use std::sync::Arc;
use grafeo_common::types::{EpochId, Value};
use grafeo_core::graph::lpg::LpgStore;
use grafeo_engine::{
GrafeoDB,
transaction::{TransactionManager, TransactionState},
};
fn create_test_store() -> Arc<LpgStore> {
let store = Arc::new(LpgStore::new().unwrap());
let alice_id = store.create_node_with_props(
&["Person"],
[
("name", Value::String("Alix".into())),
("age", Value::Int64(30)),
],
);
let bob_id = store.create_node_with_props(
&["Person"],
[
("name", Value::String("Gus".into())),
("age", Value::Int64(25)),
],
);
store.create_edge(alice_id, bob_id, "KNOWS");
store
}
#[test]
fn test_snapshot_isolation_reads_see_consistent_data() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let tx1 = tx_manager.begin();
let epoch1 = tx_manager.start_epoch(tx1).unwrap();
let new_epoch = EpochId::new(epoch1.as_u64() + 10);
let new_tx = tx_manager.begin();
store.create_node_versioned(&["Person"], new_epoch, new_tx);
tx_manager.commit(new_tx).unwrap();
let all_nodes = store.node_ids();
let visible_at_epoch1: Vec<_> = all_nodes
.iter()
.filter(|id| store.get_node_versioned(**id, epoch1, tx1).is_some())
.collect();
assert_eq!(
visible_at_epoch1.len(),
2,
"T1 should only see 2 initial nodes"
);
}
#[test]
fn test_transaction_sees_own_writes() {
let store = Arc::new(LpgStore::new().unwrap());
let tx_manager = Arc::new(TransactionManager::new());
let tx1 = tx_manager.begin();
let epoch = tx_manager.current_epoch();
let node_id = store.create_node_versioned(&["TestNode"], epoch, tx1);
let visible = store.get_node_versioned(node_id, epoch, tx1);
assert!(
visible.is_some(),
"Transaction should see its own uncommitted writes"
);
}
#[test]
fn test_committed_writes_visible_to_new_transactions() {
let store = Arc::new(LpgStore::new().unwrap());
let tx_manager = Arc::new(TransactionManager::new());
let tx1 = tx_manager.begin();
let epoch1 = tx_manager.current_epoch();
let node_id = store.create_node_versioned(&["Committed"], epoch1, tx1);
let commit_epoch = tx_manager.commit(tx1).unwrap();
store.finalize_version_epochs(tx1, commit_epoch);
let tx2 = tx_manager.begin();
let epoch2 = tx_manager.current_epoch();
let visible = store.get_node_versioned(node_id, epoch2, tx2);
assert!(
visible.is_some(),
"New transaction should see committed writes"
);
}
#[test]
fn test_write_write_conflict_detection() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let node_id = store.node_ids()[0];
let tx1 = tx_manager.begin();
tx_manager.record_write(tx1, node_id).unwrap();
let tx2 = tx_manager.begin();
let result = tx_manager.record_write(tx2, node_id);
assert!(
result.is_err(),
"Second writer should be rejected immediately"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("conflict"),
"Error should indicate write conflict: {}",
err
);
}
#[test]
fn test_committed_transaction_conflict_at_commit() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let node_id = store.node_ids()[0];
let tx1 = tx_manager.begin();
let tx2 = tx_manager.begin();
tx_manager.record_write(tx1, node_id).unwrap();
tx_manager.commit(tx1).unwrap();
tx_manager.record_write(tx2, node_id).unwrap();
let result = tx_manager.commit(tx2);
assert!(
result.is_err(),
"T2 should fail at commit due to write-write conflict with committed T1"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("conflict"),
"Error should indicate write conflict: {err}"
);
}
#[test]
fn test_post_commit_write_succeeds() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let node_id = store.node_ids()[0];
let tx1 = tx_manager.begin();
tx_manager.record_write(tx1, node_id).unwrap();
tx_manager.commit(tx1).unwrap();
let tx2 = tx_manager.begin();
tx_manager.record_write(tx2, node_id).unwrap();
assert!(
tx_manager.commit(tx2).is_ok(),
"T2 should commit after T1 committed (no conflict)"
);
}
#[test]
fn test_non_overlapping_writes_succeed() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let nodes = store.node_ids();
assert!(nodes.len() >= 2, "Need at least 2 nodes for this test");
let tx1 = tx_manager.begin();
tx_manager.record_write(tx1, nodes[0]).unwrap();
let tx2 = tx_manager.begin();
tx_manager.record_write(tx2, nodes[1]).unwrap();
assert!(tx_manager.commit(tx1).is_ok(), "T1 should commit");
assert!(
tx_manager.commit(tx2).is_ok(),
"T2 should commit (no conflict)"
);
}
#[test]
fn test_rollback_makes_writes_invisible() {
let store = Arc::new(LpgStore::new().unwrap());
let tx_manager = Arc::new(TransactionManager::new());
let tx1 = tx_manager.begin();
let epoch = tx_manager.current_epoch();
let node_id = store.create_node_versioned(&["Rollback"], epoch, tx1);
tx_manager.record_write(tx1, node_id).unwrap();
tx_manager.abort(tx1).unwrap();
store.discard_uncommitted_versions(tx1);
let tx2 = tx_manager.begin();
let epoch2 = tx_manager.current_epoch();
let visible = store.get_node_versioned(node_id, epoch2, tx2);
assert!(
visible.is_none(),
"Aborted transaction's writes should not be visible"
);
}
#[test]
fn test_abort_releases_write_locks() {
let store = create_test_store();
let tx_manager = Arc::new(TransactionManager::new());
let node_id = store.node_ids()[0];
let tx1 = tx_manager.begin();
tx_manager.record_write(tx1, node_id).unwrap();
tx_manager.abort(tx1).unwrap();
tx_manager.gc();
let tx2 = tx_manager.begin();
tx_manager.record_write(tx2, node_id).unwrap();
assert!(
tx_manager.commit(tx2).is_ok(),
"T2 should commit after T1 aborted"
);
}
#[test]
fn test_gc_cleans_up_completed_transactions() {
let tx_manager = Arc::new(TransactionManager::new());
for _ in 0..5 {
let tx = tx_manager.begin();
tx_manager.commit(tx).unwrap();
}
for _ in 0..3 {
let tx = tx_manager.begin();
tx_manager.abort(tx).unwrap();
}
assert_eq!(tx_manager.active_count(), 0, "No active transactions");
let cleaned = tx_manager.gc();
assert_eq!(
cleaned, 8,
"GC should clean up all 8 completed transactions"
);
}
#[test]
fn test_session_transaction_isolation() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
session
.execute("INSERT (:TestIsolation {value: 42})")
.unwrap();
session.commit().unwrap();
let result = session
.execute("MATCH (n:TestIsolation) RETURN n.value")
.unwrap();
assert!(
result.row_count() >= 1,
"Node should be visible after commit"
);
}
#[test]
fn test_session_rollback_state() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
assert!(
session.in_transaction(),
"Should be in transaction after begin"
);
session.rollback().unwrap();
assert!(
!session.in_transaction(),
"Should not be in transaction after rollback"
);
session.begin_transaction().unwrap();
assert!(
session.in_transaction(),
"Should be able to begin new transaction"
);
session.commit().unwrap();
}
#[test]
fn test_empty_transaction_commits_successfully() {
let tx_manager = Arc::new(TransactionManager::new());
let tx = tx_manager.begin();
let result = tx_manager.commit(tx);
assert!(
result.is_ok(),
"Empty transaction should commit successfully"
);
}
#[test]
fn test_transaction_state_transitions() {
let tx_manager = Arc::new(TransactionManager::new());
let tx = tx_manager.begin();
assert_eq!(tx_manager.state(tx), Some(TransactionState::Active));
tx_manager.commit(tx).unwrap();
assert_eq!(tx_manager.state(tx), Some(TransactionState::Committed));
let tx2 = tx_manager.begin();
tx_manager.abort(tx2).unwrap();
assert_eq!(tx_manager.state(tx2), Some(TransactionState::Aborted));
}
#[test]
fn test_double_commit_fails() {
let tx_manager = Arc::new(TransactionManager::new());
let tx = tx_manager.begin();
tx_manager.commit(tx).unwrap();
let result = tx_manager.commit(tx);
assert!(result.is_err(), "Double commit should fail");
}
#[test]
fn test_commit_after_abort_fails() {
let tx_manager = Arc::new(TransactionManager::new());
let tx = tx_manager.begin();
tx_manager.abort(tx).unwrap();
let result = tx_manager.commit(tx);
assert!(result.is_err(), "Commit after abort should fail");
}
#[test]
fn test_many_concurrent_transactions() {
let store = Arc::new(LpgStore::new().unwrap());
let tx_manager = Arc::new(TransactionManager::new());
let mut transactions = Vec::new();
for _ in 0..100 {
let tx = tx_manager.begin();
let epoch = tx_manager.current_epoch();
let node_id = store.create_node_versioned(&["Stress"], epoch, tx);
tx_manager.record_write(tx, node_id).unwrap();
transactions.push((tx, node_id));
}
assert_eq!(
tx_manager.active_count(),
100,
"Should have 100 active transactions"
);
for (i, (tx, _)) in transactions.iter().enumerate() {
if i % 2 == 0 {
tx_manager.commit(*tx).unwrap();
} else {
tx_manager.abort(*tx).unwrap();
}
}
assert_eq!(
tx_manager.active_count(),
0,
"No active transactions after completion"
);
let cleaned = tx_manager.gc();
assert_eq!(cleaned, 100, "GC should clean up all 100 transactions");
}
#[test]
fn test_multiple_sessions_independent() {
let db = GrafeoDB::new_in_memory();
let session1 = db.session();
let session2 = db.session();
session1.execute("INSERT (:Session1Node)").unwrap();
session2.execute("INSERT (:Session2Node)").unwrap();
let r1 = session1.execute("MATCH (n:Session1Node) RETURN n").unwrap();
let r2 = session2.execute("MATCH (n:Session2Node) RETURN n").unwrap();
assert!(r1.row_count() >= 1, "Session 1 should see its data");
assert!(r2.row_count() >= 1, "Session 2 should see its data");
}
#[test]
fn test_session_auto_commit_mode() {
let db = GrafeoDB::new_in_memory();
let session = db.session();
assert!(session.auto_commit());
session.execute("INSERT (:AutoCommit)").unwrap();
let session2 = db.session();
let result = session2.execute("MATCH (n:AutoCommit) RETURN n").unwrap();
assert!(
result.row_count() >= 1,
"Auto-committed data should be visible"
);
}
#[test]
fn edge_type_visible_after_tx_commit_autocommit_edge() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {id: 'tx_a'})").unwrap();
session.execute("INSERT (:Person {id: 'tx_b'})").unwrap();
session.commit().unwrap();
session
.execute("MATCH (a {id: 'tx_a'}), (b {id: 'tx_b'}) CREATE (a)-[:KNOWS]->(b)")
.unwrap();
let result = session
.execute("MATCH ({id: 'tx_a'})-[r]->() RETURN type(r) AS t")
.unwrap();
assert_eq!(result.row_count(), 1, "Edge should exist");
assert_eq!(
result.rows()[0][0],
Value::String("KNOWS".into()),
"Edge type must not be NULL after tx-committed nodes"
);
}
#[test]
fn edge_type_visible_after_tx_commit_transaction_edge() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {id: 'tx2_a'})").unwrap();
session.execute("INSERT (:Person {id: 'tx2_b'})").unwrap();
session.commit().unwrap();
session.begin_transaction().unwrap();
session
.execute("MATCH (a {id: 'tx2_a'}), (b {id: 'tx2_b'}) CREATE (a)-[:FRIENDS]->(b)")
.unwrap();
session.commit().unwrap();
let result = session
.execute("MATCH ({id: 'tx2_a'})-[r]->() RETURN type(r) AS t")
.unwrap();
assert_eq!(result.row_count(), 1);
assert_eq!(
result.rows()[0][0],
Value::String("FRIENDS".into()),
"Edge type must be visible after two sequential transactions"
);
}
#[test]
fn edge_type_visible_same_tx() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {id: 'same_a'})").unwrap();
session.execute("INSERT (:Person {id: 'same_b'})").unwrap();
session
.execute("MATCH (a {id: 'same_a'}), (b {id: 'same_b'}) CREATE (a)-[:WORKS_WITH]->(b)")
.unwrap();
session.commit().unwrap();
let result = session
.execute("MATCH ({id: 'same_a'})-[r]->() RETURN type(r) AS t")
.unwrap();
assert_eq!(result.row_count(), 1);
assert_eq!(result.rows()[0][0], Value::String("WORKS_WITH".into()));
}
#[test]
fn edge_types_preserved_bulk_after_tx() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.begin_transaction().unwrap();
for i in 0..20 {
session
.execute(&format!("INSERT (:Node {{idx: {i}}})"))
.unwrap();
}
session.commit().unwrap();
for i in 0..19 {
session
.execute(&format!(
"MATCH (a {{idx: {i}}}), (b {{idx: {next}}}) CREATE (a)-[:NEXT]->(b)",
next = i + 1
))
.unwrap();
}
let result = session
.execute("MATCH ()-[r:NEXT]->() RETURN type(r) AS t")
.unwrap();
assert_eq!(
result.row_count(),
19,
"All 19 typed edges should be found by type filter"
);
}
#[test]
fn edge_types_interleaved_autocommit_and_tx() {
let db = GrafeoDB::new_in_memory();
let mut session = db.session();
session.execute("INSERT (:Person {id: 'auto_x'})").unwrap();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {id: 'tx_y'})").unwrap();
session.commit().unwrap();
session
.execute("MATCH (a {id: 'auto_x'}), (b {id: 'tx_y'}) CREATE (a)-[:LINKED]->(b)")
.unwrap();
let result = session
.execute("MATCH ({id: 'auto_x'})-[r]->() RETURN type(r) AS t")
.unwrap();
assert_eq!(result.row_count(), 1);
assert_eq!(result.rows()[0][0], Value::String("LINKED".into()),);
}