1use reifydb_core::{
5 EncodedKey, EncodedKeyRange,
6 interface::{BoxedMultiVersionIter, FlowNodeId},
7 key::{EncodableKey, FlowNodeStateKey},
8 value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10use tracing::instrument;
11
12use super::FlowTransaction;
13
14impl FlowTransaction {
15 #[instrument(level = "trace", skip(self), fields(
17 node_id = id.0,
18 key_len = key.as_bytes().len(),
19 found
20 ))]
21 pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
22 self.metrics.increment_state_operations();
23 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
24 let encoded_key = state_key.encode();
25 let result = self.get(&encoded_key)?;
26 tracing::Span::current().record("found", result.is_some());
27 Ok(result)
28 }
29
30 #[instrument(level = "trace", skip(self, value), fields(
32 node_id = id.0,
33 key_len = key.as_bytes().len(),
34 value_len = value.as_ref().len()
35 ))]
36 pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
37 self.metrics.increment_state_operations();
38 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
39 let encoded_key = state_key.encode();
40 self.set(&encoded_key, value)
41 }
42
43 #[instrument(level = "trace", skip(self), fields(
45 node_id = id.0,
46 key_len = key.as_bytes().len()
47 ))]
48 pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
49 self.metrics.increment_state_operations();
50 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
51 let encoded_key = state_key.encode();
52 self.remove(&encoded_key)
53 }
54
55 #[instrument(level = "debug", skip(self), fields(
57 node_id = id.0
58 ))]
59 pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter<'_>> {
60 self.metrics.increment_state_operations();
61 let range = FlowNodeStateKey::node_range(id);
62 self.range(range)
63 }
64
65 #[instrument(level = "debug", skip(self, range), fields(
67 node_id = id.0
68 ))]
69 pub fn state_range(
70 &mut self,
71 id: FlowNodeId,
72 range: EncodedKeyRange,
73 ) -> crate::Result<BoxedMultiVersionIter<'_>> {
74 self.metrics.increment_state_operations();
75 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
76 self.range(prefixed_range)
77 }
78
79 #[instrument(level = "debug", skip(self), fields(
81 node_id = id.0,
82 removed_count
83 ))]
84 pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
85 self.metrics.increment_state_operations();
86 let range = FlowNodeStateKey::node_range(id);
87 let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
88
89 let count = keys_to_remove.len();
90 for key in keys_to_remove {
91 self.remove(&key)?;
92 }
93
94 tracing::Span::current().record("removed_count", count);
95 Ok(())
96 }
97
98 #[instrument(level = "debug", skip(self, layout), fields(
100 node_id = id.0,
101 key_len = key.as_bytes().len(),
102 created
103 ))]
104 pub fn load_or_create_row(
105 &mut self,
106 id: FlowNodeId,
107 key: &EncodedKey,
108 layout: &EncodedValuesLayout,
109 ) -> crate::Result<EncodedValues> {
110 match self.state_get(id, key)? {
111 Some(row) => {
112 tracing::Span::current().record("created", false);
113 Ok(row)
114 }
115 None => {
116 tracing::Span::current().record("created", true);
117 Ok(layout.allocate())
118 }
119 }
120 }
121
122 #[instrument(level = "trace", skip(self, row), fields(
124 node_id = id.0,
125 key_len = key.as_bytes().len()
126 ))]
127 pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
128 self.state_set(id, key, row)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use reifydb_core::{
135 CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
136 value::encoded::EncodedValues,
137 };
138 use reifydb_type::Type;
139
140 use super::*;
141 use crate::operator::stateful::test_utils::test::create_test_transaction;
142
143 fn make_key(s: &str) -> EncodedKey {
144 EncodedKey::new(s.as_bytes().to_vec())
145 }
146
147 fn make_value(s: &str) -> EncodedValues {
148 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
149 }
150
151 #[test]
152 fn test_state_get_set() {
153 let parent = create_test_transaction();
154 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
155
156 let node_id = FlowNodeId(1);
157 let key = make_key("state_key");
158 let value = make_value("state_value");
159
160 txn.state_set(node_id, &key, value.clone()).unwrap();
162
163 let result = txn.state_get(node_id, &key).unwrap();
165 assert_eq!(result, Some(value));
166 }
167
168 #[test]
169 fn test_state_get_nonexistent() {
170 let parent = create_test_transaction();
171 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
172
173 let node_id = FlowNodeId(1);
174 let key = make_key("missing");
175
176 let result = txn.state_get(node_id, &key).unwrap();
177 assert_eq!(result, None);
178 }
179
180 #[test]
181 fn test_state_remove() {
182 let parent = create_test_transaction();
183 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
184
185 let node_id = FlowNodeId(1);
186 let key = make_key("state_key");
187 let value = make_value("state_value");
188
189 txn.state_set(node_id, &key, value.clone()).unwrap();
191 assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
192
193 txn.state_remove(node_id, &key).unwrap();
194 assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
195 }
196
197 #[test]
198 fn test_state_isolation_between_nodes() {
199 let parent = create_test_transaction();
200 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
201
202 let node1 = FlowNodeId(1);
203 let node2 = FlowNodeId(2);
204 let key = make_key("same_key");
205
206 txn.state_set(node1, &key, make_value("node1_value")).unwrap();
207 txn.state_set(node2, &key, make_value("node2_value")).unwrap();
208
209 assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
211 assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
212 }
213
214 #[test]
215 fn test_state_scan() {
216 let parent = create_test_transaction();
217 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
218
219 let node_id = FlowNodeId(1);
220
221 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
222 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
223 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
224
225 let mut iter = txn.state_scan(node_id).unwrap();
226 let items: Vec<_> = iter.by_ref().collect();
227
228 assert_eq!(items.len(), 3);
229 }
230
231 #[test]
232 fn test_state_scan_only_own_node() {
233 let parent = create_test_transaction();
234 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
235
236 let node1 = FlowNodeId(1);
237 let node2 = FlowNodeId(2);
238
239 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
240 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
241 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
242
243 let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
245 assert_eq!(items.len(), 2);
246
247 let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
249 assert_eq!(items.len(), 1);
250 }
251
252 #[test]
253 fn test_state_scan_empty() {
254 let parent = create_test_transaction();
255 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
256
257 let node_id = FlowNodeId(1);
258
259 let mut iter = txn.state_scan(node_id).unwrap();
260 assert!(iter.next().is_none());
261 }
262
263 #[test]
264 fn test_state_range() {
265 let parent = create_test_transaction();
266 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
267
268 let node_id = FlowNodeId(1);
269
270 txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
271 txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
272 txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
273 txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
274
275 use std::collections::Bound;
277 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
278 let mut iter = txn.state_range(node_id, range).unwrap();
279 let items: Vec<_> = iter.by_ref().collect();
280
281 assert_eq!(items.len(), 2);
283 }
284
285 #[test]
286 fn test_state_clear() {
287 let parent = create_test_transaction();
288 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
289
290 let node_id = FlowNodeId(1);
291
292 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
293 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
294 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
295
296 assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
298
299 txn.state_clear(node_id).unwrap();
301
302 assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
304 }
305
306 #[test]
307 fn test_state_clear_only_own_node() {
308 let parent = create_test_transaction();
309 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
310
311 let node1 = FlowNodeId(1);
312 let node2 = FlowNodeId(2);
313
314 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
315 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
316 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
317
318 txn.state_clear(node1).unwrap();
320
321 assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
323
324 assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
326 }
327
328 #[test]
329 fn test_state_clear_empty_node() {
330 let parent = create_test_transaction();
331 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
332
333 let node_id = FlowNodeId(1);
334
335 txn.state_clear(node_id).unwrap();
337 }
338
339 #[test]
340 fn test_load_or_create_existing() {
341 let parent = create_test_transaction();
342 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
343
344 let node_id = FlowNodeId(1);
345 let key = make_key("key1");
346 let value = make_value("existing");
347 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
348
349 txn.state_set(node_id, &key, value.clone()).unwrap();
351
352 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
354 assert_eq!(result, value);
355 }
356
357 #[test]
358 fn test_load_or_create_new() {
359 let parent = create_test_transaction();
360 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
361
362 let node_id = FlowNodeId(1);
363 let key = make_key("key1");
364 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
365
366 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
368
369 assert!(!result.as_ref().is_empty());
371 }
372
373 #[test]
374 fn test_save_row() {
375 let parent = create_test_transaction();
376 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
377
378 let node_id = FlowNodeId(1);
379 let key = make_key("key1");
380 let row = make_value("row_data");
381
382 txn.save_row(node_id, &key, row.clone()).unwrap();
383
384 let result = txn.state_get(node_id, &key).unwrap();
386 assert_eq!(result, Some(row));
387 }
388
389 #[test]
390 fn test_state_operations_increment_metrics() {
391 let parent = create_test_transaction();
392 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
393
394 let node_id = FlowNodeId(1);
395 let key = make_key("key1");
396
397 assert_eq!(txn.metrics().state_operations, 0);
398
399 txn.state_set(node_id, &key, make_value("value")).unwrap();
400 assert_eq!(txn.metrics().state_operations, 1);
401
402 txn.state_get(node_id, &key).unwrap();
403 assert_eq!(txn.metrics().state_operations, 2);
404
405 txn.state_remove(node_id, &key).unwrap();
406 assert_eq!(txn.metrics().state_operations, 3);
407
408 let _ = txn.state_scan(node_id).unwrap();
409 assert_eq!(txn.metrics().state_operations, 4);
410
411 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
412 let _ = txn.state_range(node_id, range).unwrap();
413 assert_eq!(txn.metrics().state_operations, 5);
414
415 txn.state_clear(node_id).unwrap();
416 assert!(txn.metrics().state_operations >= 6);
419 }
420
421 #[test]
422 fn test_state_multiple_nodes() {
423 let parent = create_test_transaction();
424 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
425
426 let node1 = FlowNodeId(1);
427 let node2 = FlowNodeId(2);
428 let node3 = FlowNodeId(3);
429
430 txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
431 txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
432 txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
433 txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
434
435 assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
437 assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
438 assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
439 assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
440
441 assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
443 assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
444 }
445
446 #[test]
447 fn test_load_or_create_increments_state_operations() {
448 let parent = create_test_transaction();
449 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
450
451 let node_id = FlowNodeId(1);
452 let key = make_key("key1");
453 let layout = EncodedValuesLayout::new(&[Type::Int8]);
454
455 let initial_count = txn.metrics().state_operations;
456
457 txn.load_or_create_row(node_id, &key, &layout).unwrap();
458
459 assert!(txn.metrics().state_operations > initial_count);
461 }
462
463 #[test]
464 fn test_save_row_increments_state_operations() {
465 let parent = create_test_transaction();
466 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
467
468 let node_id = FlowNodeId(1);
469 let key = make_key("key1");
470
471 let initial_count = txn.metrics().state_operations;
472
473 txn.save_row(node_id, &key, make_value("data")).unwrap();
474
475 assert!(txn.metrics().state_operations > initial_count);
477 }
478}