reifydb_sub_flow/transaction/
state.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	EncodedKey, EncodedKeyRange,
6	interface::{BoxedMultiVersionIter, FlowNodeId},
7	key::{EncodableKey, FlowNodeStateKey},
8	value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10
11use super::FlowTransaction;
12
13impl FlowTransaction {
14	/// Get state for a specific flow node and key
15	pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
16		self.metrics.increment_state_operations();
17		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
18		let encoded_key = state_key.encode();
19		self.get(&encoded_key)
20	}
21
22	/// Set state for a specific flow node and key
23	pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
24		self.metrics.increment_state_operations();
25		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26		let encoded_key = state_key.encode();
27		self.set(&encoded_key, value)
28	}
29
30	/// Remove state for a specific flow node and key
31	pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
32		self.metrics.increment_state_operations();
33		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
34		let encoded_key = state_key.encode();
35		self.remove(&encoded_key)
36	}
37
38	/// Scan all state for a specific flow node
39	pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter<'_>> {
40		self.metrics.increment_state_operations();
41		let range = FlowNodeStateKey::node_range(id);
42		self.range(range)
43	}
44
45	/// Range query on state for a specific flow node
46	pub fn state_range(
47		&mut self,
48		id: FlowNodeId,
49		range: EncodedKeyRange,
50	) -> crate::Result<BoxedMultiVersionIter<'_>> {
51		self.metrics.increment_state_operations();
52		let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
53		self.range(prefixed_range)
54	}
55
56	/// Clear all state for a specific flow node
57	pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
58		self.metrics.increment_state_operations();
59		let range = FlowNodeStateKey::node_range(id);
60		let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
61
62		for key in keys_to_remove {
63			self.remove(&key)?;
64		}
65
66		Ok(())
67	}
68
69	/// Load state for a key, creating if not exists
70	pub fn load_or_create_row(
71		&mut self,
72		id: FlowNodeId,
73		key: &EncodedKey,
74		layout: &EncodedValuesLayout,
75	) -> crate::Result<EncodedValues> {
76		match self.state_get(id, key)? {
77			Some(row) => Ok(row),
78			None => Ok(layout.allocate()),
79		}
80	}
81
82	/// Save state encoded
83	pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
84		self.state_set(id, key, row)
85	}
86}
87
88#[cfg(test)]
89mod tests {
90	use reifydb_core::{
91		CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
92		value::encoded::EncodedValues,
93	};
94	use reifydb_type::Type;
95
96	use super::*;
97	use crate::operator::stateful::test_utils::test::create_test_transaction;
98
99	fn make_key(s: &str) -> EncodedKey {
100		EncodedKey::new(s.as_bytes().to_vec())
101	}
102
103	fn make_value(s: &str) -> EncodedValues {
104		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
105	}
106
107	#[test]
108	fn test_state_get_set() {
109		let parent = create_test_transaction();
110		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
111
112		let node_id = FlowNodeId(1);
113		let key = make_key("state_key");
114		let value = make_value("state_value");
115
116		// Set state
117		txn.state_set(node_id, &key, value.clone()).unwrap();
118
119		// Get state back
120		let result = txn.state_get(node_id, &key).unwrap();
121		assert_eq!(result, Some(value));
122	}
123
124	#[test]
125	fn test_state_get_nonexistent() {
126		let parent = create_test_transaction();
127		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
128
129		let node_id = FlowNodeId(1);
130		let key = make_key("missing");
131
132		let result = txn.state_get(node_id, &key).unwrap();
133		assert_eq!(result, None);
134	}
135
136	#[test]
137	fn test_state_remove() {
138		let parent = create_test_transaction();
139		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
140
141		let node_id = FlowNodeId(1);
142		let key = make_key("state_key");
143		let value = make_value("state_value");
144
145		// Set then remove
146		txn.state_set(node_id, &key, value.clone()).unwrap();
147		assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
148
149		txn.state_remove(node_id, &key).unwrap();
150		assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
151	}
152
153	#[test]
154	fn test_state_isolation_between_nodes() {
155		let parent = create_test_transaction();
156		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
157
158		let node1 = FlowNodeId(1);
159		let node2 = FlowNodeId(2);
160		let key = make_key("same_key");
161
162		txn.state_set(node1, &key, make_value("node1_value")).unwrap();
163		txn.state_set(node2, &key, make_value("node2_value")).unwrap();
164
165		// Each node should have its own value
166		assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
167		assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
168	}
169
170	#[test]
171	fn test_state_scan() {
172		let parent = create_test_transaction();
173		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
174
175		let node_id = FlowNodeId(1);
176
177		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
178		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
179		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
180
181		let mut iter = txn.state_scan(node_id).unwrap();
182		let items: Vec<_> = iter.by_ref().collect();
183
184		assert_eq!(items.len(), 3);
185	}
186
187	#[test]
188	fn test_state_scan_only_own_node() {
189		let parent = create_test_transaction();
190		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
191
192		let node1 = FlowNodeId(1);
193		let node2 = FlowNodeId(2);
194
195		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
196		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
197		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
198
199		// Scan node1 should only return node1's state
200		let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
201		assert_eq!(items.len(), 2);
202
203		// Scan node2 should only return node2's state
204		let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
205		assert_eq!(items.len(), 1);
206	}
207
208	#[test]
209	fn test_state_scan_empty() {
210		let parent = create_test_transaction();
211		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
212
213		let node_id = FlowNodeId(1);
214
215		let mut iter = txn.state_scan(node_id).unwrap();
216		assert!(iter.next().is_none());
217	}
218
219	#[test]
220	fn test_state_range() {
221		let parent = create_test_transaction();
222		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
223
224		let node_id = FlowNodeId(1);
225
226		txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
227		txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
228		txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
229		txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
230
231		// Range query from "b" to "d" (exclusive)
232		use std::collections::Bound;
233		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
234		let mut iter = txn.state_range(node_id, range).unwrap();
235		let items: Vec<_> = iter.by_ref().collect();
236
237		// Should only include "b" and "c"
238		assert_eq!(items.len(), 2);
239	}
240
241	#[test]
242	fn test_state_clear() {
243		let parent = create_test_transaction();
244		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
245
246		let node_id = FlowNodeId(1);
247
248		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
249		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
250		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
251
252		// Verify state exists
253		assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
254
255		// Clear all state
256		txn.state_clear(node_id).unwrap();
257
258		// Verify state is empty
259		assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
260	}
261
262	#[test]
263	fn test_state_clear_only_own_node() {
264		let parent = create_test_transaction();
265		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
266
267		let node1 = FlowNodeId(1);
268		let node2 = FlowNodeId(2);
269
270		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
271		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
272		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
273
274		// Clear node1
275		txn.state_clear(node1).unwrap();
276
277		// Node1 should be empty
278		assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
279
280		// Node2 should still have state
281		assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
282	}
283
284	#[test]
285	fn test_state_clear_empty_node() {
286		let parent = create_test_transaction();
287		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
288
289		let node_id = FlowNodeId(1);
290
291		// Clear on empty node should not error
292		txn.state_clear(node_id).unwrap();
293	}
294
295	#[test]
296	fn test_load_or_create_existing() {
297		let parent = create_test_transaction();
298		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
299
300		let node_id = FlowNodeId(1);
301		let key = make_key("key1");
302		let value = make_value("existing");
303		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
304
305		// Set existing state
306		txn.state_set(node_id, &key, value.clone()).unwrap();
307
308		// load_or_create should return existing value
309		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
310		assert_eq!(result, value);
311	}
312
313	#[test]
314	fn test_load_or_create_new() {
315		let parent = create_test_transaction();
316		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
317
318		let node_id = FlowNodeId(1);
319		let key = make_key("key1");
320		let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
321
322		// load_or_create should allocate new row
323		let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
324
325		// Result should be a newly allocated row (layout.allocate())
326		assert!(!result.as_ref().is_empty());
327	}
328
329	#[test]
330	fn test_save_row() {
331		let parent = create_test_transaction();
332		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
333
334		let node_id = FlowNodeId(1);
335		let key = make_key("key1");
336		let row = make_value("row_data");
337
338		txn.save_row(node_id, &key, row.clone()).unwrap();
339
340		// Verify saved
341		let result = txn.state_get(node_id, &key).unwrap();
342		assert_eq!(result, Some(row));
343	}
344
345	#[test]
346	fn test_state_operations_increment_metrics() {
347		let parent = create_test_transaction();
348		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
349
350		let node_id = FlowNodeId(1);
351		let key = make_key("key1");
352
353		assert_eq!(txn.metrics().state_operations, 0);
354
355		txn.state_set(node_id, &key, make_value("value")).unwrap();
356		assert_eq!(txn.metrics().state_operations, 1);
357
358		txn.state_get(node_id, &key).unwrap();
359		assert_eq!(txn.metrics().state_operations, 2);
360
361		txn.state_remove(node_id, &key).unwrap();
362		assert_eq!(txn.metrics().state_operations, 3);
363
364		let _ = txn.state_scan(node_id).unwrap();
365		assert_eq!(txn.metrics().state_operations, 4);
366
367		let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
368		let _ = txn.state_range(node_id, range).unwrap();
369		assert_eq!(txn.metrics().state_operations, 5);
370
371		txn.state_clear(node_id).unwrap();
372		// state_clear calls state_scan internally, so it increments by 2 (one for clear, one for the range
373		// scan)
374		assert!(txn.metrics().state_operations >= 6);
375	}
376
377	#[test]
378	fn test_state_multiple_nodes() {
379		let parent = create_test_transaction();
380		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
381
382		let node1 = FlowNodeId(1);
383		let node2 = FlowNodeId(2);
384		let node3 = FlowNodeId(3);
385
386		txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
387		txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
388		txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
389		txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
390
391		// Verify each node has correct state
392		assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
393		assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
394		assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
395		assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
396
397		// Cross-node keys should not exist
398		assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
399		assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
400	}
401
402	#[test]
403	fn test_load_or_create_increments_state_operations() {
404		let parent = create_test_transaction();
405		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
406
407		let node_id = FlowNodeId(1);
408		let key = make_key("key1");
409		let layout = EncodedValuesLayout::new(&[Type::Int8]);
410
411		let initial_count = txn.metrics().state_operations;
412
413		txn.load_or_create_row(node_id, &key, &layout).unwrap();
414
415		// load_or_create calls state_get internally
416		assert!(txn.metrics().state_operations > initial_count);
417	}
418
419	#[test]
420	fn test_save_row_increments_state_operations() {
421		let parent = create_test_transaction();
422		let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
423
424		let node_id = FlowNodeId(1);
425		let key = make_key("key1");
426
427		let initial_count = txn.metrics().state_operations;
428
429		txn.save_row(node_id, &key, make_value("data")).unwrap();
430
431		// save_row calls state_set internally
432		assert!(txn.metrics().state_operations > initial_count);
433	}
434}