use crate::AletheiaDB;
use crate::core::error::Result;
use crate::core::id::{EdgeId, NodeId};
use crate::core::temporal::{TimeRange, Timestamp};
#[cfg(feature = "semantic-temporal")]
use std::collections::{HashSet, VecDeque};
#[cfg(feature = "semantic-temporal")]
pub struct Chronos<'a> {
db: &'a AletheiaDB,
}
#[cfg(not(feature = "semantic-temporal"))]
#[deprecated(
note = "Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
)]
pub struct Chronos<'a> {
_marker: std::marker::PhantomData<&'a AletheiaDB>,
}
#[cfg(feature = "semantic-temporal")]
impl<'a> Chronos<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self { db }
}
pub fn find_path_at_time(
&self,
start: NodeId,
end: NodeId,
valid_time: Timestamp,
tx_time: Timestamp,
) -> Result<Option<Vec<NodeId>>> {
if start == end {
return Ok(Some(vec![start]));
}
let mut queue = VecDeque::new();
queue.push_back(vec![start]);
let mut visited = HashSet::new();
visited.insert(start);
while let Some(path) = queue.pop_front() {
let current = *path.last().unwrap();
if current == end {
return Ok(Some(path));
}
let edge_ids = self
.db
.get_outgoing_edges_at_time(current, valid_time, tx_time);
for edge_id in edge_ids {
if let Ok(edge) = self.db.get_edge_at_time(edge_id, valid_time, tx_time) {
let neighbor = edge.target;
if !visited.contains(&neighbor) {
visited.insert(neighbor);
let mut new_path = path.clone();
new_path.push(neighbor);
queue.push_back(new_path);
}
}
}
}
Ok(None)
}
pub fn node_volatility(&self, node_id: NodeId, window: TimeRange) -> Result<f32> {
let history = self.db.get_node_history(node_id)?;
let mut count = 0;
for version in &history.versions {
let valid_interval = version.temporal.valid_time();
if valid_interval.start() < window.end() && valid_interval.end() > window.start() {
count += 1;
}
}
let duration_micros = window.end().wallclock() - window.start().wallclock();
if duration_micros == 0 {
return Ok(0.0);
}
let duration_secs = duration_micros as f32 / 1_000_000.0;
Ok(count as f32 / duration_secs)
}
pub fn path_stability(&self, path: &[EdgeId], window: TimeRange) -> Result<f32> {
if path.is_empty() {
return Ok(1.0);
}
let mut common_intervals = vec![window];
for &edge_id in path {
let history = self.db.get_edge_history(edge_id)?;
let mut edge_intervals = Vec::new();
for version in &history.versions {
let valid = version.temporal.valid_time();
let start = valid.start().max(window.start());
let end = valid.end().min(window.end());
if start < end {
edge_intervals.push(TimeRange::new(start, end)?);
}
}
edge_intervals = Self::merge_intervals(edge_intervals);
common_intervals = Self::intersect_interval_sets(&common_intervals, &edge_intervals)?;
if common_intervals.is_empty() {
return Ok(0.0);
}
}
let total_valid_duration: i64 = common_intervals
.iter()
.map(|r| r.end().wallclock() - r.start().wallclock())
.sum();
let window_duration = window.end().wallclock() - window.start().wallclock();
if window_duration == 0 {
return Ok(0.0);
}
Ok(total_valid_duration as f32 / window_duration as f32)
}
fn merge_intervals(mut intervals: Vec<TimeRange>) -> Vec<TimeRange> {
if intervals.is_empty() {
return intervals;
}
intervals.sort_by_key(|r| r.start());
let mut merged = Vec::new();
let mut current = intervals[0];
for next in intervals.into_iter().skip(1) {
if next.start() <= current.end() {
if next.end() > current.end() {
current = TimeRange::new(current.start(), next.end()).unwrap();
}
} else {
merged.push(current);
current = next;
}
}
merged.push(current);
merged
}
fn intersect_interval_sets(set_a: &[TimeRange], set_b: &[TimeRange]) -> Result<Vec<TimeRange>> {
let mut result = Vec::new();
let mut i = 0;
let mut j = 0;
while i < set_a.len() && j < set_b.len() {
let a = set_a[i];
let b = set_b[j];
let start = a.start().max(b.start());
let end = a.end().min(b.end());
if start < end {
result.push(TimeRange::new(start, end)?);
}
if a.end() < b.end() {
i += 1;
} else {
j += 1;
}
}
Ok(result)
}
}
#[cfg(not(feature = "semantic-temporal"))]
#[allow(deprecated)]
impl<'a> Chronos<'a> {
#[allow(unused_variables)]
#[track_caller]
pub fn new(db: &'a AletheiaDB) -> Self {
panic!(
"Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
);
}
#[allow(unused_variables)]
#[track_caller]
pub fn find_path_at_time(
&self,
start: NodeId,
end: NodeId,
valid_time: Timestamp,
tx_time: Timestamp,
) -> Result<Option<Vec<NodeId>>> {
panic!(
"Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
);
}
#[allow(unused_variables)]
#[track_caller]
pub fn node_volatility(&self, node_id: NodeId, window: TimeRange) -> Result<f32> {
panic!(
"Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
);
}
#[allow(unused_variables)]
#[track_caller]
pub fn path_stability(&self, path: &[EdgeId], window: TimeRange) -> Result<f32> {
panic!(
"Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
);
}
}
#[cfg(all(test, feature = "semantic-temporal"))]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
#[test]
fn test_snapshot_pathfinding() {
let db = AletheiaDB::new().unwrap();
let _t0 = time::now();
std::thread::sleep(std::time::Duration::from_millis(10));
let _t1 = time::now();
let props = PropertyMapBuilder::new().build();
let a = db.create_node("Node", props.clone()).unwrap();
let b = db.create_node("Node", props.clone()).unwrap();
let c = db.create_node("Node", props.clone()).unwrap();
let e1 = db.create_edge(a, b, "NEXT", props.clone()).unwrap();
let _e2 = db.create_edge(b, c, "NEXT", props.clone()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let t2 = time::now();
db.write(|tx| tx.delete_edge(e1)).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let t3 = time::now();
let chronos = Chronos::new(&db);
let path_t3 = chronos.find_path_at_time(a, c, t3, t3).unwrap();
assert!(path_t3.is_none(), "Path should be broken at T3");
let path_t2 = chronos.find_path_at_time(a, c, t2, t2).unwrap();
assert!(path_t2.is_some(), "Path should exist at T2");
assert_eq!(path_t2.unwrap(), vec![a, b, c]);
}
#[test]
fn test_node_volatility() {
let db = AletheiaDB::new().unwrap();
let props = PropertyMapBuilder::new().insert("val", 0).build();
let node = db.create_node("Node", props.clone()).unwrap();
let t_start = time::now();
for i in 1..=5 {
std::thread::sleep(std::time::Duration::from_millis(10));
db.write(|tx| tx.update_node(node, PropertyMapBuilder::new().insert("val", i).build()))
.unwrap();
}
let t_end = time::now();
let chronos = Chronos::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let vol = chronos.node_volatility(node, window).unwrap();
assert!(vol > 0.0);
assert!(vol > 10.0, "Volatility should be high (got {})", vol);
}
}
#[cfg(all(test, not(feature = "semantic-temporal")))]
#[allow(deprecated)]
mod stub_tests {
use super::*;
#[test]
#[should_panic(
expected = "Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
)]
fn test_stub_panic_on_new() {
let db = AletheiaDB::new().unwrap();
let _ = Chronos::new(&db);
}
#[test]
#[should_panic(
expected = "Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
)]
fn test_stub_panic_on_find_path_at_time() {
let chronos = Chronos {
_marker: std::marker::PhantomData,
};
let _ = chronos.find_path_at_time(
NodeId::new(0).unwrap(),
NodeId::new(1).unwrap(),
crate::core::temporal::time::now(),
crate::core::temporal::time::now(),
);
}
#[test]
#[should_panic(
expected = "Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
)]
fn test_stub_panic_on_node_volatility() {
let chronos = Chronos {
_marker: std::marker::PhantomData,
};
let _ = chronos.node_volatility(
NodeId::new(0).unwrap(),
TimeRange::new(
crate::core::temporal::time::now(),
crate::core::temporal::time::now(),
)
.unwrap(),
);
}
#[test]
#[should_panic(
expected = "Chronos requires the 'nova' feature. Add 'features = [\"nova\"]' to your Cargo.toml."
)]
fn test_stub_panic_on_path_stability() {
let chronos = Chronos {
_marker: std::marker::PhantomData,
};
let _ = chronos.path_stability(
&[],
TimeRange::new(
crate::core::temporal::time::now(),
crate::core::temporal::time::now(),
)
.unwrap(),
);
}
}