use std::{
cmp::Ordering,
collections, iter,
ops::Bound::{Excluded, Included, Unbounded},
vec,
};
use collections::BTreeMap;
use iter::Peekable;
use reifydb_core::{
common::CommitVersion,
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
},
interface::store::{MultiVersionBatch, MultiVersionRow},
key::{Key, kind::KeyKind},
};
use reifydb_type::Result;
use vec::IntoIter;
use super::{FlowTransaction, PendingWrite};
impl FlowTransaction {
pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>> {
let inner = self.inner();
if inner.pending.is_removed(key) {
return Ok(None);
}
if let Some(value) = inner.pending.get(key) {
return Ok(Some(value.clone()));
}
if let Self::Transactional {
base_pending,
..
} = self
{
if base_pending.is_removed(key) {
return Ok(None);
}
if let Some(value) = base_pending.get(key) {
return Ok(Some(value.clone()));
}
}
let inner = self.inner_mut();
let query = if Self::is_flow_state_key(key) {
&inner.state_query
} else {
&inner.primitive_query
};
match query.get(key)? {
Some(multi) => Ok(Some(multi.row().clone())),
None => Ok(None),
}
}
pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
let inner = self.inner();
if inner.pending.is_removed(key) {
return Ok(false);
}
if inner.pending.get(key).is_some() {
return Ok(true);
}
if let Self::Transactional {
base_pending,
..
} = self
{
if base_pending.is_removed(key) {
return Ok(false);
}
if base_pending.get(key).is_some() {
return Ok(true);
}
}
let inner = self.inner_mut();
let query = if Self::is_flow_state_key(key) {
&inner.state_query
} else {
&inner.primitive_query
};
query.contains_key(key)
}
pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
let range = EncodedKeyRange::prefix(prefix);
let items = self.range(range, 1024).collect::<Result<Vec<_>>>()?;
Ok(MultiVersionBatch {
items,
has_more: false,
})
}
fn is_flow_state_key(key: &EncodedKey) -> bool {
match Key::kind(&key) {
None => false,
Some(kind) => match kind {
KeyKind::FlowNodeState => true,
KeyKind::FlowNodeInternalState => true,
_ => false,
},
}
}
pub fn range(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
match self {
Self::Deferred {
inner,
..
} => {
let merged: BTreeMap<EncodedKey, PendingWrite> = inner
.pending
.range((range.start.as_ref(), range.end.as_ref()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
let query = match range.start.as_ref() {
Included(start) | Excluded(start) => {
if Self::is_flow_state_key(start) {
&inner.state_query
} else {
&inner.primitive_query
}
}
Unbounded => &inner.primitive_query,
};
let storage_iter = query.range(range, batch_size);
let v = inner.version;
Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
}
Self::Transactional {
inner,
base_pending,
..
} => {
let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
.range((range.start.as_ref(), range.end.as_ref()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
merged.insert(k.clone(), v.clone());
}
let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().collect();
let query = match range.start.as_ref() {
Included(start) | Excluded(start) => {
if Self::is_flow_state_key(start) {
&inner.state_query
} else {
&inner.primitive_query
}
}
Unbounded => &inner.primitive_query,
};
let storage_iter = query.range(range, batch_size);
let v = inner.version;
Box::new(flow_merge_pending_iterator(pending_vec, storage_iter, v))
}
}
}
pub fn range_rev(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
match self {
Self::Deferred {
inner,
..
} => {
let merged: BTreeMap<EncodedKey, PendingWrite> = inner
.pending
.range((range.start.as_ref(), range.end.as_ref()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
let query = match range.start.as_ref() {
Included(start) | Excluded(start) => {
if Self::is_flow_state_key(start) {
&inner.state_query
} else {
&inner.primitive_query
}
}
Unbounded => &inner.primitive_query,
};
let storage_iter = query.range_rev(range, batch_size);
let v = inner.version;
Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
}
Self::Transactional {
inner,
base_pending,
..
} => {
let mut merged: BTreeMap<EncodedKey, PendingWrite> = base_pending
.range((range.start.as_ref(), range.end.as_ref()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (k, v) in inner.pending.range((range.start.as_ref(), range.end.as_ref())) {
merged.insert(k.clone(), v.clone());
}
let pending_vec: Vec<(EncodedKey, PendingWrite)> = merged.into_iter().rev().collect();
let query = match range.start.as_ref() {
Included(start) | Excluded(start) => {
if Self::is_flow_state_key(start) {
&inner.state_query
} else {
&inner.primitive_query
}
}
Unbounded => &inner.primitive_query,
};
let storage_iter = query.range_rev(range, batch_size);
let v = inner.version;
Box::new(flow_merge_pending_iterator_rev(pending_vec, storage_iter, v))
}
}
}
}
struct FlowMergePendingIterator<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
storage_iter: Peekable<I>,
pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
version: CommitVersion,
}
impl<I> Iterator for FlowMergePendingIterator<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
type Item = Result<MultiVersionRow>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next_storage = self.storage_iter.peek();
match (self.pending_iter.peek(), next_storage) {
(Some((pending_key, _)), Some(storage_result)) => {
let storage_val = match storage_result {
Ok(v) => v,
Err(_) => {
let err = self.storage_iter.next().unwrap();
return Some(err.map_err(|e| e.into()));
}
};
let cmp = pending_key.cmp(&storage_val.key);
if matches!(cmp, Ordering::Less) {
let (key, value) = self.pending_iter.next().unwrap();
if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
} else if matches!(cmp, Ordering::Equal) {
let (key, value) = self.pending_iter.next().unwrap();
self.storage_iter.next(); if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
} else {
return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
}
}
(Some(_), None) => {
let (key, value) = self.pending_iter.next().unwrap();
if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
}
(None, Some(_)) => {
return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
}
(None, None) => return None,
}
}
}
}
fn flow_merge_pending_iterator<I>(
pending: Vec<(EncodedKey, PendingWrite)>,
storage_iter: I,
version: CommitVersion,
) -> FlowMergePendingIterator<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
FlowMergePendingIterator {
storage_iter: storage_iter.peekable(),
pending_iter: pending.into_iter().peekable(),
version,
}
}
struct FlowMergePendingIteratorRev<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
storage_iter: Peekable<I>,
pending_iter: Peekable<IntoIter<(EncodedKey, PendingWrite)>>,
version: CommitVersion,
}
impl<I> Iterator for FlowMergePendingIteratorRev<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
type Item = Result<MultiVersionRow>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next_storage = self.storage_iter.peek();
match (self.pending_iter.peek(), next_storage) {
(Some((pending_key, _)), Some(storage_result)) => {
let storage_val = match storage_result {
Ok(v) => v,
Err(_) => {
let err = self.storage_iter.next().unwrap();
return Some(err.map_err(|e| e.into()));
}
};
let cmp = pending_key.cmp(&storage_val.key);
if matches!(cmp, Ordering::Greater) {
let (key, value) = self.pending_iter.next().unwrap();
if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
} else if matches!(cmp, Ordering::Equal) {
let (key, value) = self.pending_iter.next().unwrap();
self.storage_iter.next(); if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
} else {
return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
}
}
(Some(_), None) => {
let (key, value) = self.pending_iter.next().unwrap();
if let PendingWrite::Set(row) = value {
return Some(Ok(MultiVersionRow {
key,
row,
version: self.version,
}));
}
}
(None, Some(_)) => {
return Some(self.storage_iter.next().unwrap().map_err(|e| e.into()));
}
(None, None) => return None,
}
}
}
}
fn flow_merge_pending_iterator_rev<I>(
pending: Vec<(EncodedKey, PendingWrite)>,
storage_iter: I,
version: CommitVersion,
) -> FlowMergePendingIteratorRev<I>
where
I: Iterator<Item = Result<MultiVersionRow>>,
{
FlowMergePendingIteratorRev {
storage_iter: storage_iter.peekable(),
pending_iter: pending.into_iter().peekable(),
version,
}
}
#[cfg(test)]
pub mod tests {
use reifydb_catalog::catalog::Catalog;
use reifydb_core::encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
};
use reifydb_engine::test_harness::TestEngine;
use reifydb_transaction::interceptor::interceptors::Interceptors;
use reifydb_type::{util::cowvec::CowVec, value::identity::IdentityId};
use super::*;
use crate::operator::stateful::test_utils::test::create_test_transaction;
fn make_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn make_value(s: &str) -> EncodedRow {
EncodedRow(CowVec::new(s.as_bytes().to_vec()))
}
#[test]
fn test_get_from_pending() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let key = make_key("key1");
let value = make_value("value1");
txn.set(&key, value.clone()).unwrap();
let result = txn.get(&key).unwrap();
assert_eq!(result, Some(value));
}
#[test]
fn test_get_from_committed() {
let t = TestEngine::new();
let key = make_key("key1");
let value = make_value("value1");
{
let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
cmd_txn.set(&key, value.clone()).unwrap();
cmd_txn.commit().unwrap();
}
let parent = t.begin_admin(IdentityId::system()).unwrap();
let version = parent.version();
let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
let result = txn.get(&key).unwrap();
assert_eq!(result, Some(value));
}
#[test]
fn test_get_pending_shadows_committed() {
let mut parent = create_test_transaction();
let key = make_key("key1");
parent.set(&key, make_value("old")).unwrap();
let version = parent.version();
let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
let new_value = make_value("new");
txn.set(&key, new_value.clone()).unwrap();
let result = txn.get(&key).unwrap();
assert_eq!(result, Some(new_value));
}
#[test]
fn test_get_removed_returns_none() {
let mut parent = create_test_transaction();
let key = make_key("key1");
parent.set(&key, make_value("value1")).unwrap();
let version = parent.version();
let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
txn.remove(&key).unwrap();
let result = txn.get(&key).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_get_nonexistent_key() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let result = txn.get(&make_key("missing")).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_contains_key_pending() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let key = make_key("key1");
txn.set(&key, make_value("value1")).unwrap();
assert!(txn.contains_key(&key).unwrap());
}
#[test]
fn test_contains_key_committed() {
let t = TestEngine::new();
let key = make_key("key1");
{
let mut cmd_txn = t.begin_admin(IdentityId::system()).unwrap();
cmd_txn.set(&key, make_value("value1")).unwrap();
cmd_txn.commit().unwrap();
}
let parent = t.begin_admin(IdentityId::system()).unwrap();
let version = parent.version();
let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
assert!(txn.contains_key(&key).unwrap());
}
#[test]
fn test_contains_key_removed_returns_false() {
let mut parent = create_test_transaction();
let key = make_key("key1");
parent.set(&key, make_value("value1")).unwrap();
let version = parent.version();
let mut txn = FlowTransaction::deferred(&parent, version, Catalog::testing(), Interceptors::new());
txn.remove(&key).unwrap();
assert!(!txn.contains_key(&key).unwrap());
}
#[test]
fn test_contains_key_nonexistent() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
assert!(!txn.contains_key(&make_key("missing")).unwrap());
}
#[test]
fn test_scan_empty() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let mut iter = txn.range(EncodedKeyRange::all(), 1024);
assert!(iter.next().is_none());
}
#[test]
fn test_scan_only_pending() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
txn.set(&make_key("b"), make_value("2")).unwrap();
txn.set(&make_key("a"), make_value("1")).unwrap();
txn.set(&make_key("c"), make_value("3")).unwrap();
let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("b"));
assert_eq!(items[2].key, make_key("c"));
}
#[test]
fn test_scan_filters_removes() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
txn.set(&make_key("a"), make_value("1")).unwrap();
txn.remove(&make_key("b")).unwrap();
txn.set(&make_key("c"), make_value("3")).unwrap();
let items: Vec<_> = txn.range(EncodedKeyRange::all(), 1024).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_range_empty() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let range = EncodedKeyRange::start_end(Some(make_key("a")), Some(make_key("z")));
let mut iter = txn.range(range, 1024);
assert!(iter.next().is_none());
}
#[test]
fn test_range_only_pending() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
txn.set(&make_key("a"), make_value("1")).unwrap();
txn.set(&make_key("b"), make_value("2")).unwrap();
txn.set(&make_key("c"), make_value("3")).unwrap();
txn.set(&make_key("d"), make_value("4")).unwrap();
let range = EncodedKeyRange::new(Included(make_key("b")), Excluded(make_key("d")));
let items: Vec<_> = txn.range(range, 1024).collect::<Result<Vec<_>>>().unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("b"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_prefix_empty() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
let prefix = make_key("test_");
let iter = txn.prefix(&prefix).unwrap();
assert!(iter.items.into_iter().next().is_none());
}
#[test]
fn test_prefix_only_pending() {
let parent = create_test_transaction();
let mut txn =
FlowTransaction::deferred(&parent, CommitVersion(1), Catalog::testing(), Interceptors::new());
txn.set(&make_key("test_a"), make_value("1")).unwrap();
txn.set(&make_key("test_b"), make_value("2")).unwrap();
txn.set(&make_key("other_c"), make_value("3")).unwrap();
let prefix = make_key("test_");
let iter = txn.prefix(&prefix).unwrap();
let items: Vec<_> = iter.items.into_iter().collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("test_a"));
assert_eq!(items[1].key, make_key("test_b"));
}
}