1use reifydb_core::{
5 EncodedKey, EncodedKeyRange,
6 interface::{BoxedMultiVersionIter, FlowNodeId},
7 key::{EncodableKey, FlowNodeStateKey},
8 value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10
11use super::FlowTransaction;
12
13impl FlowTransaction {
14 pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
16 self.metrics.increment_state_operations();
17 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
18 let encoded_key = state_key.encode();
19 self.get(&encoded_key)
20 }
21
22 pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
24 self.metrics.increment_state_operations();
25 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26 let encoded_key = state_key.encode();
27 self.set(&encoded_key, value)
28 }
29
30 pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
32 self.metrics.increment_state_operations();
33 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
34 let encoded_key = state_key.encode();
35 self.remove(&encoded_key)
36 }
37
38 pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter<'_>> {
40 self.metrics.increment_state_operations();
41 let range = FlowNodeStateKey::node_range(id);
42 self.range(range)
43 }
44
45 pub fn state_range(
47 &mut self,
48 id: FlowNodeId,
49 range: EncodedKeyRange,
50 ) -> crate::Result<BoxedMultiVersionIter<'_>> {
51 self.metrics.increment_state_operations();
52 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
53 self.range(prefixed_range)
54 }
55
56 pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
58 self.metrics.increment_state_operations();
59 let range = FlowNodeStateKey::node_range(id);
60 let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
61
62 for key in keys_to_remove {
63 self.remove(&key)?;
64 }
65
66 Ok(())
67 }
68
69 pub fn load_or_create_row(
71 &mut self,
72 id: FlowNodeId,
73 key: &EncodedKey,
74 layout: &EncodedValuesLayout,
75 ) -> crate::Result<EncodedValues> {
76 match self.state_get(id, key)? {
77 Some(row) => Ok(row),
78 None => Ok(layout.allocate()),
79 }
80 }
81
82 pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
84 self.state_set(id, key, row)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use reifydb_core::{
91 CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
92 value::encoded::EncodedValues,
93 };
94 use reifydb_type::Type;
95
96 use super::*;
97 use crate::operator::stateful::test_utils::test::create_test_transaction;
98
99 fn make_key(s: &str) -> EncodedKey {
100 EncodedKey::new(s.as_bytes().to_vec())
101 }
102
103 fn make_value(s: &str) -> EncodedValues {
104 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
105 }
106
107 #[test]
108 fn test_state_get_set() {
109 let parent = create_test_transaction();
110 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
111
112 let node_id = FlowNodeId(1);
113 let key = make_key("state_key");
114 let value = make_value("state_value");
115
116 txn.state_set(node_id, &key, value.clone()).unwrap();
118
119 let result = txn.state_get(node_id, &key).unwrap();
121 assert_eq!(result, Some(value));
122 }
123
124 #[test]
125 fn test_state_get_nonexistent() {
126 let parent = create_test_transaction();
127 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
128
129 let node_id = FlowNodeId(1);
130 let key = make_key("missing");
131
132 let result = txn.state_get(node_id, &key).unwrap();
133 assert_eq!(result, None);
134 }
135
136 #[test]
137 fn test_state_remove() {
138 let parent = create_test_transaction();
139 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
140
141 let node_id = FlowNodeId(1);
142 let key = make_key("state_key");
143 let value = make_value("state_value");
144
145 txn.state_set(node_id, &key, value.clone()).unwrap();
147 assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
148
149 txn.state_remove(node_id, &key).unwrap();
150 assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
151 }
152
153 #[test]
154 fn test_state_isolation_between_nodes() {
155 let parent = create_test_transaction();
156 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
157
158 let node1 = FlowNodeId(1);
159 let node2 = FlowNodeId(2);
160 let key = make_key("same_key");
161
162 txn.state_set(node1, &key, make_value("node1_value")).unwrap();
163 txn.state_set(node2, &key, make_value("node2_value")).unwrap();
164
165 assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
167 assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
168 }
169
170 #[test]
171 fn test_state_scan() {
172 let parent = create_test_transaction();
173 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
174
175 let node_id = FlowNodeId(1);
176
177 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
178 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
179 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
180
181 let mut iter = txn.state_scan(node_id).unwrap();
182 let items: Vec<_> = iter.by_ref().collect();
183
184 assert_eq!(items.len(), 3);
185 }
186
187 #[test]
188 fn test_state_scan_only_own_node() {
189 let parent = create_test_transaction();
190 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
191
192 let node1 = FlowNodeId(1);
193 let node2 = FlowNodeId(2);
194
195 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
196 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
197 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
198
199 let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
201 assert_eq!(items.len(), 2);
202
203 let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
205 assert_eq!(items.len(), 1);
206 }
207
208 #[test]
209 fn test_state_scan_empty() {
210 let parent = create_test_transaction();
211 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
212
213 let node_id = FlowNodeId(1);
214
215 let mut iter = txn.state_scan(node_id).unwrap();
216 assert!(iter.next().is_none());
217 }
218
219 #[test]
220 fn test_state_range() {
221 let parent = create_test_transaction();
222 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
223
224 let node_id = FlowNodeId(1);
225
226 txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
227 txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
228 txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
229 txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
230
231 use std::collections::Bound;
233 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
234 let mut iter = txn.state_range(node_id, range).unwrap();
235 let items: Vec<_> = iter.by_ref().collect();
236
237 assert_eq!(items.len(), 2);
239 }
240
241 #[test]
242 fn test_state_clear() {
243 let parent = create_test_transaction();
244 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
245
246 let node_id = FlowNodeId(1);
247
248 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
249 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
250 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
251
252 assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
254
255 txn.state_clear(node_id).unwrap();
257
258 assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
260 }
261
262 #[test]
263 fn test_state_clear_only_own_node() {
264 let parent = create_test_transaction();
265 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
266
267 let node1 = FlowNodeId(1);
268 let node2 = FlowNodeId(2);
269
270 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
271 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
272 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
273
274 txn.state_clear(node1).unwrap();
276
277 assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
279
280 assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
282 }
283
284 #[test]
285 fn test_state_clear_empty_node() {
286 let parent = create_test_transaction();
287 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
288
289 let node_id = FlowNodeId(1);
290
291 txn.state_clear(node_id).unwrap();
293 }
294
295 #[test]
296 fn test_load_or_create_existing() {
297 let parent = create_test_transaction();
298 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
299
300 let node_id = FlowNodeId(1);
301 let key = make_key("key1");
302 let value = make_value("existing");
303 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
304
305 txn.state_set(node_id, &key, value.clone()).unwrap();
307
308 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
310 assert_eq!(result, value);
311 }
312
313 #[test]
314 fn test_load_or_create_new() {
315 let parent = create_test_transaction();
316 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
317
318 let node_id = FlowNodeId(1);
319 let key = make_key("key1");
320 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
321
322 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
324
325 assert!(!result.as_ref().is_empty());
327 }
328
329 #[test]
330 fn test_save_row() {
331 let parent = create_test_transaction();
332 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
333
334 let node_id = FlowNodeId(1);
335 let key = make_key("key1");
336 let row = make_value("row_data");
337
338 txn.save_row(node_id, &key, row.clone()).unwrap();
339
340 let result = txn.state_get(node_id, &key).unwrap();
342 assert_eq!(result, Some(row));
343 }
344
345 #[test]
346 fn test_state_operations_increment_metrics() {
347 let parent = create_test_transaction();
348 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
349
350 let node_id = FlowNodeId(1);
351 let key = make_key("key1");
352
353 assert_eq!(txn.metrics().state_operations, 0);
354
355 txn.state_set(node_id, &key, make_value("value")).unwrap();
356 assert_eq!(txn.metrics().state_operations, 1);
357
358 txn.state_get(node_id, &key).unwrap();
359 assert_eq!(txn.metrics().state_operations, 2);
360
361 txn.state_remove(node_id, &key).unwrap();
362 assert_eq!(txn.metrics().state_operations, 3);
363
364 let _ = txn.state_scan(node_id).unwrap();
365 assert_eq!(txn.metrics().state_operations, 4);
366
367 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
368 let _ = txn.state_range(node_id, range).unwrap();
369 assert_eq!(txn.metrics().state_operations, 5);
370
371 txn.state_clear(node_id).unwrap();
372 assert!(txn.metrics().state_operations >= 6);
375 }
376
377 #[test]
378 fn test_state_multiple_nodes() {
379 let parent = create_test_transaction();
380 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
381
382 let node1 = FlowNodeId(1);
383 let node2 = FlowNodeId(2);
384 let node3 = FlowNodeId(3);
385
386 txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
387 txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
388 txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
389 txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
390
391 assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
393 assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
394 assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
395 assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
396
397 assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
399 assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
400 }
401
402 #[test]
403 fn test_load_or_create_increments_state_operations() {
404 let parent = create_test_transaction();
405 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
406
407 let node_id = FlowNodeId(1);
408 let key = make_key("key1");
409 let layout = EncodedValuesLayout::new(&[Type::Int8]);
410
411 let initial_count = txn.metrics().state_operations;
412
413 txn.load_or_create_row(node_id, &key, &layout).unwrap();
414
415 assert!(txn.metrics().state_operations > initial_count);
417 }
418
419 #[test]
420 fn test_save_row_increments_state_operations() {
421 let parent = create_test_transaction();
422 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
423
424 let node_id = FlowNodeId(1);
425 let key = make_key("key1");
426
427 let initial_count = txn.metrics().state_operations;
428
429 txn.save_row(node_id, &key, make_value("data")).unwrap();
430
431 assert!(txn.metrics().state_operations > initial_count);
433 }
434}