1use reifydb_core::{
5 EncodedKey, EncodedKeyRange,
6 interface::{BoxedMultiVersionIter, FlowNodeId},
7 key::{EncodableKey, FlowNodeStateKey},
8 value::encoded::{EncodedValues, EncodedValuesLayout},
9};
10
11use super::FlowTransaction;
12
13impl FlowTransaction {
14 pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<Option<EncodedValues>> {
16 self.metrics.increment_state_operations();
17 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
18 let encoded_key = state_key.encode();
19 self.get(&encoded_key)
20 }
21
22 pub fn state_set(&mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues) -> crate::Result<()> {
24 self.metrics.increment_state_operations();
25 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26 let encoded_key = state_key.encode();
27 self.set(&encoded_key, value)
28 }
29
30 pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> crate::Result<()> {
32 self.metrics.increment_state_operations();
33 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
34 let encoded_key = state_key.encode();
35 self.remove(&encoded_key)
36 }
37
38 pub fn state_scan(&mut self, id: FlowNodeId) -> crate::Result<BoxedMultiVersionIter> {
40 self.metrics.increment_state_operations();
41 let range = FlowNodeStateKey::node_range(id);
42 self.range(range)
43 }
44
45 pub fn state_range(&mut self, id: FlowNodeId, range: EncodedKeyRange) -> crate::Result<BoxedMultiVersionIter> {
47 self.metrics.increment_state_operations();
48 let prefixed_range = range.with_prefix(FlowNodeStateKey::new(id, vec![]).encode());
49 self.range(prefixed_range)
50 }
51
52 pub fn state_clear(&mut self, id: FlowNodeId) -> crate::Result<()> {
54 self.metrics.increment_state_operations();
55 let range = FlowNodeStateKey::node_range(id);
56 let keys_to_remove: Vec<_> = self.range(range)?.map(|multi| multi.key).collect();
57
58 for key in keys_to_remove {
59 self.remove(&key)?;
60 }
61
62 Ok(())
63 }
64
65 pub fn load_or_create_row(
67 &mut self,
68 id: FlowNodeId,
69 key: &EncodedKey,
70 layout: &EncodedValuesLayout,
71 ) -> crate::Result<EncodedValues> {
72 match self.state_get(id, key)? {
73 Some(row) => Ok(row),
74 None => Ok(layout.allocate()),
75 }
76 }
77
78 pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> crate::Result<()> {
80 self.state_set(id, key, row)
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use reifydb_core::{
87 CommitVersion, CowVec, EncodedKey, EncodedKeyRange, interface::FlowNodeId,
88 value::encoded::EncodedValues,
89 };
90 use reifydb_type::Type;
91
92 use super::*;
93 use crate::operator::stateful::test_utils::test::create_test_transaction;
94
95 fn make_key(s: &str) -> EncodedKey {
96 EncodedKey::new(s.as_bytes().to_vec())
97 }
98
99 fn make_value(s: &str) -> EncodedValues {
100 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
101 }
102
103 #[test]
104 fn test_state_get_set() {
105 let parent = create_test_transaction();
106 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
107
108 let node_id = FlowNodeId(1);
109 let key = make_key("state_key");
110 let value = make_value("state_value");
111
112 txn.state_set(node_id, &key, value.clone()).unwrap();
114
115 let result = txn.state_get(node_id, &key).unwrap();
117 assert_eq!(result, Some(value));
118 }
119
120 #[test]
121 fn test_state_get_nonexistent() {
122 let parent = create_test_transaction();
123 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
124
125 let node_id = FlowNodeId(1);
126 let key = make_key("missing");
127
128 let result = txn.state_get(node_id, &key).unwrap();
129 assert_eq!(result, None);
130 }
131
132 #[test]
133 fn test_state_remove() {
134 let parent = create_test_transaction();
135 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
136
137 let node_id = FlowNodeId(1);
138 let key = make_key("state_key");
139 let value = make_value("state_value");
140
141 txn.state_set(node_id, &key, value.clone()).unwrap();
143 assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
144
145 txn.state_remove(node_id, &key).unwrap();
146 assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
147 }
148
149 #[test]
150 fn test_state_isolation_between_nodes() {
151 let parent = create_test_transaction();
152 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
153
154 let node1 = FlowNodeId(1);
155 let node2 = FlowNodeId(2);
156 let key = make_key("same_key");
157
158 txn.state_set(node1, &key, make_value("node1_value")).unwrap();
159 txn.state_set(node2, &key, make_value("node2_value")).unwrap();
160
161 assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
163 assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
164 }
165
166 #[test]
167 fn test_state_scan() {
168 let parent = create_test_transaction();
169 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
170
171 let node_id = FlowNodeId(1);
172
173 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
174 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
175 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
176
177 let mut iter = txn.state_scan(node_id).unwrap();
178 let items: Vec<_> = iter.by_ref().collect();
179
180 assert_eq!(items.len(), 3);
181 }
182
183 #[test]
184 fn test_state_scan_only_own_node() {
185 let parent = create_test_transaction();
186 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
187
188 let node1 = FlowNodeId(1);
189 let node2 = FlowNodeId(2);
190
191 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
192 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
193 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
194
195 let items: Vec<_> = txn.state_scan(node1).unwrap().collect();
197 assert_eq!(items.len(), 2);
198
199 let items: Vec<_> = txn.state_scan(node2).unwrap().collect();
201 assert_eq!(items.len(), 1);
202 }
203
204 #[test]
205 fn test_state_scan_empty() {
206 let parent = create_test_transaction();
207 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
208
209 let node_id = FlowNodeId(1);
210
211 let mut iter = txn.state_scan(node_id).unwrap();
212 assert!(iter.next().is_none());
213 }
214
215 #[test]
216 fn test_state_range() {
217 let parent = create_test_transaction();
218 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
219
220 let node_id = FlowNodeId(1);
221
222 txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
223 txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
224 txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
225 txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
226
227 use std::collections::Bound;
229 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
230 let mut iter = txn.state_range(node_id, range).unwrap();
231 let items: Vec<_> = iter.by_ref().collect();
232
233 assert_eq!(items.len(), 2);
235 }
236
237 #[test]
238 fn test_state_clear() {
239 let parent = create_test_transaction();
240 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
241
242 let node_id = FlowNodeId(1);
243
244 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
245 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
246 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
247
248 assert_eq!(txn.state_scan(node_id).unwrap().count(), 3);
250
251 txn.state_clear(node_id).unwrap();
253
254 assert_eq!(txn.state_scan(node_id).unwrap().count(), 0);
256 }
257
258 #[test]
259 fn test_state_clear_only_own_node() {
260 let parent = create_test_transaction();
261 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
262
263 let node1 = FlowNodeId(1);
264 let node2 = FlowNodeId(2);
265
266 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
267 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
268 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
269
270 txn.state_clear(node1).unwrap();
272
273 assert_eq!(txn.state_scan(node1).unwrap().count(), 0);
275
276 assert_eq!(txn.state_scan(node2).unwrap().count(), 1);
278 }
279
280 #[test]
281 fn test_state_clear_empty_node() {
282 let parent = create_test_transaction();
283 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
284
285 let node_id = FlowNodeId(1);
286
287 txn.state_clear(node_id).unwrap();
289 }
290
291 #[test]
292 fn test_load_or_create_existing() {
293 let parent = create_test_transaction();
294 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
295
296 let node_id = FlowNodeId(1);
297 let key = make_key("key1");
298 let value = make_value("existing");
299 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
300
301 txn.state_set(node_id, &key, value.clone()).unwrap();
303
304 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
306 assert_eq!(result, value);
307 }
308
309 #[test]
310 fn test_load_or_create_new() {
311 let parent = create_test_transaction();
312 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
313
314 let node_id = FlowNodeId(1);
315 let key = make_key("key1");
316 let layout = EncodedValuesLayout::new(&[Type::Int8, Type::Float8]);
317
318 let result = txn.load_or_create_row(node_id, &key, &layout).unwrap();
320
321 assert!(!result.as_ref().is_empty());
323 }
324
325 #[test]
326 fn test_save_row() {
327 let parent = create_test_transaction();
328 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
329
330 let node_id = FlowNodeId(1);
331 let key = make_key("key1");
332 let row = make_value("row_data");
333
334 txn.save_row(node_id, &key, row.clone()).unwrap();
335
336 let result = txn.state_get(node_id, &key).unwrap();
338 assert_eq!(result, Some(row));
339 }
340
341 #[test]
342 fn test_state_operations_increment_metrics() {
343 let parent = create_test_transaction();
344 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
345
346 let node_id = FlowNodeId(1);
347 let key = make_key("key1");
348
349 assert_eq!(txn.metrics().state_operations, 0);
350
351 txn.state_set(node_id, &key, make_value("value")).unwrap();
352 assert_eq!(txn.metrics().state_operations, 1);
353
354 txn.state_get(node_id, &key).unwrap();
355 assert_eq!(txn.metrics().state_operations, 2);
356
357 txn.state_remove(node_id, &key).unwrap();
358 assert_eq!(txn.metrics().state_operations, 3);
359
360 let _ = txn.state_scan(node_id).unwrap();
361 assert_eq!(txn.metrics().state_operations, 4);
362
363 let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
364 let _ = txn.state_range(node_id, range).unwrap();
365 assert_eq!(txn.metrics().state_operations, 5);
366
367 txn.state_clear(node_id).unwrap();
368 assert!(txn.metrics().state_operations >= 6);
371 }
372
373 #[test]
374 fn test_state_multiple_nodes() {
375 let parent = create_test_transaction();
376 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
377
378 let node1 = FlowNodeId(1);
379 let node2 = FlowNodeId(2);
380 let node3 = FlowNodeId(3);
381
382 txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
383 txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
384 txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
385 txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
386
387 assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
389 assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
390 assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
391 assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
392
393 assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
395 assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
396 }
397
398 #[test]
399 fn test_load_or_create_increments_state_operations() {
400 let parent = create_test_transaction();
401 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
402
403 let node_id = FlowNodeId(1);
404 let key = make_key("key1");
405 let layout = EncodedValuesLayout::new(&[Type::Int8]);
406
407 let initial_count = txn.metrics().state_operations;
408
409 txn.load_or_create_row(node_id, &key, &layout).unwrap();
410
411 assert!(txn.metrics().state_operations > initial_count);
413 }
414
415 #[test]
416 fn test_save_row_increments_state_operations() {
417 let parent = create_test_transaction();
418 let mut txn = FlowTransaction::new(&parent, CommitVersion(1));
419
420 let node_id = FlowNodeId(1);
421 let key = make_key("key1");
422
423 let initial_count = txn.metrics().state_operations;
424
425 txn.save_row(node_id, &key, make_value("data")).unwrap();
426
427 assert!(txn.metrics().state_operations > initial_count);
429 }
430}