use super::*;
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::observer::{StorageEvent, StorageObserver};
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::TIMESTAMP_MAX;
#[test]
fn test_create_first_version() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
let version = storage.get_node_version(version_id).unwrap();
assert!(version.is_anchor());
assert_eq!(version.node_id, node_id);
assert_eq!(version.prev_version, None);
}
#[test]
fn test_version_chain() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let mut version_ids = Vec::new();
for i in 0..5 {
let version_id = VersionId::new(100 + i).unwrap();
let temporal = BiTemporalInterval::current((1000 + (i as i64) * 100).into());
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", i as i64)
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
version_ids.push(version_id);
}
assert!(
storage
.get_node_version(version_ids[0])
.unwrap()
.is_anchor()
);
assert!(storage.get_node_version(version_ids[1]).unwrap().is_delta());
assert!(storage.get_node_version(version_ids[2]).unwrap().is_delta());
assert!(
storage
.get_node_version(version_ids[3])
.unwrap()
.is_anchor()
);
assert!(storage.get_node_version(version_ids[4]).unwrap().is_delta());
}
#[test]
fn test_property_reconstruction() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 31i64)
.build(),
false, )
.unwrap();
let props = storage.reconstruct_node_properties(v2).unwrap();
assert_eq!(props.get("name").and_then(|v| v.as_str()), Some("Alice"));
assert_eq!(props.get("age").and_then(|v| v.as_int()), Some(31.into()));
}
#[test]
fn test_find_version_at_time() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
let v3 = VersionId::new(3).unwrap();
storage
.add_node_version(
node_id,
v1,
0.into(),
0.into(),
label,
PropertyMapBuilder::new().insert("age", 30i64).build(),
false, )
.unwrap();
storage
.add_node_version(
node_id,
v2,
1000.into(),
0.into(),
label,
PropertyMapBuilder::new().insert("age", 31i64).build(),
false, )
.unwrap();
storage
.add_node_version(
node_id,
v3,
2000.into(),
0.into(),
label,
PropertyMapBuilder::new().insert("age", 32i64).build(),
false, )
.unwrap();
assert_eq!(
storage.find_node_version_at_time(node_id, 500.into(), 100.into()),
Some(v1)
);
assert_eq!(
storage.find_node_version_at_time(node_id, 1500.into(), 100.into()),
Some(v2)
);
assert_eq!(
storage.find_node_version_at_time(node_id, 2500.into(), 100.into()),
Some(v3)
);
}
#[test]
fn test_retention_policy_node_limit() {
let mut storage = HistoricalStorage::with_config_and_retention(
AnchorConfig::default(),
RetentionPolicy::new(3, i64::MAX), );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..3 {
storage
.add_node_version(
node_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
}
let result = storage.add_node_version(
node_id,
VersionId::new(3).unwrap(),
1300.into(),
1300.into(),
label,
PropertyMapBuilder::new().build(),
false, );
assert!(result.is_err());
match result.unwrap_err() {
crate::core::error::Error::Storage(StorageError::CapacityExceeded {
resource,
current,
limit,
}) => {
assert!(resource.contains("node"));
assert_eq!(current, 3);
assert_eq!(limit, 3);
}
_ => panic!("Expected CapacityExceeded error"),
}
}
#[test]
fn test_retention_policy_edge_limit() {
let mut storage = HistoricalStorage::with_config_and_retention(
AnchorConfig::default(),
RetentionPolicy::new(2, i64::MAX), );
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
for i in 0..2 {
storage
.add_edge_version(
edge_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
source,
target,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
}
let result = storage.add_edge_version(
edge_id,
VersionId::new(2).unwrap(),
1200.into(),
1200.into(),
label,
source,
target,
PropertyMapBuilder::new().build(),
false, );
assert!(result.is_err());
match result.unwrap_err() {
crate::core::error::Error::Storage(StorageError::CapacityExceeded {
resource,
current,
limit,
}) => {
assert!(resource.contains("edge"));
assert_eq!(current, 2);
assert_eq!(limit, 2);
}
_ => panic!("Expected CapacityExceeded error"),
}
}
#[test]
fn test_stats() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 2,
max_delta_chain: 10,
});
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..3 {
storage
.add_node_version(
NodeId::new(1).unwrap(),
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
}
let stats = storage.stats();
assert_eq!(stats.total_node_versions, 3);
assert_eq!(stats.node_anchor_count, 2);
assert_eq!(stats.node_delta_count, 1);
assert_eq!(stats.unique_nodes, 1);
assert!((stats.compression_ratio() - 0.6666).abs() < 0.01);
}
#[test]
fn test_create_node_version_with_vector_property() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let embedding = vec![0.1f32, 0.2, 0.3, 0.4, 0.5];
let props = PropertyMapBuilder::new()
.insert("title", "Test Document")
.insert_vector("embedding", &embedding)
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
let version = storage.get_node_version(version_id).unwrap();
assert!(version.is_anchor());
let reconstructed = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(
reconstructed.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding[..])
);
}
#[test]
fn test_delta_computation_with_vector_change() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let v1 = VersionId::new(1).unwrap();
let embedding_v1 = vec![0.1f32, 0.2, 0.3];
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "Doc")
.insert_vector("embedding", &embedding_v1)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
let embedding_v2 = vec![0.4f32, 0.5, 0.6];
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "Doc")
.insert_vector("embedding", &embedding_v2)
.build(),
false, )
.unwrap();
let version = storage.get_node_version(v2).unwrap();
assert!(version.is_delta());
let props_v1 = storage.reconstruct_node_properties(v1).unwrap();
assert_eq!(
props_v1.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding_v1[..])
);
let props_v2 = storage.reconstruct_node_properties(v2).unwrap();
assert_eq!(
props_v2.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding_v2[..])
);
}
#[test]
fn test_delta_only_vector_changes() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let v1 = VersionId::new(1).unwrap();
let embedding_v1 = vec![0.1f32, 0.2];
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "Same Title")
.insert_vector("embedding", &embedding_v1)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
let embedding_v2 = vec![0.9f32, 0.8];
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "Same Title") .insert_vector("embedding", &embedding_v2) .build(),
false, )
.unwrap();
let version = storage.get_node_version(v2).unwrap();
assert!(version.is_delta());
let props = storage.reconstruct_node_properties(v2).unwrap();
assert_eq!(
props.get("title").and_then(|v| v.as_str()),
Some("Same Title")
);
assert_eq!(
props.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding_v2[..])
);
}
#[test]
fn test_vector_unchanged_between_versions() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let embedding = vec![0.5f32, 0.5, 0.5];
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "V1 Title")
.insert_vector("embedding", &embedding)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("title", "V2 Title")
.insert_vector("embedding", &embedding) .build(),
false, )
.unwrap();
let props_v1 = storage.reconstruct_node_properties(v1).unwrap();
let props_v2 = storage.reconstruct_node_properties(v2).unwrap();
assert_eq!(
props_v1.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding[..])
);
assert_eq!(
props_v2.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding[..])
);
assert_eq!(
props_v1.get("title").and_then(|v| v.as_str()),
Some("V1 Title")
);
assert_eq!(
props_v2.get("title").and_then(|v| v.as_str()),
Some("V2 Title")
);
}
#[test]
fn test_anchor_creation_with_vector() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 2,
max_delta_chain: 10,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let embeddings = [vec![0.1f32, 0.2], vec![0.3f32, 0.4], vec![0.5f32, 0.6]];
for (i, emb) in embeddings.iter().enumerate() {
storage
.add_node_version(
node_id,
VersionId::new(i as u64).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", emb)
.build(),
false, )
.unwrap();
}
assert!(
storage
.get_node_version(VersionId::new(0).unwrap())
.unwrap()
.is_anchor()
);
assert!(
storage
.get_node_version(VersionId::new(1).unwrap())
.unwrap()
.is_delta()
);
assert!(
storage
.get_node_version(VersionId::new(2).unwrap())
.unwrap()
.is_anchor()
);
for (i, emb) in embeddings.iter().enumerate() {
let props = storage
.reconstruct_node_properties(VersionId::new(i as u64).unwrap())
.unwrap();
assert_eq!(
props.get("embedding").and_then(|v| v.as_vector()),
Some(&emb[..])
);
}
}
#[test]
fn test_edge_version_with_vector() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("SIMILAR_TO").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let source = NodeId::new(10).unwrap();
let target = NodeId::new(20).unwrap();
let embedding = vec![0.8f32, 0.1, 0.1];
let props = PropertyMapBuilder::new()
.insert("weight", 0.95f64)
.insert_vector("embedding", &embedding)
.build();
storage
.add_edge_version(
edge_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
source,
target,
props,
false, )
.unwrap();
let version = storage.get_edge_version(version_id).unwrap();
assert!(version.is_anchor());
let reconstructed = storage.reconstruct_edge_properties(version_id).unwrap();
assert_eq!(
reconstructed.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding[..])
);
assert_eq!(
reconstructed.get("weight").and_then(|v| v.as_float()),
Some(0.95)
);
}
#[test]
fn test_edge_delta_with_vector_change() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("SIMILAR_TO").unwrap();
let source = NodeId::new(10).unwrap();
let target = NodeId::new(20).unwrap();
let v1 = VersionId::new(1).unwrap();
let embedding_v1 = vec![0.5f32, 0.5];
storage
.add_edge_version(
edge_id,
v1,
1000.into(),
1000.into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", 0.5f64)
.insert_vector("embedding", &embedding_v1)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
let embedding_v2 = vec![0.9f32, 0.1];
storage
.add_edge_version(
edge_id,
v2,
2000.into(),
2000.into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", 0.9f64)
.insert_vector("embedding", &embedding_v2)
.build(),
false, )
.unwrap();
assert!(storage.get_edge_version(v2).unwrap().is_delta());
let props_v2 = storage.reconstruct_edge_properties(v2).unwrap();
assert_eq!(
props_v2.get("embedding").and_then(|v| v.as_vector()),
Some(&embedding_v2[..])
);
assert_eq!(props_v2.get("weight").and_then(|v| v.as_float()), Some(0.9));
}
#[test]
fn test_high_dimensional_vector_versioning() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Embedding").unwrap();
const DIMENSIONS: usize = 1536;
let embedding: Vec<f32> = (0..DIMENSIONS)
.map(|i| (i as f32) / DIMENSIONS as f32)
.collect();
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding)
.build(),
false, )
.unwrap();
let props = storage.reconstruct_node_properties(v1).unwrap();
let retrieved = props
.get("embedding")
.and_then(|v| v.as_vector())
.expect("Should have embedding");
assert_eq!(retrieved.len(), DIMENSIONS);
assert_eq!(retrieved, &embedding[..]);
}
#[test]
fn test_version_time_travel_with_vectors() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let embeddings = [
(0, 500, vec![0.1f32, 0.0]), (500, 1000, vec![0.2f32, 0.0]), (1000, TIMESTAMP_MAX.wallclock(), vec![0.3f32, 0.0]), ];
for (i, (start, _end, emb)) in embeddings.iter().enumerate() {
storage
.add_node_version(
node_id,
VersionId::new(i as u64).unwrap(),
(*start).into(),
0.into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", emb)
.build(),
false, )
.unwrap();
}
let v_at_250 = storage.find_node_version_at_time(node_id, 250.into(), 0.into());
let v_at_750 = storage.find_node_version_at_time(node_id, 750.into(), 0.into());
let v_at_1500 = storage.find_node_version_at_time(node_id, 1500.into(), 0.into());
assert_eq!(v_at_250, Some(VersionId::new(0).unwrap()));
assert_eq!(v_at_750, Some(VersionId::new(1).unwrap()));
assert_eq!(v_at_1500, Some(VersionId::new(2).unwrap()));
for (vid, expected_emb) in [
(v_at_250.unwrap(), &embeddings[0].2),
(v_at_750.unwrap(), &embeddings[1].2),
(v_at_1500.unwrap(), &embeddings[2].2),
] {
let props = storage.reconstruct_node_properties(vid).unwrap();
assert_eq!(
props.get("embedding").and_then(|v| v.as_vector()),
Some(&expected_emb[..])
);
}
}
#[test]
fn test_empty_vector_versioning() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("EmptyEmbedding").unwrap();
let empty_vec: Vec<f32> = vec![];
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("name", "empty")
.insert_vector("embedding", &empty_vec)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("name", "updated")
.insert_vector("embedding", &empty_vec)
.build(),
false, )
.unwrap();
let props_v1 = storage.reconstruct_node_properties(v1).unwrap();
let props_v2 = storage.reconstruct_node_properties(v2).unwrap();
assert_eq!(
props_v1.get("embedding").and_then(|v| v.as_vector()),
Some(&empty_vec[..])
);
assert_eq!(
props_v2.get("embedding").and_then(|v| v.as_vector()),
Some(&empty_vec[..])
);
}
#[test]
fn test_vector_with_special_float_values() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("SpecialFloats").unwrap();
let special_vec = vec![f32::INFINITY, f32::NEG_INFINITY, 0.0, -0.0];
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", &special_vec)
.build(),
false, )
.unwrap();
let props = storage.reconstruct_node_properties(v1).unwrap();
let retrieved = props
.get("embedding")
.and_then(|v| v.as_vector())
.expect("Should have embedding");
assert!(retrieved[0].is_infinite() && retrieved[0].is_sign_positive());
assert!(retrieved[1].is_infinite() && retrieved[1].is_sign_negative());
assert_eq!(retrieved[2], 0.0);
assert_eq!(retrieved[3], -0.0);
}
#[test]
fn test_nan_in_vector_delta_behavior() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("NaNTest").unwrap();
let nan_vec = vec![f32::NAN, 1.0];
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", &nan_vec)
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert_vector("embedding", &nan_vec)
.build(),
false, )
.unwrap();
let props_v1 = storage.reconstruct_node_properties(v1).unwrap();
let props_v2 = storage.reconstruct_node_properties(v2).unwrap();
let vec1 = props_v1
.get("embedding")
.and_then(|v| v.as_vector())
.unwrap();
let vec2 = props_v2
.get("embedding")
.and_then(|v| v.as_vector())
.unwrap();
assert!(vec1[0].is_nan());
assert!(vec2[0].is_nan());
assert_eq!(vec1[1], 1.0);
assert_eq!(vec2[1], 1.0);
}
#[test]
fn test_cache_hit_on_second_read() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp = 1000.into();
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build();
storage
.add_node_version(
node_id,
version_id,
timestamp,
timestamp,
label,
props.clone(),
false, )
.unwrap();
let result1 = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(result1.get("name").and_then(|v| v.as_str()), Some("Alice"));
let stats = storage.stats();
assert_eq!(stats.node_cache_entries, 1);
let result2 = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(result2.get("name").and_then(|v| v.as_str()), Some("Alice"));
let stats = storage.stats();
assert_eq!(stats.node_cache_entries, 1);
}
#[test]
fn test_cache_populates_delta_chain() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let v1 = VersionId::new(1).unwrap();
storage
.add_node_version(
node_id,
v1,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().insert("value", 1i64).build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_node_version(
node_id,
v2,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new().insert("value", 2i64).build(),
false, )
.unwrap();
let v3 = VersionId::new(3).unwrap();
storage
.add_node_version(
node_id,
v3,
3000.into(),
3000.into(),
label,
PropertyMapBuilder::new().insert("value", 3i64).build(),
false, )
.unwrap();
let v4 = VersionId::new(4).unwrap();
storage
.add_node_version(
node_id,
v4,
4000.into(),
4000.into(),
label,
PropertyMapBuilder::new().insert("value", 4i64).build(),
false, )
.unwrap();
let result = storage.reconstruct_node_properties(v4).unwrap();
assert_eq!(result.get("value").and_then(|v| v.as_int()), Some(4.into()));
let stats = storage.stats();
assert!(stats.node_cache_entries >= 4);
}
#[test]
fn test_cache_with_custom_size() {
let storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig::default(),
RetentionPolicy::default(),
100, );
let stats = storage.stats();
assert_eq!(stats.node_cache_entries, 0);
assert_eq!(stats.edge_cache_entries, 0);
}
#[test]
fn test_edge_cache_functionality() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(10).unwrap();
let target = NodeId::new(20).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let props = PropertyMapBuilder::new().insert("since", 2020i64).build();
storage
.add_edge_version(
edge_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
source,
target,
props,
false, )
.unwrap();
let result1 = storage.reconstruct_edge_properties(version_id).unwrap();
assert_eq!(
result1.get("since").and_then(|v| v.as_int()),
Some(2020.into())
);
let stats = storage.stats();
assert_eq!(stats.edge_cache_entries, 1);
let result2 = storage.reconstruct_edge_properties(version_id).unwrap();
assert_eq!(
result2.get("since").and_then(|v| v.as_int()),
Some(2020.into())
);
let stats = storage.stats();
assert_eq!(stats.edge_cache_entries, 1);
}
#[test]
fn test_cache_stats_accuracy() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let edge_id = EdgeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let timestamp = 1000.into();
for i in 0..5 {
let version_id = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
version_id,
timestamp,
timestamp,
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
storage.reconstruct_node_properties(version_id).unwrap();
}
for i in 0..3 {
let version_id = VersionId::new(100 + i).unwrap();
storage
.add_edge_version(
edge_id,
version_id,
timestamp,
timestamp,
label,
node_id,
node_id,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
storage.reconstruct_edge_properties(version_id).unwrap();
}
let stats = storage.stats();
assert_eq!(stats.node_cache_entries, 5);
assert_eq!(stats.edge_cache_entries, 3);
}
#[test]
fn test_cache_with_large_properties() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
let large_vector: Vec<f32> = (0..1536).map(|i| i as f32 / 1536.0).collect();
let props = PropertyMapBuilder::new()
.insert("title", "Large Document")
.insert_vector("embedding", &large_vector)
.insert("content", "x".repeat(10000).as_str())
.build();
storage
.add_node_version(
node_id,
version_id,
1000.into(),
1000.into(),
label,
props,
false,
)
.unwrap();
let result1 = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(
result1
.get("embedding")
.and_then(|v| v.as_vector())
.map(|v| v.len()),
Some(1536)
);
let result2 = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(result1.get("title"), result2.get("title"));
let stats = storage.stats();
assert_eq!(stats.node_cache_entries, 1);
}
#[test]
fn test_extract_node_version_data() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let props = PropertyMapBuilder::new().insert("name", "Bob").build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
let (vid, nid, lbl, data) = storage.extract_node_version_data(version_id).unwrap();
assert_eq!(vid, version_id);
assert_eq!(nid, node_id);
assert_eq!(lbl, label);
match data {
VersionData::Anchor { properties, .. } => {
assert_eq!(properties.get("name").and_then(|v| v.as_str()), Some("Bob"));
}
_ => panic!("Expected anchor"),
}
}
#[test]
fn test_extract_edge_version_data() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(10).unwrap();
let target = NodeId::new(20).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let props = PropertyMapBuilder::new().insert("since", 2021i64).build();
storage
.add_edge_version(
edge_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
source,
target,
props,
false, )
.unwrap();
let (vid, eid, lbl, src, tgt, data) = storage.extract_edge_version_data(version_id).unwrap();
assert_eq!(vid, version_id);
assert_eq!(eid, edge_id);
assert_eq!(lbl, label);
assert_eq!(src, source);
assert_eq!(tgt, target);
match data {
VersionData::Anchor { properties, .. } => {
assert_eq!(
properties.get("since").and_then(|v| v.as_int()),
Some(2021.into())
);
}
_ => panic!("Expected anchor"),
}
}
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingObserver {
anchor_count: AtomicUsize,
version_count: AtomicUsize,
}
impl StorageObserver for CountingObserver {
fn on_event(&self, event: &StorageEvent) -> Result<()> {
match event {
StorageEvent::NodeAnchorCreated { .. } | StorageEvent::EdgeAnchorCreated { .. } => {
self.anchor_count.fetch_add(1, Ordering::SeqCst);
}
StorageEvent::NodeVersionCreated { .. } | StorageEvent::EdgeVersionCreated { .. } => {
self.version_count.fetch_add(1, Ordering::SeqCst);
}
}
Ok(())
}
}
struct NodeAnchorObserver {
count: AtomicUsize,
}
impl StorageObserver for NodeAnchorObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn interested_in(&self, event: &StorageEvent) -> bool {
matches!(event, StorageEvent::NodeAnchorCreated { .. })
}
}
struct CollectingObserver {
events: StdMutex<Vec<StorageEvent>>,
}
impl StorageObserver for CollectingObserver {
fn on_event(&self, event: &StorageEvent) -> Result<()> {
self.events.lock().unwrap().push(event.clone());
Ok(())
}
}
#[test]
fn test_observer_triggered_on_node_anchor_creation() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let observer = Arc::new(CountingObserver {
anchor_count: AtomicUsize::new(0),
version_count: AtomicUsize::new(0),
});
storage.add_observer(observer.clone());
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..5 {
storage
.add_node_version(
node_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
assert_eq!(observer.anchor_count.load(Ordering::SeqCst), 2);
assert_eq!(observer.version_count.load(Ordering::SeqCst), 5);
}
#[test]
fn test_observer_triggered_on_edge_anchor_creation() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 2,
max_delta_chain: 10,
});
let observer = Arc::new(CountingObserver {
anchor_count: AtomicUsize::new(0),
version_count: AtomicUsize::new(0),
});
storage.add_observer(observer.clone());
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(10).unwrap();
let target = NodeId::new(20).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
for i in 0..3 {
storage
.add_edge_version(
edge_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
source,
target,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
}
assert_eq!(observer.anchor_count.load(Ordering::SeqCst), 2);
}
#[test]
fn test_observer_filtering() {
let mut storage = HistoricalStorage::new();
let observer = Arc::new(NodeAnchorObserver {
count: AtomicUsize::new(0),
});
storage.add_observer(observer.clone());
let node_id = NodeId::new(1).unwrap();
let edge_id = EdgeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
storage
.add_edge_version(
edge_id,
VersionId::new(2).unwrap(),
2000.into(),
2000.into(),
label,
node_id,
node_id,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
assert_eq!(observer.count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_observer_receives_correct_event_data() {
let mut storage = HistoricalStorage::new();
let collector = Arc::new(CollectingObserver {
events: StdMutex::new(Vec::new()),
});
storage.add_observer(collector.clone());
let node_id = NodeId::new(42).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let timestamp = 5000i64;
storage
.add_node_version(
node_id,
version_id,
timestamp.into(),
timestamp.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
let events = collector.events.lock().unwrap();
assert_eq!(events.len(), 2);
let anchor_event = events
.iter()
.find(|e| matches!(e, StorageEvent::NodeAnchorCreated { .. }))
.expect("Should have NodeAnchorCreated event");
match anchor_event {
StorageEvent::NodeAnchorCreated {
version_id: vid,
node_id: nid,
timestamp: ts,
} => {
assert_eq!(*vid, version_id);
assert_eq!(*nid, node_id);
assert_eq!(*ts, timestamp.into());
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_multiple_observers() {
let mut storage = HistoricalStorage::new();
let observer1 = Arc::new(CountingObserver {
anchor_count: AtomicUsize::new(0),
version_count: AtomicUsize::new(0),
});
let observer2 = Arc::new(CountingObserver {
anchor_count: AtomicUsize::new(0),
version_count: AtomicUsize::new(0),
});
storage.add_observer(observer1.clone());
storage.add_observer(observer2.clone());
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
assert_eq!(observer1.anchor_count.load(Ordering::SeqCst), 1);
assert_eq!(observer2.anchor_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_observer_error_doesnt_block_storage() {
struct FailingObserver;
impl StorageObserver for FailingObserver {
fn on_event(&self, _event: &StorageEvent) -> Result<()> {
Err(crate::core::error::Error::Storage(
StorageError::InconsistentState {
reason: "Test error".to_string(),
},
))
}
}
let mut storage = HistoricalStorage::new();
storage.add_observer(Arc::new(FailingObserver));
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let result = storage.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, );
assert!(result.is_ok());
let version = storage.get_node_version(VersionId::new(1).unwrap());
assert!(version.is_some());
}
#[test]
fn test_pre_anchor_hook_called_before_storage() {
use std::sync::atomic::{AtomicBool, Ordering};
let mut storage = HistoricalStorage::new();
let hook_called = Arc::new(AtomicBool::new(false));
let hook_called_clone = Arc::clone(&hook_called);
let hook: PreAnchorHook = Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| {
hook_called_clone.store(true, Ordering::SeqCst);
Ok(Some(42))
});
storage.register_pre_node_anchor_hook(hook);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
assert!(hook_called.load(Ordering::SeqCst));
}
#[test]
fn test_pre_anchor_hook_returns_snapshot_id() {
let mut storage = HistoricalStorage::new();
let hook: PreAnchorHook =
Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| Ok(Some(123)));
storage.register_pre_node_anchor_hook(hook);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
let version = storage
.get_node_version(VersionId::new(1).unwrap())
.unwrap();
assert!(version.is_anchor());
assert_eq!(version.data.get_vector_snapshot_id(), Some(123));
}
#[test]
fn test_pre_anchor_hook_none_handling() {
let mut storage = HistoricalStorage::new();
let hook: PreAnchorHook =
Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| Ok(None));
storage.register_pre_node_anchor_hook(hook);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
let version = storage
.get_node_version(VersionId::new(1).unwrap())
.unwrap();
assert!(version.is_anchor());
assert_eq!(version.data.get_vector_snapshot_id(), None);
}
#[test]
fn test_pre_anchor_hook_error_graceful_degradation() {
let mut storage = HistoricalStorage::new();
let hook: PreAnchorHook = Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| {
Err(crate::core::error::Error::Storage(
StorageError::InconsistentState {
reason: "Test hook error".to_string(),
},
))
});
storage.register_pre_node_anchor_hook(hook);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let result = storage.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, );
assert!(result.is_ok());
let version = storage
.get_node_version(VersionId::new(1).unwrap())
.unwrap();
assert!(version.is_anchor());
assert_eq!(version.data.get_vector_snapshot_id(), None);
}
#[test]
fn test_pre_anchor_hook_not_called_for_delta() {
use std::sync::atomic::{AtomicUsize, Ordering};
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let hook_call_count = Arc::new(AtomicUsize::new(0));
let hook_call_count_clone = Arc::clone(&hook_call_count);
let hook: PreAnchorHook = Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| {
hook_call_count_clone.fetch_add(1, Ordering::SeqCst);
Ok(Some(42))
});
storage.register_pre_node_anchor_hook(hook);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..5 {
storage
.add_node_version(
node_id,
VersionId::new(100 + i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
}
assert_eq!(hook_call_count.load(Ordering::SeqCst), 2);
}
#[test]
fn test_pre_anchor_hook_node_and_edge_separate() {
use std::sync::atomic::{AtomicUsize, Ordering};
let mut storage = HistoricalStorage::new();
let node_hook_count = Arc::new(AtomicUsize::new(0));
let node_hook_count_clone = Arc::clone(&node_hook_count);
let node_hook: PreAnchorHook =
Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| {
node_hook_count_clone.fetch_add(1, Ordering::SeqCst);
Ok(Some(1))
});
let edge_hook_count = Arc::new(AtomicUsize::new(0));
let edge_hook_count_clone = Arc::clone(&edge_hook_count);
let edge_hook: PreAnchorHook =
Arc::new(move |_entity_type, _entity_id, _timestamp, _properties| {
edge_hook_count_clone.fetch_add(1, Ordering::SeqCst);
Ok(Some(2))
});
storage.register_pre_node_anchor_hook(node_hook);
storage.register_pre_edge_anchor_hook(edge_hook);
let node1_id = NodeId::new(1).unwrap();
let node2_id = NodeId::new(2).unwrap();
let edge_id = EdgeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node1_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
storage
.add_edge_version(
edge_id,
VersionId::new(2).unwrap(),
2000.into(),
2000.into(),
label,
node1_id,
node2_id,
PropertyMapBuilder::new().build(),
false, )
.unwrap();
assert_eq!(node_hook_count.load(Ordering::SeqCst), 1);
assert_eq!(edge_hook_count.load(Ordering::SeqCst), 1);
let node_version = storage
.get_node_version(VersionId::new(1).unwrap())
.unwrap();
let edge_version = storage
.get_edge_version(VersionId::new(2).unwrap())
.unwrap();
assert_eq!(node_version.data.get_vector_snapshot_id(), Some(1));
assert_eq!(edge_version.data.get_vector_snapshot_id(), Some(2));
}
use super::{MAX_RECONSTRUCTION_DEPTH, RetentionPolicy};
#[test]
fn test_reconstruction_depth_limit_exceeded_for_nodes() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: (MAX_RECONSTRUCTION_DEPTH * 2) as u32, max_delta_chain: (MAX_RECONSTRUCTION_DEPTH * 2) as u32,
},
RetentionPolicy {
max_versions_per_entity: MAX_RECONSTRUCTION_DEPTH * 2, max_age_ms: i64::MAX,
},
0, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_node_version(
node_id,
v0,
0.into(),
0.into(),
label,
PropertyMapBuilder::new().insert("counter", 0i64).build(),
false, )
.unwrap();
for i in 1..=MAX_RECONSTRUCTION_DEPTH {
let vid = VersionId::new(i as u64).unwrap();
storage
.add_node_version(
node_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
PropertyMapBuilder::new()
.insert("counter", i as i64)
.build(),
false, )
.unwrap();
}
let last_version_id = VersionId::new(MAX_RECONSTRUCTION_DEPTH as u64).unwrap();
let result = storage.reconstruct_node_properties(last_version_id);
assert!(result.is_err(), "Expected MaxDepthExceeded error");
let err = result.unwrap_err();
match err {
crate::core::error::Error::Temporal(
crate::core::error::TemporalError::MaxDepthExceeded { max_depth, .. },
) => {
assert_eq!(max_depth, MAX_RECONSTRUCTION_DEPTH);
}
other => panic!("Expected MaxDepthExceeded error, got: {:?}", other),
}
}
#[test]
fn test_reconstruction_within_depth_limit_works_for_nodes() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 200,
max_delta_chain: 200,
},
RetentionPolicy::default(),
0, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_node_version(
node_id,
v0,
0.into(),
0.into(),
label,
PropertyMapBuilder::new().insert("counter", 0i64).build(),
false, )
.unwrap();
for i in 1..MAX_RECONSTRUCTION_DEPTH {
let vid = VersionId::new(i as u64).unwrap();
storage
.add_node_version(
node_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
PropertyMapBuilder::new()
.insert("counter", i as i64)
.build(),
false, )
.unwrap();
}
let last_version_id = VersionId::new((MAX_RECONSTRUCTION_DEPTH - 1) as u64).unwrap();
let result = storage.reconstruct_node_properties(last_version_id);
assert!(
result.is_ok(),
"Expected successful reconstruction within depth limit"
);
let props = result.unwrap();
assert_eq!(
props.get("counter").and_then(|v| v.as_int()),
Some((MAX_RECONSTRUCTION_DEPTH - 1) as i64)
);
}
#[test]
fn test_reconstruction_depth_limit_exceeded_for_edges() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: (MAX_RECONSTRUCTION_DEPTH * 2) as u32, max_delta_chain: (MAX_RECONSTRUCTION_DEPTH * 2) as u32,
},
RetentionPolicy {
max_versions_per_entity: MAX_RECONSTRUCTION_DEPTH * 2, max_age_ms: i64::MAX,
},
0, );
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(100).unwrap();
let target = NodeId::new(200).unwrap();
let label = GLOBAL_INTERNER.intern("TestEdge").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_edge_version(
edge_id,
v0,
0.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", 0.0f64).build(),
false, )
.unwrap();
for i in 1..=MAX_RECONSTRUCTION_DEPTH {
let vid = VersionId::new(i as u64).unwrap();
storage
.add_edge_version(
edge_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", i as f64).build(),
false, )
.unwrap();
}
let last_version_id = VersionId::new(MAX_RECONSTRUCTION_DEPTH as u64).unwrap();
let result = storage.reconstruct_edge_properties(last_version_id);
assert!(result.is_err(), "Expected MaxDepthExceeded error");
let err = result.unwrap_err();
match err {
crate::core::error::Error::Temporal(
crate::core::error::TemporalError::MaxDepthExceeded { max_depth, .. },
) => {
assert_eq!(max_depth, MAX_RECONSTRUCTION_DEPTH);
}
other => panic!("Expected MaxDepthExceeded error, got: {:?}", other),
}
}
#[test]
fn test_reconstruction_within_depth_limit_works_for_edges() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 200,
max_delta_chain: 200,
},
RetentionPolicy::default(),
0, );
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(100).unwrap();
let target = NodeId::new(200).unwrap();
let label = GLOBAL_INTERNER.intern("TestEdge").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_edge_version(
edge_id,
v0,
0.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", 0.0f64).build(),
false, )
.unwrap();
for i in 1..MAX_RECONSTRUCTION_DEPTH {
let vid = VersionId::new(i as u64).unwrap();
storage
.add_edge_version(
edge_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", i as f64).build(),
false, )
.unwrap();
}
let last_version_id = VersionId::new((MAX_RECONSTRUCTION_DEPTH - 1) as u64).unwrap();
let result = storage.reconstruct_edge_properties(last_version_id);
assert!(
result.is_ok(),
"Expected successful reconstruction within depth limit"
);
let props = result.unwrap();
assert_eq!(
props.get("weight").and_then(|v| v.as_float()),
Some((MAX_RECONSTRUCTION_DEPTH - 1) as f64)
);
}
#[test]
fn test_anchor_properties_are_cached_immediately_for_nodes() {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp = 1000.into();
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build();
storage
.add_node_version(
node_id,
version_id,
timestamp,
timestamp,
label,
props.clone(),
false, )
.unwrap();
let version = storage.get_node_version(version_id).unwrap();
assert!(version.is_anchor(), "First version should be an anchor");
let stats = storage.stats();
assert_eq!(
stats.node_cache_entries, 1,
"Anchor properties should be cached immediately"
);
let reconstructed = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(reconstructed, props);
let stats_after = storage.stats();
assert_eq!(
stats_after.node_cache_entries, 1,
"Cache should still have 1 entry after hit"
);
}
#[test]
fn test_anchor_properties_are_cached_immediately_for_edges() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(100).unwrap();
let target = NodeId::new(200).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let timestamp = 1000.into();
let props = PropertyMapBuilder::new()
.insert("since", 2020i64)
.insert("weight", 0.8f64)
.build();
storage
.add_edge_version(
edge_id,
version_id,
timestamp,
timestamp,
label,
source,
target,
props.clone(),
false, )
.unwrap();
let version = storage.get_edge_version(version_id).unwrap();
assert!(version.is_anchor(), "First version should be an anchor");
let stats = storage.stats();
assert_eq!(
stats.edge_cache_entries, 1,
"Anchor properties should be cached immediately"
);
let reconstructed = storage.reconstruct_edge_properties(version_id).unwrap();
assert_eq!(reconstructed, props);
let stats_after = storage.stats();
assert_eq!(
stats_after.edge_cache_entries, 1,
"Cache should still have 1 entry after hit"
);
}
#[test]
fn test_subsequent_anchors_are_also_cached() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 5, max_delta_chain: 5,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
for i in 0..11 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new()
.insert("counter", i as i64)
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
let stats = storage.stats();
assert_eq!(stats.node_anchor_count, 3, "Should have 3 anchors");
assert_eq!(stats.node_delta_count, 8, "Should have 8 deltas");
assert!(
stats.node_cache_entries >= 3,
"At least the 3 anchors should be cached, got {}",
stats.node_cache_entries
);
let anchor_v0 = VersionId::new(0).unwrap();
let anchor_v5 = VersionId::new(5).unwrap();
let anchor_v10 = VersionId::new(10).unwrap();
storage.reconstruct_node_properties(anchor_v0).unwrap();
storage.reconstruct_node_properties(anchor_v5).unwrap();
storage.reconstruct_node_properties(anchor_v10).unwrap();
}
#[test]
fn test_anchor_cache_survives_delta_cache_pressure() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 10, max_delta_chain: 10,
},
RetentionPolicy::default(),
5, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
for i in 0..21 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new()
.insert("counter", i as i64)
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
let stats = storage.stats();
assert_eq!(stats.node_anchor_count, 3, "Should have 3 anchors");
for i in 1..10 {
let version_id = VersionId::new(i).unwrap();
storage.reconstruct_node_properties(version_id).unwrap();
}
let anchor_v0 = VersionId::new(0).unwrap();
let anchor_v10 = VersionId::new(10).unwrap();
let anchor_v20 = VersionId::new(20).unwrap();
let props0 = storage.reconstruct_node_properties(anchor_v0).unwrap();
let props10 = storage.reconstruct_node_properties(anchor_v10).unwrap();
let props20 = storage.reconstruct_node_properties(anchor_v20).unwrap();
assert_eq!(
props0.get("counter").and_then(|v| v.as_int()),
Some(0.into())
);
assert_eq!(
props10.get("counter").and_then(|v| v.as_int()),
Some(10.into())
);
assert_eq!(
props20.get("counter").and_then(|v| v.as_int()),
Some(20.into())
);
}
#[test]
fn test_delta_reconstruction_uses_anchor_cache() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 5,
max_delta_chain: 5,
},
RetentionPolicy::default(),
100, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Document").unwrap();
for i in 0..8 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new()
.insert("version", i as i64)
.insert("data", format!("content_{}", i))
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
let stats = storage.stats();
assert!(
stats.node_cache_entries >= 2,
"Both anchors should be cached"
);
let v7 = VersionId::new(7).unwrap();
let props = storage.reconstruct_node_properties(v7).unwrap();
assert_eq!(
props.get("version").and_then(|v| v.as_int()),
Some(7.into())
);
assert_eq!(
props.get("data").and_then(|v| v.as_str()),
Some("content_7")
);
}
#[test]
fn test_anchor_cache_improves_multi_version_reconstruction() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 5,
max_delta_chain: 5,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Entity").unwrap();
for i in 0..10 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new().insert("value", i as i64).build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
for i in 6..10 {
let version_id = VersionId::new(i).unwrap();
let props = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(props.get("value").and_then(|v| v.as_int()), Some(i as i64));
}
let stats = storage.stats();
assert_eq!(stats.node_anchor_count, 2, "Should have 2 anchors");
}
#[test]
fn test_anchor_cache_size_calculation() {
let storage_small = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig::default(),
RetentionPolicy::default(),
100,
);
assert_eq!(storage_small.node_property_cache.len(), 0);
let storage_medium = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig::default(),
RetentionPolicy::default(),
1000,
);
assert_eq!(storage_medium.node_property_cache.len(), 0);
let storage_large = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig::default(),
RetentionPolicy::default(),
10000,
);
assert_eq!(storage_large.node_property_cache.len(), 0);
let storage_tiny = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig::default(),
RetentionPolicy::default(),
10,
);
assert_eq!(storage_tiny.node_property_cache.len(), 0);
}
#[test]
fn test_should_resize_cache_recommends_growth_on_low_hit_rate() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 10,
max_delta_chain: 10,
},
RetentionPolicy::default(),
10, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
for i in 0..50 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new()
.insert("counter", i as i64)
.insert("data", format!("value_{}", i))
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
for i in 0..50 {
let version_id = VersionId::new(i).unwrap();
storage.reconstruct_node_properties(version_id).unwrap();
}
let metrics = storage.cache_metrics();
assert!(
metrics.total_operations() > 0,
"Should have cache operations"
);
let resize_recommendation = storage.should_resize_cache(0.8, 10);
assert!(
resize_recommendation.is_some(),
"should_resize_cache should recommend resizing with low hit rate"
);
let hit_rate = resize_recommendation.unwrap();
assert!(hit_rate < 0.8, "Hit rate should be below the threshold");
println!(
"Cache hit rate {:.2}% is below threshold, resize recommended",
hit_rate * 100.0
);
let stats = storage.stats();
assert!(
stats.node_cache_entries > 0,
"Cache should have some entries"
);
}
#[test]
fn test_cache_hit_rate_tracking() {
let mut storage = HistoricalStorage::with_config(AnchorConfig::default());
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("TestNode").unwrap();
for i in 0..20 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new().insert("value", i as i64).build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
let v5 = VersionId::new(5).unwrap();
for _ in 0..10 {
storage.reconstruct_node_properties(v5).unwrap();
}
let stats = storage.stats();
assert!(
stats.node_cache_entries > 0,
"Cache should have entries after reconstruction"
);
let hit_rate = storage.cache_hit_rate();
assert!(hit_rate.is_some(), "Should have cache hit rate data");
assert!(
hit_rate.unwrap() > 0.20,
"Hit rate should be > 20% with some repeated access, got {:?}",
hit_rate
);
}
#[test]
fn test_cache_resize_maintains_correctness() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 5,
max_delta_chain: 5,
},
RetentionPolicy::default(),
100,
);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Data").unwrap();
for i in 0..30 {
let version_id = VersionId::new(i).unwrap();
let temporal = BiTemporalInterval::current((i as i64 * 1000).into());
let props = PropertyMapBuilder::new()
.insert("id", i as i64)
.insert("name", format!("item_{}", i))
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
for i in 0..30 {
let version_id = VersionId::new(i).unwrap();
let props = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(props.get("id").and_then(|v| v.as_int()), Some(i as i64));
assert_eq!(
props.get("name").and_then(|v| v.as_str()),
Some(format!("item_{}", i).as_str())
);
}
let metrics = storage.cache_metrics();
assert!(
metrics.total_operations() > 0,
"Should have cache operations"
);
if let Some(hit_rate) = storage.cache_hit_rate() {
println!("Cache hit rate: {:.2}%", hit_rate * 100.0);
}
}
#[test]
fn test_edge_version_chain() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let mut version_ids = Vec::new();
for i in 0..5 {
let version_id = VersionId::new(100 + i).unwrap();
let temporal = BiTemporalInterval::current((1000 + (i as i64) * 100).into());
let props = PropertyMapBuilder::new()
.insert("weight", i as i64)
.insert("since", "2024")
.build();
storage
.add_edge_version(
edge_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
source,
target,
props,
false, )
.unwrap();
version_ids.push(version_id);
}
assert!(
storage
.get_edge_version(version_ids[0])
.unwrap()
.is_anchor()
);
assert!(storage.get_edge_version(version_ids[1]).unwrap().is_delta());
assert!(storage.get_edge_version(version_ids[2]).unwrap().is_delta());
assert!(
storage
.get_edge_version(version_ids[3])
.unwrap()
.is_anchor()
);
assert!(storage.get_edge_version(version_ids[4]).unwrap().is_delta());
}
#[test]
fn test_edge_property_reconstruction() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let v1 = VersionId::new(1).unwrap();
storage
.add_edge_version(
edge_id,
v1,
1000.into(),
1000.into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", 10i64)
.insert("since", "2020")
.build(),
false, )
.unwrap();
let v2 = VersionId::new(2).unwrap();
storage
.add_edge_version(
edge_id,
v2,
2000.into(),
2000.into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", 20i64)
.insert("since", "2020")
.build(),
false, )
.unwrap();
let props_v1 = storage.reconstruct_edge_properties(v1).unwrap();
assert_eq!(props_v1.get("weight").and_then(|v| v.as_int()), Some(10));
assert_eq!(props_v1.get("since").and_then(|v| v.as_str()), Some("2020"));
let props_v2 = storage.reconstruct_edge_properties(v2).unwrap();
assert_eq!(props_v2.get("weight").and_then(|v| v.as_int()), Some(20));
assert_eq!(props_v2.get("since").and_then(|v| v.as_str()), Some("2020"));
}
#[test]
fn test_edge_find_version_at_time() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
let v3 = VersionId::new(3).unwrap();
storage
.add_edge_version(
edge_id,
v1,
0.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", 10i64).build(),
false, )
.unwrap();
storage
.add_edge_version(
edge_id,
v2,
1000.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", 20i64).build(),
false, )
.unwrap();
storage
.add_edge_version(
edge_id,
v3,
2000.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new().insert("weight", 30i64).build(),
false, )
.unwrap();
assert_eq!(
storage.find_edge_version_at_time(edge_id, 500.into(), 100.into()),
Some(v1)
);
assert_eq!(
storage.find_edge_version_at_time(edge_id, 1500.into(), 100.into()),
Some(v2)
);
assert_eq!(
storage.find_edge_version_at_time(edge_id, 2500.into(), 100.into()),
Some(v3)
);
}
#[test]
fn test_edge_version_chain_links() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
let v3 = VersionId::new(3).unwrap();
for (i, vid) in [v1, v2, v3].iter().enumerate() {
storage
.add_edge_version(
edge_id,
*vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
}
let version1 = storage.get_edge_version(v1).unwrap();
assert_eq!(version1.prev_version, None);
assert_eq!(version1.next_version, Some(v2));
let version2 = storage.get_edge_version(v2).unwrap();
assert_eq!(version2.prev_version, Some(v1));
assert_eq!(version2.next_version, Some(v3));
let version3 = storage.get_edge_version(v3).unwrap();
assert_eq!(version3.prev_version, Some(v2));
assert_eq!(version3.next_version, None);
}
#[test]
fn test_first_edge_version_is_anchor() {
let mut storage = HistoricalStorage::new();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(1).unwrap();
let target = NodeId::new(2).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let temporal = BiTemporalInterval::current(1000.into());
let props = PropertyMapBuilder::new().insert("weight", 5i64).build();
storage
.add_edge_version(
edge_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
source,
target,
props,
false, )
.unwrap();
let version = storage.get_edge_version(version_id).unwrap();
assert!(version.is_anchor());
assert_eq!(version.edge_id, edge_id);
assert_eq!(version.prev_version, None);
assert_eq!(version.source, source);
assert_eq!(version.target, target);
}
#[test]
fn test_independent_node_edge_anchor_intervals() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let node_id = NodeId::new(1).unwrap();
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(2).unwrap();
let target = NodeId::new(3).unwrap();
let node_label = GLOBAL_INTERNER.intern("Person").unwrap();
let edge_label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let mut node_version_ids = Vec::new();
let mut edge_version_ids = Vec::new();
for i in 0..5 {
let node_vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
node_vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
node_label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
node_version_ids.push(node_vid);
let edge_vid = VersionId::new(100 + i).unwrap();
storage
.add_edge_version(
edge_id,
edge_vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
edge_label,
source,
target,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
edge_version_ids.push(edge_vid);
}
assert!(
storage
.get_node_version(node_version_ids[0])
.unwrap()
.is_anchor()
);
assert!(
storage
.get_node_version(node_version_ids[1])
.unwrap()
.is_delta()
);
assert!(
storage
.get_node_version(node_version_ids[2])
.unwrap()
.is_delta()
);
assert!(
storage
.get_node_version(node_version_ids[3])
.unwrap()
.is_anchor()
);
assert!(
storage
.get_node_version(node_version_ids[4])
.unwrap()
.is_delta()
);
assert!(
storage
.get_edge_version(edge_version_ids[0])
.unwrap()
.is_anchor()
);
assert!(
storage
.get_edge_version(edge_version_ids[1])
.unwrap()
.is_delta()
);
assert!(
storage
.get_edge_version(edge_version_ids[2])
.unwrap()
.is_delta()
);
assert!(
storage
.get_edge_version(edge_version_ids[3])
.unwrap()
.is_anchor()
);
assert!(
storage
.get_edge_version(edge_version_ids[4])
.unwrap()
.is_delta()
);
}
#[test]
fn test_count_versions_since_anchor_generic() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let mut version_ids = Vec::new();
for i in 0..3 {
let vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
version_ids.push(vid);
}
assert_eq!(storage.count_versions_since_anchor_node(version_ids[2]), 2);
assert_eq!(storage.count_versions_since_anchor_node(version_ids[1]), 1);
assert_eq!(storage.count_versions_since_anchor_node(version_ids[0]), 0);
for i in 3..5 {
let vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
version_ids.push(vid);
}
assert_eq!(storage.count_versions_since_anchor_node(version_ids[4]), 1);
assert_eq!(storage.count_versions_since_anchor_node(version_ids[3]), 0);
}
#[test]
fn test_version_counter_cache() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 5,
max_delta_chain: 10,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_node_version(
node_id,
v0,
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new().insert("version", 0i64).build(),
false, )
.unwrap();
assert!(storage.get_node_version(v0).unwrap().is_anchor());
for i in 1..5 {
let vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
assert!(storage.get_node_version(vid).unwrap().is_delta());
}
let v5 = VersionId::new(5).unwrap();
storage
.add_node_version(
node_id,
v5,
1600.into(),
1600.into(),
label,
PropertyMapBuilder::new().insert("version", 5i64).build(),
false, )
.unwrap();
assert!(storage.get_node_version(v5).unwrap().is_anchor());
let v6 = VersionId::new(6).unwrap();
storage
.add_node_version(
node_id,
v6,
1700.into(),
1700.into(),
label,
PropertyMapBuilder::new().insert("version", 6i64).build(),
false, )
.unwrap();
assert!(storage.get_node_version(v6).unwrap().is_delta());
let node_id2 = NodeId::new(2).unwrap();
for i in 0..3 {
let vid = VersionId::new(100 + i).unwrap();
storage
.add_node_version(
node_id2,
vid,
(2000 + (i as i64) * 100).into(),
(2000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
}
assert!(
storage
.get_node_version(VersionId::new(100).unwrap())
.unwrap()
.is_anchor()
);
assert!(
storage
.get_node_version(VersionId::new(101).unwrap())
.unwrap()
.is_delta()
);
assert!(
storage
.get_node_version(VersionId::new(102).unwrap())
.unwrap()
.is_delta()
);
}
#[test]
fn test_edge_version_counter_cache() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
});
let edge_id = EdgeId::new(1).unwrap();
let from = NodeId::new(1).unwrap();
let to = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_edge_version(
edge_id,
v0,
1000.into(),
1000.into(),
label,
from,
to,
PropertyMapBuilder::new().insert("version", 0i64).build(),
false, )
.unwrap();
assert!(storage.get_edge_version(v0).unwrap().is_anchor());
for i in 1..3 {
let vid = VersionId::new(i).unwrap();
storage
.add_edge_version(
edge_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
from,
to,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
assert!(storage.get_edge_version(vid).unwrap().is_delta());
}
let v3 = VersionId::new(3).unwrap();
storage
.add_edge_version(
edge_id,
v3,
1400.into(),
1400.into(),
label,
from,
to,
PropertyMapBuilder::new().insert("version", 3i64).build(),
false, )
.unwrap();
assert!(storage.get_edge_version(v3).unwrap().is_anchor());
}
#[test]
fn test_counter_cache_rebuilt_after_persistence_restore() {
let config = AnchorConfig {
anchor_interval: 5,
max_delta_chain: 10,
};
let mut original = HistoricalStorage::with_config(config.clone());
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
for i in 0..7 {
let vid = VersionId::new(i).unwrap();
original
.add_node_version(
node_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
}
assert!(
original
.get_node_version(VersionId::new(0).unwrap())
.unwrap()
.is_anchor()
); assert!(
original
.get_node_version(VersionId::new(1).unwrap())
.unwrap()
.is_delta()
); assert!(
original
.get_node_version(VersionId::new(4).unwrap())
.unwrap()
.is_delta()
); assert!(
original
.get_node_version(VersionId::new(5).unwrap())
.unwrap()
.is_anchor()
); assert!(
original
.get_node_version(VersionId::new(6).unwrap())
.unwrap()
.is_delta()
);
let saved_versions: Vec<NodeVersion> = original.node_versions.values().cloned().collect();
let mut restored = HistoricalStorage::with_config(config);
for version in saved_versions {
restored.insert_restored_node_version(version).unwrap();
}
restored.rebuild_version_chains();
let counter = restored
.node_versions_since_anchor
.get(&node_id)
.copied()
.unwrap_or(0);
assert_eq!(
counter, 1,
"Counter should be 1 after version 6 (one delta since anchor at v5)"
);
for i in 7..9 {
let vid = VersionId::new(i).unwrap();
restored
.add_node_version(
node_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
assert!(
restored.get_node_version(vid).unwrap().is_delta(),
"Version {} should be delta",
i
);
}
let v9 = VersionId::new(9).unwrap();
restored
.add_node_version(
node_id,
v9,
1900.into(),
1900.into(),
label,
PropertyMapBuilder::new().insert("version", 9i64).build(),
false, )
.unwrap();
assert!(
restored.get_node_version(v9).unwrap().is_delta(),
"Version 9 should be delta"
);
let v10 = VersionId::new(10).unwrap();
restored
.add_node_version(
node_id,
v10,
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new().insert("version", 10i64).build(),
false, )
.unwrap();
assert!(
restored.get_node_version(v10).unwrap().is_anchor(),
"Version 10 should be anchor after 5 deltas"
);
let counter_after = restored
.node_versions_since_anchor
.get(&node_id)
.copied()
.unwrap_or(0);
assert_eq!(
counter_after, 0,
"Counter should be reset to 0 after creating anchor"
);
}
#[test]
fn test_edge_counter_cache_rebuilt_after_restore() {
let config = AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
};
let mut original = HistoricalStorage::with_config(config.clone());
let edge_id = EdgeId::new(1).unwrap();
let from = NodeId::new(1).unwrap();
let to = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
for i in 0..5 {
let vid = VersionId::new(i).unwrap();
original
.add_edge_version(
edge_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
from,
to,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
}
let saved_versions: Vec<EdgeVersion> = original.edge_versions.values().cloned().collect();
let mut restored = HistoricalStorage::with_config(config);
for version in saved_versions {
restored.insert_restored_edge_version(version).unwrap();
}
restored.rebuild_version_chains();
let counter = restored
.edge_versions_since_anchor
.get(&edge_id)
.copied()
.unwrap_or(0);
assert_eq!(counter, 1, "Edge counter should be 1 after restore");
for i in 5..7 {
let vid = VersionId::new(i).unwrap();
restored
.add_edge_version(
edge_id,
vid,
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
from,
to,
PropertyMapBuilder::new()
.insert("version", i as i64)
.build(),
false, )
.unwrap();
}
assert!(
restored
.get_edge_version(VersionId::new(5).unwrap())
.unwrap()
.is_delta()
);
assert!(
restored
.get_edge_version(VersionId::new(6).unwrap())
.unwrap()
.is_anchor()
);
}
#[test]
fn test_node_reconstruction_with_long_delta_chain() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 50, max_delta_chain: 50,
},
RetentionPolicy::default(),
0, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("TestNode").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_node_version(
node_id,
v0,
0.into(),
0.into(),
label,
PropertyMapBuilder::new()
.insert("counter", 0i64)
.insert("name", "test")
.insert("active", true)
.build(),
false, )
.unwrap();
let mut current_name = "test".to_string();
let mut current_active = true;
for i in 1..=40 {
let vid = VersionId::new(i).unwrap();
if i % 3 == 0 {
current_name = format!("test_{}", i);
}
if i % 5 == 0 {
current_active = i % 2 == 0;
}
storage
.add_node_version(
node_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
PropertyMapBuilder::new()
.insert("counter", i as i64)
.insert("name", current_name.clone())
.insert("active", current_active)
.build(),
false, )
.unwrap();
}
let final_version = VersionId::new(40).unwrap();
let props = storage.reconstruct_node_properties(final_version).unwrap();
assert_eq!(props.get("counter").and_then(|v| v.as_int()), Some(40));
assert_eq!(props.get("name").and_then(|v| v.as_str()), Some("test_39")); assert_eq!(props.get("active").and_then(|v| v.as_bool()), Some(true));
let metrics = storage.cache_metrics();
assert!(
metrics.full_reconstructions > 0,
"Should have performed reconstruction"
);
}
#[test]
fn test_edge_reconstruction_with_long_delta_chain() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 50,
max_delta_chain: 50,
},
RetentionPolicy::default(),
0, );
let edge_id = EdgeId::new(1).unwrap();
let source = NodeId::new(100).unwrap();
let target = NodeId::new(200).unwrap();
let label = GLOBAL_INTERNER.intern("TestEdge").unwrap();
let v0 = VersionId::new(0).unwrap();
storage
.add_edge_version(
edge_id,
v0,
0.into(),
0.into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", 0.0f64)
.insert("type", "initial")
.build(),
false, )
.unwrap();
let mut current_type = "initial".to_string();
for i in 1..=40 {
let vid = VersionId::new(i).unwrap();
if i % 7 == 0 {
current_type = format!("updated_{}", i);
}
storage
.add_edge_version(
edge_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
source,
target,
PropertyMapBuilder::new()
.insert("weight", i as f64)
.insert("type", current_type.clone())
.build(),
false, )
.unwrap();
}
let final_version = VersionId::new(40).unwrap();
let props = storage.reconstruct_edge_properties(final_version).unwrap();
assert_eq!(props.get("weight").and_then(|v| v.as_float()), Some(40.0));
assert_eq!(
props.get("type").and_then(|v| v.as_str()),
Some("updated_35")
);
let metrics = storage.cache_metrics();
assert!(
metrics.full_reconstructions > 0,
"Should have performed reconstruction"
);
}
#[test]
fn test_reconstruction_correctness_at_various_depths() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 20,
max_delta_chain: 20,
},
RetentionPolicy::default(),
0, );
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..15 {
let vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
PropertyMapBuilder::new()
.insert("version", i as i64)
.insert("sum", (i * (i + 1) / 2) as i64) .build(),
false, )
.unwrap();
}
for i in 0..15 {
let vid = VersionId::new(i).unwrap();
let props = storage.reconstruct_node_properties(vid).unwrap();
assert_eq!(
props.get("version").and_then(|v| v.as_int()),
Some(i as i64),
"Version {} should have version={}",
i,
i
);
assert_eq!(
props.get("sum").and_then(|v| v.as_int()),
Some((i * (i + 1) / 2) as i64),
"Version {} should have correct sum",
i
);
}
}
#[test]
fn test_reconstruction_with_property_deletion() {
let mut storage = HistoricalStorage::with_config_retention_and_cache_size(
AnchorConfig {
anchor_interval: 10,
max_delta_chain: 10,
},
RetentionPolicy::default(),
0,
);
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
storage
.add_node_version(
node_id,
VersionId::new(0).unwrap(),
0.into(),
0.into(),
label,
PropertyMapBuilder::new()
.insert("a", "value_a")
.insert("b", "value_b")
.insert("c", "value_c")
.build(),
false, )
.unwrap();
storage
.add_node_version(
node_id,
VersionId::new(1).unwrap(),
1000.into(),
1000.into(),
label,
PropertyMapBuilder::new()
.insert("a", "value_a")
.insert("c", "new_value_c")
.build(),
false, )
.unwrap();
storage
.add_node_version(
node_id,
VersionId::new(2).unwrap(),
2000.into(),
2000.into(),
label,
PropertyMapBuilder::new()
.insert("a", "value_a")
.insert("c", "new_value_c")
.insert("d", "value_d")
.build(),
false, )
.unwrap();
let props_v1 = storage
.reconstruct_node_properties(VersionId::new(1).unwrap())
.unwrap();
assert!(
props_v1.get("b").is_none(),
"v1 should not have property 'b'"
);
assert_eq!(
props_v1.get("c").and_then(|v| v.as_str()),
Some("new_value_c")
);
let props_v2 = storage
.reconstruct_node_properties(VersionId::new(2).unwrap())
.unwrap();
assert!(
props_v2.get("b").is_none(),
"v2 should not have property 'b'"
);
assert_eq!(props_v2.get("d").and_then(|v| v.as_str()), Some("value_d"));
}
#[test]
fn test_reconstruction_with_anchor_interval() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 5,
max_delta_chain: 5,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for i in 0..12 {
let vid = VersionId::new(i).unwrap();
storage
.add_node_version(
node_id,
vid,
(i as i64 * 1000).into(),
(i as i64 * 1000).into(),
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
let props_v3 = storage
.reconstruct_node_properties(VersionId::new(3).unwrap())
.unwrap();
assert_eq!(props_v3.get("value").and_then(|v| v.as_int()), Some(3));
let props_v7 = storage
.reconstruct_node_properties(VersionId::new(7).unwrap())
.unwrap();
assert_eq!(props_v7.get("value").and_then(|v| v.as_int()), Some(7));
let props_v11 = storage
.reconstruct_node_properties(VersionId::new(11).unwrap())
.unwrap();
assert_eq!(props_v11.get("value").and_then(|v| v.as_int()), Some(11));
let props_v5 = storage
.reconstruct_node_properties(VersionId::new(5).unwrap())
.unwrap();
assert_eq!(props_v5.get("value").and_then(|v| v.as_int()), Some(5));
}
#[test]
fn test_stats_uses_cached_counters() {
let config = AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
};
let mut storage = HistoricalStorage::with_config(config);
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let node_id = NodeId::new(1).unwrap();
let edge_id = EdgeId::new(1).unwrap();
for i in 0..7 {
storage
.add_node_version(
node_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
for i in 0..5 {
storage
.add_edge_version(
edge_id,
VersionId::new(100 + i).unwrap(),
(2000 + (i as i64) * 100).into(),
(2000 + (i as i64) * 100).into(),
label,
node_id,
node_id,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
let stats = storage.stats();
assert_eq!(stats.total_node_versions, 7);
assert_eq!(stats.node_anchor_count, 3, "Should have 3 node anchors");
assert_eq!(stats.node_delta_count, 4, "Should have 4 node deltas");
assert_eq!(stats.total_edge_versions, 5);
assert_eq!(stats.edge_anchor_count, 2, "Should have 2 edge anchors");
assert_eq!(stats.edge_delta_count, 3, "Should have 3 edge deltas");
assert_eq!(stats.unique_nodes, 1);
assert_eq!(stats.unique_edges, 1);
}
#[test]
fn test_stats_counters_with_multiple_entities() {
let config = AnchorConfig {
anchor_interval: 2,
max_delta_chain: 10,
};
let mut storage = HistoricalStorage::with_config(config);
let label = GLOBAL_INTERNER.intern("Test").unwrap();
for node_idx in 1..=3 {
let node_id = NodeId::new(node_idx).unwrap();
for i in 0..4 {
storage
.add_node_version(
node_id,
VersionId::new(node_idx * 100 + i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
}
for edge_idx in 1..=2 {
let edge_id = EdgeId::new(edge_idx).unwrap();
for i in 0..3 {
storage
.add_edge_version(
edge_id,
VersionId::new(edge_idx * 1000 + i).unwrap(),
(2000 + (i as i64) * 100).into(),
(2000 + (i as i64) * 100).into(),
label,
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
}
let stats = storage.stats();
assert_eq!(stats.total_node_versions, 12);
assert_eq!(stats.node_anchor_count, 6, "Should have 6 node anchors");
assert_eq!(stats.node_delta_count, 6, "Should have 6 node deltas");
assert_eq!(stats.total_edge_versions, 6);
assert_eq!(stats.edge_anchor_count, 4, "Should have 4 edge anchors");
assert_eq!(stats.edge_delta_count, 2, "Should have 2 edge deltas");
assert_eq!(stats.unique_nodes, 3);
assert_eq!(stats.unique_edges, 2);
}
#[test]
fn test_stats_counters_remain_accurate_after_persistence_restore() {
let config = AnchorConfig {
anchor_interval: 3,
max_delta_chain: 10,
};
let mut original = HistoricalStorage::with_config(config.clone());
let label = GLOBAL_INTERNER.intern("Test").unwrap();
let node_id = NodeId::new(1).unwrap();
for i in 0..5 {
original
.add_node_version(
node_id,
VersionId::new(i).unwrap(),
(1000 + (i as i64) * 100).into(),
(1000 + (i as i64) * 100).into(),
label,
PropertyMapBuilder::new().insert("value", i as i64).build(),
false, )
.unwrap();
}
let stats_before = original.stats();
assert_eq!(stats_before.total_node_versions, 5);
assert_eq!(stats_before.node_anchor_count, 2);
assert_eq!(stats_before.node_delta_count, 3);
let saved_versions: Vec<NodeVersion> = original.node_versions.values().cloned().collect();
let mut restored = HistoricalStorage::with_config(config);
for version in saved_versions {
restored.insert_restored_node_version(version).unwrap();
}
restored.rebuild_version_chains();
let stats_after = restored.stats();
assert_eq!(stats_after.total_node_versions, 5);
assert_eq!(
stats_after.node_anchor_count, 2,
"Anchor count should be preserved after restore"
);
assert_eq!(
stats_after.node_delta_count, 3,
"Delta count should be preserved after restore"
);
}
#[test]
fn test_delta_creation_caches_properties() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 100,
max_delta_chain: 200,
});
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
for i in 0..10 {
let version_id = VersionId::new(100 + i).unwrap();
let temporal = BiTemporalInterval::current((1000 + (i as i64) * 100).into());
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("version", i as i64)
.build();
storage
.add_node_version(
node_id,
version_id,
temporal.valid_time().start(),
temporal.transaction_time().start(),
label,
props,
false, )
.unwrap();
}
let metrics = storage.cache_metrics();
assert_eq!(
metrics.full_reconstructions, 0,
"Expected 0 full reconstructions when adding consecutive deltas, but got {}. \
This indicates we're reconstructing properties we just added. \
Issue #210: Cache properties at write-time to avoid this.",
metrics.full_reconstructions
);
for i in 0..10 {
let version_id = VersionId::new(100 + i).unwrap();
let props = storage.reconstruct_node_properties(version_id).unwrap();
assert_eq!(
props.get("version").unwrap().as_int().unwrap(),
i as i64,
"Property reconstruction failed for version {}",
i
);
}
}
#[test]
fn test_edge_delta_creation_caches_properties() {
let mut storage = HistoricalStorage::with_config(AnchorConfig {
anchor_interval: 100,
max_delta_chain: 200,
});
let edge_id = EdgeId::new(1).unwrap();
let from = NodeId::new(1).unwrap();
let to = NodeId::new(2).unwrap();
let label = GLOBAL_INTERNER.intern("KNOWS").unwrap();
for i in 0..10 {
let version_id = VersionId::new(100 + i).unwrap();
let timestamp = (1000 + (i as i64) * 100).into();
let props = PropertyMapBuilder::new()
.insert("strength", i as i64)
.build();
storage
.add_edge_version(
edge_id, version_id, timestamp, timestamp, label, from, to, props,
false, )
.unwrap();
}
let metrics = storage.cache_metrics();
assert_eq!(
metrics.full_reconstructions, 0,
"Expected 0 full reconstructions for edge deltas, but got {}. \
Issue #210: Cache edge properties at write-time.",
metrics.full_reconstructions
);
for i in 0..10 {
let version_id = VersionId::new(100 + i).unwrap();
let props = storage.reconstruct_edge_properties(version_id).unwrap();
assert_eq!(props.get("strength").unwrap().as_int().unwrap(), i as i64);
}
}
#[test]
fn test_sentry_find_node_version_at_time_cycle_detection() {
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut storage = HistoricalStorage::new();
let node_id = NodeId::new(1).unwrap();
let label = GLOBAL_INTERNER.intern("CycleTest").unwrap();
let v1_id = VersionId::new(1).unwrap();
let v1 = NodeVersion::new_anchor(
v1_id,
node_id,
BiTemporalInterval::current(1000.into()),
label,
PropertyMapBuilder::new().build(),
);
storage.insert_restored_node_version(v1).unwrap();
let v2_id = VersionId::new(2).unwrap();
let v2 = NodeVersion::new_delta(
v2_id,
node_id,
BiTemporalInterval::current(2000.into()),
label,
&PropertyMapBuilder::new().build(),
&PropertyMapBuilder::new().build(),
v1_id, );
storage.insert_restored_node_version(v2).unwrap();
if let Some(v1_mut) = storage.node_versions.get_mut(&v1_id) {
v1_mut.prev_version = Some(v2_id);
}
let result = storage.find_node_version_at_time(
node_id,
500.into(), 500.into(), );
tx.send(result).unwrap();
});
let result = rx.recv_timeout(Duration::from_millis(500));
match result {
Ok(found) => {
assert!(
found.is_none(),
"Should return None on cycle detection/exhaustion"
);
}
Err(_) => {
panic!("find_node_version_at_time hung (infinite loop detected by timeout)");
}
}
}