reifydb_sub_flow/transaction/
state.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	EncodedKey, EncodedKeyRange,
6	interface::{BoxedMultiVersionIter, FlowNodeId},
7	key::{EncodableKey, FlowNodeStateKey},
8	value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10use tracing::instrument;
11
12use super::FlowTransaction;
13
14impl FlowTransaction {
15	/// Get state for a specific flow node and key
16	#[instrument(level = "trace", skip(self), fields(
17		node_id = id.0,
18		key_len = key.as_bytes().len(),
19		found
20	))]
21	pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
22		self.metrics.increment_state_operations();
23		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
24		let encoded_key = state_key.encode();
25		let result = self.get(&encoded_key)?;
26		tracing::Span::current().record("found", result.is_some());
27		Ok(result)
28	}
29
30	/// Set state for a specific flow node and key
31	#[instrument(level = "trace", skip(self, value), fields(
32		node_id = id.0,
33		key_len = key.as_bytes().len(),
34		value_len = value.as_ref().len()
35	))]
36	pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
37		self.metrics.increment_state_operations();
38		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
39		let encoded_key = state_key.encode();
40		self.set(&encoded_key, value)
41	}
42
43	/// Remove state for a specific flow node and key
44	#[instrument(level = "trace", skip(self), fields(
45		node_id = id.0,
46		key_len = key.as_bytes().len()
47	))]
48	pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
49		self.metrics.increment_state_operations();
50		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
51		let encoded_key = state_key.encode();
52		self.remove(&encoded_key)
53	}
54
55	/// Scan all state for a specific flow node
56	#[instrument(level = "debug", skip(self), fields(
57		node_id = id.0
58	))]
59	pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter<'_>> {
60		self.metrics.increment_state_operations();
61		let range = FlowNodeStateKey::node_range(id);
62		self.range(range)
63	}
64
65	/// Range query on state for a specific flow node
66	#[instrument(level = "debug", skip(self, range), fields(
67		node_id = id.0
68	))]
69	pub fn state_range(
70		&mut self,
71		id: FlowNodeId,
72		range: EncodedKeyRange,
73	) -> crate::Result<BoxedMultiVersionIter<'_>> {
74		self.metrics.increment_state_operations();
75		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
76		self.range(prefixed_range)
77	}
78
79	/// Clear all state for a specific flow node
80	#[instrument(level = "debug", skip(self), fields(
81		node_id = id.0,
82		removed_count
83	))]
84	pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
85		self.metrics.increment_state_operations();
86		let range = FlowNodeStateKey::node_range(id);
87		let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
88
89		let count = keys_to_remove.len();
90		for key in keys_to_remove {
91			self.remove(&key)?;
92		}
93
94		tracing::Span::current().record("removed_count", count);
95		Ok(())
96	}
97
98	/// Load state for a key, creating if not exists
99	#[instrument(level = "debug", skip(self, layout), fields(
100		node_id = id.0,
101		key_len = key.as_bytes().len(),
102		created
103	))]
104	pub fn load_or_create_row(
105		&mut self,
106		id: FlowNodeId,
107		key: &EncodedKey,
108		layout: &EncodedValuesLayout,
109	) -> crate::Result<EncodedValues> {
110		match self.state_get(id, key)? {
111			Some(row) => {
112				tracing::Span::current().record("created", false);
113				Ok(row)
114			}
115			None => {
116				tracing::Span::current().record("created", true);
117				Ok(layout.allocate())
118			}
119		}
120	}
121
122	/// Save state encoded
123	#[instrument(level = "trace", skip(self, row), fields(
124		node_id = id.0,
125		key_len = key.as_bytes().len()
126	))]
127	pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
128		self.state_set(id, key, row)
129	}
130}
131
132#[cfg(test)]
133mod tests {
134	use reifydb_core::{
135		CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
136		value::encoded::EncodedValues,
137	};
138	use reifydb_type::Type;
139
140	use super::*;
141	use crate::operator::stateful::test_utils::test::create_test_transaction;
142
143	fn make_key(s: &str) -> EncodedKey {
144		EncodedKey::new(s.as_bytes().to_vec())
145	}
146
147	fn make_value(s: &str) -> EncodedValues {
148		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
149	}
150
151	#[test]
152	fn test_state_get_set() {
153		let parent = create_test_transaction();
154		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
155
156		let node_id = FlowNodeId(1);
157		let key = make_key("state_key");
158		let value = make_value("state_value");
159
160		// Set state
161		txn.state_set(node_id, &key, value.clone()).unwrap();
162
163		// Get state back
164		let result = txn.state_get(node_id, &key).unwrap();
165		assert_eq!(result, Some(value));
166	}
167
168	#[test]
169	fn test_state_get_nonexistent() {
170		let parent = create_test_transaction();
171		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
172
173		let node_id = FlowNodeId(1);
174		let key = make_key("missing");
175
176		let result = txn.state_get(node_id, &key).unwrap();
177		assert_eq!(result, None);
178	}
179
180	#[test]
181	fn test_state_remove() {
182		let parent = create_test_transaction();
183		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
184
185		let node_id = FlowNodeId(1);
186		let key = make_key("state_key");
187		let value = make_value("state_value");
188
189		// Set then remove
190		txn.state_set(node_id, &key, value.clone()).unwrap();
191		assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
192
193		txn.state_remove(node_id, &key).unwrap();
194		assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
195	}
196
197	#[test]
198	fn test_state_isolation_between_nodes() {
199		let parent = create_test_transaction();
200		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
201
202		let node1 = FlowNodeId(1);
203		let node2 = FlowNodeId(2);
204		let key = make_key("same_key");
205
206		txn.state_set(node1, &key, make_value("node1_value")).unwrap();
207		txn.state_set(node2, &key, make_value("node2_value")).unwrap();
208
209		// Each node should have its own value
210		assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
211		assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
212	}
213
214	#[test]
215	fn test_state_scan() {
216		let parent = create_test_transaction();
217		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
218
219		let node_id = FlowNodeId(1);
220
221		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
222		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
223		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
224
225		let mut iter = txn.state_scan(node_id).unwrap();
226		let items: Vec<_> = iter.by_ref().collect();
227
228		assert_eq!(items.len(), 3);
229	}
230
231	#[test]
232	fn test_state_scan_only_own_node() {
233		let parent = create_test_transaction();
234		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
235
236		let node1 = FlowNodeId(1);
237		let node2 = FlowNodeId(2);
238
239		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
240		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
241		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
242
243		// Scan node1 should only return node1's state
244		let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
245		assert_eq!(items.len(), 2);
246
247		// Scan node2 should only return node2's state
248		let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
249		assert_eq!(items.len(), 1);
250	}
251
252	#[test]
253	fn test_state_scan_empty() {
254		let parent = create_test_transaction();
255		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
256
257		let node_id = FlowNodeId(1);
258
259		let mut iter = txn.state_scan(node_id).unwrap();
260		assert!(iter.next().is_none());
261	}
262
263	#[test]
264	fn test_state_range() {
265		let parent = create_test_transaction();
266		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
267
268		let node_id = FlowNodeId(1);
269
270		txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
271		txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
272		txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
273		txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
274
275		// Range query from "b" to "d" (exclusive)
276		use std::collections::Bound;
277		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
278		let mut iter = txn.state_range(node_id, range).unwrap();
279		let items: Vec<_> = iter.by_ref().collect();
280
281		// Should only include "b" and "c"
282		assert_eq!(items.len(), 2);
283	}
284
285	#[test]
286	fn test_state_clear() {
287		let parent = create_test_transaction();
288		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
289
290		let node_id = FlowNodeId(1);
291
292		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
293		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
294		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
295
296		// Verify state exists
297		assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
298
299		// Clear all state
300		txn.state_clear(node_id).unwrap();
301
302		// Verify state is empty
303		assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
304	}
305
306	#[test]
307	fn test_state_clear_only_own_node() {
308		let parent = create_test_transaction();
309		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
310
311		let node1 = FlowNodeId(1);
312		let node2 = FlowNodeId(2);
313
314		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
315		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
316		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
317
318		// Clear node1
319		txn.state_clear(node1).unwrap();
320
321		// Node1 should be empty
322		assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
323
324		// Node2 should still have state
325		assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
326	}
327
328	#[test]
329	fn test_state_clear_empty_node() {
330		let parent = create_test_transaction();
331		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
332
333		let node_id = FlowNodeId(1);
334
335		// Clear on empty node should not error
336		txn.state_clear(node_id).unwrap();
337	}
338
339	#[test]
340	fn test_load_or_create_existing() {
341		let parent = create_test_transaction();
342		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
343
344		let node_id = FlowNodeId(1);
345		let key = make_key("key1");
346		let value = make_value("existing");
347		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
348
349		// Set existing state
350		txn.state_set(node_id, &key, value.clone()).unwrap();
351
352		// load_or_create should return existing value
353		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
354		assert_eq!(result, value);
355	}
356
357	#[test]
358	fn test_load_or_create_new() {
359		let parent = create_test_transaction();
360		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
361
362		let node_id = FlowNodeId(1);
363		let key = make_key("key1");
364		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
365
366		// load_or_create should allocate new row
367		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
368
369		// Result should be a newly allocated row (layout.allocate())
370		assert!(!result.as_ref().is_empty());
371	}
372
373	#[test]
374	fn test_save_row() {
375		let parent = create_test_transaction();
376		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
377
378		let node_id = FlowNodeId(1);
379		let key = make_key("key1");
380		let row = make_value("row_data");
381
382		txn.save_row(node_id, &key, row.clone()).unwrap();
383
384		// Verify saved
385		let result = txn.state_get(node_id, &key).unwrap();
386		assert_eq!(result, Some(row));
387	}
388
389	#[test]
390	fn test_state_operations_increment_metrics() {
391		let parent = create_test_transaction();
392		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
393
394		let node_id = FlowNodeId(1);
395		let key = make_key("key1");
396
397		assert_eq!(txn.metrics().state_operations, 0);
398
399		txn.state_set(node_id, &key, make_value("value")).unwrap();
400		assert_eq!(txn.metrics().state_operations, 1);
401
402		txn.state_get(node_id, &key).unwrap();
403		assert_eq!(txn.metrics().state_operations, 2);
404
405		txn.state_remove(node_id, &key).unwrap();
406		assert_eq!(txn.metrics().state_operations, 3);
407
408		let _ = txn.state_scan(node_id).unwrap();
409		assert_eq!(txn.metrics().state_operations, 4);
410
411		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
412		let _ = txn.state_range(node_id, range).unwrap();
413		assert_eq!(txn.metrics().state_operations, 5);
414
415		txn.state_clear(node_id).unwrap();
416		// state_clear calls state_scan internally, so it increments by 2 (one for clear, one for the range
417		// scan)
418		assert!(txn.metrics().state_operations >= 6);
419	}
420
421	#[test]
422	fn test_state_multiple_nodes() {
423		let parent = create_test_transaction();
424		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
425
426		let node1 = FlowNodeId(1);
427		let node2 = FlowNodeId(2);
428		let node3 = FlowNodeId(3);
429
430		txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
431		txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
432		txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
433		txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
434
435		// Verify each node has correct state
436		assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
437		assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
438		assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
439		assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
440
441		// Cross-node keys should not exist
442		assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
443		assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
444	}
445
446	#[test]
447	fn test_load_or_create_increments_state_operations() {
448		let parent = create_test_transaction();
449		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
450
451		let node_id = FlowNodeId(1);
452		let key = make_key("key1");
453		let layout = EncodedValuesLayout::new(&[Type::Int8]);
454
455		let initial_count = txn.metrics().state_operations;
456
457		txn.load_or_create_row(node_id, &key, &layout).unwrap();
458
459		// load_or_create calls state_get internally
460		assert!(txn.metrics().state_operations > initial_count);
461	}
462
463	#[test]
464	fn test_save_row_increments_state_operations() {
465		let parent = create_test_transaction();
466		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
467
468		let node_id = FlowNodeId(1);
469		let key = make_key("key1");
470
471		let initial_count = txn.metrics().state_operations;
472
473		txn.save_row(node_id, &key, make_value("data")).unwrap();
474
475		// save_row calls state_set internally
476		assert!(txn.metrics().state_operations > initial_count);
477	}
478}