use std::{cell::RefCell, collections::HashMap};
use reifydb_core::{
interface::store::MultiVersionBatch,
key::{EncodableKey, flow_node_state::FlowNodeStateKey},
};
pub type StateIteratorHandle = u64;
thread_local! {
static ITERATOR_REGISTRY: RefCell<IteratorRegistry> = RefCell::new(IteratorRegistry::new());
}
struct BatchIterator {
items: Vec<(Vec<u8>, Vec<u8>)>, position: usize,
}
impl BatchIterator {
fn new(batch: MultiVersionBatch) -> Self {
let items = batch
.items
.into_iter()
.filter_map(|multi| {
let state_key = FlowNodeStateKey::decode(&multi.key)?;
Some((state_key.key, multi.row.to_vec()))
})
.collect();
Self {
items,
position: 0,
}
}
fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.position < self.items.len() {
let item = self.items[self.position].clone();
self.position += 1;
Some(item)
} else {
None
}
}
}
struct IteratorRegistry {
next_handle: StateIteratorHandle,
iterators: HashMap<StateIteratorHandle, BatchIterator>,
}
impl IteratorRegistry {
fn new() -> Self {
Self {
next_handle: 1,
iterators: HashMap::new(),
}
}
fn insert(&mut self, iter: BatchIterator) -> StateIteratorHandle {
let handle = self.next_handle;
self.next_handle = self.next_handle.wrapping_add(1);
self.iterators.insert(handle, iter);
handle
}
fn get_mut(&mut self, handle: StateIteratorHandle) -> Option<&mut BatchIterator> {
self.iterators.get_mut(&handle)
}
fn remove(&mut self, handle: StateIteratorHandle) -> Option<BatchIterator> {
self.iterators.remove(&handle)
}
}
pub(crate) fn create_iterator(batch: MultiVersionBatch) -> StateIteratorHandle {
let iter = BatchIterator::new(batch);
ITERATOR_REGISTRY.with(|r| r.borrow_mut().insert(iter))
}
pub(crate) fn next_iterator(handle: StateIteratorHandle) -> Option<(Vec<u8>, Vec<u8>)> {
ITERATOR_REGISTRY.with(|r| {
let mut registry = r.borrow_mut();
registry.get_mut(handle)?.next()
})
}
pub(crate) fn free_iterator(handle: StateIteratorHandle) -> bool {
ITERATOR_REGISTRY.with(|r| r.borrow_mut().remove(handle).is_some())
}
#[cfg(test)]
pub mod tests {
use reifydb_core::{
common::CommitVersion,
encoded::{key::EncodedKey, row::EncodedRow},
interface::{
catalog::flow::FlowNodeId,
store::{MultiVersionBatch, MultiVersionRow},
},
key::{EncodableKey, flow_node_state::FlowNodeStateKey},
};
use reifydb_type::util::cowvec::CowVec;
use super::*;
fn make_state_key(node_id: u64, key: &[u8]) -> EncodedKey {
FlowNodeStateKey::new(FlowNodeId(node_id), key.to_vec()).encode()
}
fn make_value(data: &[u8]) -> EncodedRow {
EncodedRow(CowVec::new(data.to_vec()))
}
#[test]
fn test_create_and_free_iterator() {
let items = vec![MultiVersionRow {
key: make_state_key(1, b"key1"),
row: make_value(b"value1"),
version: CommitVersion(1),
}];
let batch = MultiVersionBatch {
items,
has_more: false,
};
let handle = create_iterator(batch);
assert!(handle > 0);
let freed = free_iterator(handle);
assert!(freed);
let freed_again = free_iterator(handle);
assert!(!freed_again);
}
#[test]
fn test_iterator_next() {
let items = vec![
MultiVersionRow {
key: make_state_key(1, b"key1"),
row: make_value(b"value1"),
version: CommitVersion(1),
},
MultiVersionRow {
key: make_state_key(1, b"key2"),
row: make_value(b"value2"),
version: CommitVersion(1),
},
];
let batch = MultiVersionBatch {
items,
has_more: false,
};
let handle = create_iterator(batch);
let (key1, val1) = next_iterator(handle).unwrap();
assert_eq!(key1, b"key1");
assert_eq!(val1, b"value1");
let (key2, val2) = next_iterator(handle).unwrap();
assert_eq!(key2, b"key2");
assert_eq!(val2, b"value2");
assert!(next_iterator(handle).is_none());
free_iterator(handle);
}
#[test]
fn test_iterator_invalid_handle() {
let result = next_iterator(999999);
assert!(result.is_none());
let freed = free_iterator(999999);
assert!(!freed);
}
#[test]
fn test_multiple_iterators() {
let items1 = vec![MultiVersionRow {
key: make_state_key(1, b"iter1"),
row: make_value(b"value1"),
version: CommitVersion(1),
}];
let items2 = vec![MultiVersionRow {
key: make_state_key(2, b"iter2"),
row: make_value(b"value2"),
version: CommitVersion(1),
}];
let batch1 = MultiVersionBatch {
items: items1,
has_more: false,
};
let batch2 = MultiVersionBatch {
items: items2,
has_more: false,
};
let handle1 = create_iterator(batch1);
let handle2 = create_iterator(batch2);
assert_ne!(handle1, handle2);
let (key1, _) = next_iterator(handle1).unwrap();
let (key2, _) = next_iterator(handle2).unwrap();
assert_eq!(key1, b"iter1");
assert_eq!(key2, b"iter2");
free_iterator(handle1);
free_iterator(handle2);
}
}