Skip to main content

reifydb_sub_flow/operator/stateful/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::{encoded::EncodedValues, key::EncodedKey},
6	interface::store::MultiVersionBatch,
7};
8
9pub mod counter;
10pub mod keyed;
11pub mod raw;
12pub mod row;
13pub mod single;
14#[cfg(test)]
15pub mod test_utils;
16pub mod utils;
17pub mod window;
18
19use reifydb_core::key::{EncodableKey, flow_node_state::FlowNodeStateKey};
20
21/// Iterator wrapper for state entries
22///
23/// Wraps a MultiVersionBatch and provides an iterator over decoded state keys.
24/// The batch is eagerly decoded during construction for efficiency.
25pub struct StateIterator {
26	items: Vec<(EncodedKey, EncodedValues)>,
27	position: usize,
28}
29
30impl StateIterator {
31	/// Create a new StateIterator from a MultiVersionBatch
32	pub fn new(batch: MultiVersionBatch) -> Self {
33		let items = batch
34			.items
35			.into_iter()
36			.map(|multi| {
37				if let Some(state_key) = FlowNodeStateKey::decode(&multi.key) {
38					(EncodedKey::new(state_key.key), multi.values)
39				} else {
40					(multi.key, multi.values)
41				}
42			})
43			.collect();
44
45		Self {
46			items,
47			position: 0,
48		}
49	}
50
51	/// Create a new StateIterator from pre-decoded items
52	pub fn from_items(items: Vec<(EncodedKey, EncodedValues)>) -> Self {
53		Self {
54			items,
55			position: 0,
56		}
57	}
58}
59
60impl Iterator for StateIterator {
61	type Item = (EncodedKey, EncodedValues);
62
63	fn next(&mut self) -> Option<Self::Item> {
64		if self.position < self.items.len() {
65			let item = self.items[self.position].clone();
66			self.position += 1;
67			Some(item)
68		} else {
69			None
70		}
71	}
72}