reifydb_sub_flow/operator/stateful/
mod.rs1use reifydb_core::{EncodedKey, interface::MultiVersionBatch, value::encoded::EncodedValues};
5
6mod keyed;
7mod raw;
8mod row;
9mod single;
10#[cfg(test)]
11pub mod test_utils;
12mod utils;
13mod window;
14
15pub use keyed::KeyedStateful;
16pub use raw::RawStatefulOperator;
17use reifydb_core::key::{EncodableKey, FlowNodeStateKey};
18pub use row::RowNumberProvider;
19pub use single::SingleStateful;
20pub use utils::*;
21pub use window::WindowStateful;
22
23pub struct StateIterator {
28 items: Vec<(EncodedKey, EncodedValues)>,
29 position: usize,
30}
31
32impl StateIterator {
33 pub fn new(batch: MultiVersionBatch) -> Self {
35 let items = batch
36 .items
37 .into_iter()
38 .map(|multi| {
39 if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
40 (EncodedKey::new(state_key.key), multi.values)
41 } else {
42 (multi.key, multi.values)
43 }
44 })
45 .collect();
46
47 Self {
48 items,
49 position: 0,
50 }
51 }
52}
53
54impl Iterator for StateIterator {
55 type Item = (EncodedKey, EncodedValues);
56
57 fn next(&mut self) -> Option<Self::Item> {
58 if self.position < self.items.len() {
59 let item = self.items[self.position].clone();
60 self.position += 1;
61 Some(item)
62 } else {
63 None
64 }
65 }
66}