1use reifydb_core::{
5 encoded::{
6 key::{EncodedKey, EncodedKeyRange},
7 row::EncodedRow,
8 shape::RowShape,
9 },
10 interface::{catalog::flow::FlowNodeId, store::MultiVersionBatch},
11 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use reifydb_type::Result;
14use tracing::{Span, field, instrument};
15
16use super::FlowTransaction;
17
18impl FlowTransaction {
19 #[instrument(name = "flow::state::get", level = "trace", skip(self), fields(
20 node_id = id.0,
21 key_len = key.as_bytes().len(),
22 found = field::Empty
23 ))]
24 pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<Option<EncodedRow>> {
25 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26 let encoded_key = state_key.encode();
27 let result = self.get(&encoded_key)?;
28 Span::current().record("found", result.is_some());
29 Ok(result)
30 }
31
32 #[instrument(name = "flow::state::set", level = "trace", skip(self, value), fields(
33 node_id = id.0,
34 key_len = key.as_bytes().len(),
35 value_len = value.len()
36 ))]
37 pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedRow) -> Result<()> {
38 let state_key = FlowNodeStateKey::new(id, key.to_vec());
39 let encoded_key = state_key.encode();
40 self.set(&encoded_key, value)
41 }
42
43 #[instrument(name = "flow::state::remove", level = "trace", skip(self), fields(
44 node_id = id.0,
45 key_len = key.as_bytes().len()
46 ))]
47 pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()> {
48 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
49 let encoded_key = state_key.encode();
50 self.remove(&encoded_key)
51 }
52
53 #[instrument(name = "flow::state::scan", level = "debug", skip(self), fields(
54 node_id = id.0,
55 result_count = field::Empty
56 ))]
57 pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch> {
58 let range = FlowNodeStateKey::node_range(id);
59 let iter = self.range(range, 1024);
60 let mut items = Vec::new();
61 for result in iter {
62 items.push(result?);
63 }
64 Span::current().record("result_count", items.len());
65 Ok(MultiVersionBatch {
66 items,
67 has_more: false,
68 })
69 }
70
71 #[instrument(name = "flow::state::range", level = "debug", skip(self, range), fields(
72 node_id = id.0
73 ))]
74 pub fn state_range(&mut self, id: FlowNodeId, range: EncodedKeyRange) -> Result<MultiVersionBatch> {
75 let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(id, vec![]));
76 let iter = self.range(prefixed_range, 1024);
77 let mut items = Vec::new();
78 for result in iter {
79 items.push(result?);
80 }
81 Ok(MultiVersionBatch {
82 items,
83 has_more: false,
84 })
85 }
86
87 #[instrument(name = "flow::state::clear", level = "trace", skip(self), fields(
88 node_id = id.0,
89 keys_removed = field::Empty
90 ))]
91 pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()> {
92 let keys_to_remove = self.scan_keys_for_clear(id)?;
94
95 let count = keys_to_remove.len();
97 self.remove_keys(keys_to_remove)?;
98
99 Span::current().record("keys_removed", count);
100 Ok(())
101 }
102
103 #[inline]
104 #[instrument(name = "flow::state::clear::scan", level = "trace", skip(self), fields(node_id = id.0))]
105 fn scan_keys_for_clear(&mut self, id: FlowNodeId) -> Result<Vec<EncodedKey>> {
106 let range = FlowNodeStateKey::node_range(id);
107 let iter = self.range(range, 1024);
108 let mut keys = Vec::new();
109 for result in iter {
110 let multi = result?;
111 keys.push(multi.key);
112 }
113 Ok(keys)
114 }
115
116 #[inline]
117 #[instrument(name = "flow::state::clear::remove", level = "trace", skip(self, keys), fields(count = keys.len()))]
118 fn remove_keys(&mut self, keys: Vec<EncodedKey>) -> Result<()> {
119 for key in keys {
120 self.remove(&key)?;
121 }
122 Ok(())
123 }
124
125 #[instrument(name = "flow::state::load_or_create", level = "debug", skip(self, shape), fields(
126 node_id = id.0,
127 key_len = key.as_bytes().len(),
128 created
129 ))]
130 pub fn load_or_create_row(&mut self, id: FlowNodeId, key: &EncodedKey, shape: &RowShape) -> Result<EncodedRow> {
131 match self.state_get(id, key)? {
132 Some(row) => {
133 Span::current().record("created", false);
134 Ok(row)
135 }
136 None => {
137 Span::current().record("created", true);
138 Ok(shape.allocate())
139 }
140 }
141 }
142
143 #[instrument(name = "flow::state::save", level = "trace", skip(self, row), fields(
144 node_id = id.0,
145 key_len = key.as_bytes().len()
146 ))]
147 pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedRow) -> Result<()> {
148 self.state_set(id, key, row)
149 }
150}
151
152#[cfg(test)]
153pub mod tests {
154 use std::collections::Bound;
155
156 use reifydb_catalog::catalog::Catalog;
157 use reifydb_core::{
158 common::CommitVersion,
159 encoded::{
160 key::{EncodedKey, EncodedKeyRange},
161 row::EncodedRow,
162 shape::RowShape,
163 },
164 interface::catalog::flow::FlowNodeId,
165 };
166 use reifydb_runtime::context::clock::{Clock, MockClock};
167 use reifydb_transaction::interceptor::interceptors::Interceptors;
168 use reifydb_type::{util::cowvec::CowVec, value::r#type::Type};
169
170 use super::*;
171 use crate::operator::stateful::test_utils::test::create_test_transaction;
172
173 fn make_key(s: &str) -> EncodedKey {
174 EncodedKey::new(s.as_bytes().to_vec())
175 }
176
177 fn make_value(s: &str) -> EncodedRow {
178 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
179 }
180
181 #[test]
182 fn test_state_get_set() {
183 let parent = create_test_transaction();
184 let mut txn = FlowTransaction::deferred(
185 &parent,
186 CommitVersion(1),
187 Catalog::testing(),
188 Interceptors::new(),
189 Clock::Mock(MockClock::from_millis(1000)),
190 );
191
192 let node_id = FlowNodeId(1);
193 let key = make_key("state_key");
194 let value = make_value("state_value");
195
196 txn.state_set(node_id, &key, value.clone()).unwrap();
198
199 let result = txn.state_get(node_id, &key).unwrap();
201 assert_eq!(result, Some(value));
202 }
203
204 #[test]
205 fn test_state_get_nonexistent() {
206 let parent = create_test_transaction();
207 let mut txn = FlowTransaction::deferred(
208 &parent,
209 CommitVersion(1),
210 Catalog::testing(),
211 Interceptors::new(),
212 Clock::Mock(MockClock::from_millis(1000)),
213 );
214
215 let node_id = FlowNodeId(1);
216 let key = make_key("missing");
217
218 let result = txn.state_get(node_id, &key).unwrap();
219 assert_eq!(result, None);
220 }
221
222 #[test]
223 fn test_state_remove() {
224 let parent = create_test_transaction();
225 let mut txn = FlowTransaction::deferred(
226 &parent,
227 CommitVersion(1),
228 Catalog::testing(),
229 Interceptors::new(),
230 Clock::Mock(MockClock::from_millis(1000)),
231 );
232
233 let node_id = FlowNodeId(1);
234 let key = make_key("state_key");
235 let value = make_value("state_value");
236
237 txn.state_set(node_id, &key, value.clone()).unwrap();
239 assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
240
241 txn.state_remove(node_id, &key).unwrap();
242 assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
243 }
244
245 #[test]
246 fn test_state_isolation_between_nodes() {
247 let parent = create_test_transaction();
248 let mut txn = FlowTransaction::deferred(
249 &parent,
250 CommitVersion(1),
251 Catalog::testing(),
252 Interceptors::new(),
253 Clock::Mock(MockClock::from_millis(1000)),
254 );
255
256 let node1 = FlowNodeId(1);
257 let node2 = FlowNodeId(2);
258 let key = make_key("same_key");
259
260 txn.state_set(node1, &key, make_value("node1_value")).unwrap();
261 txn.state_set(node2, &key, make_value("node2_value")).unwrap();
262
263 assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
265 assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
266 }
267
268 #[test]
269 fn test_state_scan() {
270 let parent = create_test_transaction();
271 let mut txn = FlowTransaction::deferred(
272 &parent,
273 CommitVersion(1),
274 Catalog::testing(),
275 Interceptors::new(),
276 Clock::Mock(MockClock::from_millis(1000)),
277 );
278
279 let node_id = FlowNodeId(1);
280
281 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
282 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
283 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
284
285 let iter = txn.state_scan(node_id).unwrap();
286 let items: Vec<_> = iter.items.into_iter().collect();
287
288 assert_eq!(items.len(), 3);
289 }
290
291 #[test]
292 fn test_state_scan_only_own_node() {
293 let parent = create_test_transaction();
294 let mut txn = FlowTransaction::deferred(
295 &parent,
296 CommitVersion(1),
297 Catalog::testing(),
298 Interceptors::new(),
299 Clock::Mock(MockClock::from_millis(1000)),
300 );
301
302 let node1 = FlowNodeId(1);
303 let node2 = FlowNodeId(2);
304
305 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
306 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
307 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
308
309 let items: Vec<_> = txn.state_scan(node1).unwrap().items.into_iter().collect();
311 assert_eq!(items.len(), 2);
312
313 let items: Vec<_> = txn.state_scan(node2).unwrap().items.into_iter().collect();
315 assert_eq!(items.len(), 1);
316 }
317
318 #[test]
319 fn test_state_scan_empty() {
320 let parent = create_test_transaction();
321 let mut txn = FlowTransaction::deferred(
322 &parent,
323 CommitVersion(1),
324 Catalog::testing(),
325 Interceptors::new(),
326 Clock::Mock(MockClock::from_millis(1000)),
327 );
328
329 let node_id = FlowNodeId(1);
330
331 let iter = txn.state_scan(node_id).unwrap();
332 assert!(iter.items.into_iter().next().is_none());
333 }
334
335 #[test]
336 fn test_state_range() {
337 let parent = create_test_transaction();
338 let mut txn = FlowTransaction::deferred(
339 &parent,
340 CommitVersion(1),
341 Catalog::testing(),
342 Interceptors::new(),
343 Clock::Mock(MockClock::from_millis(1000)),
344 );
345
346 let node_id = FlowNodeId(1);
347
348 txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
349 txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
350 txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
351 txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
352
353 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
355 let iter = txn.state_range(node_id, range).unwrap();
356 let items: Vec<_> = iter.items.into_iter().collect();
357
358 assert_eq!(items.len(), 2);
360 }
361
362 #[test]
363 fn test_state_clear() {
364 let parent = create_test_transaction();
365 let mut txn = FlowTransaction::deferred(
366 &parent,
367 CommitVersion(1),
368 Catalog::testing(),
369 Interceptors::new(),
370 Clock::Mock(MockClock::from_millis(1000)),
371 );
372
373 let node_id = FlowNodeId(1);
374
375 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
376 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
377 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
378
379 assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 3);
381
382 txn.state_clear(node_id).unwrap();
384
385 assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 0);
387 }
388
389 #[test]
390 fn test_state_clear_only_own_node() {
391 let parent = create_test_transaction();
392 let mut txn = FlowTransaction::deferred(
393 &parent,
394 CommitVersion(1),
395 Catalog::testing(),
396 Interceptors::new(),
397 Clock::Mock(MockClock::from_millis(1000)),
398 );
399
400 let node1 = FlowNodeId(1);
401 let node2 = FlowNodeId(2);
402
403 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
404 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
405 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
406
407 txn.state_clear(node1).unwrap();
409
410 assert_eq!(txn.state_scan(node1).unwrap().items.into_iter().count(), 0);
412
413 assert_eq!(txn.state_scan(node2).unwrap().items.into_iter().count(), 1);
415 }
416
417 #[test]
418 fn test_state_clear_empty_node() {
419 let parent = create_test_transaction();
420 let mut txn = FlowTransaction::deferred(
421 &parent,
422 CommitVersion(1),
423 Catalog::testing(),
424 Interceptors::new(),
425 Clock::Mock(MockClock::from_millis(1000)),
426 );
427
428 let node_id = FlowNodeId(1);
429
430 txn.state_clear(node_id).unwrap();
432 }
433
434 #[test]
435 fn test_load_or_create_existing() {
436 let parent = create_test_transaction();
437 let mut txn = FlowTransaction::deferred(
438 &parent,
439 CommitVersion(1),
440 Catalog::testing(),
441 Interceptors::new(),
442 Clock::Mock(MockClock::from_millis(1000)),
443 );
444
445 let node_id = FlowNodeId(1);
446 let key = make_key("key1");
447 let value = make_value("existing");
448 let shape = RowShape::testing(&[Type::Int8, Type::Float8]);
449
450 txn.state_set(node_id, &key, value.clone()).unwrap();
452
453 let result = txn.load_or_create_row(node_id, &key, &shape).unwrap();
455 assert_eq!(result, value);
456 }
457
458 #[test]
459 fn test_load_or_create_new() {
460 let parent = create_test_transaction();
461 let mut txn = FlowTransaction::deferred(
462 &parent,
463 CommitVersion(1),
464 Catalog::testing(),
465 Interceptors::new(),
466 Clock::Mock(MockClock::from_millis(1000)),
467 );
468
469 let node_id = FlowNodeId(1);
470 let key = make_key("key1");
471 let shape = RowShape::testing(&[Type::Int8, Type::Float8]);
472
473 let result = txn.load_or_create_row(node_id, &key, &shape).unwrap();
475
476 assert!(!result.is_empty());
478 }
479
480 #[test]
481 fn test_save_row() {
482 let parent = create_test_transaction();
483 let mut txn = FlowTransaction::deferred(
484 &parent,
485 CommitVersion(1),
486 Catalog::testing(),
487 Interceptors::new(),
488 Clock::Mock(MockClock::from_millis(1000)),
489 );
490
491 let node_id = FlowNodeId(1);
492 let key = make_key("key1");
493 let row = make_value("row_data");
494
495 txn.save_row(node_id, &key, row.clone()).unwrap();
496
497 let result = txn.state_get(node_id, &key).unwrap();
499 assert_eq!(result, Some(row));
500 }
501
502 #[test]
503 fn test_state_multiple_nodes() {
504 let parent = create_test_transaction();
505 let mut txn = FlowTransaction::deferred(
506 &parent,
507 CommitVersion(1),
508 Catalog::testing(),
509 Interceptors::new(),
510 Clock::Mock(MockClock::from_millis(1000)),
511 );
512
513 let node1 = FlowNodeId(1);
514 let node2 = FlowNodeId(2);
515 let node3 = FlowNodeId(3);
516
517 txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
518 txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
519 txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
520 txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
521
522 assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
524 assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
525 assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
526 assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
527
528 assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
530 assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
531 }
532}