use std::sync::Arc;
use crossbeam_skiplist::SkipMap;
use reifydb_core::{
delta::Delta,
encoded::{key::EncodedKey, row::EncodedRow},
event::EventBus,
interface::WithEventBus,
};
use reifydb_runtime::sync::rwlock::RwLock;
use reifydb_store_single::SingleStore;
pub mod read;
pub mod write;
use read::{KeyReadLock, SingleReadTransaction};
use reifydb_runtime::{
actor::system::ActorSystem,
context::clock::Clock,
pool::{PoolConfig, Pools},
};
use reifydb_type::Result;
use write::{KeyWriteLock, SingleWriteTransaction};
#[derive(Clone)]
pub struct SingleTransaction {
inner: Arc<SingleTransactionInner>,
}
struct SingleTransactionInner {
store: RwLock<SingleStore>,
event_bus: EventBus,
key_locks: SkipMap<EncodedKey, Arc<RwLock<()>>>,
}
impl SingleTransactionInner {
fn get_or_create_lock(&self, key: &EncodedKey) -> Arc<RwLock<()>> {
if let Some(entry) = self.key_locks.get(key) {
return entry.value().clone();
}
let lock = Arc::new(RwLock::new(()));
self.key_locks.insert(key.clone(), lock.clone());
lock
}
}
impl SingleTransaction {
pub fn new(store: SingleStore, event_bus: EventBus) -> Self {
Self {
inner: Arc::new(SingleTransactionInner {
store: RwLock::new(store),
event_bus,
key_locks: SkipMap::new(),
}),
}
}
pub fn testing() -> Self {
let pools = Pools::new(PoolConfig::sync_only());
let actor_system = ActorSystem::new(pools, Clock::Real);
Self::new(SingleStore::testing_memory(), EventBus::new(&actor_system))
}
pub fn with_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
where
I: IntoIterator<Item = &'a EncodedKey>,
F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R>,
{
let mut tx = self.begin_query(keys)?;
f(&mut tx)
}
pub fn with_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
where
I: IntoIterator<Item = &'a EncodedKey>,
F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R>,
{
let mut tx = self.begin_command(keys)?;
let result = f(&mut tx)?;
tx.commit()?;
Ok(result)
}
pub fn begin_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
where
I: IntoIterator<Item = &'a EncodedKey>,
{
let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
assert!(
!keys_vec.is_empty(),
"SVL transactions must declare keys upfront - empty keysets are not allowed"
);
keys_vec.sort();
let mut locks = Vec::new();
for key in &keys_vec {
let arc = self.inner.get_or_create_lock(key);
locks.push(KeyReadLock::new(arc));
}
Ok(SingleReadTransaction {
inner: &self.inner,
keys: keys_vec,
_key_locks: locks,
})
}
pub fn begin_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
where
I: IntoIterator<Item = &'a EncodedKey>,
{
let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
assert!(
!keys_vec.is_empty(),
"SVL transactions must declare keys upfront - empty keysets are not allowed"
);
keys_vec.sort();
let mut locks = Vec::new();
for key in &keys_vec {
let arc = self.inner.get_or_create_lock(key);
locks.push(KeyWriteLock::new(arc));
}
Ok(SingleWriteTransaction::new(&self.inner, keys_vec, locks))
}
}
impl WithEventBus for SingleTransaction {
fn event_bus(&self) -> &EventBus {
&self.inner.event_bus
}
}
#[cfg(test)]
pub mod tests {
use std::{
iter,
sync::{Arc, Barrier},
thread,
time::Duration,
};
use reifydb_type::util::cowvec::CowVec;
use super::*;
fn make_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn make_value(s: &str) -> EncodedRow {
EncodedRow(CowVec::new(s.as_bytes().to_vec()))
}
fn create_test_svl() -> SingleTransaction {
SingleTransaction::testing()
}
#[test]
fn test_allowed_key_query() {
let svl = create_test_svl();
let key = make_key("test_key");
let mut tx = svl.begin_query(vec![&key]).unwrap();
let result = tx.get(&key);
assert!(result.is_ok());
}
#[test]
fn test_disallowed_key_query() {
let svl = create_test_svl();
let key1 = make_key("allowed");
let key2 = make_key("disallowed");
let mut tx = svl.begin_query(vec![&key1]).unwrap();
assert!(tx.get(&key1).is_ok());
let result = tx.get(&key2);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.0.code, "TXN_010");
}
#[test]
#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
fn test_empty_keyset_query_panics() {
let svl = create_test_svl();
let _tx = svl.begin_query(iter::empty());
}
#[test]
#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
fn test_empty_keyset_command_panics() {
let svl = create_test_svl();
let _tx = svl.begin_command(iter::empty());
}
#[test]
fn test_allowed_key_command() {
let svl = create_test_svl();
let key = make_key("test_key");
let value = make_value("test_value");
let mut tx = svl.begin_command(vec![&key]).unwrap();
assert!(tx.set(&key, value.clone()).is_ok());
assert!(tx.get(&key).is_ok());
assert!(tx.commit().is_ok());
}
#[test]
fn test_disallowed_key_command() {
let svl = create_test_svl();
let key1 = make_key("allowed");
let key2 = make_key("disallowed");
let value = make_value("test_value");
let mut tx = svl.begin_command(vec![&key1]).unwrap();
assert!(tx.set(&key1, value.clone()).is_ok());
let result = tx.set(&key2, value);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.0.code, "TXN_010");
}
#[test]
fn test_command_commit_with_valid_keys() {
let svl = create_test_svl();
let key1 = make_key("key1");
let key2 = make_key("key2");
let value1 = make_value("value1");
let value2 = make_value("value2");
{
let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
tx.set(&key1, value1.clone()).unwrap();
tx.set(&key2, value2.clone()).unwrap();
tx.commit().unwrap();
}
{
let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
let result1 = tx.get(&key1).unwrap();
let result2 = tx.get(&key2).unwrap();
assert!(result1.is_some());
assert!(result2.is_some());
assert_eq!(result1.unwrap().row, value1);
assert_eq!(result2.unwrap().row, value2);
}
}
#[test]
fn test_rollback_with_scoped_keys() {
let svl = create_test_svl();
let key = make_key("test_key");
let value = make_value("test_value");
{
let mut tx = svl.begin_command(vec![&key]).unwrap();
tx.set(&key, value).unwrap();
tx.rollback().unwrap();
}
{
let mut tx = svl.begin_query(vec![&key]).unwrap();
let result = tx.get(&key).unwrap();
assert!(result.is_none());
}
}
#[test]
fn test_concurrent_reads() {
let svl = Arc::new(create_test_svl());
let key = make_key("shared_key");
let value = make_value("shared_value");
{
let mut tx = svl.begin_command(vec![&key]).unwrap();
tx.set(&key, value.clone()).unwrap();
tx.commit().unwrap();
}
let mut handles = vec![];
for _ in 0..5 {
let svl_clone = Arc::clone(&svl);
let key_clone = key.clone();
let value_clone = value.clone();
let handle = thread::spawn(move || {
let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
let result = tx.get(&key_clone).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, value_clone);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_writers_disjoint_keys() {
let svl = Arc::new(create_test_svl());
let mut handles = vec![];
for i in 0..5 {
let svl_clone = Arc::clone(&svl);
let key = make_key(&format!("key_{}", i));
let value = make_value(&format!("value_{}", i));
let handle = thread::spawn(move || {
let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
tx.set(&key, value).unwrap();
tx.commit().unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for i in 0..5 {
let key = make_key(&format!("key_{}", i));
let expected_value = make_value(&format!("value_{}", i));
let mut tx = svl.begin_query(vec![&key]).unwrap();
let result = tx.get(&key).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, expected_value);
}
}
#[test]
fn test_concurrent_readers_and_writer() {
let svl = Arc::new(create_test_svl());
let key1 = make_key("key1");
let key2 = make_key("key2");
let value1 = make_value("value1");
let value2 = make_value("value2");
{
let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
tx.set(&key1, value1.clone()).unwrap();
tx.set(&key2, value2.clone()).unwrap();
tx.commit().unwrap();
}
let mut handles = vec![];
for _ in 0..3 {
let svl_clone = Arc::clone(&svl);
let key_clone = key1.clone();
let value_clone = value1.clone();
let handle = thread::spawn(move || {
let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
let result = tx.get(&key_clone).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, value_clone);
});
handles.push(handle);
}
let svl_clone = Arc::clone(&svl);
let new_value = make_value("new_value2");
let handle = thread::spawn(move || {
let mut tx = svl_clone.begin_command(vec![&key2]).unwrap();
tx.set(&key2, new_value).unwrap();
tx.commit().unwrap();
});
handles.push(handle);
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_no_panics_with_rwlock() {
let svl = Arc::new(create_test_svl());
let mut handles = vec![];
for i in 0..10 {
let svl_clone = Arc::clone(&svl);
let key = make_key(&format!("key_{}", i % 3)); let value = make_value(&format!("value_{}", i));
let handle = thread::spawn(move || {
if i % 2 == 0 {
let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
let _ = tx.set(&key, value);
let _ = tx.commit();
} else {
let mut tx = svl_clone.begin_query(vec![&key]).unwrap();
let _ = tx.get(&key);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_write_blocks_concurrent_write() {
let svl = Arc::new(create_test_svl());
let key = make_key("blocking_key");
let barrier = Arc::new(Barrier::new(2));
let svl1 = Arc::clone(&svl);
let key1 = key.clone();
let barrier1 = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
let mut tx = svl1.begin_command(vec![&key1]).unwrap();
tx.set(&key1, make_value("value1")).unwrap();
barrier1.wait();
thread::sleep(Duration::from_millis(100));
tx.commit().unwrap();
});
let svl2 = Arc::clone(&svl);
let key2 = key.clone();
let barrier2 = Arc::clone(&barrier);
let handle2 = thread::spawn(move || {
barrier2.wait();
thread::sleep(Duration::from_millis(10));
let mut tx = svl2.begin_command(vec![&key2]).unwrap();
tx.set(&key2, make_value("value2")).unwrap();
tx.commit().unwrap();
});
handle1.join().unwrap();
handle2.join().unwrap();
let mut tx = svl.begin_query(vec![&key]).unwrap();
let result = tx.get(&key).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, make_value("value2"));
}
#[test]
fn test_write_blocks_concurrent_read() {
let svl = Arc::new(create_test_svl());
let key = make_key("blocking_key");
{
let mut tx = svl.begin_command(vec![&key]).unwrap();
tx.set(&key, make_value("initial")).unwrap();
tx.commit().unwrap();
}
let barrier = Arc::new(Barrier::new(2));
let svl1 = Arc::clone(&svl);
let key1 = key.clone();
let barrier1 = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
let mut tx = svl1.begin_command(vec![&key1]).unwrap();
tx.set(&key1, make_value("updated")).unwrap();
barrier1.wait();
thread::sleep(Duration::from_millis(100));
tx.commit().unwrap();
});
let svl2 = Arc::clone(&svl);
let key2 = key.clone();
let barrier2 = Arc::clone(&barrier);
let handle2 = thread::spawn(move || {
barrier2.wait();
thread::sleep(Duration::from_millis(10));
let mut tx = svl2.begin_query(vec![&key2]).unwrap();
let result = tx.get(&key2).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, make_value("updated"));
});
handle1.join().unwrap();
handle2.join().unwrap();
}
#[test]
fn test_concurrent_reads_allowed() {
let svl = Arc::new(create_test_svl());
let key = make_key("shared_read_key");
{
let mut tx = svl.begin_command(vec![&key]).unwrap();
tx.set(&key, make_value("shared")).unwrap();
tx.commit().unwrap();
}
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for _ in 0..3 {
let svl_clone = Arc::clone(&svl);
let key_clone = key.clone();
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
barrier_clone.wait();
let result = tx.get(&key_clone).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, make_value("shared"));
thread::sleep(Duration::from_millis(50));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_overlapping_keys_different_order() {
let svl = Arc::new(create_test_svl());
let key1 = make_key("deadlock_key1");
let key2 = make_key("deadlock_key2");
let barrier = Arc::new(Barrier::new(2));
let svl1 = Arc::clone(&svl);
let key1_clone = key1.clone();
let key2_clone = key2.clone();
let barrier1 = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
barrier1.wait();
let mut tx = svl1.begin_command(vec![&key1_clone, &key2_clone]).unwrap();
tx.set(&key1_clone, make_value("from_thread1")).unwrap();
thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
});
let svl2 = Arc::clone(&svl);
let key1_clone2 = key1.clone();
let key2_clone2 = key2.clone();
let barrier2 = Arc::clone(&barrier);
let handle2 = thread::spawn(move || {
barrier2.wait();
let mut tx = svl2.begin_command(vec![&key2_clone2, &key1_clone2]).unwrap();
tx.set(&key2_clone2, make_value("from_thread2")).unwrap();
thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
});
handle1.join().unwrap();
handle2.join().unwrap();
let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
let result1 = tx.get(&key1).unwrap();
let result2 = tx.get(&key2).unwrap();
assert!(result1.is_some());
assert!(result2.is_some());
}
#[test]
fn test_circular_dependency_three_transactions() {
let svl = Arc::new(create_test_svl());
let key1 = make_key("circular_key1");
let key2 = make_key("circular_key2");
let key3 = make_key("circular_key3");
let barrier = Arc::new(Barrier::new(3));
let svl1 = Arc::clone(&svl);
let k1_1 = key1.clone();
let k2_1 = key2.clone();
let barrier1 = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
barrier1.wait();
let mut tx = svl1.begin_command(vec![&k1_1, &k2_1]).unwrap();
tx.set(&k1_1, make_value("t1")).unwrap();
thread::sleep(Duration::from_millis(10));
tx.commit().unwrap();
});
let svl2 = Arc::clone(&svl);
let k2_2 = key2.clone();
let k3_2 = key3.clone();
let barrier2 = Arc::clone(&barrier);
let handle2 = thread::spawn(move || {
barrier2.wait();
let mut tx = svl2.begin_command(vec![&k2_2, &k3_2]).unwrap();
tx.set(&k2_2, make_value("t2")).unwrap();
thread::sleep(Duration::from_millis(10));
tx.commit().unwrap();
});
let svl3 = Arc::clone(&svl);
let barrier3 = Arc::clone(&barrier);
let handle3 = thread::spawn(move || {
barrier3.wait();
let mut tx = svl3.begin_command(vec![&key3, &key1]).unwrap();
tx.set(&key3, make_value("t3")).unwrap();
thread::sleep(Duration::from_millis(10));
tx.commit().unwrap();
});
handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
}
#[test]
fn test_locks_released_on_drop() {
let svl = Arc::new(create_test_svl());
let key = make_key("drop_test_key");
let svl1 = Arc::clone(&svl);
let key_clone = key.clone();
let handle1 = thread::spawn(move || {
let mut tx = svl1.begin_command(vec![&key_clone]).unwrap();
tx.set(&key_clone, make_value("dropped")).unwrap();
});
handle1.join().unwrap();
thread::sleep(Duration::from_millis(10));
let svl2 = Arc::clone(&svl);
let key_clone2 = key.clone();
let handle2 = thread::spawn(move || {
let mut tx = svl2.begin_command(vec![&key_clone2]).unwrap();
tx.set(&key_clone2, make_value("success")).unwrap();
tx.commit().unwrap();
});
handle2.join().unwrap();
let mut tx = svl.begin_query(vec![&key]).unwrap();
let result = tx.get(&key).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().row, make_value("success"));
}
}