use super::*;
use crate::core::id::TxIdGenerator;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use tempfile::TempDir;
mod tombstone_tests {
use super::*;
fn create_test_write_tx() -> (WriteTransaction, TempDir) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
(tx, temp_dir)
}
#[test]
fn test_tombstone_exhaustion_error() {
let (tx, _temp_dir) = create_test_write_tx();
let mut historical = tx.historical.write();
let commit_ts = time::now();
let mut tombstone_ids = Vec::new().into_iter();
let op = crate::api::transaction::BufferedWrite::DeleteNode {
node_id: NodeId::new(1).unwrap(),
valid_from: time::now(),
};
let result = crate::api::transaction::write::apply::apply_single_write(
&tx,
&op,
commit_ts,
&mut historical,
&mut tombstone_ids,
1, );
assert!(result.is_err());
match result.unwrap_err() {
crate::core::error::Error::Storage(StorageError::InconsistentState { reason }) => {
assert!(reason.contains("Tombstone ID exhaustion"));
}
err => panic!("Expected InconsistentState error, got: {:?}", err),
}
}
}
mod general_tests {
use super::*;
use crate::core::property::PropertyMapBuilder;
fn create_test_write_tx() -> (WriteTransaction, TempDir) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
(tx, temp_dir)
}
#[test]
fn test_write_transaction_creation() {
let (tx, _temp_dir) = create_test_write_tx();
assert_eq!(tx.state, TxState::Active);
let metadata = tx.metadata();
assert!(!metadata.is_read_only);
}
#[test]
fn test_create_node_buffering() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
let node_id = tx.create_node("Person", props.clone()).unwrap();
assert_eq!(node_id.as_u64(), 0);
let node = tx.get_node(node_id).unwrap();
assert_eq!(node.id, node_id);
assert_eq!(
node.properties.get("name").unwrap(),
&crate::core::property::PropertyValue::from("Alice")
);
}
#[test]
fn test_create_edge_buffering() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().build();
let node1 = tx.current.create_node("Person", props.clone()).unwrap();
let node2 = tx.current.create_node("Person", props.clone()).unwrap();
let edge_props = PropertyMapBuilder::new().insert("since", 2020i64).build();
let edge_id = tx
.create_edge(node1, node2, "KNOWS", edge_props.clone())
.unwrap();
assert_eq!(edge_id.as_u64(), 0);
let edge = tx.get_edge(edge_id).unwrap();
assert_eq!(edge.id, edge_id);
assert_eq!(edge.source, node1);
assert_eq!(edge.target, node2);
assert_eq!(
edge.properties.get("since").unwrap(),
&crate::core::property::PropertyValue::from(2020i64)
);
}
#[test]
fn test_commit_applies_changes() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("name", "Bob").build();
let node_id = tx.create_node("Person", props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Bob")
);
}
#[test]
fn test_rollback_discards_changes() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("name", "Charlie").build();
let node_id = tx.create_node("Person", props).unwrap();
tx.rollback().unwrap();
assert!(current.get_node(node_id).is_err());
}
#[test]
fn test_validation_fails_for_invalid_edge() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().build();
let node1 = NodeId::new(999).unwrap();
let node2 = NodeId::new(1000).unwrap();
tx.create_edge(node1, node2, "KNOWS", props).unwrap();
let result = tx.commit();
assert!(result.is_err());
}
#[test]
fn test_auto_rollback_on_drop() {
let current = Arc::new(CurrentStorage::new());
let node_id = {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().build();
tx.create_node("Person", props).unwrap()
};
assert!(current.get_node(node_id).is_err());
}
#[test]
fn test_update_node() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("age", 30i64).build();
let node_id = current.create_node("Person", props).unwrap();
let new_props = PropertyMapBuilder::new().insert("age", 31i64).build();
tx.update_node(node_id, new_props.clone()).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(node.get_property("age").and_then(|v| v.as_int()), Some(31));
}
#[test]
fn test_update_node_not_found() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().insert("age", 30i64).build();
let result = tx.update_node(NodeId::new(999).unwrap(), props);
assert!(result.is_err());
}
#[test]
fn test_update_node_patch_preserves_existing_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let initial_props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.insert("city", "London")
.build();
let node_id = current.create_node("Person", initial_props).unwrap();
let update_props = PropertyMapBuilder::new().insert("age", 31i64).build();
tx.update_node(node_id, update_props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Alice")
);
assert_eq!(node.get_property("age").and_then(|v| v.as_int()), Some(31));
assert_eq!(
node.get_property("city").and_then(|v| v.as_str()),
Some("London")
);
}
#[test]
fn test_update_node_patch_adds_new_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let initial_props = PropertyMapBuilder::new().insert("name", "Bob").build();
let node_id = current.create_node("Person", initial_props).unwrap();
let update_props = PropertyMapBuilder::new().insert("age", 25i64).build();
tx.update_node(node_id, update_props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Bob")
);
assert_eq!(node.get_property("age").and_then(|v| v.as_int()), Some(25));
}
#[test]
fn test_update_node_patch_modifies_multiple_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let initial_props = PropertyMapBuilder::new()
.insert("name", "Charlie")
.insert("age", 40i64)
.insert("city", "Paris")
.insert("occupation", "Engineer")
.build();
let node_id = current.create_node("Person", initial_props).unwrap();
let update_props = PropertyMapBuilder::new()
.insert("age", 41i64)
.insert("city", "Berlin")
.build();
tx.update_node(node_id, update_props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Charlie")
);
assert_eq!(node.get_property("age").and_then(|v| v.as_int()), Some(41));
assert_eq!(
node.get_property("city").and_then(|v| v.as_str()),
Some("Berlin")
);
assert_eq!(
node.get_property("occupation").and_then(|v| v.as_str()),
Some("Engineer")
);
}
#[test]
fn test_update_node_patch_empty_update() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let initial_props = PropertyMapBuilder::new()
.insert("name", "Diana")
.insert("age", 35i64)
.build();
let node_id = current.create_node("Person", initial_props).unwrap();
let update_props = PropertyMapBuilder::new().build();
tx.update_node(node_id, update_props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Diana")
);
assert_eq!(node.get_property("age").and_then(|v| v.as_int()), Some(35));
}
#[test]
fn test_update_node_patch_with_vector_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let embedding = vec![0.1f32, 0.2, 0.3];
let initial_props = PropertyMapBuilder::new()
.insert("name", "Eve")
.insert_vector("embedding", &embedding)
.build();
let node_id = current.create_node("Document", initial_props).unwrap();
let update_props = PropertyMapBuilder::new()
.insert("name", "Eve Updated")
.build();
tx.update_node(node_id, update_props).unwrap();
tx.commit().unwrap();
let node = current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("name").and_then(|v| v.as_str()),
Some("Eve Updated")
);
assert!(node.get_property("embedding").is_some());
if let Some(vec_val) = node.get_property("embedding").and_then(|v| v.as_vector()) {
assert_eq!(vec_val, &[0.1f32, 0.2, 0.3]);
} else {
panic!("Vector property not found or wrong type");
}
}
#[test]
fn test_update_edge() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let edge_props = PropertyMapBuilder::new().insert("strength", 5i64).build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", edge_props)
.unwrap();
let new_props = PropertyMapBuilder::new().insert("strength", 10i64).build();
tx.update_edge(edge_id, new_props.clone()).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("strength").and_then(|v| v.as_int()),
Some(10)
);
}
#[test]
fn test_update_edge_not_found() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().insert("strength", 5i64).build();
let result = tx.update_edge(EdgeId::new(999).unwrap(), props);
assert!(result.is_err());
}
#[test]
fn test_update_edge_patch_preserves_existing_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let initial_props = PropertyMapBuilder::new()
.insert("weight", 5i64)
.insert("type", "friendship")
.insert("since", "2020")
.build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", initial_props)
.unwrap();
let update_props = PropertyMapBuilder::new().insert("weight", 10i64).build();
tx.update_edge(edge_id, update_props).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("weight").and_then(|v| v.as_int()),
Some(10)
);
assert_eq!(
edge.get_property("type").and_then(|v| v.as_str()),
Some("friendship")
);
assert_eq!(
edge.get_property("since").and_then(|v| v.as_str()),
Some("2020")
);
}
#[test]
fn test_update_edge_patch_adds_new_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let initial_props = PropertyMapBuilder::new().insert("weight", 5i64).build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", initial_props)
.unwrap();
let update_props = PropertyMapBuilder::new()
.insert("type", "colleague")
.build();
tx.update_edge(edge_id, update_props).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("weight").and_then(|v| v.as_int()),
Some(5)
);
assert_eq!(
edge.get_property("type").and_then(|v| v.as_str()),
Some("colleague")
);
}
#[test]
fn test_update_edge_patch_modifies_multiple_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let initial_props = PropertyMapBuilder::new()
.insert("weight", 5i64)
.insert("type", "friendship")
.insert("since", "2020")
.insert("active", true)
.build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", initial_props)
.unwrap();
let update_props = PropertyMapBuilder::new()
.insert("weight", 8i64)
.insert("since", "2021")
.build();
tx.update_edge(edge_id, update_props).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("weight").and_then(|v| v.as_int()),
Some(8)
);
assert_eq!(
edge.get_property("type").and_then(|v| v.as_str()),
Some("friendship")
);
assert_eq!(
edge.get_property("since").and_then(|v| v.as_str()),
Some("2021")
);
assert_eq!(
edge.get_property("active").and_then(|v| v.as_bool()),
Some(true)
);
}
#[test]
fn test_update_edge_patch_empty_update() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let initial_props = PropertyMapBuilder::new()
.insert("weight", 5i64)
.insert("type", "friendship")
.build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", initial_props)
.unwrap();
let update_props = PropertyMapBuilder::new().build();
tx.update_edge(edge_id, update_props).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("weight").and_then(|v| v.as_int()),
Some(5)
);
assert_eq!(
edge.get_property("type").and_then(|v| v.as_str()),
Some("friendship")
);
}
#[test]
fn test_update_edge_patch_with_vector_properties() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let embedding = vec![0.5f32, 0.6, 0.7];
let initial_props = PropertyMapBuilder::new()
.insert("weight", 5i64)
.insert_vector("embedding", &embedding)
.build();
let edge_id = current
.create_edge(node1, node2, "SIMILAR", initial_props)
.unwrap();
let update_props = PropertyMapBuilder::new().insert("weight", 10i64).build();
tx.update_edge(edge_id, update_props).unwrap();
tx.commit().unwrap();
let edge = current.get_edge(edge_id).unwrap();
assert_eq!(
edge.get_property("weight").and_then(|v| v.as_int()),
Some(10)
);
assert!(edge.get_property("embedding").is_some());
if let Some(vec_val) = edge.get_property("embedding").and_then(|v| v.as_vector()) {
assert_eq!(vec_val, &[0.5f32, 0.6, 0.7]);
} else {
panic!("Vector property not found or wrong type");
}
}
#[test]
fn test_delete_node() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node_id = current.create_node("Person", props).unwrap();
assert!(current.get_node(node_id).is_ok());
tx.delete_node(node_id).unwrap();
tx.commit().unwrap();
assert!(current.get_node(node_id).is_err());
}
#[test]
fn test_delete_node_not_found() {
let (mut tx, _temp_dir) = create_test_write_tx();
let result = tx.delete_node(NodeId::new(999).unwrap());
assert!(result.is_err());
}
#[test]
fn test_delete_edge() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props).unwrap();
let edge_props = PropertyMapBuilder::new().build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", edge_props)
.unwrap();
assert!(current.get_edge(edge_id).is_ok());
tx.delete_edge(edge_id).unwrap();
tx.commit().unwrap();
assert!(current.get_edge(edge_id).is_err());
}
#[test]
fn test_delete_edge_not_found() {
let (mut tx, _temp_dir) = create_test_write_tx();
let result = tx.delete_edge(EdgeId::new(999).unwrap());
assert!(result.is_err());
}
#[test]
fn test_commit_after_commit_fails() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().build();
tx.create_node("Person", props).unwrap();
tx.commit().unwrap();
}
#[test]
fn test_operations_after_commit_prevented_by_move() {
let (mut tx, _temp_dir) = create_test_write_tx();
let props = PropertyMapBuilder::new().build();
tx.create_node("Person", props).unwrap();
tx.commit().unwrap();
}
#[test]
fn test_read_ops_delegation() {
let (tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props.clone()).unwrap();
current.create_edge(node1, node2, "KNOWS", props).unwrap();
assert_eq!(tx.node_count(), 2);
assert_eq!(tx.edge_count(), 1);
assert!(tx.get_node(node1).is_ok());
assert_eq!(tx.get_outgoing_edges(node1).len(), 1);
assert_eq!(tx.get_incoming_edges(node2).len(), 1);
assert_eq!(tx.get_outgoing_edges_with_label(node1, "KNOWS").len(), 1);
}
#[test]
fn test_delete_node_creates_tombstone() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let historical = Arc::clone(&tx.historical);
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build();
let node_id = current.create_node("Person", props).unwrap();
assert!(current.get_node(node_id).is_ok());
tx.delete_node(node_id).unwrap();
tx.commit().unwrap();
assert!(current.get_node(node_id).is_err());
let historical = historical.read();
let stats = historical.stats();
assert!(
stats.total_node_versions > 0,
"Expected at least one node version (tombstone) in historical storage"
);
}
#[test]
fn test_delete_edge_creates_tombstone() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let historical = Arc::clone(&tx.historical);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props.clone()).unwrap();
let edge_props = PropertyMapBuilder::new().insert("since", 2020i64).build();
let edge_id = current
.create_edge(node1, node2, "KNOWS", edge_props)
.unwrap();
assert!(current.get_edge(edge_id).is_ok());
tx.delete_edge(edge_id).unwrap();
tx.commit().unwrap();
assert!(current.get_edge(edge_id).is_err());
let historical = historical.read();
let stats = historical.stats();
assert!(
stats.total_edge_versions > 0,
"Expected at least one edge version (tombstone) in historical storage"
);
}
#[test]
fn test_read_your_writes_update() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("age", 30i64).build();
let node_id = current.create_node("Person", props).unwrap();
let new_props = PropertyMapBuilder::new().insert("age", 31i64).build();
tx.update_node(node_id, new_props).unwrap();
let node = tx.get_node(node_id).unwrap();
assert_eq!(
node.properties.get("age").unwrap(),
&crate::core::property::PropertyValue::from(31i64)
);
}
#[test]
fn test_read_your_writes_delete() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node_id = current.create_node("Person", props).unwrap();
tx.delete_node(node_id).unwrap();
assert!(tx.get_node(node_id).is_err());
}
#[test]
fn test_empty_transaction_commit() {
let (tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
tx.commit().unwrap();
assert_eq!(current.node_count(), 0);
assert_eq!(current.edge_count(), 0);
}
#[test]
fn test_empty_transaction_with_only_node_operations() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
tx.create_node("Person", props).unwrap();
tx.commit().unwrap();
assert_eq!(current.node_count(), 1);
assert_eq!(current.edge_count(), 0);
}
#[test]
fn test_transaction_with_edge_operations() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
let alice = tx.create_node("Person", props).unwrap();
let props = PropertyMapBuilder::new().insert("name", "Bob").build();
let bob = tx.create_node("Person", props).unwrap();
let edge_props = PropertyMapBuilder::new().insert("since", 2020i64).build();
let edge_id = tx.create_edge(alice, bob, "KNOWS", edge_props).unwrap();
tx.commit().unwrap();
assert_eq!(current.node_count(), 2);
assert_eq!(current.edge_count(), 1);
let outgoing = current.get_outgoing_edges(alice);
assert_eq!(outgoing.len(), 1);
assert_eq!(outgoing[0], edge_id);
let incoming = current.get_incoming_edges(bob);
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0], edge_id);
}
#[test]
fn test_edge_commit_does_not_force_adjacency_compaction() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let source = tx.create_node("Person", props.clone()).unwrap();
let target = tx.create_node("Person", props).unwrap();
tx.create_edge(
source,
target,
"KNOWS",
PropertyMapBuilder::new().insert("since", 2020i64).build(),
)
.unwrap();
tx.commit().unwrap();
assert_eq!(current.edge_count(), 1);
assert_eq!(current.out_degree(source), 1);
assert_eq!(current.delta_edge_count(), 1);
}
#[test]
fn test_interleaved_create_update_delete_operations() {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot1 = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let mut tx1 = WriteTransaction::new(
tx_id_gen.next(),
snapshot1,
current.clone(),
historical.clone(),
temporal_indexes.clone(),
wal.clone(),
current_timestamp.clone(),
visibility_manager.clone(),
node_id_gen.clone(),
edge_id_gen.clone(),
version_id_gen.clone(),
);
let props = PropertyMapBuilder::new().build();
let node1 = tx1.create_node("Person", props.clone()).unwrap();
let node2 = tx1.create_node("Person", props.clone()).unwrap();
let node3 = tx1.create_node("Person", props.clone()).unwrap();
let edge_props = PropertyMapBuilder::new().insert("weight", 5i64).build();
let edge1 = tx1.create_edge(node1, node2, "KNOWS", edge_props).unwrap();
tx1.commit().unwrap();
assert_eq!(current.edge_count(), 1);
let snapshot2 = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let mut tx2 = WriteTransaction::new(
tx_id_gen.next(),
snapshot2,
current.clone(),
historical.clone(),
temporal_indexes.clone(),
wal.clone(),
current_timestamp.clone(),
visibility_manager.clone(),
node_id_gen.clone(),
edge_id_gen.clone(),
version_id_gen.clone(),
);
tx2.create_edge(
node2,
node3,
"FOLLOWS",
PropertyMapBuilder::new().insert("weight", 8i64).build(),
)
.unwrap();
tx2.update_edge(
edge1,
PropertyMapBuilder::new().insert("weight", 7i64).build(),
)
.unwrap();
tx2.create_edge(node1, node3, "LIKES", PropertyMapBuilder::new().build())
.unwrap();
tx2.commit().unwrap();
assert_eq!(current.edge_count(), 3);
let updated_edge = current.get_edge(edge1).unwrap();
assert_eq!(
updated_edge.get_property("weight").and_then(|v| v.as_int()),
Some(7)
);
assert_eq!(current.out_degree(node1), 2); assert_eq!(current.out_degree(node2), 1); assert_eq!(current.in_degree(node3), 2); }
#[test]
fn test_tombstone_id_count_matches_deletes() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node1 = current.create_node("Person", props.clone()).unwrap();
let node2 = current.create_node("Person", props.clone()).unwrap();
let node3 = current.create_node("Person", props.clone()).unwrap();
let node4 = current.create_node("Person", props.clone()).unwrap();
let edge1 = current
.create_edge(node1, node2, "KNOWS", props.clone())
.unwrap();
let edge2 = current
.create_edge(node2, node3, "KNOWS", props.clone())
.unwrap();
let edge3 = current
.create_edge(node3, node4, "KNOWS", props.clone())
.unwrap();
let node5 = tx.create_node("Person", props.clone()).unwrap();
tx.create_edge(node1, node5, "FOLLOWS", props.clone())
.unwrap();
tx.update_node(
node4,
PropertyMapBuilder::new().insert("age", 30i64).build(),
)
.unwrap();
tx.delete_node(node3).unwrap(); tx.delete_node(node4).unwrap(); tx.delete_edge(edge1).unwrap(); tx.delete_edge(edge2).unwrap();
let result = tx.commit();
assert!(
result.is_ok(),
"Commit should succeed with correct tombstone ID count"
);
assert!(current.get_node(node3).is_err());
assert!(current.get_node(node4).is_err());
assert!(current.get_edge(edge1).is_err());
assert!(current.get_edge(edge2).is_err());
assert!(current.get_node(node1).is_ok());
assert!(current.get_node(node2).is_ok());
assert!(current.get_node(node5).is_ok());
assert!(current.get_edge(edge3).is_ok());
}
#[test]
fn test_batch_edge_operations_visible_without_manual_compaction() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let mut nodes = Vec::new();
for i in 0..100 {
let node = tx
.create_node(
"Node",
PropertyMapBuilder::new().insert("id", i as i64).build(),
)
.unwrap();
nodes.push(node);
}
for i in 0..99 {
tx.create_edge(
nodes[i],
nodes[i + 1],
"CONNECTS",
PropertyMapBuilder::new().build(),
)
.unwrap();
}
tx.commit().unwrap();
assert_eq!(current.edge_count(), 99);
for i in 0..99 {
assert_eq!(current.out_degree(nodes[i]), 1);
assert_eq!(current.in_degree(nodes[i + 1]), 1);
}
}
#[test]
fn test_delete_node_cascade_removes_edges() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let central_node = tx.create_node("Person", props.clone()).unwrap();
let node1 = tx.create_node("Person", props.clone()).unwrap();
let node2 = tx.create_node("Person", props.clone()).unwrap();
let node3 = tx.create_node("Person", props).unwrap();
let edge1 = tx
.create_edge(
central_node,
node1,
"KNOWS",
PropertyMapBuilder::new().build(),
)
.unwrap();
let edge2 = tx
.create_edge(
central_node,
node2,
"FOLLOWS",
PropertyMapBuilder::new().build(),
)
.unwrap();
let edge3 = tx
.create_edge(
node3,
central_node,
"LIKES",
PropertyMapBuilder::new().build(),
)
.unwrap();
tx.commit().unwrap();
assert!(current.get_edge(edge1).is_ok());
assert!(current.get_edge(edge2).is_ok());
assert!(current.get_edge(edge3).is_ok());
let (mut tx2, _temp_dir2) = create_test_write_tx_from_existing(Arc::clone(¤t));
tx2.delete_node_cascade(central_node).unwrap();
tx2.commit().unwrap();
assert!(current.get_node(central_node).is_err());
assert!(
current.get_edge(edge1).is_err(),
"Outgoing edge should be deleted with cascade"
);
assert!(
current.get_edge(edge2).is_err(),
"Outgoing edge should be deleted with cascade"
);
assert!(
current.get_edge(edge3).is_err(),
"Incoming edge should be deleted with cascade"
);
assert!(current.get_node(node1).is_ok());
assert!(current.get_node(node2).is_ok());
assert!(current.get_node(node3).is_ok());
}
#[test]
fn test_delete_node_no_cascade_keeps_edges() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let central_node = tx.create_node("Person", props.clone()).unwrap();
let node1 = tx.create_node("Person", props).unwrap();
let edge1 = tx
.create_edge(
central_node,
node1,
"KNOWS",
PropertyMapBuilder::new().build(),
)
.unwrap();
tx.commit().unwrap();
assert!(current.get_edge(edge1).is_ok());
let (mut tx2, _temp_dir2) = create_test_write_tx_from_existing(Arc::clone(¤t));
tx2.delete_node(central_node).unwrap();
tx2.commit().unwrap();
assert!(current.get_node(central_node).is_err());
assert!(
current.get_edge(edge1).is_ok(),
"Edge should remain when cascade is not enabled (current behavior)"
);
}
#[test]
fn test_delete_node_cascade_with_bidirectional_edges() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let node_a = tx.create_node("Person", props.clone()).unwrap();
let node_b = tx.create_node("Person", props).unwrap();
let edge_a_to_b = tx
.create_edge(node_a, node_b, "KNOWS", PropertyMapBuilder::new().build())
.unwrap();
let edge_b_to_a = tx
.create_edge(node_b, node_a, "KNOWS", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
let (mut tx2, _temp_dir2) = create_test_write_tx_from_existing(Arc::clone(¤t));
tx2.delete_node_cascade(node_a).unwrap();
tx2.commit().unwrap();
assert!(current.get_edge(edge_a_to_b).is_err());
assert!(current.get_edge(edge_b_to_a).is_err());
assert!(current.get_node(node_b).is_ok());
}
#[test]
fn test_delete_node_cascade_performance_many_edges() {
let (mut tx, _temp_dir) = create_test_write_tx();
let current = Arc::clone(&tx.current);
let props = PropertyMapBuilder::new().build();
let central_node = tx.create_node("Hub", props.clone()).unwrap();
let mut outgoing_edges = Vec::new();
let mut incoming_edges = Vec::new();
for i in 0..100 {
let target = tx
.create_node(
"Target",
PropertyMapBuilder::new().insert("id", i as i64).build(),
)
.unwrap();
let edge = tx
.create_edge(
central_node,
target,
"OUT",
PropertyMapBuilder::new().build(),
)
.unwrap();
outgoing_edges.push(edge);
}
for i in 0..100 {
let source = tx
.create_node(
"Source",
PropertyMapBuilder::new().insert("id", i as i64).build(),
)
.unwrap();
let edge = tx
.create_edge(
source,
central_node,
"IN",
PropertyMapBuilder::new().build(),
)
.unwrap();
incoming_edges.push(edge);
}
tx.commit().unwrap();
for edge in &outgoing_edges {
assert!(current.get_edge(*edge).is_ok());
}
for edge in &incoming_edges {
assert!(current.get_edge(*edge).is_ok());
}
let (mut tx2, _temp_dir2) = create_test_write_tx_from_existing(Arc::clone(¤t));
let start = std::time::Instant::now();
tx2.delete_node_cascade(central_node).unwrap();
tx2.commit().unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 2000,
"Cascade delete of 200 edges took too long: {:?}",
elapsed
);
assert!(current.get_node(central_node).is_err());
for edge in &outgoing_edges {
assert!(current.get_edge(*edge).is_err());
}
for edge in &incoming_edges {
assert!(current.get_edge(*edge).is_err());
}
}
fn create_test_write_tx_from_existing(
current: Arc<CurrentStorage>,
) -> (WriteTransaction, TempDir) {
use crate::core::id::TxIdGenerator;
use crate::core::temporal::time;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use tempfile::TempDir;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
(tx, temp_dir)
}
}
mod conflict_detection_tests {
use super::*;
use crate::core::id::TxIdGenerator;
use crate::core::property::PropertyMapBuilder;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use tempfile::TempDir;
struct TestHarness {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
current_timestamp: Arc<Mutex<Timestamp>>,
visibility_manager: Arc<TxVisibilityManager>,
node_id_gen: Arc<IdGenerator>,
edge_id_gen: Arc<IdGenerator>,
version_id_gen: Arc<IdGenerator>,
tx_id_gen: TxIdGenerator,
_temp_dir: TempDir, }
impl TestHarness {
fn new() -> Self {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
TestHarness {
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
tx_id_gen,
_temp_dir: temp_dir,
}
}
fn create_tx(&self) -> WriteTransaction {
let snapshot = TransactionSnapshot {
snapshot_timestamp: *self.current_timestamp.lock().unwrap(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
WriteTransaction::new(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
self.current_timestamp.clone(),
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
}
#[test]
fn test_first_committer_wins_node_update() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("age", 30i64).build(),
)
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 31i64).build(),
)
.unwrap();
let mut tx2 = harness.create_tx();
tx2.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 32i64).build(),
)
.unwrap();
tx2.commit().unwrap();
let node_after_tx2 = harness.current.get_node(node_id).unwrap();
assert_eq!(
node_after_tx2.get_property("age").and_then(|v| v.as_int()),
Some(32),
"tx2's update should have been applied"
);
let result = tx1.commit();
assert!(
result.is_err(),
"tx1.commit() should fail due to write-write conflict"
);
let err = result.unwrap_err();
let err_str = format!("{:?}", err);
assert!(
err_str.contains("SerializationFailure"),
"Expected SerializationFailure, got: {}",
err_str
);
let final_node = harness.current.get_node(node_id).unwrap();
assert_eq!(
final_node.get_property("age").and_then(|v| v.as_int()),
Some(32),
"Final value should be tx2's value (first committer wins)"
);
}
#[test]
fn test_first_committer_wins_edge_update() {
let harness = TestHarness::new();
let (node1, node2, edge_id) = {
let mut tx = harness.create_tx();
let n1 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
let n2 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
let e = tx
.create_edge(
n1,
n2,
"KNOWS",
PropertyMapBuilder::new().insert("weight", 5i64).build(),
)
.unwrap();
tx.commit().unwrap();
(n1, n2, e)
};
let mut tx1 = harness.create_tx();
tx1.update_edge(
edge_id,
PropertyMapBuilder::new().insert("weight", 10i64).build(),
)
.unwrap();
let mut tx2 = harness.create_tx();
tx2.update_edge(
edge_id,
PropertyMapBuilder::new().insert("weight", 20i64).build(),
)
.unwrap();
tx2.commit().unwrap();
let result = tx1.commit();
assert!(
result.is_err(),
"tx1.commit() should fail due to edge update conflict"
);
let final_edge = harness.current.get_edge(edge_id).unwrap();
assert_eq!(
final_edge.get_property("weight").and_then(|v| v.as_int()),
Some(20)
);
let _ = (node1, node2);
}
#[test]
fn test_first_committer_wins_node_delete() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 31i64).build(),
)
.unwrap();
let mut tx2 = harness.create_tx();
tx2.delete_node(node_id).unwrap();
tx2.commit().unwrap();
assert!(harness.current.get_node(node_id).is_err());
let result = tx1.commit();
assert!(
result.is_err(),
"tx1.commit() should fail - node was modified (deleted) by tx2"
);
}
#[test]
fn test_delete_delete_conflict() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.delete_node(node_id).unwrap();
let mut tx2 = harness.create_tx();
tx2.delete_node(node_id).unwrap();
tx2.commit().unwrap();
let result = tx1.commit();
assert!(
result.is_err(),
"tx1.commit() should fail - node was already deleted by tx2"
);
}
#[test]
fn test_no_conflict_different_entities() {
let harness = TestHarness::new();
let (node1, node2) = {
let mut tx = harness.create_tx();
let n1 = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
let n2 = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Bob").build(),
)
.unwrap();
tx.commit().unwrap();
(n1, n2)
};
let mut tx1 = harness.create_tx();
tx1.update_node(
node1,
PropertyMapBuilder::new().insert("age", 30i64).build(),
)
.unwrap();
let mut tx2 = harness.create_tx();
tx2.update_node(
node2,
PropertyMapBuilder::new().insert("age", 25i64).build(),
)
.unwrap();
tx2.commit().unwrap();
tx1.commit().unwrap();
assert_eq!(
harness
.current
.get_node(node1)
.unwrap()
.get_property("age")
.and_then(|v| v.as_int()),
Some(30)
);
assert_eq!(
harness
.current
.get_node(node2)
.unwrap()
.get_property("age")
.and_then(|v| v.as_int()),
Some(25)
);
}
#[test]
fn test_no_conflict_for_creates() {
let harness = TestHarness::new();
let mut tx1 = harness.create_tx();
let node1 = tx1
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
let mut tx2 = harness.create_tx();
let node2 = tx2
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx2.commit().unwrap();
tx1.commit().unwrap();
assert!(harness.current.get_node(node1).is_ok());
assert!(harness.current.get_node(node2).is_ok());
}
#[test]
fn test_conflict_error_message() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.update_node(node_id, PropertyMapBuilder::new().build())
.unwrap();
let mut tx2 = harness.create_tx();
tx2.update_node(node_id, PropertyMapBuilder::new().build())
.unwrap();
tx2.commit().unwrap();
let result = tx1.commit();
let err = result.unwrap_err();
let err_str = format!("{:?}", err);
assert!(
err_str.contains("NodeId"),
"Error should mention the entity: {}",
err_str
);
assert!(
err_str.contains("committed") || err_str.contains("snapshot"),
"Error should explain the conflict: {}",
err_str
);
}
#[test]
fn test_delete_then_recreate_race() {
let harness = TestHarness::new();
{
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.delete_node(node_id).unwrap();
let mut tx2 = harness.create_tx();
let new_node_id = tx2
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Bob").build(),
)
.unwrap();
tx1.commit().unwrap();
assert!(
harness.current.get_node(node_id).is_err(),
"Original node should be deleted"
);
tx2.commit().unwrap();
assert!(
harness.current.get_node(new_node_id).is_ok(),
"New node should be created successfully"
);
assert!(
harness.current.get_node(node_id).is_err(),
"Original node should remain deleted"
);
}
{
let node_to_delete = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Charlie").build(),
)
.unwrap();
tx.commit().unwrap();
id
};
let mut tx3 = harness.create_tx();
let created_node_id = tx3
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Dave").build(),
)
.unwrap();
let mut tx4 = harness.create_tx();
tx4.delete_node(node_to_delete).unwrap();
tx3.commit().unwrap();
assert!(
harness.current.get_node(created_node_id).is_ok(),
"New node should be created"
);
tx4.commit().unwrap();
assert!(
harness.current.get_node(node_to_delete).is_err(),
"Original node should be deleted"
);
assert!(
harness.current.get_node(created_node_id).is_ok(),
"Created node should still exist"
);
}
}
#[test]
fn test_update_delete_conflict() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("age", 30i64).build(),
)
.unwrap();
tx.commit().unwrap();
id
};
{
let mut tx1 = harness.create_tx();
tx1.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 35i64).build(),
)
.unwrap();
let mut tx2 = harness.create_tx();
tx2.delete_node(node_id).unwrap();
tx1.commit().unwrap();
let node = harness.current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("age").and_then(|v| v.as_int()),
Some(35),
"Update should be applied"
);
let result = tx2.commit();
assert!(
result.is_err(),
"tx2 should fail - node was modified by tx1"
);
let node = harness.current.get_node(node_id).unwrap();
assert_eq!(
node.get_property("age").and_then(|v| v.as_int()),
Some(35),
"Node should still have tx1's update"
);
}
let node_id_b = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("age", 50i64).build(),
)
.unwrap();
tx.commit().unwrap();
id
};
{
let mut tx3 = harness.create_tx();
tx3.update_node(
node_id_b,
PropertyMapBuilder::new().insert("age", 55i64).build(),
)
.unwrap();
let mut tx4 = harness.create_tx();
tx4.delete_node(node_id_b).unwrap();
tx4.commit().unwrap();
assert!(
harness.current.get_node(node_id_b).is_err(),
"Node should be deleted by tx4"
);
let result = tx3.commit();
assert!(result.is_err(), "tx3 should fail - node was deleted by tx4");
let err = result.unwrap_err();
let err_str = format!("{:?}", err);
assert!(
err_str.contains("NodeId")
|| err_str.contains("deleted")
|| err_str.contains("not found"),
"Error should explain the conflict: {}",
err_str
);
}
}
#[test]
fn test_edge_creation_with_concurrent_node_deletion() {
let harness = TestHarness::new();
let (node1, node2) = {
let mut tx = harness.create_tx();
let n1 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
let n2 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
(n1, n2)
};
{
let mut tx1 = harness.create_tx();
let edge_id = tx1
.create_edge(node1, node2, "KNOWS", PropertyMapBuilder::new().build())
.unwrap();
let mut tx2 = harness.create_tx();
tx2.delete_node(node2).unwrap();
tx1.commit().unwrap();
assert!(
harness.current.get_edge(edge_id).is_ok(),
"Edge should be created"
);
let result = tx2.commit();
assert!(
result.is_ok(),
"tx2 commit should succeed - edge addition doesn't create version conflict on node2"
);
assert!(
harness.current.get_node(node2).is_err(),
"Node should be deleted after successful tx2 commit"
);
assert!(
harness.current.get_edge(edge_id).is_ok(),
"Edge still exists as orphan (documents current behavior)"
);
let edge = harness.current.get_edge(edge_id).unwrap();
assert_eq!(
edge.target, node2,
"Edge still references deleted node (orphaned)"
);
}
let (node3, node4) = {
let mut tx = harness.create_tx();
let n3 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
let n4 = tx
.create_node("Person", PropertyMapBuilder::new().build())
.unwrap();
tx.commit().unwrap();
(n3, n4)
};
{
let mut tx3 = harness.create_tx();
let mut tx4 = harness.create_tx();
tx4.delete_node(node4).unwrap();
tx4.commit().unwrap();
assert!(
harness.current.get_node(node4).is_err(),
"Node should be deleted"
);
let edge_result =
tx3.create_edge(node3, node4, "KNOWS", PropertyMapBuilder::new().build());
assert!(
edge_result.is_ok(),
"Edge creation should succeed - node4 exists in tx3's snapshot"
);
let edge_id = edge_result.unwrap();
let commit_result = tx3.commit();
assert!(
commit_result.is_err(),
"tx3 commit should fail - node4 was deleted by tx4, breaking referential integrity"
);
assert!(
harness.current.get_edge(edge_id).is_err(),
"Edge should not exist after failed commit"
);
}
}
#[test]
fn test_rollback_during_concurrent_commit() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("age", 30i64).build(),
)
.unwrap();
tx.commit().unwrap();
id
};
let mut tx1 = harness.create_tx();
tx1.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 35i64).build(),
)
.unwrap();
tx1.commit().unwrap();
let node_after_tx1 = harness.current.get_node(node_id).unwrap();
assert_eq!(
node_after_tx1.get_property("age").and_then(|v| v.as_int()),
Some(35),
"tx1's update should be visible"
);
{
let mut tx2 = harness.create_tx();
tx2.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 40i64).build(),
)
.unwrap();
}
let node_after_rollback = harness.current.get_node(node_id).unwrap();
assert_eq!(
node_after_rollback
.get_property("age")
.and_then(|v| v.as_int()),
Some(35),
"Only tx1's changes should be visible; tx2 rolled back"
);
let mut tx3 = harness.create_tx();
tx3.update_node(
node_id,
PropertyMapBuilder::new().insert("age", 45i64).build(),
)
.unwrap();
tx3.commit().unwrap();
let node_final = harness.current.get_node(node_id).unwrap();
assert_eq!(
node_final.get_property("age").and_then(|v| v.as_int()),
Some(45),
"tx3's update should be visible"
);
let node_final_check = harness.current.get_node(node_id).unwrap();
assert_eq!(
node_final_check
.get_property("age")
.and_then(|v| v.as_int()),
Some(45),
"Storage should consistently show latest committed state"
);
}
}
mod clock_skew_tests {
use super::*;
use crate::core::hlc::ClockSkewAutoHealTestGuard;
use crate::core::id::TxIdGenerator;
use crate::core::property::PropertyMapBuilder;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use std::time::{Duration, Instant};
use tempfile::TempDir;
struct TestHarness {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
current_timestamp: Arc<Mutex<Timestamp>>,
commit_clock_observed_at: Arc<Mutex<Instant>>,
visibility_manager: Arc<TxVisibilityManager>,
node_id_gen: Arc<IdGenerator>,
edge_id_gen: Arc<IdGenerator>,
version_id_gen: Arc<IdGenerator>,
tx_id_gen: TxIdGenerator,
_temp_dir: TempDir,
}
impl TestHarness {
fn new() -> Self {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let commit_clock_observed_at = Arc::new(Mutex::new(Instant::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
TestHarness {
current,
historical,
temporal_indexes,
wal,
current_timestamp,
commit_clock_observed_at,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
tx_id_gen,
_temp_dir: temp_dir,
}
}
fn create_tx(&self) -> WriteTransaction {
let snapshot_ts = *self.current_timestamp.lock().unwrap();
let snapshot = self.visibility_manager.capture_snapshot(snapshot_ts);
WriteTransaction::new(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
self.current_timestamp.clone(),
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
fn create_tx_with_shared_observation_clock(&self) -> WriteTransaction {
let snapshot_ts = *self.current_timestamp.lock().unwrap();
let snapshot = self.visibility_manager.capture_snapshot(snapshot_ts);
WriteTransaction::new_with_clock_observed_at(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
self.current_timestamp.clone(),
self.commit_clock_observed_at.clone(),
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
}
#[test]
fn test_clock_skew_backward_error() {
let _auto_heal_guard = ClockSkewAutoHealTestGuard::force(false);
let harness = TestHarness::new();
let mut tx = harness.create_tx();
let props = PropertyMapBuilder::new().insert("test", true).build();
tx.create_node("Test", props).unwrap();
{
let mut ts = harness.current_timestamp.lock().unwrap();
let future_time = time::now().wallclock() + 10 * 60 * 1_000_000;
*ts = crate::core::hlc::HybridTimestamp::new(future_time, 0).unwrap();
}
let result = tx.commit();
assert!(result.is_err());
match result.unwrap_err() {
crate::core::error::Error::Transaction(TransactionError::ClockSkew {
drift_us,
..
}) => {
assert!(drift_us < -super::MAX_BACKWARD_DRIFT_US);
}
err => panic!("Expected ClockSkew error, got: {:?}", err),
}
}
#[test]
fn test_clock_skew_forward_error() {
let _auto_heal_guard = ClockSkewAutoHealTestGuard::force(false);
let harness = TestHarness::new();
let mut tx = harness.create_tx();
let props = PropertyMapBuilder::new().insert("test", true).build();
tx.create_node("Test", props).unwrap();
{
let mut ts = harness.current_timestamp.lock().unwrap();
let past_time = time::now().wallclock() - 2 * 60 * 60 * 1_000_000;
*ts = crate::core::hlc::HybridTimestamp::new(past_time, 0).unwrap();
}
let result = tx.commit();
assert!(result.is_err());
match result.unwrap_err() {
crate::core::error::Error::Transaction(TransactionError::ClockSkew {
drift_us,
..
}) => {
assert!(drift_us > super::MAX_FORWARD_JUMP_US);
}
err => panic!("Expected ClockSkew error, got: {:?}", err),
}
}
#[test]
fn test_clock_skew_failure_does_not_advance_observation_timestamp() {
let _auto_heal_guard = ClockSkewAutoHealTestGuard::force(false);
let harness = TestHarness::new();
let mut tx = harness.create_tx_with_shared_observation_clock();
let props = PropertyMapBuilder::new().insert("test", true).build();
tx.create_node("Test", props).unwrap();
{
let mut ts = harness.current_timestamp.lock().unwrap();
let old_frontier = time::now().wallclock() - (6 * 60 * 60 * 1_000_000);
*ts = crate::core::hlc::HybridTimestamp::new(old_frontier, 0).unwrap();
}
let old_observed_at = {
let mut observed_at = harness.commit_clock_observed_at.lock().unwrap();
let now = Instant::now();
let old_observed = now.checked_sub(Duration::from_secs(5)).unwrap_or(now);
*observed_at = old_observed;
old_observed
};
let result = tx.commit();
assert!(matches!(
result,
Err(crate::core::error::Error::Transaction(
TransactionError::ClockSkew { .. }
))
));
let observed_after_failure = *harness.commit_clock_observed_at.lock().unwrap();
assert_eq!(
observed_after_failure, old_observed_at,
"failed skew validation must not consume idle-time budget"
);
}
#[test]
fn test_clock_skew_allows_idle_forward_drift_with_shared_observation_clock() {
let _auto_heal_guard = ClockSkewAutoHealTestGuard::force(false);
let harness = TestHarness::new();
let mut tx = harness.create_tx_with_shared_observation_clock();
let props = PropertyMapBuilder::new().insert("test", true).build();
tx.create_node("Test", props).unwrap();
let idle_gap_us = super::MAX_FORWARD_JUMP_US + 2_000_000;
{
let mut ts = harness.current_timestamp.lock().unwrap();
let past_time = time::now().wallclock() - idle_gap_us;
*ts = crate::core::hlc::HybridTimestamp::new(past_time, 0).unwrap();
}
{
let mut observed_at = harness.commit_clock_observed_at.lock().unwrap();
match Instant::now().checked_sub(Duration::from_micros(idle_gap_us as u64)) {
Some(past_instant) => *observed_at = past_instant,
None => {
println!(
"Skipping test_clock_skew_allows_idle_forward_drift_with_shared_observation_clock: uptime insufficient for 1h+ idle gap."
);
return;
}
}
}
let result = tx.commit();
assert!(
result.is_ok(),
"normal idle time should not be treated as forward clock skew"
);
}
}
mod timestamp_ordering_tests {
use super::*;
use crate::core::id::TxIdGenerator;
use crate::core::property::PropertyMapBuilder;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use std::thread;
use tempfile::TempDir;
struct TestHarness {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
current_timestamp: Arc<Mutex<Timestamp>>,
visibility_manager: Arc<TxVisibilityManager>,
node_id_gen: Arc<IdGenerator>,
edge_id_gen: Arc<IdGenerator>,
version_id_gen: Arc<IdGenerator>,
tx_id_gen: TxIdGenerator,
_temp_dir: TempDir,
}
impl TestHarness {
fn new() -> Self {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
TestHarness {
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
tx_id_gen,
_temp_dir: temp_dir,
}
}
fn create_tx(&self) -> WriteTransaction {
let snapshot = TransactionSnapshot {
snapshot_timestamp: *self.current_timestamp.lock().unwrap(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
WriteTransaction::new(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
self.current_timestamp.clone(),
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
}
#[test]
fn test_sequential_commits_monotonic_timestamps() {
let harness = TestHarness::new();
let mut timestamps = Vec::new();
for i in 0..10 {
let mut tx = harness.create_tx();
tx.create_node(
"Test",
PropertyMapBuilder::new().insert("seq", i as i64).build(),
)
.unwrap();
tx.commit().unwrap();
let ts = *harness.current_timestamp.lock().unwrap();
timestamps.push(ts);
}
for i in 1..timestamps.len() {
assert!(
timestamps[i] > timestamps[i - 1],
"Timestamp {} ({}) should be > timestamp {} ({})",
i,
timestamps[i],
i - 1,
timestamps[i - 1]
);
}
}
#[test]
fn test_concurrent_commits_ordered_timestamps() {
let harness = Arc::new(TestHarness::new());
let results = Arc::new(Mutex::new(Vec::new()));
let num_threads = 8;
let commits_per_thread = 5;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let harness = harness.clone();
let results = results.clone();
thread::spawn(move || {
for i in 0..commits_per_thread {
let mut tx = harness.create_tx();
let node_id = tx
.create_node(
"Test",
PropertyMapBuilder::new()
.insert("thread", thread_id as i64)
.insert("iteration", i as i64)
.build(),
)
.unwrap();
tx.commit().unwrap();
let node = harness.current.get_node(node_id).unwrap();
let commit_ts = node.metadata.commit_timestamp.unwrap();
results
.lock()
.unwrap()
.push((commit_ts, thread_id, i, node_id));
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let mut results = results.lock().unwrap();
results.sort_by_key(|(ts, _, _, _)| *ts);
for i in 1..results.len() {
let (ts_prev, thread_prev, iter_prev, _) = results[i - 1];
let (ts_curr, thread_curr, iter_curr, _) = results[i];
assert!(
ts_curr > ts_prev,
"Duplicate or out-of-order timestamp detected: \
Thread {} iter {} (ts={}) vs Thread {} iter {} (ts={})",
thread_prev,
iter_prev,
ts_prev,
thread_curr,
iter_curr,
ts_curr
);
}
assert_eq!(
results.len(),
num_threads * commits_per_thread,
"Expected {} commits, got {}",
num_threads * commits_per_thread,
results.len()
);
}
#[test]
fn test_version_chain_ordering() {
let harness = TestHarness::new();
let node_id = {
let mut tx = harness.create_tx();
let id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("version", 0i64).build(),
)
.unwrap();
tx.commit().unwrap();
id
};
let mut commit_timestamps = Vec::new();
for version in 1..=5 {
let mut tx = harness.create_tx();
tx.update_node(
node_id,
PropertyMapBuilder::new()
.insert("version", version as i64)
.build(),
)
.unwrap();
tx.commit().unwrap();
let node = harness.current.get_node(node_id).unwrap();
if let Some(ts) = node.metadata.commit_timestamp {
commit_timestamps.push(ts);
}
}
for i in 1..commit_timestamps.len() {
assert!(
commit_timestamps[i] > commit_timestamps[i - 1],
"Version {} timestamp ({}) should be > version {} timestamp ({})",
i + 1,
commit_timestamps[i],
i,
commit_timestamps[i - 1]
);
}
let final_node = harness.current.get_node(node_id).unwrap();
assert_eq!(
final_node.get_property("version").and_then(|v| v.as_int()),
Some(5)
);
}
}
mod bitemporal_validation_tests {
use super::*;
use crate::core::id::TxIdGenerator;
use crate::core::property::PropertyMap;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use tempfile::TempDir;
struct TestHarness {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
current_timestamp: Arc<Mutex<Timestamp>>,
node_id_gen: Arc<IdGenerator>,
edge_id_gen: Arc<IdGenerator>,
version_id_gen: Arc<IdGenerator>,
tx_id_gen: TxIdGenerator,
visibility_manager: Arc<TxVisibilityManager>,
_temp_dir: TempDir,
}
impl TestHarness {
fn new() -> Self {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
TestHarness {
current: current.clone(),
historical: historical.clone(),
temporal_indexes: temporal_indexes.clone(),
wal: wal.clone(),
current_timestamp: current_timestamp.clone(),
node_id_gen,
edge_id_gen,
version_id_gen,
tx_id_gen,
visibility_manager,
_temp_dir: temp_dir,
}
}
fn begin_write(&self) -> WriteTransaction {
let tx_id = self.tx_id_gen.next();
let snapshot_ts = *self.current_timestamp.lock().unwrap();
let snapshot = self.visibility_manager.capture_snapshot(snapshot_ts);
WriteTransaction::new(
tx_id,
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
self.current_timestamp.clone(),
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
}
#[test]
fn test_create_node_with_backdated_valid_time_verified() {
use crate::core::hlc::HybridTimestamp;
let harness = TestHarness::new();
let one_hour_ago_wallclock = time::now().wallclock() - 3_600_000_000;
let one_hour_ago = HybridTimestamp::new(one_hour_ago_wallclock, 0).unwrap();
let mut tx = harness.begin_write();
let node_id = tx
.create_node_with_valid_time(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
Some(one_hour_ago),
)
.unwrap();
let commit_result = tx.commit();
assert!(commit_result.is_ok(), "Commit failed: {:?}", commit_result);
let historical = harness.historical.read();
let version_id = historical.get_current_node_version(node_id).unwrap();
let node_version = historical.get_node_version(version_id).unwrap();
assert_eq!(
node_version.temporal.valid_time().start(),
one_hour_ago,
"valid_time should be 1 hour ago"
);
assert!(
node_version.temporal.transaction_time().start() > one_hour_ago,
"transaction_time should be after valid_time"
);
let gap_us = node_version.temporal.transaction_time().start().wallclock()
- node_version.temporal.valid_time().start().wallclock();
assert!(
gap_us > 3_500_000_000, "Gap should be approximately 1 hour, got {}µs",
gap_us
);
}
#[test]
fn test_create_node_rejects_far_future_valid_time() {
use crate::core::error::TemporalError;
use crate::core::hlc::HybridTimestamp;
let harness = TestHarness::new();
let two_years_future_wallclock = time::now().wallclock() + 2 * 365 * 24 * 3_600_000_000;
let two_years_future = HybridTimestamp::new(two_years_future_wallclock, 0).unwrap();
let mut tx = harness.begin_write();
let result =
tx.create_node_with_valid_time("Person", PropertyMap::new(), Some(two_years_future));
assert!(result.is_err(), "Should reject far-future valid_time");
let err = result.unwrap_err();
match err {
crate::core::error::Error::Temporal(TemporalError::ValidTimeTooFarInFuture {
..
}) => {
}
other => panic!("Expected ValidTimeTooFarInFuture, got: {:?}", other),
}
}
#[test]
fn test_update_node_rejects_valid_time_before_creation() {
use crate::core::error::TemporalError;
use crate::core::hlc::HybridTimestamp;
let harness = TestHarness::new();
let mut tx = harness.begin_write();
let node_id = tx.create_node("Person", PropertyMap::new()).unwrap();
let commit_result = tx.commit();
assert!(commit_result.is_ok());
let historical = harness.historical.read();
let version_id = historical.get_current_node_version(node_id).unwrap();
let creation_version = historical.get_node_version(version_id).unwrap();
let creation_time = creation_version.temporal.valid_time().start();
drop(historical);
let way_in_past = HybridTimestamp::new(1000, 0).unwrap(); assert!(
way_in_past < creation_time,
"Test setup: way_in_past should be before creation_time"
);
let mut tx2 = harness.begin_write();
let result = tx2.update_node_with_valid_time(
node_id,
PropertyMapBuilder::new().insert("name", "Bob").build(),
Some(way_in_past),
);
assert!(
result.is_err(),
"Should reject valid_time before entity creation"
);
let err = result.unwrap_err();
match err {
crate::core::error::Error::Temporal(TemporalError::ValidTimeBeforeEntityCreation {
..
}) => {
}
other => panic!("Expected ValidTimeBeforeEntityCreation, got: {:?}", other),
}
}
#[test]
fn test_create_edge_with_backdated_valid_time_verified() {
use crate::core::hlc::HybridTimestamp;
let harness = TestHarness::new();
let mut tx = harness.begin_write();
let source_id = tx.create_node("Person", PropertyMap::new()).unwrap();
let target_id = tx.create_node("Person", PropertyMap::new()).unwrap();
tx.commit().unwrap();
let thirty_min_ago_wallclock = time::now().wallclock() - 1_800_000_000;
let thirty_min_ago = HybridTimestamp::new(thirty_min_ago_wallclock, 0).unwrap();
let mut tx2 = harness.begin_write();
let edge_id = tx2
.create_edge_with_valid_time(
source_id,
target_id,
"KNOWS",
PropertyMap::new(),
Some(thirty_min_ago),
)
.unwrap();
tx2.commit().unwrap();
let historical = harness.historical.read();
let version_id = historical.get_current_edge_version(edge_id).unwrap();
let edge_version = historical.get_edge_version(version_id).unwrap();
assert_eq!(edge_version.temporal.valid_time().start(), thirty_min_ago);
assert!(edge_version.temporal.transaction_time().start() > thirty_min_ago);
}
#[test]
fn test_delete_node_rejects_valid_time_before_creation() {
use crate::core::hlc::HybridTimestamp;
let harness = TestHarness::new();
let mut tx = harness.begin_write();
let node_id = tx.create_node("Person", PropertyMap::new()).unwrap();
tx.commit().unwrap();
let way_in_past = HybridTimestamp::new(1000, 0).unwrap();
let mut tx2 = harness.begin_write();
let result = tx2.delete_node_with_valid_time(node_id, Some(way_in_past));
assert!(
result.is_err(),
"Should reject valid_time before entity creation"
);
}
}
mod find_nodes_by_property_tests {
use super::*;
use crate::api::transaction::ReadOps;
use crate::core::property::{PropertyMapBuilder, PropertyValue};
fn create_test_write_tx() -> (WriteTransaction, TempDir) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
(tx, temp_dir)
}
#[test]
fn test_committed_nodes_visible() {
let current = Arc::new(CurrentStorage::new());
let alice_id = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Bob").build(),
)
.unwrap();
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
let results =
tx.find_nodes_by_property("Person", "name", &PropertyValue::String("Alice".into()));
assert_eq!(results, vec![alice_id]);
}
#[test]
fn test_buffered_create_node_visible() {
let (mut tx, _temp_dir) = create_test_write_tx();
let alice_id = tx
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
let results =
tx.find_nodes_by_property("Person", "name", &PropertyValue::String("Alice".into()));
assert_eq!(results, vec![alice_id]);
}
#[test]
fn test_buffered_delete_excluded() {
let current = Arc::new(CurrentStorage::new());
let alice_id = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let mut tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
tx.delete_node(alice_id).unwrap();
let results =
tx.find_nodes_by_property("Person", "name", &PropertyValue::String("Alice".into()));
assert!(results.is_empty());
}
#[test]
fn test_buffered_update_with_matching_property() {
let current = Arc::new(CurrentStorage::new());
let node_id = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "OldName").build(),
)
.unwrap();
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
let mut tx = WriteTransaction::new(
tx_id_gen.next(),
snapshot,
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
);
tx.update_node(
node_id,
PropertyMapBuilder::new().insert("name", "NewName").build(),
)
.unwrap();
let results =
tx.find_nodes_by_property("Person", "name", &PropertyValue::String("NewName".into()));
assert_eq!(results, vec![node_id]);
let results =
tx.find_nodes_by_property("Person", "name", &PropertyValue::String("OldName".into()));
assert!(results.is_empty());
}
}
mod lock_poisoning_tests {
use super::*;
use crate::core::id::TxIdGenerator;
use crate::core::property::PropertyMapBuilder;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use std::sync::Barrier;
use std::thread;
use std::time::Instant;
use tempfile::TempDir;
struct TestHarness {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_indexes: Arc<TemporalIndexes>,
wal: Arc<ConcurrentWalSystem>,
current_timestamp: Arc<Mutex<Timestamp>>,
visibility_manager: Arc<TxVisibilityManager>,
node_id_gen: Arc<IdGenerator>,
edge_id_gen: Arc<IdGenerator>,
version_id_gen: Arc<IdGenerator>,
tx_id_gen: TxIdGenerator,
_temp_dir: TempDir,
}
impl TestHarness {
fn new() -> Self {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let temporal_indexes = Arc::new(TemporalIndexes::new());
let temp_dir = TempDir::new().unwrap();
let wal_config = ConcurrentWalSystemConfig::new(temp_dir.path());
let wal = Arc::new(ConcurrentWalSystem::new(wal_config).unwrap());
let current_timestamp = Arc::new(Mutex::new(time::now()));
let node_id_gen = Arc::new(IdGenerator::new());
let edge_id_gen = Arc::new(IdGenerator::new());
let version_id_gen = Arc::new(IdGenerator::new());
let tx_id_gen = TxIdGenerator::new();
let visibility_manager = Arc::new(TxVisibilityManager::new());
TestHarness {
current,
historical,
temporal_indexes,
wal,
current_timestamp,
visibility_manager,
node_id_gen,
edge_id_gen,
version_id_gen,
tx_id_gen,
_temp_dir: temp_dir,
}
}
fn create_tx_with_timestamp(
&self,
current_timestamp: Arc<Mutex<Timestamp>>,
) -> WriteTransaction {
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
WriteTransaction::new(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
current_timestamp,
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
fn create_tx_with_clock(
&self,
current_timestamp: Arc<Mutex<Timestamp>>,
commit_clock_observed_at: Arc<Mutex<Instant>>,
) -> WriteTransaction {
let snapshot = TransactionSnapshot {
snapshot_timestamp: time::now(),
active_transactions: Arc::new(std::collections::HashSet::new()),
};
WriteTransaction::new_with_clock_observed_at(
self.tx_id_gen.next(),
snapshot,
self.current.clone(),
self.historical.clone(),
self.temporal_indexes.clone(),
self.wal.clone(),
current_timestamp,
commit_clock_observed_at,
self.visibility_manager.clone(),
self.node_id_gen.clone(),
self.edge_id_gen.clone(),
self.version_id_gen.clone(),
)
}
}
fn poison_mutex<T: Send + 'static>(mutex: &Arc<Mutex<T>>) {
let clone = mutex.clone();
let _ = thread::spawn(move || {
let _guard = clone.lock().unwrap();
panic!("intentional panic to poison the lock");
})
.join();
}
#[test]
fn test_timestamp_lock_poisoning_during_commit() {
let harness = TestHarness::new();
let poisoned_ts: Arc<Mutex<Timestamp>> = Arc::new(Mutex::new(time::now()));
poison_mutex(&poisoned_ts);
assert!(poisoned_ts.is_poisoned());
let mut tx = harness.create_tx_with_timestamp(poisoned_ts);
tx.create_node("Test", PropertyMapBuilder::new().insert("x", 1i64).build())
.unwrap();
let result = tx.commit();
assert!(
result.is_err(),
"commit should fail with poisoned timestamp lock"
);
match result.unwrap_err() {
crate::core::error::Error::Transaction(TransactionError::LockPoisoned { resource }) => {
assert!(
resource.contains("current_timestamp"),
"expected current_timestamp in resource, got: {resource}"
);
}
err => panic!("expected LockPoisoned error, got: {err:?}"),
}
}
#[test]
fn test_concurrent_commits_with_poisoned_lock() {
let poisoned_ts: Arc<Mutex<Timestamp>> = Arc::new(Mutex::new(time::now()));
poison_mutex(&poisoned_ts);
assert!(poisoned_ts.is_poisoned());
let num_threads = 4;
let harness = Arc::new(TestHarness::new());
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = Vec::new();
for _ in 0..num_threads {
let h = harness.clone();
let ts = poisoned_ts.clone();
let barrier = barrier.clone();
handles.push(thread::spawn(move || {
let mut tx = h.create_tx_with_timestamp(ts);
tx.create_node("Test", PropertyMapBuilder::new().insert("x", 1i64).build())
.unwrap();
barrier.wait();
tx.commit()
}));
}
for handle in handles {
let result = handle.join().expect("thread should not panic");
assert!(result.is_err(), "commit should fail");
match result.unwrap_err() {
crate::core::error::Error::Transaction(TransactionError::LockPoisoned {
resource,
}) => {
assert!(
resource.contains("current_timestamp"),
"expected current_timestamp in resource, got: {resource}"
);
}
err => panic!("expected LockPoisoned error, got: {err:?}"),
}
}
}
#[test]
fn test_commit_clock_observed_at_lock_poisoning() {
let harness = TestHarness::new();
let poisoned_clock: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
poison_mutex(&poisoned_clock);
assert!(poisoned_clock.is_poisoned());
let mut tx =
harness.create_tx_with_clock(harness.current_timestamp.clone(), poisoned_clock);
tx.create_node("Test", PropertyMapBuilder::new().insert("x", 1i64).build())
.unwrap();
let result = tx.commit();
assert!(
result.is_err(),
"commit should fail with poisoned clock lock"
);
match result.unwrap_err() {
crate::core::error::Error::Transaction(TransactionError::LockPoisoned { resource }) => {
assert!(
resource.contains("commit_clock_observed_at"),
"expected commit_clock_observed_at in resource, got: {resource}"
);
}
err => panic!("expected LockPoisoned error, got: {err:?}"),
}
}
}