reifydb_sub_flow/operator/stateful/
mod.rs1use reifydb_core::{
5 encoded::{key::EncodedKey, row::EncodedRow},
6 interface::store::MultiVersionBatch,
7};
8
9pub mod counter;
10pub mod keyed;
11pub mod raw;
12pub mod row;
13pub mod single;
14pub mod test_utils;
15pub mod utils;
16pub mod window;
17
18use reifydb_core::key::{EncodableKey, flow_node_state::FlowNodeStateKey};
19
20pub struct StateIterator {
25 items: Vec<(EncodedKey, EncodedRow)>,
26 position: usize,
27}
28
29impl StateIterator {
30 pub fn new(batch: MultiVersionBatch) -> Self {
32 let items = batch
33 .items
34 .into_iter()
35 .map(|multi| {
36 if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
37 (EncodedKey::new(state_key.key), multi.row)
38 } else {
39 (multi.key, multi.row)
40 }
41 })
42 .collect();
43
44 Self {
45 items,
46 position: 0,
47 }
48 }
49
50 pub fn from_items(items: Vec<(EncodedKey, EncodedRow)>) -> Self {
52 Self {
53 items,
54 position: 0,
55 }
56 }
57}
58
59impl Iterator for StateIterator {
60 type Item = (EncodedKey, EncodedRow);
61
62 fn next(&mut self) -> Option<Self::Item> {
63 if self.position < self.items.len() {
64 let item = self.items[self.position].clone();
65 self.position += 1;
66 Some(item)
67 } else {
68 None
69 }
70 }
71}