use super::*;
use crate::core::temporal::{BiTemporalInterval, TIMESTAMP_MAX};
#[test]
fn test_insert_and_find_node_versions() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(
TimeRange::new(2000.into(), 3000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(500.into(), 1500.into()).unwrap(),
);
assert_eq!(results.len(), 2);
assert!(results.contains(&v1));
assert!(results.contains(&v2));
assert!(!results.contains(&v3));
}
#[test]
fn test_edge_cases_overlap() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 3000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
let results =
indexes.find_node_versions_in_valid_time_range(node_id, TimeRange::at(1500.into()));
assert_eq!(results.len(), 2);
assert!(results.contains(&v1));
assert!(results.contains(&v2));
let results =
indexes.find_node_versions_in_valid_time_range(node_id, TimeRange::at(500.into()));
assert_eq!(results.len(), 1);
assert!(results.contains(&v1));
let results =
indexes.find_node_versions_in_valid_time_range(node_id, TimeRange::at(2500.into()));
assert_eq!(results.len(), 1);
assert!(results.contains(&v2));
}
#[test]
fn test_adjacent_intervals() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(1000.into(), 1001.into()).unwrap(),
);
assert_eq!(results.len(), 1);
assert!(results.contains(&v2));
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(500.into(), 1000.into()).unwrap(),
);
assert_eq!(results.len(), 1);
assert!(results.contains(&v1));
}
#[test]
fn test_batch_insertion() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let versions = vec![
(
VersionId::new(1).unwrap(),
BiTemporalInterval::new(
TimeRange::new(0.into(), 10.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
VersionId::new(3).unwrap(),
BiTemporalInterval::new(
TimeRange::new(20.into(), 30.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
VersionId::new(2).unwrap(),
BiTemporalInterval::new(
TimeRange::new(10.into(), 20.into()).unwrap(),
TimeRange::from(0.into()),
),
),
];
indexes
.insert_node_versions_batch(node_id, versions)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(5.into(), 25.into()).unwrap(),
);
assert_eq!(results.len(), 3);
let timelines = indexes.index.get(&EntityId::Node(node_id)).unwrap();
assert_eq!(timelines.valid.versions[0].start, 0.into());
assert_eq!(timelines.valid.versions[1].start, 10.into());
assert_eq!(timelines.valid.versions[2].start, 20.into());
}
#[test]
fn test_transaction_time_range_query() {
let indexes = TemporalIndexes::new();
let edge_id = EdgeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_edge_version(
edge_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
TimeRange::new(1000.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_edge_version(
edge_id,
v2,
BiTemporalInterval::new(
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
TimeRange::new(2000.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_edge_versions_in_transaction_time_range(
edge_id,
TimeRange::new(1500.into(), 2500.into()).unwrap(),
);
assert_eq!(results.len(), 2);
assert!(results.contains(&v1));
assert!(results.contains(&v2));
}
#[test]
fn test_multiple_entities() {
let indexes = TemporalIndexes::new();
let node1 = NodeId::new(1).unwrap();
let node2 = NodeId::new(2).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_node_version(node1, v1, BiTemporalInterval::current(1000.into()))
.unwrap();
indexes
.insert_node_version(node2, v2, BiTemporalInterval::current(1000.into()))
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node1,
TimeRange::new(0.into(), 2000.into()).unwrap(),
);
assert_eq!(results.len(), 1);
assert!(results.contains(&v1));
assert!(!results.contains(&v2));
}
#[test]
fn test_clear() {
let indexes = TemporalIndexes::new();
indexes
.insert_node_version(
NodeId::new(1).unwrap(),
VersionId::new(100).unwrap(),
BiTemporalInterval::current(1000.into()),
)
.unwrap();
assert!(indexes.version_count() > 0);
indexes.clear();
assert_eq!(indexes.version_count(), 0);
}
#[test]
fn test_empty_timeline_query() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), 1000.into()).unwrap(),
);
assert_eq!(results.len(), 0, "Empty timeline should return no results");
}
#[test]
fn test_retroactive_single_inserts() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(
TimeRange::new(20000.into(), 21000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(10000.into(), 11000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(timelines.valid.versions.len(), 3, "Should have 3 versions");
assert_eq!(
timelines.valid.versions[0].start,
0.into(),
"First version should start at 0"
);
let idx0 = timelines.valid.versions[0].metadata_index();
assert_eq!(
timelines.get_version_metadata(idx0).unwrap().version_id(),
v1,
"First version should be v1"
);
assert_eq!(
timelines.valid.versions[1].start,
10000.into(),
"Second version should start at 10000"
);
let idx1 = timelines.valid.versions[1].metadata_index();
assert_eq!(
timelines.get_version_metadata(idx1).unwrap().version_id(),
v2,
"Second version should be v2"
);
assert_eq!(
timelines.valid.versions[2].start,
20000.into(),
"Third version should start at 20000"
);
let idx2 = timelines.valid.versions[2].metadata_index();
assert_eq!(
timelines.get_version_metadata(idx2).unwrap().version_id(),
v3,
"Third version should be v3"
);
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(9000.into(), 11000.into()).unwrap(),
);
assert_eq!(results.len(), 1, "Should find 1 version in range");
assert_eq!(results[0], v2, "Should find v2 in the middle");
}
#[test]
fn test_batch_insert_deduplication() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let mut timelines = indexes.index.get_mut(&EntityId::Node(node_id)).unwrap();
let v3 = VersionId::new(102).unwrap();
let new_metadata_idx = timelines
.add_version_metadata(TimelineVersionMetadata::new(v3))
.unwrap();
let duplicate_entries = vec![
TimelineEntry {
start: 0.into(),
end: 1500.into(), metadata_idx: 0, },
TimelineEntry {
start: 2000.into(),
end: 3000.into(),
metadata_idx: new_metadata_idx,
},
];
timelines
.valid
.insert_batch(duplicate_entries, DeduplicationPolicy::default())
.unwrap();
assert_eq!(
timelines.valid.versions.len(),
3,
"Deduplication should remove duplicate entry with metadata_idx=0"
);
let idx0_count = timelines
.valid
.versions
.iter()
.filter(|e| e.metadata_idx == 0)
.count();
assert_eq!(
idx0_count, 1,
"metadata_idx=0 should appear exactly once after dedup"
);
let idx0_entry = timelines
.valid
.versions
.iter()
.find(|e| e.metadata_idx == 0)
.unwrap();
assert_eq!(
idx0_entry.end,
1000.into(),
"Should keep first occurrence (end=1000)"
);
}
#[test]
fn test_batch_insert_deduplication_non_consecutive_first_occurrence() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(200).unwrap();
let v2 = VersionId::new(201).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let mut timelines = indexes.index.get_mut(&EntityId::Node(node_id)).unwrap();
let v3 = VersionId::new(202).unwrap();
let new_metadata_idx = timelines
.add_version_metadata(TimelineVersionMetadata::new(v3))
.unwrap();
timelines
.valid
.insert_batch(
vec![
TimelineEntry {
start: 1500.into(),
end: 2500.into(),
metadata_idx: 0,
},
TimelineEntry {
start: 2000.into(),
end: 3000.into(),
metadata_idx: new_metadata_idx,
},
],
DeduplicationPolicy::FirstOccurrence,
)
.unwrap();
assert_eq!(
timelines.valid.versions.len(),
3,
"FirstOccurrence must deduplicate non-consecutive duplicate metadata indices"
);
let idx0_entries: Vec<_> = timelines
.valid
.versions
.iter()
.filter(|e| e.metadata_idx == 0)
.collect();
assert_eq!(idx0_entries.len(), 1);
assert_eq!(idx0_entries[0].start, 0.into());
assert_eq!(idx0_entries[0].end, 1000.into());
}
#[test]
fn test_batch_insert_deduplication_non_consecutive_last_occurrence() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(2).unwrap();
let v1 = VersionId::new(300).unwrap();
let v2 = VersionId::new(301).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let mut timelines = indexes.index.get_mut(&EntityId::Node(node_id)).unwrap();
timelines
.valid
.insert_batch(
vec![TimelineEntry {
start: 1500.into(),
end: 2500.into(),
metadata_idx: 0,
}],
DeduplicationPolicy::LastOccurrence,
)
.unwrap();
assert_eq!(
timelines.valid.versions.len(),
2,
"LastOccurrence must deduplicate non-consecutive duplicate metadata indices"
);
let idx0_entries: Vec<_> = timelines
.valid
.versions
.iter()
.filter(|e| e.metadata_idx == 0)
.collect();
assert_eq!(idx0_entries.len(), 1);
assert_eq!(idx0_entries[0].start, 1500.into());
assert_eq!(idx0_entries[0].end, 2500.into());
}
#[test]
fn test_concurrent_entity_writes() {
use std::sync::Arc;
use std::thread;
let indexes = Arc::new(TemporalIndexes::new());
let num_threads = 10;
let versions_per_thread = 1000;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let idx = Arc::clone(&indexes);
thread::spawn(move || {
let node_id = NodeId::new(thread_id + 1).unwrap();
for v in 0..versions_per_thread {
let version_id = VersionId::new(thread_id * versions_per_thread + v).unwrap();
idx.insert_node_version(
node_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(
((v * 1000) as i64).into(),
(((v + 1) * 1000) as i64).into(),
)
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
for thread_id in 0..num_threads {
let node_id = NodeId::new(thread_id + 1).unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), ((versions_per_thread * 1000) as i64).into()).unwrap(),
);
assert_eq!(
results.len(),
versions_per_thread as usize,
"Thread {} should have {} versions",
thread_id,
versions_per_thread
);
}
assert_eq!(
indexes.version_count(),
(num_threads * versions_per_thread) as usize,
"Total version count should match number of inserts"
);
}
#[test]
fn test_concurrent_same_entity_contention() {
use std::sync::Arc;
use std::thread;
use std::time::Instant;
let indexes = Arc::new(TemporalIndexes::new());
let node_id = NodeId::new(1).unwrap(); let num_threads = 8;
let versions_per_thread = 500;
let start = Instant::now();
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let idx = Arc::clone(&indexes);
thread::spawn(move || {
for v in 0..versions_per_thread {
let version_id = VersionId::new(thread_id * versions_per_thread + v).unwrap();
idx.insert_node_version(
node_id, version_id,
BiTemporalInterval::new(
TimeRange::new(
(((thread_id * versions_per_thread + v) * 100) as i64).into(),
((((thread_id * versions_per_thread + v) + 1) * 100) as i64).into(),
)
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let elapsed = start.elapsed();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(
0.into(),
(((num_threads * versions_per_thread) * 100) as i64).into(),
)
.unwrap(),
);
assert_eq!(
results.len(),
(num_threads * versions_per_thread) as usize,
"All versions should be indexed despite contention"
);
assert!(
elapsed.as_secs() < 2,
"Should complete in reasonable time despite contention, took {:?}",
elapsed
);
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
let versions = &timelines.valid.versions;
for i in 1..versions.len() {
assert!(
versions[i - 1].start <= versions[i].start,
"Timeline must remain sorted despite concurrent writes"
);
}
}
#[test]
fn test_very_large_history() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let version_count = 100_000;
for i in 0..version_count {
let version_id = VersionId::new(i).unwrap();
indexes
.insert_node_version(
node_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into())
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(5_000_000.into(), 5_001_000.into()).unwrap(),
);
assert!(
results.len() >= 10 && results.len() <= 11,
"Should find ~10 versions in 1000-tick range, found {}",
results.len()
);
let all_results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), ((version_count * 100) as i64).into()).unwrap(),
);
assert_eq!(
all_results.len(),
version_count as usize,
"Should find all versions in full range query"
);
}
#[test]
fn test_same_start_time_different_versions() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 3000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
let results =
indexes.find_node_versions_in_valid_time_range(node_id, TimeRange::at(1500.into()));
assert_eq!(
results.len(),
2,
"Both versions should be found when they have the same start time"
);
assert!(results.contains(&v1), "Version 1 should be in results");
assert!(results.contains(&v2), "Version 2 should be in results");
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.valid.versions.len(),
2,
"Timeline should contain both versions"
);
assert_eq!(timelines.valid.versions[0].start, 1000.into());
assert_eq!(timelines.valid.versions[1].start, 1000.into());
}
#[test]
fn test_batch_insert_equivalence_with_same_start_retroactive_pattern() {
let node_id = NodeId::new(1).unwrap();
let versions = vec![
(
VersionId::new(1).unwrap(),
BiTemporalInterval::new(
TimeRange::new(10.into(), 15.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
VersionId::new(2).unwrap(),
BiTemporalInterval::new(
TimeRange::new(20.into(), 25.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
VersionId::new(3).unwrap(),
BiTemporalInterval::new(
TimeRange::new(10.into(), 12.into()).unwrap(),
TimeRange::from(0.into()),
),
),
];
let indexes_individual = TemporalIndexes::new();
for (version_id, temporal) in &versions {
indexes_individual
.insert_node_version(node_id, *version_id, *temporal)
.unwrap();
}
let indexes_batch = TemporalIndexes::new();
indexes_batch
.insert_node_versions_batch(node_id, versions)
.unwrap();
let entity_id = EntityId::Node(node_id);
let timelines_individual = indexes_individual.index.get(&entity_id).unwrap();
let timelines_batch = indexes_batch.index.get(&entity_id).unwrap();
let individual_entries: Vec<_> = timelines_individual
.valid
.versions
.iter()
.map(|entry| {
(
entry.start,
entry.end,
timelines_individual.resolve_version_id(entry.metadata_idx),
)
})
.collect();
let batch_entries: Vec<_> = timelines_batch
.valid
.versions
.iter()
.map(|entry| {
(
entry.start,
entry.end,
timelines_batch.resolve_version_id(entry.metadata_idx),
)
})
.collect();
assert_eq!(individual_entries, batch_entries);
}
#[test]
fn test_dos_protection_version_limit_node() {
let config = TemporalIndexConfig {
max_versions_per_entity: 10,
};
let indexes = TemporalIndexes::with_config(config);
let node_id = NodeId::new(1).unwrap();
for i in 0..10 {
let version_id = VersionId::new(i).unwrap();
let result = indexes.insert_node_version(
node_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(result.is_ok(), "Insert {} should succeed", i);
}
let version_id = VersionId::new(10).unwrap();
let result = indexes.insert_node_version(
node_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 1100.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(result.is_err(), "Insert beyond limit should fail");
let err = result.unwrap_err();
let err_str = err.to_string();
assert!(
err_str.contains("Capacity") || err_str.contains("exceeded"),
"Error should mention capacity: {}",
err_str
);
assert!(
err_str.contains("versions for entity"),
"Error should identify the entity: {}",
err_str
);
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.valid.versions.len(),
10,
"Should have exactly 10 versions after rejection"
);
}
#[test]
fn test_dos_protection_version_limit_edge() {
let config = TemporalIndexConfig {
max_versions_per_entity: 5,
};
let indexes = TemporalIndexes::with_config(config);
let edge_id = EdgeId::new(1).unwrap();
for i in 0..5 {
let version_id = VersionId::new(i).unwrap();
let result = indexes.insert_edge_version(
edge_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(result.is_ok(), "Insert {} should succeed", i);
}
let version_id = VersionId::new(5).unwrap();
let result = indexes.insert_edge_version(
edge_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(500.into(), 600.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(result.is_err(), "Insert beyond limit should fail");
let err_str = result.unwrap_err().to_string();
assert!(
err_str.contains("Capacity") || err_str.contains("exceeded"),
"Error should mention capacity: {}",
err_str
);
}
#[test]
fn test_dos_protection_different_entities_independent() {
let config = TemporalIndexConfig {
max_versions_per_entity: 5,
};
let indexes = TemporalIndexes::with_config(config);
let node1 = NodeId::new(1).unwrap();
let node2 = NodeId::new(2).unwrap();
for i in 0..5 {
indexes
.insert_node_version(
node1,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into())
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
for i in 0..5 {
let result = indexes.insert_node_version(
node2,
VersionId::new(100 + i).unwrap(),
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(
result.is_ok(),
"node2 insert {} should succeed (independent limit)",
i
);
}
let result = indexes.insert_node_version(
node1,
VersionId::new(10).unwrap(),
BiTemporalInterval::new(
TimeRange::new(500.into(), 600.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(result.is_err(), "node1 should still be at limit");
}
#[test]
fn test_dos_protection_batch_insert_respects_limit() {
let config = TemporalIndexConfig {
max_versions_per_entity: 10,
};
let indexes = TemporalIndexes::with_config(config);
let node_id = NodeId::new(1).unwrap();
for i in 0..8 {
indexes
.insert_node_version(
node_id,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into())
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
let batch = vec![
(
VersionId::new(8).unwrap(),
BiTemporalInterval::new(
TimeRange::new(800.into(), 900.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
),
(
VersionId::new(9).unwrap(),
BiTemporalInterval::new(
TimeRange::new(900.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
),
(
VersionId::new(10).unwrap(),
BiTemporalInterval::new(
TimeRange::new(1000.into(), 1100.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
),
];
let result = indexes.insert_node_versions_batch(node_id, batch);
assert!(result.is_err(), "Batch insert exceeding limit should fail");
let err_str = result.unwrap_err().to_string();
assert!(
err_str.contains("Capacity") || err_str.contains("exceeded"),
"Error should mention capacity: {}",
err_str
);
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.valid.versions.len(),
8,
"Should still have 8 versions after batch rejection"
);
}
#[test]
fn test_dos_protection_default_limit_reasonable() {
let config = TemporalIndexConfig::default();
assert_eq!(
config.max_versions_per_entity, 1_000_000,
"Default limit should be 1 million"
);
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for i in 0..1000 {
let result = indexes.insert_node_version(
node_id,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
);
assert!(
result.is_ok(),
"Insert {} should succeed with default limit",
i
);
}
}
#[test]
fn test_find_node_version_at_point_basic() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(500.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_node_version_at_point(node_id, 500.into(), 600.into());
assert_eq!(results.len(), 1, "Should find exactly one version");
assert_eq!(results[0], v1, "Should find v1");
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 600.into());
assert_eq!(results.len(), 1, "Should find exactly one version");
assert_eq!(results[0], v2, "Should find v2");
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 400.into());
assert!(
results.is_empty(),
"Should find no versions before v2 was recorded"
);
}
#[test]
fn test_find_node_version_at_point_empty_index() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let results = indexes.find_node_version_at_point(node_id, 500.into(), 600.into());
assert!(results.is_empty(), "Empty index should return no results");
}
#[test]
fn test_find_edge_version_at_point_basic() {
let indexes = TemporalIndexes::new();
let edge_id = EdgeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_edge_version(
edge_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_edge_version_at_point(edge_id, 500.into(), 600.into());
assert_eq!(results.len(), 1, "Should find exactly one version");
assert_eq!(results[0], v1, "Should find v1");
}
#[test]
fn test_find_node_version_at_point_overlapping_intervals() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 2000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 3000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 100.into());
assert_eq!(results.len(), 2, "Should find both overlapping versions");
assert!(results.contains(&v1), "Should include v1");
assert!(results.contains(&v2), "Should include v2");
let results = indexes.find_node_version_at_point(node_id, 500.into(), 100.into());
assert_eq!(results.len(), 1, "Should find only v1");
assert_eq!(results[0], v1, "Should be v1");
let results = indexes.find_node_version_at_point(node_id, 2500.into(), 100.into());
assert_eq!(results.len(), 1, "Should find only v2");
assert_eq!(results[0], v2, "Should be v2");
}
#[test]
fn test_find_node_version_at_point_boundary_conditions() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(500.into(), 1500.into()).unwrap(),
),
)
.unwrap();
let results = indexes.find_node_version_at_point(node_id, 999.into(), 1000.into());
assert!(
results.is_empty(),
"Should not find version before valid_time start"
);
let results = indexes.find_node_version_at_point(node_id, 1000.into(), 1000.into());
assert_eq!(results.len(), 1, "Should find at valid_time start");
let results = indexes.find_node_version_at_point(node_id, 1999.into(), 1000.into());
assert_eq!(results.len(), 1, "Should find just before valid_time end");
let results = indexes.find_node_version_at_point(node_id, 2000.into(), 1000.into());
assert!(
results.is_empty(),
"Should not find at valid_time end (exclusive)"
);
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 499.into());
assert!(results.is_empty(), "Should not find before tx_time start");
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 1500.into());
assert!(
results.is_empty(),
"Should not find at tx_time end (exclusive)"
);
}
mod proptests {
use super::*;
use proptest::prelude::*;
fn timestamp_strategy() -> impl Strategy<Value = i64> {
0i64..1_000_000_000i64
}
fn time_range_strategy() -> impl Strategy<Value = (i64, i64)> {
timestamp_strategy().prop_flat_map(|start| (Just(start), (start + 1)..=(start + 10_000)))
}
fn unique_version_entries_strategy(
size: impl Into<proptest::collection::SizeRange>,
) -> impl Strategy<Value = Vec<(VersionId, BiTemporalInterval)>> {
prop::collection::btree_map(
0u64..10_000u64, time_range_strategy(), size,
)
.prop_flat_map(|map| {
let vec: Vec<_> = map
.into_iter()
.map(|(vid, (start, end))| {
(
VersionId::new(vid).unwrap(),
BiTemporalInterval::new(
TimeRange::new(start.into(), end.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
})
.collect();
Just(vec).prop_shuffle()
})
}
proptest! {
#[test]
fn prop_insert_order_irrelevant(
versions in unique_version_entries_strategy(1..100)
) {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for (version_id, temporal) in &versions {
indexes.insert_node_version(node_id, *version_id, *temporal).unwrap();
}
let entity_id = EntityId::Node(node_id);
let timelines1 = indexes.index.get(&entity_id).unwrap();
let timeline1_entries: Vec<_> = timelines1.valid.versions.iter().map(|e| {
(e.start, e.end, timelines1.resolve_version_id(e.metadata_idx))
}).collect();
drop(timelines1);
indexes.clear();
let mut shuffled = versions.clone();
shuffled.sort_by_key(|(vid, _)| *vid); for (version_id, temporal) in shuffled {
indexes.insert_node_version(node_id, version_id, temporal).unwrap();
}
let timelines2 = indexes.index.get(&entity_id).unwrap();
let timeline2_entries: Vec<_> = timelines2.valid.versions.iter().map(|e| {
(e.start, e.end, timelines2.resolve_version_id(e.metadata_idx))
}).collect();
for i in 1..timeline1_entries.len() {
prop_assert!(timeline1_entries[i-1].0 <= timeline1_entries[i].0);
}
for i in 1..timeline2_entries.len() {
prop_assert!(timeline2_entries[i-1].0 <= timeline2_entries[i].0);
}
let mut t1_sorted = timeline1_entries.clone();
t1_sorted.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)).then(a.2.cmp(&b.2)));
let mut t2_sorted = timeline2_entries.clone();
t2_sorted.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)).then(a.2.cmp(&b.2)));
prop_assert_eq!(t1_sorted, t2_sorted);
}
#[test]
fn prop_range_query_correctness(
versions in unique_version_entries_strategy(1..50),
query_range in time_range_strategy()
) {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for (version_id, temporal) in &versions {
indexes.insert_node_version(node_id, *version_id, *temporal).unwrap();
}
let query_time_range = TimeRange::new(query_range.0.into(), query_range.1.into()).unwrap();
let results = indexes.find_node_versions_in_valid_time_range(node_id, query_time_range);
let mut expected: Vec<VersionId> = versions
.iter()
.filter(|(_, temporal)| {
let valid = temporal.valid_time();
valid.end() > query_range.0.into() && valid.start() < query_range.1.into()
})
.map(|(vid, _)| *vid)
.collect();
let mut results_sorted = results.clone();
results_sorted.sort();
expected.sort();
prop_assert_eq!(results_sorted, expected, "Query returned incorrect versions");
}
#[test]
fn prop_batch_insert_equivalence(
versions in unique_version_entries_strategy(1..50)
) {
let node_id = NodeId::new(1).unwrap();
let indexes1 = TemporalIndexes::new();
for (version_id, temporal) in &versions {
indexes1.insert_node_version(node_id, *version_id, *temporal).unwrap();
}
let indexes2 = TemporalIndexes::new();
indexes2.insert_node_versions_batch(node_id, versions.clone()).unwrap();
let entity_id = EntityId::Node(node_id);
let timelines1 = indexes1.index.get(&entity_id).unwrap();
let timelines2 = indexes2.index.get(&entity_id).unwrap();
prop_assert_eq!(timelines1.valid.versions.len(), timelines2.valid.versions.len());
for i in 0..timelines1.valid.versions.len() {
let e1 = &timelines1.valid.versions[i];
let e2 = &timelines2.valid.versions[i];
prop_assert_eq!(e1.start, e2.start);
prop_assert_eq!(e1.end, e2.end);
let v1 = timelines1.resolve_version_id(e1.metadata_idx);
let v2 = timelines2.resolve_version_id(e2.metadata_idx);
prop_assert_eq!(v1, v2);
}
}
#[test]
fn prop_retroactive_inserts_maintain_order(
versions in unique_version_entries_strategy(1..100)
) {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for (version_id, temporal) in &versions {
indexes.insert_node_version(node_id, *version_id, *temporal).unwrap();
}
let entity_id = EntityId::Node(node_id);
let timeline = indexes.index.get(&entity_id).unwrap().valid.versions.clone();
for i in 1..timeline.len() {
prop_assert!(
timeline[i-1].start <= timeline[i].start,
"Timeline not sorted: timeline[{}].start={} > timeline[{}].start={}",
i-1, timeline[i-1].start, i, timeline[i].start
);
}
}
#[test]
fn prop_point_query_subset_of_range(
versions in unique_version_entries_strategy(1..50),
point in timestamp_strategy()
) {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for (version_id, temporal) in &versions {
indexes.insert_node_version(node_id, *version_id, *temporal).unwrap();
}
let point_results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(point.into(), (point + 1).into()).unwrap()
);
let range_results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new((point - 1000).into(), (point + 1000).into()).unwrap()
);
for version_id in point_results {
prop_assert!(
range_results.contains(&version_id),
"Point query returned version not in range query: {:?}",
version_id
);
}
}
#[test]
fn prop_batch_maintains_sort_order(
versions in unique_version_entries_strategy(1..50)
) {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
indexes.insert_node_versions_batch(node_id, versions).unwrap();
let entity_id = EntityId::Node(node_id);
if let Some(timelines) = indexes.index.get(&entity_id) {
let timeline = &timelines.valid.versions;
for i in 1..timeline.len() {
prop_assert!(
timeline[i-1].start <= timeline[i].start,
"Timeline must be sorted by start time"
);
}
for i in 1..timeline.len() {
if timeline[i-1].start == timeline[i].start {
prop_assert!(
timeline[i-1].metadata_idx != timeline[i].metadata_idx,
"No consecutive entries with same start time and metadata index"
);
}
}
}
}
}
}
mod consolidated_storage_tests {
use super::*;
#[test]
fn test_version_metadata_stored_once() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(500.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.version_metadata_count(),
1,
"Version metadata should be stored exactly once, not duplicated"
);
assert_eq!(timelines.valid.versions.len(), 1);
assert_eq!(timelines.tx.versions.len(), 1);
}
#[test]
fn test_multiple_versions_no_duplication() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for i in 0..100 {
let version_id = VersionId::new(i).unwrap();
let start = (i * 1000) as i64;
indexes
.insert_node_version(
node_id,
version_id,
BiTemporalInterval::new(
TimeRange::new(start.into(), (start + 1000).into()).unwrap(),
TimeRange::new(start.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.version_metadata_count(),
100,
"Should store 100 unique versions, not 200 (duplicated)"
);
assert_eq!(timelines.valid.versions.len(), 100);
assert_eq!(timelines.tx.versions.len(), 100);
}
#[test]
fn test_queries_return_version_ids_correctly() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::new(500.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(
TimeRange::new(2000.into(), 3000.into()).unwrap(),
TimeRange::new(1000.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(500.into(), 1500.into()).unwrap(),
);
assert_eq!(results.len(), 2);
assert!(results.contains(&v1));
assert!(results.contains(&v2));
let results = indexes.find_node_versions_in_transaction_time_range(
node_id,
TimeRange::new(600.into(), 800.into()).unwrap(),
);
assert_eq!(results.len(), 2);
assert!(results.contains(&v1));
assert!(results.contains(&v2));
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 1500.into());
assert_eq!(results.len(), 1);
assert!(results.contains(&v2));
}
#[test]
fn test_batch_insert_consolidated_storage() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let versions: Vec<_> = (0..50)
.map(|i| {
let version_id = VersionId::new(i).unwrap();
let start = (i * 100) as i64;
(
version_id,
BiTemporalInterval::new(
TimeRange::new(start.into(), (start + 100).into()).unwrap(),
TimeRange::new(start.into(), TIMESTAMP_MAX).unwrap(),
),
)
})
.collect();
indexes
.insert_node_versions_batch(node_id, versions)
.unwrap();
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(
timelines.version_metadata_count(),
50,
"Batch insert should store 50 unique versions, not 100"
);
}
#[test]
fn test_version_count_reflects_consolidated_storage() {
let indexes = TemporalIndexes::new();
let node1 = NodeId::new(1).unwrap();
for i in 0..10 {
indexes
.insert_node_version(
node1,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new((i as i64 * 100).into(), ((i as i64 + 1) * 100).into())
.unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
}
let node2 = NodeId::new(2).unwrap();
for i in 10..15 {
indexes
.insert_node_version(
node2,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new((i as i64 * 100).into(), ((i as i64 + 1) * 100).into())
.unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
}
assert_eq!(
indexes.version_count(),
15,
"version_count should reflect consolidated storage (15 versions, not 30)"
);
}
#[test]
fn test_memory_layout_efficiency() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(42).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
let metadata = timelines.get_version_metadata(0);
assert!(
metadata.is_some(),
"Should be able to retrieve version metadata by index"
);
assert_eq!(
metadata.unwrap().version_id(),
v1,
"Version metadata should contain the correct version_id"
);
}
#[test]
fn test_timeline_entries_use_metadata_index() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
for i in 0..5 {
indexes
.insert_node_version(
node_id,
VersionId::new(i * 10).unwrap(),
BiTemporalInterval::new(
TimeRange::new((i as i64 * 100).into(), ((i as i64 + 1) * 100).into())
.unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
}
let entity_id = EntityId::Node(node_id);
let timelines = indexes.index.get(&entity_id).unwrap();
assert_eq!(timelines.valid.versions.len(), 5);
assert_eq!(timelines.tx.versions.len(), 5);
for i in 0..5 {
let valid_entry = &timelines.valid.versions[i];
let tx_entry = &timelines.tx.versions[i];
let valid_version = timelines
.get_version_metadata(valid_entry.metadata_index())
.unwrap()
.version_id();
let tx_version = timelines
.get_version_metadata(tx_entry.metadata_index())
.unwrap()
.version_id();
assert_eq!(
valid_version, tx_version,
"Valid and tx entries for same version should resolve to same VersionId"
);
}
}
}
#[test]
fn test_find_indices_in_range_iterator() {
let mut timeline = EntityTimeline::default();
for i in 0..10 {
timeline.insert((i * 100).into(), ((i + 1) * 100).into(), i as u32);
}
let range = TimeRange::new(250.into(), 550.into()).unwrap();
let count = timeline.find_indices_in_range_iter(range).count();
assert_eq!(count, 4, "Should find 4 overlapping versions");
let first = timeline.find_indices_in_range_iter(range).next();
assert_eq!(first, Some(2), "First result should be version 2");
let collected: Vec<_> = timeline.find_indices_in_range_iter(range).collect();
assert_eq!(collected.len(), 4);
assert_eq!(collected[0], 2);
assert_eq!(collected[1], 3);
assert_eq!(collected[2], 4);
assert_eq!(collected[3], 5);
let sum: u32 = timeline.find_indices_in_range_iter(range).sum();
assert_eq!(sum, 2 + 3 + 4 + 5, "Sum of indices should be 14");
let point_range = TimeRange::at(250.into());
let point_results: Vec<_> = timeline.find_indices_in_range_iter(point_range).collect();
assert_eq!(point_results.len(), 1);
assert_eq!(point_results[0], 2);
}
#[test]
fn test_find_indices_at_point_iterator() {
let mut timeline = EntityTimeline::default();
timeline.insert(500.into(), 1500.into(), 0);
timeline.insert(800.into(), 1200.into(), 1);
timeline.insert(1100.into(), 2000.into(), 2);
let count = timeline.find_indices_at_point_iter(1000.into()).count();
assert_eq!(count, 2, "Should find 2 versions at timestamp 1000");
let first = timeline.find_indices_at_point_iter(1000.into()).next();
assert_eq!(first, Some(0), "First result should be version 0");
let collected: Vec<_> = timeline.find_indices_at_point_iter(1000.into()).collect();
assert_eq!(collected.len(), 2);
assert!(collected.contains(&0));
assert!(collected.contains(&1));
assert!(!collected.contains(&2));
}
#[test]
fn test_find_node_version_at_point_iterator() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 3000.into()).unwrap(),
TimeRange::from(500.into()),
),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(
TimeRange::new(1500.into(), 4000.into()).unwrap(),
TimeRange::from(1000.into()),
),
)
.unwrap();
let first = indexes
.find_node_version_at_point_iter(node_id, 1600.into(), 1200.into())
.next();
assert!(first.is_some(), "Should find at least one version");
assert!(
first == Some(v1) || first == Some(v2) || first == Some(v3),
"First result should be one of the matching versions"
);
let count = indexes
.find_node_version_at_point_iter(node_id, 1600.into(), 1200.into())
.count();
assert_eq!(count, 3, "Should find 3 versions at this point");
let iter_results: Vec<_> = indexes
.find_node_version_at_point_iter(node_id, 1600.into(), 1200.into())
.collect();
let vec_results = indexes.find_node_version_at_point(node_id, 1600.into(), 1200.into());
assert_eq!(iter_results.len(), vec_results.len());
for version_id in &iter_results {
assert!(
vec_results.contains(version_id),
"Iterator results should match Vec results"
);
}
let empty_count = indexes
.find_node_version_at_point_iter(node_id, 5000.into(), 5000.into())
.count();
assert_eq!(empty_count, 0, "Should find no versions outside time range");
let missing_node = NodeId::new(999).unwrap();
let missing_count = indexes
.find_node_version_at_point_iter(missing_node, 1000.into(), 1000.into())
.count();
assert_eq!(missing_count, 0, "Should find no versions for missing node");
}
#[test]
fn test_find_edge_version_at_point_iterator() {
let indexes = TemporalIndexes::new();
let edge_id = EdgeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
indexes
.insert_edge_version(
edge_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1500.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
indexes
.insert_edge_version(
edge_id,
v2,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(500.into()),
),
)
.unwrap();
let first = indexes
.find_edge_version_at_point_iter(edge_id, 1200.into(), 600.into())
.next();
assert!(first.is_some());
let iter_results: Vec<_> = indexes
.find_edge_version_at_point_iter(edge_id, 1200.into(), 600.into())
.collect();
let vec_results = indexes.find_edge_version_at_point(edge_id, 1200.into(), 600.into());
assert_eq!(iter_results.len(), vec_results.len());
for version_id in &iter_results {
assert!(vec_results.contains(version_id));
}
}
#[test]
fn test_update_node_valid_time_end() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::from(1000.into()), TimeRange::from(0.into()),
),
)
.unwrap();
let results = indexes.find_node_version_at_point(node_id, 2000.into(), 100.into());
assert_eq!(results.len(), 1);
assert_eq!(results[0], v1);
indexes.update_node_valid_time_end(node_id, v1, 1500.into());
let results = indexes.find_node_version_at_point(node_id, 1200.into(), 100.into());
assert_eq!(results.len(), 1);
assert_eq!(results[0], v1);
let results = indexes.find_node_version_at_point(node_id, 2000.into(), 100.into());
assert_eq!(results.len(), 0);
}
#[test]
fn test_update_edge_transaction_time_end() {
let indexes = TemporalIndexes::new();
let edge_id = EdgeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_edge_version(
edge_id,
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(500.into()), ),
)
.unwrap();
let results = indexes.find_edge_version_at_point(edge_id, 1500.into(), 1000.into());
assert_eq!(results.len(), 1);
assert_eq!(results[0], v1);
indexes.update_edge_transaction_time_end(edge_id, v1, 800.into());
let results = indexes.find_edge_version_at_point(edge_id, 1500.into(), 600.into());
assert_eq!(results.len(), 1);
assert_eq!(results[0], v1);
let results = indexes.find_edge_version_at_point(edge_id, 1500.into(), 1000.into());
assert_eq!(results.len(), 0);
}
#[test]
fn test_update_multiple_versions() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(TimeRange::from(1000.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(TimeRange::from(2000.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(TimeRange::from(3000.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes.update_node_valid_time_end(node_id, v1, 2000.into());
indexes.update_node_valid_time_end(node_id, v2, 3000.into());
let results = indexes.find_node_version_at_point(node_id, 1500.into(), 100.into());
assert_eq!(results, vec![v1]);
let results = indexes.find_node_version_at_point(node_id, 2500.into(), 100.into());
assert_eq!(results, vec![v2]);
let results = indexes.find_node_version_at_point(node_id, 3500.into(), 100.into());
assert_eq!(results, vec![v3]);
}
#[test]
fn test_concurrent_updates() {
use std::sync::Arc;
use std::thread;
let indexes = Arc::new(TemporalIndexes::new());
let node_id = NodeId::new(1).unwrap();
for i in 0..100 {
let v_id = VersionId::new(i).unwrap();
indexes
.insert_node_version(
node_id,
v_id,
BiTemporalInterval::new(
TimeRange::from(((i * 1000) as i64).into()),
TimeRange::from(0.into()),
),
)
.unwrap();
}
let mut handles = vec![];
for i in 0..10 {
let indexes_clone = Arc::clone(&indexes);
let handle = thread::spawn(move || {
for j in 0..10 {
let idx = i * 10 + j;
let v_id = VersionId::new(idx).unwrap();
indexes_clone.update_node_valid_time_end(
node_id,
v_id,
(((idx + 1) * 1000) as i64).into(),
);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for i in 0..100 {
let v_id = VersionId::new(i).unwrap();
let query_time = (((i * 1000) + 500) as i64).into();
let results = indexes.find_node_version_at_point(node_id, query_time, 100.into());
assert_eq!(results.len(), 1);
assert_eq!(results[0], v_id);
let query_time = (((i + 1) * 1000 + 500) as i64).into();
let results = indexes.find_node_version_at_point(node_id, query_time, 100.into());
if i < 99 {
assert_eq!(results.len(), 1);
assert_eq!(results[0], VersionId::new(i + 1).unwrap());
}
}
}
#[test]
fn test_deduplication_policy_variants() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let batch_first = vec![
(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(), TimeRange::from(0.into()),
),
),
(
v1,
BiTemporalInterval::new(
TimeRange::new(500.into(), 1500.into()).unwrap(), TimeRange::from(0.into()),
),
),
];
indexes
.insert_node_versions_batch_with_policy(
node_id,
batch_first,
DeduplicationPolicy::FirstOccurrence,
)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), 5000.into()).unwrap(),
);
assert_eq!(results.len(), 1);
assert_eq!(
indexes
.find_node_versions_in_valid_time_range(node_id, TimeRange::at(600.into()))
.len(),
1
);
assert_eq!(
indexes
.find_node_versions_in_valid_time_range(node_id, TimeRange::at(1800.into()))
.len(),
0
);
indexes.clear();
let batch_last = vec![
(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(), TimeRange::from(0.into()),
),
),
(
v1,
BiTemporalInterval::new(
TimeRange::new(500.into(), 1500.into()).unwrap(), TimeRange::from(0.into()),
),
),
];
indexes
.insert_node_versions_batch_with_policy(
node_id,
batch_last,
DeduplicationPolicy::LastOccurrence,
)
.unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), 5000.into()).unwrap(),
);
assert_eq!(results.len(), 1);
assert_eq!(
indexes
.find_node_versions_in_valid_time_range(node_id, TimeRange::at(1800.into()))
.len(),
1
);
assert_eq!(
indexes
.find_node_versions_in_valid_time_range(node_id, TimeRange::at(600.into()))
.len(),
0
);
indexes.clear();
let batch_reject = vec![
(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
v1,
BiTemporalInterval::new(
TimeRange::new(500.into(), 1500.into()).unwrap(),
TimeRange::from(0.into()),
),
),
];
let result = indexes.insert_node_versions_batch_with_policy(
node_id,
batch_reject,
DeduplicationPolicy::Reject,
);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Duplicate"));
}
#[test]
fn test_metadata_index_reuse_across_batches() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_node_versions_batch(
node_id,
vec![(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)],
)
.unwrap();
indexes
.insert_node_versions_batch(
node_id,
vec![(
v1,
BiTemporalInterval::new(
TimeRange::new(500.into(), 1500.into()).unwrap(),
TimeRange::from(0.into()),
),
)],
)
.unwrap();
let timelines = indexes.index.get(&EntityId::Node(node_id)).unwrap();
assert_eq!(
timelines.version_metadata_count(),
1,
"Metadata should be reused"
);
assert_eq!(
timelines.valid.versions.len(),
1,
"Duplicate should be removed by insert_batch"
);
}
#[test]
fn test_batch_insert_reject_policy_valid_batch() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let batch = vec![
(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
),
(
v2,
BiTemporalInterval::new(
TimeRange::new(2000.into(), 3000.into()).unwrap(),
TimeRange::from(0.into()),
),
),
];
let result =
indexes.insert_node_versions_batch_with_policy(node_id, batch, DeduplicationPolicy::Reject);
assert!(
result.is_ok(),
"Reject policy should accept valid batch without duplicates"
);
}
#[test]
fn test_get_all_entity_ids() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let edge_id = EdgeId::new(2).unwrap();
indexes
.insert_node_version(
node_id,
VersionId::new(1).unwrap(),
BiTemporalInterval::current(1000.into()),
)
.unwrap();
indexes
.insert_edge_version(
edge_id,
VersionId::new(2).unwrap(),
BiTemporalInterval::current(1000.into()),
)
.unwrap();
let ids: Vec<_> = indexes.entity_ids().collect();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&EntityId::Node(node_id)));
assert!(ids.contains(&EntityId::Edge(edge_id)));
}
#[test]
fn test_update_after_retroactive_insert() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
let v2 = VersionId::new(101).unwrap();
let v3 = VersionId::new(102).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(TimeRange::from(0.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v3,
BiTemporalInterval::new(TimeRange::from(20.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes
.insert_node_version(
node_id,
v2,
BiTemporalInterval::new(TimeRange::from(10.into()), TimeRange::from(0.into())),
)
.unwrap();
indexes.update_node_valid_time_end(node_id, v3, 30.into());
let mut results = indexes.find_node_version_at_point(node_id, 25.into(), 0.into());
results.sort();
assert_eq!(
results,
vec![v1, v2, v3],
"v1, v2, and v3 should be visible at 25"
);
let mut results = indexes.find_node_version_at_point(node_id, 35.into(), 0.into());
results.sort();
assert_eq!(
results,
vec![v1, v2],
"Only v1 and v2 should be visible at 35"
);
let mut results = indexes.find_node_version_at_point(node_id, 15.into(), 0.into());
results.sort();
assert_eq!(results, vec![v1, v2], "v1 and v2 should be visible at 15");
indexes.update_node_valid_time_end(node_id, v2, 18.into());
let results = indexes.find_node_version_at_point(node_id, 19.into(), 0.into());
assert_eq!(results, vec![v1], "Only v1 should be visible at 19");
}
#[test]
fn test_batch_insert_unsorted_query() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap(); let v2 = VersionId::new(101).unwrap(); let v3 = VersionId::new(102).unwrap();
let batch = vec![
(
v1,
BiTemporalInterval::new(TimeRange::from(0.into()), TimeRange::from(0.into())),
),
(
v2,
BiTemporalInterval::new(TimeRange::from(20.into()), TimeRange::from(0.into())),
),
(
v3,
BiTemporalInterval::new(TimeRange::from(10.into()), TimeRange::from(0.into())),
),
];
indexes.insert_node_versions_batch(node_id, batch).unwrap();
let results = indexes.find_node_versions_in_valid_time_range(
node_id,
TimeRange::new(0.into(), 15.into()).unwrap(),
);
assert_eq!(results.len(), 2, "Should find 2 versions (v1 and v3)");
assert!(results.contains(&v1));
assert!(results.contains(&v3));
assert!(!results.contains(&v2));
}
#[test]
fn test_reject_policy_collision_with_existing_sentinel() {
let indexes = TemporalIndexes::new();
let node_id = NodeId::new(1).unwrap();
let v1 = VersionId::new(100).unwrap();
indexes
.insert_node_version(
node_id,
v1,
BiTemporalInterval::new(
TimeRange::new(0.into(), 1000.into()).unwrap(),
TimeRange::from(0.into()),
),
)
.unwrap();
let batch = vec![(
v1,
BiTemporalInterval::new(
TimeRange::new(1000.into(), 2000.into()).unwrap(),
TimeRange::from(0.into()),
),
)];
let result =
indexes.insert_node_versions_batch_with_policy(node_id, batch, DeduplicationPolicy::Reject);
assert!(
result.is_err(),
"Should reject duplicate version ID when it already exists in index"
);
}
#[test]
fn test_batch_insert_exact_capacity_sentinel() {
let config = TemporalIndexConfig {
max_versions_per_entity: 10,
};
let indexes = TemporalIndexes::with_config(config);
let node_id = NodeId::new(1).unwrap();
for i in 0..8 {
indexes
.insert_node_version(
node_id,
VersionId::new(i).unwrap(),
BiTemporalInterval::new(
TimeRange::new(((i * 100) as i64).into(), (((i + 1) * 100) as i64).into())
.unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
)
.unwrap();
}
let batch = vec![
(
VersionId::new(8).unwrap(),
BiTemporalInterval::new(
TimeRange::new(800.into(), 900.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
),
(
VersionId::new(9).unwrap(),
BiTemporalInterval::new(
TimeRange::new(900.into(), 1000.into()).unwrap(),
TimeRange::new(0.into(), TIMESTAMP_MAX).unwrap(),
),
),
];
let result = indexes.insert_node_versions_batch(node_id, batch);
assert!(
result.is_ok(),
"Should accept batch that fits exactly into capacity"
);
}