reifydb_sub_flow/transaction/
state.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	EncodedKey, EncodedKeyRange,
6	interface::{BoxedMultiVersionIter, FlowNodeId},
7	key::{EncodableKey, FlowNodeStateKey},
8	value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10
11use super::FlowTransaction;
12
13impl FlowTransaction {
14	/// Get state for a specific flow node and key
15	pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
16		self.metrics.increment_state_operations();
17		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
18		let encoded_key = state_key.encode();
19		self.get(&encoded_key)
20	}
21
22	/// Set state for a specific flow node and key
23	pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
24		self.metrics.increment_state_operations();
25		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26		let encoded_key = state_key.encode();
27		self.set(&encoded_key, value)
28	}
29
30	/// Remove state for a specific flow node and key
31	pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
32		self.metrics.increment_state_operations();
33		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
34		let encoded_key = state_key.encode();
35		self.remove(&encoded_key)
36	}
37
38	/// Scan all state for a specific flow node
39	pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter> {
40		self.metrics.increment_state_operations();
41		let range = FlowNodeStateKey::node_range(id);
42		self.range(range)
43	}
44
45	/// Range query on state for a specific flow node
46	pub fn state_range(&mut self, id: FlowNodeId, range: EncodedKeyRange) -> crate::Result<BoxedMultiVersionIter> {
47		self.metrics.increment_state_operations();
48		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
49		self.range(prefixed_range)
50	}
51
52	/// Clear all state for a specific flow node
53	pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
54		self.metrics.increment_state_operations();
55		let range = FlowNodeStateKey::node_range(id);
56		let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
57
58		for key in keys_to_remove {
59			self.remove(&key)?;
60		}
61
62		Ok(())
63	}
64
65	/// Load state for a key, creating if not exists
66	pub fn load_or_create_row(
67		&mut self,
68		id: FlowNodeId,
69		key: &EncodedKey,
70		layout: &EncodedValuesLayout,
71	) -> crate::Result<EncodedValues> {
72		match self.state_get(id, key)? {
73			Some(row) => Ok(row),
74			None => Ok(layout.allocate()),
75		}
76	}
77
78	/// Save state encoded
79	pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
80		self.state_set(id, key, row)
81	}
82}
83
84#[cfg(test)]
85mod tests {
86	use reifydb_core::{
87		CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
88		value::encoded::EncodedValues,
89	};
90	use reifydb_type::Type;
91
92	use super::*;
93	use crate::operator::stateful::test_utils::test::create_test_transaction;
94
95	fn make_key(s: &str) -> EncodedKey {
96		EncodedKey::new(s.as_bytes().to_vec())
97	}
98
99	fn make_value(s: &str) -> EncodedValues {
100		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
101	}
102
103	#[test]
104	fn test_state_get_set() {
105		let parent = create_test_transaction();
106		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
107
108		let node_id = FlowNodeId(1);
109		let key = make_key("state_key");
110		let value = make_value("state_value");
111
112		// Set state
113		txn.state_set(node_id, &key, value.clone()).unwrap();
114
115		// Get state back
116		let result = txn.state_get(node_id, &key).unwrap();
117		assert_eq!(result, Some(value));
118	}
119
120	#[test]
121	fn test_state_get_nonexistent() {
122		let parent = create_test_transaction();
123		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
124
125		let node_id = FlowNodeId(1);
126		let key = make_key("missing");
127
128		let result = txn.state_get(node_id, &key).unwrap();
129		assert_eq!(result, None);
130	}
131
132	#[test]
133	fn test_state_remove() {
134		let parent = create_test_transaction();
135		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
136
137		let node_id = FlowNodeId(1);
138		let key = make_key("state_key");
139		let value = make_value("state_value");
140
141		// Set then remove
142		txn.state_set(node_id, &key, value.clone()).unwrap();
143		assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
144
145		txn.state_remove(node_id, &key).unwrap();
146		assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
147	}
148
149	#[test]
150	fn test_state_isolation_between_nodes() {
151		let parent = create_test_transaction();
152		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
153
154		let node1 = FlowNodeId(1);
155		let node2 = FlowNodeId(2);
156		let key = make_key("same_key");
157
158		txn.state_set(node1, &key, make_value("node1_value")).unwrap();
159		txn.state_set(node2, &key, make_value("node2_value")).unwrap();
160
161		// Each node should have its own value
162		assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
163		assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
164	}
165
166	#[test]
167	fn test_state_scan() {
168		let parent = create_test_transaction();
169		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
170
171		let node_id = FlowNodeId(1);
172
173		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
174		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
175		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
176
177		let mut iter = txn.state_scan(node_id).unwrap();
178		let items: Vec<_> = iter.by_ref().collect();
179
180		assert_eq!(items.len(), 3);
181	}
182
183	#[test]
184	fn test_state_scan_only_own_node() {
185		let parent = create_test_transaction();
186		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
187
188		let node1 = FlowNodeId(1);
189		let node2 = FlowNodeId(2);
190
191		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
192		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
193		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
194
195		// Scan node1 should only return node1's state
196		let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
197		assert_eq!(items.len(), 2);
198
199		// Scan node2 should only return node2's state
200		let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
201		assert_eq!(items.len(), 1);
202	}
203
204	#[test]
205	fn test_state_scan_empty() {
206		let parent = create_test_transaction();
207		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
208
209		let node_id = FlowNodeId(1);
210
211		let mut iter = txn.state_scan(node_id).unwrap();
212		assert!(iter.next().is_none());
213	}
214
215	#[test]
216	fn test_state_range() {
217		let parent = create_test_transaction();
218		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
219
220		let node_id = FlowNodeId(1);
221
222		txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
223		txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
224		txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
225		txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
226
227		// Range query from "b" to "d" (exclusive)
228		use std::collections::Bound;
229		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
230		let mut iter = txn.state_range(node_id, range).unwrap();
231		let items: Vec<_> = iter.by_ref().collect();
232
233		// Should only include "b" and "c"
234		assert_eq!(items.len(), 2);
235	}
236
237	#[test]
238	fn test_state_clear() {
239		let parent = create_test_transaction();
240		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
241
242		let node_id = FlowNodeId(1);
243
244		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
245		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
246		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
247
248		// Verify state exists
249		assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
250
251		// Clear all state
252		txn.state_clear(node_id).unwrap();
253
254		// Verify state is empty
255		assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
256	}
257
258	#[test]
259	fn test_state_clear_only_own_node() {
260		let parent = create_test_transaction();
261		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
262
263		let node1 = FlowNodeId(1);
264		let node2 = FlowNodeId(2);
265
266		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
267		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
268		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
269
270		// Clear node1
271		txn.state_clear(node1).unwrap();
272
273		// Node1 should be empty
274		assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
275
276		// Node2 should still have state
277		assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
278	}
279
280	#[test]
281	fn test_state_clear_empty_node() {
282		let parent = create_test_transaction();
283		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
284
285		let node_id = FlowNodeId(1);
286
287		// Clear on empty node should not error
288		txn.state_clear(node_id).unwrap();
289	}
290
291	#[test]
292	fn test_load_or_create_existing() {
293		let parent = create_test_transaction();
294		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
295
296		let node_id = FlowNodeId(1);
297		let key = make_key("key1");
298		let value = make_value("existing");
299		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
300
301		// Set existing state
302		txn.state_set(node_id, &key, value.clone()).unwrap();
303
304		// load_or_create should return existing value
305		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
306		assert_eq!(result, value);
307	}
308
309	#[test]
310	fn test_load_or_create_new() {
311		let parent = create_test_transaction();
312		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
313
314		let node_id = FlowNodeId(1);
315		let key = make_key("key1");
316		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
317
318		// load_or_create should allocate new row
319		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
320
321		// Result should be a newly allocated row (layout.allocate())
322		assert!(!result.as_ref().is_empty());
323	}
324
325	#[test]
326	fn test_save_row() {
327		let parent = create_test_transaction();
328		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
329
330		let node_id = FlowNodeId(1);
331		let key = make_key("key1");
332		let row = make_value("row_data");
333
334		txn.save_row(node_id, &key, row.clone()).unwrap();
335
336		// Verify saved
337		let result = txn.state_get(node_id, &key).unwrap();
338		assert_eq!(result, Some(row));
339	}
340
341	#[test]
342	fn test_state_operations_increment_metrics() {
343		let parent = create_test_transaction();
344		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
345
346		let node_id = FlowNodeId(1);
347		let key = make_key("key1");
348
349		assert_eq!(txn.metrics().state_operations, 0);
350
351		txn.state_set(node_id, &key, make_value("value")).unwrap();
352		assert_eq!(txn.metrics().state_operations, 1);
353
354		txn.state_get(node_id, &key).unwrap();
355		assert_eq!(txn.metrics().state_operations, 2);
356
357		txn.state_remove(node_id, &key).unwrap();
358		assert_eq!(txn.metrics().state_operations, 3);
359
360		let _ = txn.state_scan(node_id).unwrap();
361		assert_eq!(txn.metrics().state_operations, 4);
362
363		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
364		let _ = txn.state_range(node_id, range).unwrap();
365		assert_eq!(txn.metrics().state_operations, 5);
366
367		txn.state_clear(node_id).unwrap();
368		// state_clear calls state_scan internally, so it increments by 2 (one for clear, one for the range
369		// scan)
370		assert!(txn.metrics().state_operations >= 6);
371	}
372
373	#[test]
374	fn test_state_multiple_nodes() {
375		let parent = create_test_transaction();
376		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
377
378		let node1 = FlowNodeId(1);
379		let node2 = FlowNodeId(2);
380		let node3 = FlowNodeId(3);
381
382		txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
383		txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
384		txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
385		txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
386
387		// Verify each node has correct state
388		assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
389		assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
390		assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
391		assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
392
393		// Cross-node keys should not exist
394		assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
395		assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
396	}
397
398	#[test]
399	fn test_load_or_create_increments_state_operations() {
400		let parent = create_test_transaction();
401		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
402
403		let node_id = FlowNodeId(1);
404		let key = make_key("key1");
405		let layout = EncodedValuesLayout::new(&[Type::Int8]);
406
407		let initial_count = txn.metrics().state_operations;
408
409		txn.load_or_create_row(node_id, &key, &layout).unwrap();
410
411		// load_or_create calls state_get internally
412		assert!(txn.metrics().state_operations > initial_count);
413	}
414
415	#[test]
416	fn test_save_row_increments_state_operations() {
417		let parent = create_test_transaction();
418		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
419
420		let node_id = FlowNodeId(1);
421		let key = make_key("key1");
422
423		let initial_count = txn.metrics().state_operations;
424
425		txn.save_row(node_id, &key, make_value("data")).unwrap();
426
427		// save_row calls state_set internally
428		assert!(txn.metrics().state_operations > initial_count);
429	}
430}