use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::{Mutex, MutexGuard};
use cypherlite_core::{CypherLiteError, Result, TransactionView};
pub struct TransactionManager {
current_frame: Arc<AtomicU64>,
write_lock: Arc<Mutex<()>>,
next_tx_id: AtomicU64,
}
impl TransactionManager {
pub fn new() -> Self {
Self {
current_frame: Arc::new(AtomicU64::new(0)),
write_lock: Arc::new(Mutex::new(())),
next_tx_id: AtomicU64::new(1),
}
}
pub fn begin_read(&self) -> ReadTransaction {
let snapshot = self.current_frame.load(Ordering::Acquire);
let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
ReadTransaction {
tx_id,
snapshot_frame: snapshot,
}
}
pub fn begin_write(&self) -> Result<WriteTransaction> {
let guard = self.write_lock.try_lock();
if guard.is_none() {
return Err(CypherLiteError::TransactionConflict);
}
let snapshot = self.current_frame.load(Ordering::Acquire);
let tx_id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
let guard = guard.expect("checked above");
let guard: MutexGuard<'static, ()> = unsafe { std::mem::transmute(guard) };
Ok(WriteTransaction {
tx_id,
snapshot_frame: snapshot,
committed: false,
_guard: Some(guard),
_write_lock_arc: self.write_lock.clone(),
current_frame: self.current_frame.clone(),
})
}
pub fn update_current_frame(&self, frame: u64) {
self.current_frame.store(frame, Ordering::Release);
}
pub fn current_frame(&self) -> u64 {
self.current_frame.load(Ordering::Acquire)
}
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
pub struct ReadTransaction {
tx_id: u64,
snapshot_frame: u64,
}
impl ReadTransaction {
pub fn tx_id(&self) -> u64 {
self.tx_id
}
}
impl TransactionView for ReadTransaction {
fn snapshot_frame(&self) -> u64 {
self.snapshot_frame
}
}
pub struct WriteTransaction {
tx_id: u64,
snapshot_frame: u64,
committed: bool,
_guard: Option<MutexGuard<'static, ()>>,
_write_lock_arc: Arc<Mutex<()>>,
current_frame: Arc<AtomicU64>,
}
impl WriteTransaction {
pub fn tx_id(&self) -> u64 {
self.tx_id
}
pub fn commit(&mut self, new_frame: u64) {
self.committed = true;
self.current_frame.store(new_frame, Ordering::Release);
}
pub fn is_committed(&self) -> bool {
self.committed
}
pub fn rollback(mut self) {
self._guard.take();
}
}
impl TransactionView for WriteTransaction {
fn snapshot_frame(&self) -> u64 {
self.snapshot_frame
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_begin_read_captures_snapshot() {
let tm = TransactionManager::new();
tm.update_current_frame(42);
let tx = tm.begin_read();
assert_eq!(tx.snapshot_frame(), 42);
}
#[test]
fn test_multiple_readers() {
let tm = TransactionManager::new();
let r1 = tm.begin_read();
let r2 = tm.begin_read();
assert_ne!(r1.tx_id(), r2.tx_id());
}
#[test]
fn test_begin_write_exclusive() {
let tm = TransactionManager::new();
let _w1 = tm.begin_write().expect("first write");
let result = tm.begin_write();
assert!(matches!(result, Err(CypherLiteError::TransactionConflict)));
}
#[test]
fn test_write_lock_released_on_drop() {
let tm = TransactionManager::new();
{
let _w1 = tm.begin_write().expect("first write");
} let _w2 = tm.begin_write().expect("second write should succeed");
}
#[test]
fn test_commit_updates_frame() {
let tm = TransactionManager::new();
let mut w = tm.begin_write().expect("write");
assert!(!w.is_committed());
w.commit(10);
assert!(w.is_committed());
assert_eq!(tm.current_frame(), 10);
}
#[test]
fn test_rollback_releases_lock() {
let tm = TransactionManager::new();
let w = tm.begin_write().expect("write");
w.rollback();
let _w2 = tm.begin_write().expect("should succeed after rollback");
}
#[test]
fn test_snapshot_isolation() {
let tm = TransactionManager::new();
tm.update_current_frame(5);
let r = tm.begin_read();
assert_eq!(r.snapshot_frame(), 5);
tm.update_current_frame(10);
assert_eq!(r.snapshot_frame(), 5); }
#[test]
fn test_transaction_ids_are_unique() {
let tm = TransactionManager::new();
let t1 = tm.begin_read();
let t2 = tm.begin_read();
let t3 = tm.begin_write().expect("w");
assert_ne!(t1.tx_id(), t2.tx_id());
assert_ne!(t2.tx_id(), t3.tx_id());
}
#[test]
fn test_initial_frame_is_zero() {
let tm = TransactionManager::new();
assert_eq!(tm.current_frame(), 0);
}
#[test]
fn test_uncommitted_not_visible() {
let tm = TransactionManager::new();
let r = tm.begin_read();
assert_eq!(r.snapshot_frame(), 0);
let _w = tm.begin_write().expect("w");
let r2 = tm.begin_read();
assert_eq!(r2.snapshot_frame(), 0);
}
#[test]
fn test_reader_not_blocked_by_writer() {
let tm = TransactionManager::new();
let _w = tm.begin_write().expect("write");
let r = tm.begin_read();
assert_eq!(r.snapshot_frame(), 0);
}
}