Skip to main content

reifydb_sub_flow/transaction/
state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::{
6		key::{EncodedKey, EncodedKeyRange},
7		row::EncodedRow,
8		shape::RowShape,
9	},
10	interface::{catalog::flow::FlowNodeId, store::MultiVersionBatch},
11	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use reifydb_type::Result;
14use tracing::{Span, field, instrument};
15
16use super::FlowTransaction;
17
18impl FlowTransaction {
19	#[instrument(name = "flow::state::get", level = "trace", skip(self), fields(
20		node_id = id.0,
21		key_len = key.as_bytes().len(),
22		found = field::Empty
23	))]
24	pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<Option<EncodedRow>> {
25		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26		let encoded_key = state_key.encode();
27		let result = self.get(&encoded_key)?;
28		Span::current().record("found", result.is_some());
29		Ok(result)
30	}
31
32	#[instrument(name = "flow::state::set", level = "trace", skip(self, value), fields(
33		node_id = id.0,
34		key_len = key.as_bytes().len(),
35		value_len = value.len()
36	))]
37	pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedRow) -> Result<()> {
38		let state_key = FlowNodeStateKey::new(id, key.to_vec());
39		let encoded_key = state_key.encode();
40		self.set(&encoded_key, value)
41	}
42
43	#[instrument(name = "flow::state::remove", level = "trace", skip(self), fields(
44		node_id = id.0,
45		key_len = key.as_bytes().len()
46	))]
47	pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()> {
48		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
49		let encoded_key = state_key.encode();
50		self.remove(&encoded_key)
51	}
52
53	#[instrument(name = "flow::state::scan", level = "debug", skip(self), fields(
54		node_id = id.0,
55		result_count = field::Empty
56	))]
57	pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch> {
58		let range = FlowNodeStateKey::node_range(id);
59		let iter = self.range(range, 1024);
60		let mut items = Vec::new();
61		for result in iter {
62			items.push(result?);
63		}
64		Span::current().record("result_count", items.len());
65		Ok(MultiVersionBatch {
66			items,
67			has_more: false,
68		})
69	}
70
71	#[instrument(name = "flow::state::range", level = "debug", skip(self, range), fields(
72		node_id = id.0
73	))]
74	pub fn state_range(&mut self, id: FlowNodeId, range: EncodedKeyRange) -> Result<MultiVersionBatch> {
75		let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(id, vec![]));
76		let iter = self.range(prefixed_range, 1024);
77		let mut items = Vec::new();
78		for result in iter {
79			items.push(result?);
80		}
81		Ok(MultiVersionBatch {
82			items,
83			has_more: false,
84		})
85	}
86
87	#[instrument(name = "flow::state::clear", level = "trace", skip(self), fields(
88		node_id = id.0,
89		keys_removed = field::Empty
90	))]
91	pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()> {
92		// Phase 1: Scan to collect all keys
93		let keys_to_remove = self.scan_keys_for_clear(id)?;
94
95		// Phase 2: Remove all collected keys
96		let count = keys_to_remove.len();
97		self.remove_keys(keys_to_remove)?;
98
99		Span::current().record("keys_removed", count);
100		Ok(())
101	}
102
103	#[inline]
104	#[instrument(name = "flow::state::clear::scan", level = "trace", skip(self), fields(node_id = id.0))]
105	fn scan_keys_for_clear(&mut self, id: FlowNodeId) -> Result<Vec<EncodedKey>> {
106		let range = FlowNodeStateKey::node_range(id);
107		let iter = self.range(range, 1024);
108		let mut keys = Vec::new();
109		for result in iter {
110			let multi = result?;
111			keys.push(multi.key);
112		}
113		Ok(keys)
114	}
115
116	#[inline]
117	#[instrument(name = "flow::state::clear::remove", level = "trace", skip(self, keys), fields(count = keys.len()))]
118	fn remove_keys(&mut self, keys: Vec<EncodedKey>) -> Result<()> {
119		for key in keys {
120			self.remove(&key)?;
121		}
122		Ok(())
123	}
124
125	#[instrument(name = "flow::state::load_or_create", level = "debug", skip(self, shape), fields(
126		node_id = id.0,
127		key_len = key.as_bytes().len(),
128		created
129	))]
130	pub fn load_or_create_row(&mut self, id: FlowNodeId, key: &EncodedKey, shape: &RowShape) -> Result<EncodedRow> {
131		match self.state_get(id, key)? {
132			Some(row) => {
133				Span::current().record("created", false);
134				Ok(row)
135			}
136			None => {
137				Span::current().record("created", true);
138				Ok(shape.allocate())
139			}
140		}
141	}
142
143	#[instrument(name = "flow::state::save", level = "trace", skip(self, row), fields(
144		node_id = id.0,
145		key_len = key.as_bytes().len()
146	))]
147	pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedRow) -> Result<()> {
148		self.state_set(id, key, row)
149	}
150}
151
152#[cfg(test)]
153pub mod tests {
154	use std::collections::Bound;
155
156	use reifydb_catalog::catalog::Catalog;
157	use reifydb_core::{
158		common::CommitVersion,
159		encoded::{
160			key::{EncodedKey, EncodedKeyRange},
161			row::EncodedRow,
162			shape::RowShape,
163		},
164		interface::catalog::flow::FlowNodeId,
165	};
166	use reifydb_runtime::context::clock::{Clock, MockClock};
167	use reifydb_transaction::interceptor::interceptors::Interceptors;
168	use reifydb_type::{util::cowvec::CowVec, value::r#type::Type};
169
170	use super::*;
171	use crate::operator::stateful::test_utils::test::create_test_transaction;
172
173	fn make_key(s: &str) -> EncodedKey {
174		EncodedKey::new(s.as_bytes().to_vec())
175	}
176
177	fn make_value(s: &str) -> EncodedRow {
178		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
179	}
180
181	#[test]
182	fn test_state_get_set() {
183		let parent = create_test_transaction();
184		let mut txn = FlowTransaction::deferred(
185			&parent,
186			CommitVersion(1),
187			Catalog::testing(),
188			Interceptors::new(),
189			Clock::Mock(MockClock::from_millis(1000)),
190		);
191
192		let node_id = FlowNodeId(1);
193		let key = make_key("state_key");
194		let value = make_value("state_value");
195
196		// Set state
197		txn.state_set(node_id, &key, value.clone()).unwrap();
198
199		// Get state back
200		let result = txn.state_get(node_id, &key).unwrap();
201		assert_eq!(result, Some(value));
202	}
203
204	#[test]
205	fn test_state_get_nonexistent() {
206		let parent = create_test_transaction();
207		let mut txn = FlowTransaction::deferred(
208			&parent,
209			CommitVersion(1),
210			Catalog::testing(),
211			Interceptors::new(),
212			Clock::Mock(MockClock::from_millis(1000)),
213		);
214
215		let node_id = FlowNodeId(1);
216		let key = make_key("missing");
217
218		let result = txn.state_get(node_id, &key).unwrap();
219		assert_eq!(result, None);
220	}
221
222	#[test]
223	fn test_state_remove() {
224		let parent = create_test_transaction();
225		let mut txn = FlowTransaction::deferred(
226			&parent,
227			CommitVersion(1),
228			Catalog::testing(),
229			Interceptors::new(),
230			Clock::Mock(MockClock::from_millis(1000)),
231		);
232
233		let node_id = FlowNodeId(1);
234		let key = make_key("state_key");
235		let value = make_value("state_value");
236
237		// Set then remove
238		txn.state_set(node_id, &key, value.clone()).unwrap();
239		assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
240
241		txn.state_remove(node_id, &key).unwrap();
242		assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
243	}
244
245	#[test]
246	fn test_state_isolation_between_nodes() {
247		let parent = create_test_transaction();
248		let mut txn = FlowTransaction::deferred(
249			&parent,
250			CommitVersion(1),
251			Catalog::testing(),
252			Interceptors::new(),
253			Clock::Mock(MockClock::from_millis(1000)),
254		);
255
256		let node1 = FlowNodeId(1);
257		let node2 = FlowNodeId(2);
258		let key = make_key("same_key");
259
260		txn.state_set(node1, &key, make_value("node1_value")).unwrap();
261		txn.state_set(node2, &key, make_value("node2_value")).unwrap();
262
263		// Each node should have its own value
264		assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
265		assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
266	}
267
268	#[test]
269	fn test_state_scan() {
270		let parent = create_test_transaction();
271		let mut txn = FlowTransaction::deferred(
272			&parent,
273			CommitVersion(1),
274			Catalog::testing(),
275			Interceptors::new(),
276			Clock::Mock(MockClock::from_millis(1000)),
277		);
278
279		let node_id = FlowNodeId(1);
280
281		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
282		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
283		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
284
285		let iter = txn.state_scan(node_id).unwrap();
286		let items: Vec<_> = iter.items.into_iter().collect();
287
288		assert_eq!(items.len(), 3);
289	}
290
291	#[test]
292	fn test_state_scan_only_own_node() {
293		let parent = create_test_transaction();
294		let mut txn = FlowTransaction::deferred(
295			&parent,
296			CommitVersion(1),
297			Catalog::testing(),
298			Interceptors::new(),
299			Clock::Mock(MockClock::from_millis(1000)),
300		);
301
302		let node1 = FlowNodeId(1);
303		let node2 = FlowNodeId(2);
304
305		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
306		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
307		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
308
309		// Scan node1 should only return node1's state
310		let items: Vec<_> = txn.state_scan(node1).unwrap().items.into_iter().collect();
311		assert_eq!(items.len(), 2);
312
313		// Scan node2 should only return node2's state
314		let items: Vec<_> = txn.state_scan(node2).unwrap().items.into_iter().collect();
315		assert_eq!(items.len(), 1);
316	}
317
318	#[test]
319	fn test_state_scan_empty() {
320		let parent = create_test_transaction();
321		let mut txn = FlowTransaction::deferred(
322			&parent,
323			CommitVersion(1),
324			Catalog::testing(),
325			Interceptors::new(),
326			Clock::Mock(MockClock::from_millis(1000)),
327		);
328
329		let node_id = FlowNodeId(1);
330
331		let iter = txn.state_scan(node_id).unwrap();
332		assert!(iter.items.into_iter().next().is_none());
333	}
334
335	#[test]
336	fn test_state_range() {
337		let parent = create_test_transaction();
338		let mut txn = FlowTransaction::deferred(
339			&parent,
340			CommitVersion(1),
341			Catalog::testing(),
342			Interceptors::new(),
343			Clock::Mock(MockClock::from_millis(1000)),
344		);
345
346		let node_id = FlowNodeId(1);
347
348		txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
349		txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
350		txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
351		txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
352
353		// Range query from "b" to "d" (exclusive)
354		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
355		let iter = txn.state_range(node_id, range).unwrap();
356		let items: Vec<_> = iter.items.into_iter().collect();
357
358		// Should only include "b" and "c"
359		assert_eq!(items.len(), 2);
360	}
361
362	#[test]
363	fn test_state_clear() {
364		let parent = create_test_transaction();
365		let mut txn = FlowTransaction::deferred(
366			&parent,
367			CommitVersion(1),
368			Catalog::testing(),
369			Interceptors::new(),
370			Clock::Mock(MockClock::from_millis(1000)),
371		);
372
373		let node_id = FlowNodeId(1);
374
375		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
376		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
377		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
378
379		// Verify state exists
380		assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 3);
381
382		// Clear all state
383		txn.state_clear(node_id).unwrap();
384
385		// Verify state is empty
386		assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 0);
387	}
388
389	#[test]
390	fn test_state_clear_only_own_node() {
391		let parent = create_test_transaction();
392		let mut txn = FlowTransaction::deferred(
393			&parent,
394			CommitVersion(1),
395			Catalog::testing(),
396			Interceptors::new(),
397			Clock::Mock(MockClock::from_millis(1000)),
398		);
399
400		let node1 = FlowNodeId(1);
401		let node2 = FlowNodeId(2);
402
403		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
404		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
405		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
406
407		// Clear node1
408		txn.state_clear(node1).unwrap();
409
410		// Node1 should be empty
411		assert_eq!(txn.state_scan(node1).unwrap().items.into_iter().count(), 0);
412
413		// Node2 should still have state
414		assert_eq!(txn.state_scan(node2).unwrap().items.into_iter().count(), 1);
415	}
416
417	#[test]
418	fn test_state_clear_empty_node() {
419		let parent = create_test_transaction();
420		let mut txn = FlowTransaction::deferred(
421			&parent,
422			CommitVersion(1),
423			Catalog::testing(),
424			Interceptors::new(),
425			Clock::Mock(MockClock::from_millis(1000)),
426		);
427
428		let node_id = FlowNodeId(1);
429
430		// Clear on empty node should not error
431		txn.state_clear(node_id).unwrap();
432	}
433
434	#[test]
435	fn test_load_or_create_existing() {
436		let parent = create_test_transaction();
437		let mut txn = FlowTransaction::deferred(
438			&parent,
439			CommitVersion(1),
440			Catalog::testing(),
441			Interceptors::new(),
442			Clock::Mock(MockClock::from_millis(1000)),
443		);
444
445		let node_id = FlowNodeId(1);
446		let key = make_key("key1");
447		let value = make_value("existing");
448		let shape = RowShape::testing(&[Type::Int8, Type::Float8]);
449
450		// Set existing state
451		txn.state_set(node_id, &key, value.clone()).unwrap();
452
453		// load_or_create should return existing value
454		let result = txn.load_or_create_row(node_id, &key, &shape).unwrap();
455		assert_eq!(result, value);
456	}
457
458	#[test]
459	fn test_load_or_create_new() {
460		let parent = create_test_transaction();
461		let mut txn = FlowTransaction::deferred(
462			&parent,
463			CommitVersion(1),
464			Catalog::testing(),
465			Interceptors::new(),
466			Clock::Mock(MockClock::from_millis(1000)),
467		);
468
469		let node_id = FlowNodeId(1);
470		let key = make_key("key1");
471		let shape = RowShape::testing(&[Type::Int8, Type::Float8]);
472
473		// load_or_create should allocate new row
474		let result = txn.load_or_create_row(node_id, &key, &shape).unwrap();
475
476		// Result should be a newly allocated row (shape.allocate())
477		assert!(!result.is_empty());
478	}
479
480	#[test]
481	fn test_save_row() {
482		let parent = create_test_transaction();
483		let mut txn = FlowTransaction::deferred(
484			&parent,
485			CommitVersion(1),
486			Catalog::testing(),
487			Interceptors::new(),
488			Clock::Mock(MockClock::from_millis(1000)),
489		);
490
491		let node_id = FlowNodeId(1);
492		let key = make_key("key1");
493		let row = make_value("row_data");
494
495		txn.save_row(node_id, &key, row.clone()).unwrap();
496
497		// Verify saved
498		let result = txn.state_get(node_id, &key).unwrap();
499		assert_eq!(result, Some(row));
500	}
501
502	#[test]
503	fn test_state_multiple_nodes() {
504		let parent = create_test_transaction();
505		let mut txn = FlowTransaction::deferred(
506			&parent,
507			CommitVersion(1),
508			Catalog::testing(),
509			Interceptors::new(),
510			Clock::Mock(MockClock::from_millis(1000)),
511		);
512
513		let node1 = FlowNodeId(1);
514		let node2 = FlowNodeId(2);
515		let node3 = FlowNodeId(3);
516
517		txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
518		txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
519		txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
520		txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
521
522		// Verify each node has correct state
523		assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
524		assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
525		assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
526		assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
527
528		// Cross-node keys should not exist
529		assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
530		assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
531	}
532}