reifydb_sub_flow/operator/stateful/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{EncodedKey, interface::MultiVersionBatch, value::encoded::EncodedValues};
5
6mod keyed;
7mod raw;
8mod row;
9mod single;
10#[cfg(test)]
11pub mod test_utils;
12mod utils;
13mod window;
14
15pub use keyed::KeyedStateful;
16pub use raw::RawStatefulOperator;
17use reifydb_core::key::{EncodableKey, FlowNodeStateKey};
18pub use row::RowNumberProvider;
19pub use single::SingleStateful;
20pub use utils::*;
21pub use window::WindowStateful;
22
23/// Iterator wrapper for state entries
24///
25/// Wraps a MultiVersionBatch and provides an iterator over decoded state keys.
26/// The batch is eagerly decoded during construction for efficiency.
27pub struct StateIterator {
28	items: Vec<(EncodedKey, EncodedValues)>,
29	position: usize,
30}
31
32impl StateIterator {
33	/// Create a new StateIterator from a MultiVersionBatch
34	pub fn new(batch: MultiVersionBatch) -> Self {
35		let items = batch
36			.items
37			.into_iter()
38			.map(|multi| {
39				if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
40					(EncodedKey::new(state_key.key), multi.values)
41				} else {
42					(multi.key, multi.values)
43				}
44			})
45			.collect();
46
47		Self {
48			items,
49			position: 0,
50		}
51	}
52}
53
54impl Iterator for StateIterator {
55	type Item = (EncodedKey, EncodedValues);
56
57	fn next(&mut self) -> Option<Self::Item> {
58		if self.position < self.items.len() {
59			let item = self.items[self.position].clone();
60			self.position += 1;
61			Some(item)
62		} else {
63			None
64		}
65	}
66}