1use reifydb_core::{
5 encoded::{
6 encoded::EncodedValues,
7 key::{EncodedKey, EncodedKeyRange},
8 schema::Schema,
9 },
10 interface::{catalog::flow::FlowNodeId, store::MultiVersionBatch},
11 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use tracing::{Span, instrument};
14
15use super::FlowTransaction;
16
17impl FlowTransaction {
18 #[instrument(name = "flow::state::get", level = "trace", skip(self), fields(
20 node_id = id.0,
21 key_len = key.as_bytes().len(),
22 found = tracing::field::Empty
23 ))]
24 pub fn state_get(&mut self, id: FlowNodeId, key: &EncodedKey) -> reifydb_type::Result<Option<EncodedValues>> {
25 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
26 let encoded_key = state_key.encode();
27 let result = self.get(&encoded_key)?;
28 Span::current().record("found", result.is_some());
29 Ok(result)
30 }
31
32 #[instrument(name = "flow::state::set", level = "trace", skip(self, value), fields(
34 node_id = id.0,
35 key_len = key.as_bytes().len(),
36 value_len = value.as_ref().len()
37 ))]
38 pub fn state_set(
39 &mut self,
40 id: FlowNodeId,
41 key: &EncodedKey,
42 value: EncodedValues,
43 ) -> reifydb_type::Result<()> {
44 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
45 let encoded_key = state_key.encode();
46 self.set(&encoded_key, value)
47 }
48
49 #[instrument(name = "flow::state::remove", level = "trace", skip(self), fields(
51 node_id = id.0,
52 key_len = key.as_bytes().len()
53 ))]
54 pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> reifydb_type::Result<()> {
55 let state_key = FlowNodeStateKey::new(id, key.as_ref().to_vec());
56 let encoded_key = state_key.encode();
57 self.remove(&encoded_key)
58 }
59
60 #[instrument(name = "flow::state::scan", level = "debug", skip(self), fields(
62 node_id = id.0,
63 result_count = tracing::field::Empty
64 ))]
65 pub fn state_scan(&mut self, id: FlowNodeId) -> reifydb_type::Result<MultiVersionBatch> {
66 let range = FlowNodeStateKey::node_range(id);
67 let mut iter = self.range(range, 1024);
68 let mut items = Vec::new();
69 while let Some(result) = iter.next() {
70 items.push(result?);
71 }
72 Span::current().record("result_count", items.len());
73 Ok(MultiVersionBatch {
74 items,
75 has_more: false,
76 })
77 }
78
79 #[instrument(name = "flow::state::range", level = "debug", skip(self, range), fields(
81 node_id = id.0
82 ))]
83 pub fn state_range(
84 &mut self,
85 id: FlowNodeId,
86 range: EncodedKeyRange,
87 ) -> reifydb_type::Result<MultiVersionBatch> {
88 let prefixed_range = range.with_prefix(FlowNodeStateKey::encoded(id, vec![]));
89 let mut iter = self.range(prefixed_range, 1024);
90 let mut items = Vec::new();
91 while let Some(result) = iter.next() {
92 items.push(result?);
93 }
94 Ok(MultiVersionBatch {
95 items,
96 has_more: false,
97 })
98 }
99
100 #[instrument(name = "flow::state::clear", level = "trace", skip(self), fields(
102 node_id = id.0,
103 keys_removed = tracing::field::Empty
104 ))]
105 pub fn state_clear(&mut self, id: FlowNodeId) -> reifydb_type::Result<()> {
106 let keys_to_remove = self.scan_keys_for_clear(id)?;
108
109 let count = keys_to_remove.len();
111 self.remove_keys(keys_to_remove)?;
112
113 Span::current().record("keys_removed", count);
114 Ok(())
115 }
116
117 #[inline]
119 #[instrument(name = "flow::state::clear::scan", level = "trace", skip(self), fields(node_id = id.0))]
120 fn scan_keys_for_clear(&mut self, id: FlowNodeId) -> reifydb_type::Result<Vec<EncodedKey>> {
121 let range = FlowNodeStateKey::node_range(id);
122 let mut iter = self.range(range, 1024);
123 let mut keys = Vec::new();
124 while let Some(result) = iter.next() {
125 let multi = result?;
126 keys.push(multi.key);
127 }
128 Ok(keys)
129 }
130
131 #[inline]
133 #[instrument(name = "flow::state::clear::remove", level = "trace", skip(self, keys), fields(count = keys.len()))]
134 fn remove_keys(&mut self, keys: Vec<EncodedKey>) -> reifydb_type::Result<()> {
135 for key in keys {
136 self.remove(&key)?;
137 }
138 Ok(())
139 }
140
141 #[instrument(name = "flow::state::load_or_create", level = "debug", skip(self, schema), fields(
143 node_id = id.0,
144 key_len = key.as_bytes().len(),
145 created
146 ))]
147 pub fn load_or_create_row(
148 &mut self,
149 id: FlowNodeId,
150 key: &EncodedKey,
151 schema: &Schema,
152 ) -> reifydb_type::Result<EncodedValues> {
153 match self.state_get(id, key)? {
154 Some(row) => {
155 Span::current().record("created", false);
156 Ok(row)
157 }
158 None => {
159 Span::current().record("created", true);
160 Ok(schema.allocate())
161 }
162 }
163 }
164
165 #[instrument(name = "flow::state::save", level = "trace", skip(self, row), fields(
167 node_id = id.0,
168 key_len = key.as_bytes().len()
169 ))]
170 pub fn save_row(&mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues) -> reifydb_type::Result<()> {
171 self.state_set(id, key, row)
172 }
173}
174
175#[cfg(test)]
176pub mod tests {
177 use reifydb_catalog::catalog::Catalog;
178 use reifydb_core::{
179 common::CommitVersion,
180 encoded::{
181 encoded::EncodedValues,
182 key::{EncodedKey, EncodedKeyRange},
183 schema::Schema,
184 },
185 interface::catalog::flow::FlowNodeId,
186 };
187 use reifydb_transaction::interceptor::interceptors::Interceptors;
188 use reifydb_type::{util::cowvec::CowVec, value::r#type::Type};
189
190 use super::*;
191 use crate::operator::stateful::test_utils::test::create_test_transaction;
192
193 fn make_key(s: &str) -> EncodedKey {
194 EncodedKey::new(s.as_bytes().to_vec())
195 }
196
197 fn make_value(s: &str) -> EncodedValues {
198 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
199 }
200
201 #[test]
202 fn test_state_get_set() {
203 let parent = create_test_transaction();
204 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
205
206 let node_id = FlowNodeId(1);
207 let key = make_key("state_key");
208 let value = make_value("state_value");
209
210 txn.state_set(node_id, &key, value.clone()).unwrap();
212
213 let result = txn.state_get(node_id, &key).unwrap();
215 assert_eq!(result, Some(value));
216 }
217
218 #[test]
219 fn test_state_get_nonexistent() {
220 let parent = create_test_transaction();
221 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
222
223 let node_id = FlowNodeId(1);
224 let key = make_key("missing");
225
226 let result = txn.state_get(node_id, &key).unwrap();
227 assert_eq!(result, None);
228 }
229
230 #[test]
231 fn test_state_remove() {
232 let parent = create_test_transaction();
233 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
234
235 let node_id = FlowNodeId(1);
236 let key = make_key("state_key");
237 let value = make_value("state_value");
238
239 txn.state_set(node_id, &key, value.clone()).unwrap();
241 assert_eq!(txn.state_get(node_id, &key).unwrap(), Some(value));
242
243 txn.state_remove(node_id, &key).unwrap();
244 assert_eq!(txn.state_get(node_id, &key).unwrap(), None);
245 }
246
247 #[test]
248 fn test_state_isolation_between_nodes() {
249 let parent = create_test_transaction();
250 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
251
252 let node1 = FlowNodeId(1);
253 let node2 = FlowNodeId(2);
254 let key = make_key("same_key");
255
256 txn.state_set(node1, &key, make_value("node1_value")).unwrap();
257 txn.state_set(node2, &key, make_value("node2_value")).unwrap();
258
259 assert_eq!(txn.state_get(node1, &key).unwrap(), Some(make_value("node1_value")));
261 assert_eq!(txn.state_get(node2, &key).unwrap(), Some(make_value("node2_value")));
262 }
263
264 #[test]
265 fn test_state_scan() {
266 let parent = create_test_transaction();
267 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
268
269 let node_id = FlowNodeId(1);
270
271 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
272 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
273 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
274
275 let iter = txn.state_scan(node_id).unwrap();
276 let items: Vec<_> = iter.items.into_iter().collect();
277
278 assert_eq!(items.len(), 3);
279 }
280
281 #[test]
282 fn test_state_scan_only_own_node() {
283 let parent = create_test_transaction();
284 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
285
286 let node1 = FlowNodeId(1);
287 let node2 = FlowNodeId(2);
288
289 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
290 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
291 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
292
293 let items: Vec<_> = txn.state_scan(node1).unwrap().items.into_iter().collect();
295 assert_eq!(items.len(), 2);
296
297 let items: Vec<_> = txn.state_scan(node2).unwrap().items.into_iter().collect();
299 assert_eq!(items.len(), 1);
300 }
301
302 #[test]
303 fn test_state_scan_empty() {
304 let parent = create_test_transaction();
305 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
306
307 let node_id = FlowNodeId(1);
308
309 let iter = txn.state_scan(node_id).unwrap();
310 assert!(iter.items.into_iter().next().is_none());
311 }
312
313 #[test]
314 fn test_state_range() {
315 let parent = create_test_transaction();
316 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
317
318 let node_id = FlowNodeId(1);
319
320 txn.state_set(node_id, &make_key("a"), make_value("1")).unwrap();
321 txn.state_set(node_id, &make_key("b"), make_value("2")).unwrap();
322 txn.state_set(node_id, &make_key("c"), make_value("3")).unwrap();
323 txn.state_set(node_id, &make_key("d"), make_value("4")).unwrap();
324
325 use std::collections::Bound;
327 let range = EncodedKeyRange::new(Bound::Included(make_key("b")), Bound::Excluded(make_key("d")));
328 let iter = txn.state_range(node_id, range).unwrap();
329 let items: Vec<_> = iter.items.into_iter().collect();
330
331 assert_eq!(items.len(), 2);
333 }
334
335 #[test]
336 fn test_state_clear() {
337 let parent = create_test_transaction();
338 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
339
340 let node_id = FlowNodeId(1);
341
342 txn.state_set(node_id, &make_key("key1"), make_value("value1")).unwrap();
343 txn.state_set(node_id, &make_key("key2"), make_value("value2")).unwrap();
344 txn.state_set(node_id, &make_key("key3"), make_value("value3")).unwrap();
345
346 assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 3);
348
349 txn.state_clear(node_id).unwrap();
351
352 assert_eq!(txn.state_scan(node_id).unwrap().items.into_iter().count(), 0);
354 }
355
356 #[test]
357 fn test_state_clear_only_own_node() {
358 let parent = create_test_transaction();
359 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
360
361 let node1 = FlowNodeId(1);
362 let node2 = FlowNodeId(2);
363
364 txn.state_set(node1, &make_key("key1"), make_value("value1")).unwrap();
365 txn.state_set(node1, &make_key("key2"), make_value("value2")).unwrap();
366 txn.state_set(node2, &make_key("key3"), make_value("value3")).unwrap();
367
368 txn.state_clear(node1).unwrap();
370
371 assert_eq!(txn.state_scan(node1).unwrap().items.into_iter().count(), 0);
373
374 assert_eq!(txn.state_scan(node2).unwrap().items.into_iter().count(), 1);
376 }
377
378 #[test]
379 fn test_state_clear_empty_node() {
380 let parent = create_test_transaction();
381 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
382
383 let node_id = FlowNodeId(1);
384
385 txn.state_clear(node_id).unwrap();
387 }
388
389 #[test]
390 fn test_load_or_create_existing() {
391 let parent = create_test_transaction();
392 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
393
394 let node_id = FlowNodeId(1);
395 let key = make_key("key1");
396 let value = make_value("existing");
397 let schema = Schema::testing(&[Type::Int8, Type::Float8]);
398
399 txn.state_set(node_id, &key, value.clone()).unwrap();
401
402 let result = txn.load_or_create_row(node_id, &key, &schema).unwrap();
404 assert_eq!(result, value);
405 }
406
407 #[test]
408 fn test_load_or_create_new() {
409 let parent = create_test_transaction();
410 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
411
412 let node_id = FlowNodeId(1);
413 let key = make_key("key1");
414 let schema = Schema::testing(&[Type::Int8, Type::Float8]);
415
416 let result = txn.load_or_create_row(node_id, &key, &schema).unwrap();
418
419 assert!(!result.as_ref().is_empty());
421 }
422
423 #[test]
424 fn test_save_row() {
425 let parent = create_test_transaction();
426 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
427
428 let node_id = FlowNodeId(1);
429 let key = make_key("key1");
430 let row = make_value("row_data");
431
432 txn.save_row(node_id, &key, row.clone()).unwrap();
433
434 let result = txn.state_get(node_id, &key).unwrap();
436 assert_eq!(result, Some(row));
437 }
438
439 #[test]
440 fn test_state_multiple_nodes() {
441 let parent = create_test_transaction();
442 let mut txn = FlowTransaction::new(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
443
444 let node1 = FlowNodeId(1);
445 let node2 = FlowNodeId(2);
446 let node3 = FlowNodeId(3);
447
448 txn.state_set(node1, &make_key("a"), make_value("n1_a")).unwrap();
449 txn.state_set(node1, &make_key("b"), make_value("n1_b")).unwrap();
450 txn.state_set(node2, &make_key("a"), make_value("n2_a")).unwrap();
451 txn.state_set(node3, &make_key("c"), make_value("n3_c")).unwrap();
452
453 assert_eq!(txn.state_get(node1, &make_key("a")).unwrap(), Some(make_value("n1_a")));
455 assert_eq!(txn.state_get(node1, &make_key("b")).unwrap(), Some(make_value("n1_b")));
456 assert_eq!(txn.state_get(node2, &make_key("a")).unwrap(), Some(make_value("n2_a")));
457 assert_eq!(txn.state_get(node3, &make_key("c")).unwrap(), Some(make_value("n3_c")));
458
459 assert_eq!(txn.state_get(node2, &make_key("b")).unwrap(), None);
461 assert_eq!(txn.state_get(node3, &make_key("a")).unwrap(), None);
462 }
463}