use std::sync::{Arc, Mutex};
use crate::error::KgError;
use crate::graph::dir_graph::DirGraph;
pub struct Session {
graph: Mutex<Arc<DirGraph>>,
}
impl Session {
pub fn new(graph: DirGraph) -> Self {
Self {
graph: Mutex::new(Arc::new(graph)),
}
}
pub fn from_arc(graph: Arc<DirGraph>) -> Self {
Self {
graph: Mutex::new(graph),
}
}
pub fn snapshot(&self) -> Arc<DirGraph> {
Arc::clone(&self.graph.lock().unwrap_or_else(|p| p.into_inner()))
}
pub fn version(&self) -> u64 {
self.snapshot().version()
}
pub fn begin(&self) -> Transaction {
let snapshot = self.snapshot();
let base_version = snapshot.version();
Transaction {
snapshot: Some(snapshot),
working: None,
base_version,
read_only: false,
}
}
pub fn begin_read(&self) -> Transaction {
let mut tx = self.begin();
tx.read_only = true;
tx
}
pub fn commit(&self, tx: Transaction, check_occ: bool) -> CommitOutcome {
let (working_opt, base_version) = tx.take_working();
let Some(mut working) = working_opt else {
return CommitOutcome::NoWritesNoOp;
};
if check_occ {
let current_version = self.version();
if current_version != base_version {
return CommitOutcome::ConflictDetected {
current_version,
base_version,
};
}
}
let new_version = base_version + 1;
working.set_version(new_version);
*self.graph.lock().unwrap_or_else(|p| p.into_inner()) = Arc::new(working);
CommitOutcome::Committed { new_version }
}
pub fn rollback(&self, _tx: Transaction) {
}
}
pub struct Transaction {
pub(super) snapshot: Option<Arc<DirGraph>>,
pub(super) working: Option<DirGraph>,
pub(super) base_version: u64,
pub(super) read_only: bool,
}
impl Transaction {
pub fn is_read_only(&self) -> bool {
self.read_only
}
pub fn base_version(&self) -> u64 {
self.base_version
}
pub fn has_writes(&self) -> bool {
self.working.is_some()
}
pub fn current(&self) -> Option<&DirGraph> {
self.working.as_ref().or(self.snapshot.as_deref())
}
pub fn working_mut(&mut self) -> Result<&mut DirGraph, KgError> {
if self.read_only {
return Err(KgError::Argument(
"read-only transaction does not support mutations \
(CREATE/SET/DELETE/REMOVE/MERGE) — open a read-write tx \
via Session::begin"
.to_string(),
));
}
if self.working.is_none() {
let snap = self.snapshot.take().ok_or_else(|| {
KgError::Argument("transaction already committed or rolled back".to_string())
})?;
let working = Arc::try_unwrap(snap).unwrap_or_else(|arc| (*arc).clone());
self.working = Some(working);
}
Ok(self
.working
.as_mut()
.expect("invariant: just materialized above"))
}
pub fn take_working(self) -> (Option<DirGraph>, u64) {
(self.working, self.base_version)
}
}
#[derive(Debug)]
pub enum CommitOutcome {
NoWritesNoOp,
Committed { new_version: u64 },
ConflictDetected {
current_version: u64,
base_version: u64,
},
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_graph() -> DirGraph {
DirGraph::new()
}
#[test]
fn new_session_version_is_zero() {
let s = Session::new(empty_graph());
assert_eq!(s.version(), 0);
}
#[test]
fn snapshot_is_cheap_arc_clone() {
let s = Session::new(empty_graph());
let snap1 = s.snapshot();
let snap2 = s.snapshot();
assert!(Arc::ptr_eq(&snap1, &snap2));
}
#[test]
fn begin_then_commit_no_writes_is_noop() {
let s = Session::new(empty_graph());
let tx = s.begin();
let outcome = s.commit(tx, true);
assert!(matches!(outcome, CommitOutcome::NoWritesNoOp));
assert_eq!(s.version(), 0);
}
#[test]
fn begin_then_rollback_is_noop() {
let s = Session::new(empty_graph());
let tx = s.begin();
s.rollback(tx);
assert_eq!(s.version(), 0);
}
#[test]
fn working_mut_materializes_only_on_first_call() {
let s = Session::new(empty_graph());
let mut tx = s.begin();
assert!(!tx.has_writes());
assert!(tx.current().is_some());
let _ = tx.working_mut().unwrap();
assert!(tx.has_writes());
assert!(tx.snapshot.is_none());
assert!(tx.working.is_some());
}
#[test]
fn current_routes_through_working_after_materialize() {
let s = Session::new(empty_graph());
let mut tx = s.begin();
let _ = tx.working_mut().unwrap();
let _: &DirGraph = tx.current().unwrap();
}
#[test]
fn commit_with_writes_bumps_version() {
let s = Session::new(empty_graph());
let mut tx = s.begin();
let _ = tx.working_mut().unwrap();
let outcome = s.commit(tx, true);
match outcome {
CommitOutcome::Committed { new_version } => assert_eq!(new_version, 1),
other => panic!("expected Committed, got {other:?}"),
}
assert_eq!(s.version(), 1);
}
#[test]
fn read_only_tx_rejects_working_mut() {
let s = Session::new(empty_graph());
let mut tx = s.begin_read();
assert!(tx.is_read_only());
match tx.working_mut() {
Err(KgError::Argument(msg)) => assert!(msg.contains("read-only")),
Err(other) => panic!("expected Argument, got different error: {other}"),
Ok(_) => panic!("expected read-only rejection but got Ok"),
}
}
#[test]
fn read_only_tx_commit_is_noop() {
let s = Session::new(empty_graph());
let tx = s.begin_read();
let outcome = s.commit(tx, true);
assert!(matches!(outcome, CommitOutcome::NoWritesNoOp));
assert_eq!(s.version(), 0);
}
#[test]
fn occ_conflict_detected_when_other_writer_commits() {
let s = Arc::new(Session::new(empty_graph()));
let mut tx_a = s.begin();
let _ = tx_a.working_mut().unwrap();
let mut tx_b = s.begin();
let _ = tx_b.working_mut().unwrap();
let outcome_b = s.commit(tx_b, true);
assert!(matches!(
outcome_b,
CommitOutcome::Committed { new_version: 1 }
));
let outcome_a = s.commit(tx_a, true);
match outcome_a {
CommitOutcome::ConflictDetected {
current_version,
base_version,
} => {
assert_eq!(current_version, 1);
assert_eq!(base_version, 0);
}
other => panic!("expected ConflictDetected, got {other:?}"),
}
assert_eq!(s.version(), 1);
}
#[test]
fn occ_disabled_means_last_writer_wins() {
let s = Arc::new(Session::new(empty_graph()));
let mut tx_a = s.begin();
let _ = tx_a.working_mut().unwrap();
let mut tx_b = s.begin();
let _ = tx_b.working_mut().unwrap();
let outcome_a = s.commit(tx_a, false);
let outcome_b = s.commit(tx_b, false);
assert!(matches!(outcome_a, CommitOutcome::Committed { .. }));
assert!(matches!(outcome_b, CommitOutcome::Committed { .. }));
assert_eq!(s.version(), 1);
}
#[test]
fn snapshot_after_commit_sees_new_graph() {
let s = Session::new(empty_graph());
let pre = s.snapshot();
assert_eq!(pre.version(), 0);
let mut tx = s.begin();
let _ = tx.working_mut().unwrap();
let _ = s.commit(tx, true);
let post = s.snapshot();
assert_eq!(post.version(), 1);
assert!(!Arc::ptr_eq(&pre, &post));
}
#[test]
fn double_commit_via_take_working_drops_state() {
let s = Session::new(empty_graph());
let tx = s.begin();
let _ = s.commit(tx, true);
}
}