Skip to main content

reifydb_sub_flow/transaction/
state.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::{
6		encoded::EncodedValues,
7		key::{EncodedKey, EncodedKeyRange},
8		schema::Schema,
9	},
10	interface::{catalog::flow::FlowNodeId, store::MultiVersionBatch},
11	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use tracing::{Span, instrument};
14
15use super::FlowTransaction;
16
17impl FlowTransaction {
18	/// Get state for a specific flow node and key
19	#[instrument(name = "flow::state::get", level = "trace", skip(self), fields(
20		node_id = id.0,
21		key_len = key.as_bytes().len(),
22		found = tracing::field::Empty
23	))]
24	pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> reifydb_type::Result<Option<EncodedValues>> {
25		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26		let encoded_key = state_key.encode();
27		let result = self.get(&encoded_key)?;
28		Span::current().record("found", result.is_some());
29		Ok(result)
30	}
31
32	/// Set state for a specific flow node and key
33	#[instrument(name = "flow::state::set", level = "trace", skip(self, value), fields(
34		node_id = id.0,
35		key_len = key.as_bytes().len(),
36		value_len = value.as_ref().len()
37	))]
38	pub fn state_set(
39		&mut self,
40		id: FlowNodeId,
41		key: &EncodedKey,
42		value: EncodedValues,
43	) -> reifydb_type::Result<()> {
44		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
45		let encoded_key = state_key.encode();
46		self.set(&encoded_key, value)
47	}
48
49	/// Remove state for a specific flow node and key
50	#[instrument(name = "flow::state::remove", level = "trace", skip(self), fields(
51		node_id = id.0,
52		key_len = key.as_bytes().len()
53	))]
54	pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> reifydb_type::Result<()> {
55		let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
56		let encoded_key = state_key.encode();
57		self.remove(&encoded_key)
58	}
59
60	/// Scan all state for a specific flow node
61	#[instrument(name = "flow::state::scan", level = "debug", skip(self), fields(
62		node_id = id.0,
63		result_count = tracing::field::Empty
64	))]
65	pub fn state_scan(&mut self, id: FlowNodeId) -> reifydb_type::Result<MultiVersionBatch> {
66		let range = FlowNodeStateKey::node_range(id);
67		let mut iter = self.range(range, 1024);
68		let mut items = Vec::new();
69		while let Some(result) = iter.next() {
70			items.push(result?);
71		}
72		Span::current().record("result_count", items.len());
73		Ok(MultiVersionBatch {
74			items,
75			has_more: false,
76		})
77	}
78
79	/// Range query on state for a specific flow node
80	#[instrument(name = "flow::state::range", level = "debug", skip(self, range), fields(
81		node_id = id.0
82	))]
83	pub fn state_range(
84		&mut self,
85		id: FlowNodeId,
86		range: EncodedKeyRange,
87	) -> reifydb_type::Result<MultiVersionBatch> {
88		let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(id, vec![]));
89		let mut iter = self.range(prefixed_range, 1024);
90		let mut items = Vec::new();
91		while let Some(result) = iter.next() {
92			items.push(result?);
93		}
94		Ok(MultiVersionBatch {
95			items,
96			has_more: false,
97		})
98	}
99
100	/// Clear all state for a specific flow node
101	#[instrument(name = "flow::state::clear", level = "trace", skip(self), fields(
102		node_id = id.0,
103		keys_removed = tracing::field::Empty
104	))]
105	pub fn state_clear(&mut self, id: FlowNodeId) -> reifydb_type::Result<()> {
106		// Phase 1: Scan to collect all keys
107		let keys_to_remove = self.scan_keys_for_clear(id)?;
108
109		// Phase 2: Remove all collected keys
110		let count = keys_to_remove.len();
111		self.remove_keys(keys_to_remove)?;
112
113		Span::current().record("keys_removed", count);
114		Ok(())
115	}
116
117	/// Scan and collect all keys for a node (used by state_clear)
118	#[inline]
119	#[instrument(name = "flow::state::clear::scan", level = "trace", skip(self), fields(node_id = id.0))]
120	fn scan_keys_for_clear(&mut self, id: FlowNodeId) -> reifydb_type::Result<Vec<EncodedKey>> {
121		let range = FlowNodeStateKey::node_range(id);
122		let mut iter = self.range(range, 1024);
123		let mut keys = Vec::new();
124		while let Some(result) = iter.next() {
125			let multi = result?;
126			keys.push(multi.key);
127		}
128		Ok(keys)
129	}
130
131	/// Remove a list of keys (used by state_clear)
132	#[inline]
133	#[instrument(name = "flow::state::clear::remove", level = "trace", skip(self, keys), fields(count = keys.len()))]
134	fn remove_keys(&mut self, keys: Vec<EncodedKey>) -> reifydb_type::Result<()> {
135		for key in keys {
136			self.remove(&key)?;
137		}
138		Ok(())
139	}
140
141	/// Load state for a key, creating if not exists
142	#[instrument(name = "flow::state::load_or_create", level = "debug", skip(self, schema), fields(
143		node_id = id.0,
144		key_len = key.as_bytes().len(),
145		created
146	))]
147	pub fn load_or_create_row(
148		&mut self,
149		id: FlowNodeId,
150		key: &EncodedKey,
151		schema: &Schema,
152	) -> reifydb_type::Result<EncodedValues> {
153		match self.state_get(id, key)? {
154			Some(row) => {
155				Span::current().record("created", false);
156				Ok(row)
157			}
158			None => {
159				Span::current().record("created", true);
160				Ok(schema.allocate())
161			}
162		}
163	}
164
165	/// Save state encoded
166	#[instrument(name = "flow::state::save", level = "trace", skip(self, row), fields(
167		node_id = id.0,
168		key_len = key.as_bytes().len()
169	))]
170	pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> reifydb_type::Result<()> {
171		self.state_set(id, key, row)
172	}
173}
174
175#[cfg(test)]
176pub mod tests {
177	use reifydb_catalog::catalog::Catalog;
178	use reifydb_core::{
179		common::CommitVersion,
180		encoded::{
181			encoded::EncodedValues,
182			key::{EncodedKey, EncodedKeyRange},
183			schema::Schema,
184		},
185		interface::catalog::flow::FlowNodeId,
186	};
187	use reifydb_transaction::interceptor::interceptors::Interceptors;
188	use reifydb_type::{util::cowvec::CowVec, value::r#type::Type};
189
190	use super::*;
191	use crate::operator::stateful::test_utils::test::create_test_transaction;
192
193	fn make_key(s: &str) -> EncodedKey {
194		EncodedKey::new(s.as_bytes().to_vec())
195	}
196
197	fn make_value(s: &str) -> EncodedValues {
198		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
199	}
200
201	#[test]
202	fn test_state_get_set() {
203		let parent = create_test_transaction();
204		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
205
206		let node_id = FlowNodeId(1);
207		let key = make_key("state_key");
208		let value = make_value("state_value");
209
210		// Set state
211		txn.state_set(node_id, &key, value.clone()).unwrap();
212
213		// Get state back
214		let result = txn.state_get(node_id, &key).unwrap();
215		assert_eq!(result, Some(value));
216	}
217
218	#[test]
219	fn test_state_get_nonexistent() {
220		let parent = create_test_transaction();
221		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
222
223		let node_id = FlowNodeId(1);
224		let key = make_key("missing");
225
226		let result = txn.state_get(node_id, &key).unwrap();
227		assert_eq!(result, None);
228	}
229
230	#[test]
231	fn test_state_remove() {
232		let parent = create_test_transaction();
233		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
234
235		let node_id = FlowNodeId(1);
236		let key = make_key("state_key");
237		let value = make_value("state_value");
238
239		// Set then remove
240		txn.state_set(node_id, &key, value.clone()).unwrap();
241		assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
242
243		txn.state_remove(node_id, &key).unwrap();
244		assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
245	}
246
247	#[test]
248	fn test_state_isolation_between_nodes() {
249		let parent = create_test_transaction();
250		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
251
252		let node1 = FlowNodeId(1);
253		let node2 = FlowNodeId(2);
254		let key = make_key("same_key");
255
256		txn.state_set(node1, &key, make_value("node1_value")).unwrap();
257		txn.state_set(node2, &key, make_value("node2_value")).unwrap();
258
259		// Each node should have its own value
260		assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
261		assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
262	}
263
264	#[test]
265	fn test_state_scan() {
266		let parent = create_test_transaction();
267		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
268
269		let node_id = FlowNodeId(1);
270
271		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
272		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
273		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
274
275		let iter = txn.state_scan(node_id).unwrap();
276		let items: Vec<_> = iter.items.into_iter().collect();
277
278		assert_eq!(items.len(), 3);
279	}
280
281	#[test]
282	fn test_state_scan_only_own_node() {
283		let parent = create_test_transaction();
284		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
285
286		let node1 = FlowNodeId(1);
287		let node2 = FlowNodeId(2);
288
289		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
290		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
291		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
292
293		// Scan node1 should only return node1's state
294		let items: Vec<_> = txn.state_scan(node1).unwrap().items.into_iter().collect();
295		assert_eq!(items.len(), 2);
296
297		// Scan node2 should only return node2's state
298		let items: Vec<_> = txn.state_scan(node2).unwrap().items.into_iter().collect();
299		assert_eq!(items.len(), 1);
300	}
301
302	#[test]
303	fn test_state_scan_empty() {
304		let parent = create_test_transaction();
305		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
306
307		let node_id = FlowNodeId(1);
308
309		let iter = txn.state_scan(node_id).unwrap();
310		assert!(iter.items.into_iter().next().is_none());
311	}
312
313	#[test]
314	fn test_state_range() {
315		let parent = create_test_transaction();
316		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
317
318		let node_id = FlowNodeId(1);
319
320		txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
321		txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
322		txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
323		txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
324
325		// Range query from "b" to "d" (exclusive)
326		use std::collections::Bound;
327		let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
328		let iter = txn.state_range(node_id, range).unwrap();
329		let items: Vec<_> = iter.items.into_iter().collect();
330
331		// Should only include "b" and "c"
332		assert_eq!(items.len(), 2);
333	}
334
335	#[test]
336	fn test_state_clear() {
337		let parent = create_test_transaction();
338		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
339
340		let node_id = FlowNodeId(1);
341
342		txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
343		txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
344		txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
345
346		// Verify state exists
347		assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 3);
348
349		// Clear all state
350		txn.state_clear(node_id).unwrap();
351
352		// Verify state is empty
353		assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 0);
354	}
355
356	#[test]
357	fn test_state_clear_only_own_node() {
358		let parent = create_test_transaction();
359		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
360
361		let node1 = FlowNodeId(1);
362		let node2 = FlowNodeId(2);
363
364		txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
365		txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
366		txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
367
368		// Clear node1
369		txn.state_clear(node1).unwrap();
370
371		// Node1 should be empty
372		assert_eq!(txn.state_scan(node1).unwrap().items.into_iter().count(), 0);
373
374		// Node2 should still have state
375		assert_eq!(txn.state_scan(node2).unwrap().items.into_iter().count(), 1);
376	}
377
378	#[test]
379	fn test_state_clear_empty_node() {
380		let parent = create_test_transaction();
381		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
382
383		let node_id = FlowNodeId(1);
384
385		// Clear on empty node should not error
386		txn.state_clear(node_id).unwrap();
387	}
388
389	#[test]
390	fn test_load_or_create_existing() {
391		let parent = create_test_transaction();
392		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
393
394		let node_id = FlowNodeId(1);
395		let key = make_key("key1");
396		let value = make_value("existing");
397		let schema = Schema::testing(&[Type::Int8, Type::Float8]);
398
399		// Set existing state
400		txn.state_set(node_id, &key, value.clone()).unwrap();
401
402		// load_or_create should return existing value
403		let result = txn.load_or_create_row(node_id, &key, &schema).unwrap();
404		assert_eq!(result, value);
405	}
406
407	#[test]
408	fn test_load_or_create_new() {
409		let parent = create_test_transaction();
410		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
411
412		let node_id = FlowNodeId(1);
413		let key = make_key("key1");
414		let schema = Schema::testing(&[Type::Int8, Type::Float8]);
415
416		// load_or_create should allocate new row
417		let result = txn.load_or_create_row(node_id, &key, &schema).unwrap();
418
419		// Result should be a newly allocated row (schema.allocate())
420		assert!(!result.as_ref().is_empty());
421	}
422
423	#[test]
424	fn test_save_row() {
425		let parent = create_test_transaction();
426		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
427
428		let node_id = FlowNodeId(1);
429		let key = make_key("key1");
430		let row = make_value("row_data");
431
432		txn.save_row(node_id, &key, row.clone()).unwrap();
433
434		// Verify saved
435		let result = txn.state_get(node_id, &key).unwrap();
436		assert_eq!(result, Some(row));
437	}
438
439	#[test]
440	fn test_state_multiple_nodes() {
441		let parent = create_test_transaction();
442		let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
443
444		let node1 = FlowNodeId(1);
445		let node2 = FlowNodeId(2);
446		let node3 = FlowNodeId(3);
447
448		txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
449		txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
450		txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
451		txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
452
453		// Verify each node has correct state
454		assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
455		assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
456		assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
457		assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
458
459		// Cross-node keys should not exist
460		assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
461		assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
462	}
463}