Skip to main content

reifydb_sub_flow/operator/stateful/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::{key::EncodedKey, row::EncodedRow},
6	interface::store::MultiVersionBatch,
7};
8
9pub mod counter;
10pub mod keyed;
11pub mod raw;
12pub mod row;
13pub mod single;
14pub mod test_utils;
15pub mod utils;
16pub mod window;
17
18use reifydb_core::key::{EncodableKey, flow_node_state::FlowNodeStateKey};
19
20/// Iterator wrapper for state entries
21///
22/// Wraps a MultiVersionBatch and provides an iterator over decoded state keys.
23/// The batch is eagerly decoded during construction for efficiency.
24pub struct StateIterator {
25	items: Vec<(EncodedKey, EncodedRow)>,
26	position: usize,
27}
28
29impl StateIterator {
30	/// Create a new StateIterator from a MultiVersionBatch
31	pub fn new(batch: MultiVersionBatch) -> Self {
32		let items = batch
33			.items
34			.into_iter()
35			.map(|multi| {
36				if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
37					(EncodedKey::new(state_key.key), multi.row)
38				} else {
39					(multi.key, multi.row)
40				}
41			})
42			.collect();
43
44		Self {
45			items,
46			position: 0,
47		}
48	}
49
50	/// Create a new StateIterator from pre-decoded items
51	pub fn from_items(items: Vec<(EncodedKey, EncodedRow)>) -> Self {
52		Self {
53			items,
54			position: 0,
55		}
56	}
57}
58
59impl Iterator for StateIterator {
60	type Item = (EncodedKey, EncodedRow);
61
62	fn next(&mut self) -> Option<Self::Item> {
63		if self.position < self.items.len() {
64			let item = self.items[self.position].clone();
65			self.position += 1;
66			Some(item)
67		} else {
68			None
69		}
70	}
71}