use reifydb_core::encoded::{key::EncodedKey, row::EncodedRow};
use reifydb_type::Result;
use super::FlowTransaction;
impl FlowTransaction {
pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()> {
self.inner_mut().pending.insert(key.clone(), value);
Ok(())
}
pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
self.inner_mut().pending.remove(key.clone());
Ok(())
}
pub fn set_batch(&mut self, keys: &[EncodedKey], values: &[EncodedRow]) -> Result<()> {
self.inner_mut().pending.insert_batch(keys, values);
Ok(())
}
pub fn remove_batch(&mut self, keys: &[EncodedKey]) -> Result<()> {
self.inner_mut().pending.remove_batch(keys);
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_catalog::catalog::Catalog;
use reifydb_core::{
common::CommitVersion,
encoded::{key::EncodedKey, row::EncodedRow},
};
use reifydb_runtime::context::clock::{Clock, MockClock};
use reifydb_transaction::{interceptor::interceptors::Interceptors, transaction::admin::AdminTransaction};
use reifydb_type::util::cowvec::CowVec;
use super::*;
use crate::operator::stateful::test_utils::test::create_test_transaction;
fn make_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn make_value(s: &str) -> EncodedRow {
EncodedRow(CowVec::new(s.as_bytes().to_vec()))
}
fn get_row(parent: &mut AdminTransaction, key: &EncodedKey) -> Option<EncodedRow> {
parent.get(key).unwrap().map(|m| m.row.clone())
}
#[test]
fn test_set_buffers_to_pending() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
let value = make_value("value1");
txn.set(&key, value.clone()).unwrap();
assert_eq!(txn.pending().get(&key), Some(&value));
}
#[test]
fn test_set_multiple_keys() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
txn.set(&make_key("key1"), make_value("value1")).unwrap();
txn.set(&make_key("key2"), make_value("value2")).unwrap();
txn.set(&make_key("key3"), make_value("value3")).unwrap();
assert_eq!(txn.pending().get(&make_key("key1")), Some(&make_value("value1")));
assert_eq!(txn.pending().get(&make_key("key2")), Some(&make_value("value2")));
assert_eq!(txn.pending().get(&make_key("key3")), Some(&make_value("value3")));
}
#[test]
fn test_set_overwrites_same_key() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
txn.set(&key, make_value("value1")).unwrap();
txn.set(&key, make_value("value2")).unwrap();
assert_eq!(txn.pending().get(&key), Some(&make_value("value2")));
}
#[test]
fn test_remove_buffers_to_pending() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
txn.remove(&key).unwrap();
assert!(txn.pending().is_removed(&key));
}
#[test]
fn test_remove_multiple_keys() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
txn.remove(&make_key("key1")).unwrap();
txn.remove(&make_key("key2")).unwrap();
txn.remove(&make_key("key3")).unwrap();
assert!(txn.pending().is_removed(&make_key("key1")));
assert!(txn.pending().is_removed(&make_key("key2")));
assert!(txn.pending().is_removed(&make_key("key3")));
}
#[test]
fn test_set_then_remove() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
txn.set(&key, make_value("value1")).unwrap();
assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
txn.remove(&key).unwrap();
assert!(txn.pending().is_removed(&key));
assert_eq!(txn.pending().get(&key), None);
}
#[test]
fn test_remove_then_set() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
txn.remove(&key).unwrap();
assert!(txn.pending().is_removed(&key));
txn.set(&key, make_value("value1")).unwrap();
assert!(!txn.pending().is_removed(&key));
assert_eq!(txn.pending().get(&key), Some(&make_value("value1")));
}
#[test]
fn test_writes_not_visible_to_parent() {
let mut parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
let key = make_key("key1");
let value = make_value("value1");
txn.set(&key, value.clone()).unwrap();
assert_eq!(get_row(&mut parent, &key), None);
}
#[test]
fn test_removes_not_visible_to_parent() {
let mut parent = create_test_transaction();
let key = make_key("key1");
let value = make_value("value1");
parent.set(&key, value.clone()).unwrap();
assert_eq!(get_row(&mut parent, &key), Some(value.clone()));
let parent_version = parent.version();
let mut txn = FlowTransaction::deferred(
&parent,
parent_version,
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
txn.remove(&key).unwrap();
assert_eq!(get_row(&mut parent, &key), Some(value));
}
#[test]
fn test_mixed_writes_and_removes() {
let parent = create_test_transaction();
let mut txn = FlowTransaction::deferred(
&parent,
CommitVersion(1),
Catalog::testing(),
Interceptors::new(),
Clock::Mock(MockClock::from_millis(1000)),
);
txn.set(&make_key("write1"), make_value("v1")).unwrap();
txn.remove(&make_key("remove1")).unwrap();
txn.set(&make_key("write2"), make_value("v2")).unwrap();
txn.remove(&make_key("remove2")).unwrap();
assert_eq!(txn.pending().get(&make_key("write1")), Some(&make_value("v1")));
assert_eq!(txn.pending().get(&make_key("write2")), Some(&make_value("v2")));
assert!(txn.pending().is_removed(&make_key("remove1")));
assert!(txn.pending().is_removed(&make_key("remove2")));
}
}