reifydb_sub_flow/operator/stateful/
mod.rs1use reifydb_core::{
5 encoded::{encoded::EncodedValues, key::EncodedKey},
6 interface::store::MultiVersionBatch,
7};
8
9pub mod counter;
10pub mod keyed;
11pub mod raw;
12pub mod row;
13pub mod single;
14#[cfg(test)]
15pub mod test_utils;
16pub mod utils;
17pub mod window;
18
19use reifydb_core::key::{EncodableKey, flow_node_state::FlowNodeStateKey};
20
21pub struct StateIterator {
26 items: Vec<(EncodedKey, EncodedValues)>,
27 position: usize,
28}
29
30impl StateIterator {
31 pub fn new(batch: MultiVersionBatch) -> Self {
33 let items = batch
34 .items
35 .into_iter()
36 .map(|multi| {
37 if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
38 (EncodedKey::new(state_key.key), multi.values)
39 } else {
40 (multi.key, multi.values)
41 }
42 })
43 .collect();
44
45 Self {
46 items,
47 position: 0,
48 }
49 }
50
51 pub fn from_items(items: Vec<(EncodedKey, EncodedValues)>) -> Self {
53 Self {
54 items,
55 position: 0,
56 }
57 }
58}
59
60impl Iterator for StateIterator {
61 type Item = (EncodedKey, EncodedValues);
62
63 fn next(&mut self) -> Option<Self::Item> {
64 if self.position < self.items.len() {
65 let item = self.items[self.position].clone();
66 self.position += 1;
67 Some(item)
68 } else {
69 None
70 }
71 }
72}