use aletheiadb::core::error::Result;
use aletheiadb::core::id::NodeId;
use aletheiadb::core::temporal::{TimeRange, time};
use aletheiadb::index::vector::temporal::{
RetentionPolicy, SnapshotStrategy, TemporalVectorConfig, TemporalVectorIndex,
};
use aletheiadb::index::vector::{DistanceMetric, HnswConfig};
use std::time::Duration;
fn create_test_index() -> Result<TemporalVectorIndex> {
let hnsw_config = HnswConfig::new(4, DistanceMetric::Cosine);
let config = TemporalVectorConfig {
snapshot_strategy: SnapshotStrategy::TransactionInterval(2),
retention_policy: RetentionPolicy::KeepN(10),
max_snapshots: 100,
full_snapshot_interval: 10,
hnsw_config: Some(hnsw_config),
};
TemporalVectorIndex::new(config)
}
#[test]
fn test_temporal_index_creation() -> Result<()> {
let index = create_test_index()?;
assert_eq!(index.dimensions(), 4);
assert_eq!(index.distance_metric(), DistanceMetric::Cosine);
assert_eq!(index.snapshot_count(), 0);
Ok(())
}
#[test]
fn test_snapshot_creation_with_transaction_interval() -> Result<()> {
let index = create_test_index()?;
let node1 = NodeId::new(1).unwrap();
let node2 = NodeId::new(2).unwrap();
let vec1 = vec![1.0, 0.0, 0.0, 0.0];
let vec2 = vec![0.0, 1.0, 0.0, 0.0];
index.add(node1, &vec1, 1000.into())?;
index.add(node2, &vec2, 2000.into())?;
assert_eq!(index.snapshot_count(), 0);
index.on_transaction()?;
assert_eq!(index.snapshot_count(), 0);
index.on_transaction()?;
assert_eq!(index.snapshot_count(), 1);
Ok(())
}
#[test]
fn test_point_in_time_vector_query() -> Result<()> {
let index = create_test_index()?;
let node1 = NodeId::new(1).unwrap();
let node2 = NodeId::new(2).unwrap();
let node3 = NodeId::new(3).unwrap();
let timestamp1 = time::now();
index.add(node1, &[1.0, 0.0, 0.0, 0.0], timestamp1)?;
index.add(node2, &[0.9, 0.1, 0.0, 0.0], timestamp1)?;
index.on_transaction()?;
index.on_transaction()?;
assert_eq!(index.snapshot_count(), 1);
let timestamp2 = (1000 + timestamp1.wallclock()).into();
index.add(node3, &[0.0, 0.0, 1.0, 0.0], timestamp2)?;
let query = vec![1.0, 0.0, 0.0, 0.0];
let results = index.find_similar_as_of(&query, 10, timestamp1)?;
assert!(results.len() <= 2);
assert!(results.iter().all(|(id, _)| *id == node1 || *id == node2));
Ok(())
}
#[test]
fn test_time_range_vector_query() -> Result<()> {
let index = create_test_index()?;
let base_time = time::now();
for i in 0..3 {
let node_id = NodeId::new(i).unwrap();
let timestamp = ((i as i64 * 1000) + base_time.wallclock()).into();
index.add(node_id, &[i as f32, 0.0, 0.0, 0.0], timestamp)?;
index.on_transaction_at(timestamp)?;
index.on_transaction_at(timestamp)?;
}
assert!(index.snapshot_count() >= 2);
let query = vec![1.0, 0.0, 0.0, 0.0];
let time_range = TimeRange::between(base_time, (3000 + base_time.wallclock()).into()).unwrap();
let results = index.find_similar_in_range(&query, 5, time_range)?;
assert!(!results.is_empty());
for (timestamp, similar_nodes) in results {
assert!(timestamp >= base_time);
assert!(!similar_nodes.is_empty());
}
Ok(())
}
#[test]
fn test_snapshot_pruning_with_keep_n() -> Result<()> {
let hnsw_config = HnswConfig::new(4, DistanceMetric::Cosine);
let config = TemporalVectorConfig {
snapshot_strategy: SnapshotStrategy::TransactionInterval(1),
retention_policy: RetentionPolicy::KeepN(3),
max_snapshots: 100,
full_snapshot_interval: 10,
hnsw_config: Some(hnsw_config),
};
let index = TemporalVectorIndex::new(config)?;
for i in 0..5 {
let node_id = NodeId::new(i).unwrap();
index.add(
node_id,
&[i as f32, 0.0, 0.0, 0.0],
((i * 1000) as i64).into(),
)?;
index.on_transaction()?;
}
assert_eq!(index.snapshot_count(), 5);
let removed = index.prune_snapshots()?;
assert_eq!(removed, 2);
assert_eq!(index.snapshot_count(), 3);
Ok(())
}
#[test]
fn test_snapshot_pruning_with_keep_duration() -> Result<()> {
let hnsw_config = HnswConfig::new(4, DistanceMetric::Cosine);
let config = TemporalVectorConfig {
snapshot_strategy: SnapshotStrategy::TransactionInterval(1),
retention_policy: RetentionPolicy::KeepDuration(Duration::from_secs(10)),
max_snapshots: 100,
full_snapshot_interval: 10,
hnsw_config: Some(hnsw_config),
};
let index = TemporalVectorIndex::new(config)?;
for i in 0..3 {
let node_id = NodeId::new(i).unwrap();
index.add(
node_id,
&[i as f32, 0.0, 0.0, 0.0],
((i * 1000) as i64).into(),
)?;
index.on_transaction()?;
}
let count_before = index.snapshot_count();
let removed = index.prune_snapshots()?;
assert_eq!(removed, 0);
assert_eq!(index.snapshot_count(), count_before);
Ok(())
}
#[test]
fn test_snapshot_info_retrieval() -> Result<()> {
let index = create_test_index()?;
index.add(NodeId::new(1).unwrap(), &[1.0, 0.0, 0.0, 0.0], 1000.into())?;
index.on_transaction()?;
index.on_transaction()?;
let info = index.get_snapshot_info()?;
assert_eq!(info.len(), 1);
let snapshot_info = &info[0];
assert_eq!(snapshot_info.snapshot_id, 0);
assert!(snapshot_info.timestamp.wallclock() > 0);
assert!(snapshot_info.vector_count > 0);
assert!(snapshot_info.size_bytes > 0);
Ok(())
}
#[test]
fn test_temporal_vector_with_different_strategies() -> Result<()> {
let hnsw_config = HnswConfig::new(4, DistanceMetric::Cosine);
let config = TemporalVectorConfig {
snapshot_strategy: SnapshotStrategy::TimeInterval(1), retention_policy: RetentionPolicy::KeepN(10),
max_snapshots: 100,
full_snapshot_interval: 10,
hnsw_config: Some(hnsw_config.clone()),
};
let base_time = 1_000_000_000; let time_interval_index = TemporalVectorIndex::new_at(config, base_time.into())?;
time_interval_index.add(
NodeId::new(1).unwrap(),
&[1.0, 0.0, 0.0, 0.0],
base_time.into(),
)?;
let later_time = base_time + 2_000_000; time_interval_index.add(
NodeId::new(2).unwrap(),
&[0.0, 1.0, 0.0, 0.0],
later_time.into(),
)?;
time_interval_index.on_transaction_at(later_time.into())?;
assert_eq!(time_interval_index.snapshot_count(), 1);
let config = TemporalVectorConfig {
snapshot_strategy: SnapshotStrategy::ChangeThreshold(0.5), retention_policy: RetentionPolicy::KeepN(10),
max_snapshots: 100,
full_snapshot_interval: 10,
hnsw_config: Some(hnsw_config),
};
let change_threshold_index = TemporalVectorIndex::new_at(config, 1000.into())?;
for i in 0..4 {
change_threshold_index.add(
NodeId::new(i).unwrap(),
&[i as f32, 0.0, 0.0, 0.0],
1000.into(),
)?;
}
change_threshold_index.add(NodeId::new(0).unwrap(), &[10.0, 0.0, 0.0, 0.0], 2000.into())?;
change_threshold_index.add(NodeId::new(1).unwrap(), &[11.0, 0.0, 0.0, 0.0], 2000.into())?;
change_threshold_index.on_transaction_at(2000.into())?;
assert!(change_threshold_index.snapshot_count() >= 1);
Ok(())
}
#[test]
fn test_empty_index_queries() -> Result<()> {
let index = create_test_index()?;
let query = vec![1.0, 0.0, 0.0, 0.0];
let timestamp = time::now();
let result = index.find_similar_as_of(&query, 10, timestamp);
assert!(result.is_err() || result.unwrap().is_empty());
Ok(())
}
#[test]
fn test_concurrent_snapshot_creation() -> Result<()> {
use std::sync::Arc;
use std::thread;
let index = Arc::new(create_test_index()?);
let mut handles = vec![];
for i in 0..4 {
let index_clone = Arc::clone(&index);
let handle = thread::spawn(move || -> Result<()> {
let node_id = NodeId::new(i).unwrap();
index_clone.add(
node_id,
&[i as f32, 0.0, 0.0, 0.0],
((i * 1000) as i64).into(),
)?;
Ok(())
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap()?;
}
index.on_transaction()?;
index.on_transaction()?;
assert!(index.snapshot_count() > 0);
Ok(())
}
#[test]
fn test_vector_dimension_validation() -> Result<()> {
let index = create_test_index()?;
let node_id = NodeId::new(1).unwrap();
let wrong_dims = vec![1.0, 2.0];
let result = index.add(node_id, &wrong_dims, 1000.into());
assert!(result.is_err());
Ok(())
}
#[test]
fn test_semantic_evolution_end_to_end() -> Result<()> {
let index = create_test_index()?;
let node_id = NodeId::new(100).unwrap();
let base_time = time::now();
let vectors = [
vec![1.0, 0.0, 0.0, 0.0],
vec![0.9, 0.1, 0.0, 0.0],
vec![0.7, 0.3, 0.0, 0.0],
vec![0.5, 0.5, 0.0, 0.0],
];
for (i, vector) in vectors.iter().enumerate() {
let timestamp = ((i as i64 * 1000) + base_time.wallclock()).into();
index.add(node_id, vector, timestamp)?;
index.on_transaction_at(timestamp)?;
index.on_transaction_at(timestamp)?; }
let time_range = TimeRange::between(0.into(), i64::MAX.into()).unwrap();
let evolution = index.semantic_evolution(node_id, time_range)?;
assert_eq!(evolution.len(), 4);
for (i, (_, vector)) in evolution.iter().enumerate() {
let expected = &vectors[i];
let actual: &[f32] = vector.as_ref();
assert_eq!(actual, expected.as_slice());
}
Ok(())
}
#[test]
fn test_track_semantic_drift_over_time() -> Result<()> {
let index = create_test_index()?;
let node_id = NodeId::new(200).unwrap();
let base_time = time::now();
index.add(node_id, &[1.0, 0.0, 0.0, 0.0], base_time)?;
index.on_transaction_at(base_time)?;
index.on_transaction_at(base_time)?;
let t2 = (1000 + base_time.wallclock()).into();
index.add(node_id, &[0.8, 0.2, 0.0, 0.0], t2)?;
index.on_transaction_at(t2)?;
index.on_transaction_at(t2)?;
let t3 = (2000 + base_time.wallclock()).into();
index.add(node_id, &[0.5, 0.5, 0.0, 0.0], t3)?;
index.on_transaction_at(t3)?;
index.on_transaction_at(t3)?;
let t4 = (3000 + base_time.wallclock()).into();
index.add(node_id, &[0.0, 1.0, 0.0, 0.0], t4)?;
index.on_transaction_at(t4)?;
index.on_transaction_at(t4)?;
let reference = vec![1.0, 0.0, 0.0, 0.0];
let time_range = TimeRange::between(0.into(), i64::MAX.into()).unwrap();
let drift = index.track_semantic_drift(node_id, &reference, time_range)?;
assert_eq!(drift.len(), 4);
assert!(drift[0].1 > drift[1].1); assert!(drift[1].1 > drift[2].1); assert!(drift[2].1 > drift[3].1);
Ok(())
}
#[test]
fn test_calculate_consecutive_drift_end_to_end() -> Result<()> {
let index = create_test_index()?;
let node_id = NodeId::new(300).unwrap();
let base_time = time::now();
let vectors = [
vec![1.0, 0.0, 0.0, 0.0],
vec![1.0, 0.0, 0.0, 0.0], vec![0.9, 0.1, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0], ];
for (i, vector) in vectors.iter().enumerate() {
let timestamp = ((i as i64 * 1000) + base_time.wallclock()).into();
index.add(node_id, vector, timestamp)?;
index.on_transaction_at(timestamp)?;
index.on_transaction_at(timestamp)?;
}
let time_range = TimeRange::between(0.into(), i64::MAX.into()).unwrap();
let drift = index.calculate_consecutive_drift(node_id, time_range)?;
assert_eq!(drift.len(), 3);
assert!(drift[0].1 < 0.001);
assert!(drift[1].1 > 0.0 && drift[1].1 < 0.2);
assert!(drift[2].1 > 0.8);
Ok(())
}
#[test]
fn test_semantic_evolution_with_gaps() -> Result<()> {
let index = create_test_index()?;
let node1 = NodeId::new(400).unwrap();
let node2 = NodeId::new(401).unwrap();
let base_time = time::now();
index.add(node1, &[1.0, 0.0, 0.0, 0.0], base_time)?;
index.on_transaction_at(base_time)?;
index.on_transaction_at(base_time)?;
let t2 = (1000 + base_time.wallclock()).into();
index.add(node2, &[0.0, 1.0, 0.0, 0.0], t2)?;
index.on_transaction_at(t2)?;
index.on_transaction_at(t2)?;
let t3 = (2000 + base_time.wallclock()).into();
index.add(node1, &[0.0, 0.0, 1.0, 0.0], t3)?;
index.on_transaction_at(t3)?;
index.on_transaction_at(t3)?;
let time_range = TimeRange::between(0.into(), i64::MAX.into()).unwrap();
let evolution = index.semantic_evolution(node1, time_range)?;
assert_eq!(evolution.len(), 3);
assert_eq!(evolution[0].1.as_ref(), &[1.0, 0.0, 0.0, 0.0]); assert_eq!(evolution[1].1.as_ref(), &[1.0, 0.0, 0.0, 0.0]); assert_eq!(evolution[2].1.as_ref(), &[0.0, 0.0, 1.0, 0.0]);
Ok(())
}
#[test]
fn test_empty_evolution_for_nonexistent_node() -> Result<()> {
let index = create_test_index()?;
let base_time = time::now();
index.add(NodeId::new(1).unwrap(), &[1.0, 0.0, 0.0, 0.0], base_time)?;
index.on_transaction()?;
index.on_transaction()?;
let nonexistent = NodeId::new(999).unwrap();
let time_range = TimeRange::between(base_time, (1000 + base_time.wallclock()).into()).unwrap();
let evolution = index.semantic_evolution(nonexistent, time_range)?;
assert_eq!(evolution.len(), 0);
Ok(())
}
#[test]
fn test_drift_calculation_with_normalized_vectors() -> Result<()> {
use aletheiadb::core::vector::normalize;
let index = create_test_index()?;
let node_id = NodeId::new(500).unwrap();
let base_time = time::now();
let v1 = normalize(&[1.0, 0.0, 0.0, 0.0]);
let v2 = normalize(&[0.707, 0.707, 0.0, 0.0]); let v3 = normalize(&[0.0, 1.0, 0.0, 0.0]);
index.add(node_id, &v1, base_time)?;
index.on_transaction_at(base_time)?;
index.on_transaction_at(base_time)?;
let t2 = (1000 + base_time.wallclock()).into();
index.add(node_id, &v2, t2)?;
index.on_transaction_at(t2)?;
index.on_transaction_at(t2)?;
let t3 = (2000 + base_time.wallclock()).into();
index.add(node_id, &v3, t3)?;
index.on_transaction_at(t3)?;
index.on_transaction_at(t3)?;
let time_range = TimeRange::between(0.into(), i64::MAX.into()).unwrap();
let drift = index.calculate_consecutive_drift(node_id, time_range)?;
assert_eq!(drift.len(), 2);
let expected_drift = 1.0 - (1.0 / std::f32::consts::SQRT_2);
assert!((drift[0].1 - expected_drift).abs() < 1e-4);
assert!((drift[1].1 - expected_drift).abs() < 1e-4);
Ok(())
}