use crate::AletheiaDB;
use crate::core::error::Result;
use crate::core::id::NodeId;
use crate::core::temporal::TimeRange;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Encounter {
pub time_to_encounter: Duration,
pub is_past: bool,
pub predicted_distance: f32,
}
pub struct Omen<'a> {
db: &'a AletheiaDB,
}
impl<'a> Omen<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self { db }
}
pub fn predict_encounter(
&self,
node_a: NodeId,
node_b: NodeId,
window: TimeRange,
vector_property: &str,
) -> Result<Option<Encounter>> {
let tx = self.db.read_transaction()?;
let tx_time = tx.metadata().start_timestamp;
let traj_a = self.calculate_trajectory(node_a, window, vector_property, tx_time)?;
let traj_b = self.calculate_trajectory(node_b, window, vector_property, tx_time)?;
let (pos_a, vel_a) = match traj_a {
Some(t) => t,
None => return Ok(None),
};
let (pos_b, vel_b) = match traj_b {
Some(t) => t,
None => return Ok(None),
};
let _ = tx;
let rel_pos: Vec<f32> = pos_b.iter().zip(pos_a.iter()).map(|(b, a)| b - a).collect();
let rel_vel: Vec<f32> = vel_b.iter().zip(vel_a.iter()).map(|(b, a)| b - a).collect();
let p_dot_v: f32 = rel_pos.iter().zip(rel_vel.iter()).map(|(p, v)| p * v).sum();
let v_dot_v: f32 = rel_vel.iter().map(|v| v * v).sum();
if v_dot_v < 1e-9 {
let current_dist = rel_pos.iter().map(|x| x * x).sum::<f32>().sqrt();
return Ok(Some(Encounter {
time_to_encounter: Duration::from_secs(0),
is_past: false,
predicted_distance: current_dist,
}));
}
let t_secs = -p_dot_v / v_dot_v;
let mut pos_at_t = Vec::with_capacity(rel_pos.len());
for (p, v) in rel_pos.iter().zip(rel_vel.iter()) {
pos_at_t.push(p + v * t_secs);
}
let min_dist = pos_at_t.iter().map(|x| x * x).sum::<f32>().sqrt();
let is_past = t_secs < 0.0;
let abs_secs = t_secs.abs();
let time_to_encounter = Duration::from_secs_f32(abs_secs);
Ok(Some(Encounter {
time_to_encounter,
is_past,
predicted_distance: min_dist,
}))
}
fn calculate_trajectory(
&self,
node_id: NodeId,
window: TimeRange,
property: &str,
tx_time: crate::core::temporal::Timestamp,
) -> Result<Option<(Vec<f32>, Vec<f32>)>> {
let history = self.db.get_node_history(node_id)?;
let start_vec = self.find_vector_at(&history, window.start(), property, tx_time);
let end_vec = self.find_vector_at(&history, window.end(), property, tx_time);
match (start_vec, end_vec) {
(Some(start), Some(end)) => {
if start.len() != end.len() {
return Ok(None); }
let duration_micros = window.duration_micros().unwrap_or(0);
if duration_micros == 0 {
let zero_vel = vec![0.0; start.len()];
return Ok(Some((end, zero_vel)));
}
let duration_secs = duration_micros as f32 / 1_000_000.0;
let velocity: Vec<f32> = end
.iter()
.zip(start.iter())
.map(|(e, s)| (e - s) / duration_secs)
.collect();
Ok(Some((end, velocity)))
}
_ => Ok(None), }
}
fn find_vector_at(
&self,
history: &crate::core::history::EntityHistory,
time: crate::core::temporal::Timestamp,
property: &str,
tx_time: crate::core::temporal::Timestamp,
) -> Option<Vec<f32>> {
let mut best_vec = None;
let mut best_time = i64::MIN;
for v in &history.versions {
let vt_start = v.temporal.valid_time().start().wallclock();
let tt_start = v.temporal.transaction_time().start().wallclock();
if tt_start <= tx_time.wallclock()
&& vt_start <= time.wallclock()
&& vt_start >= best_time
&& let Some(val) = v.properties.get(property)
&& let Some(vec) = val.as_vector()
{
best_vec = Some(vec.to_vec());
best_time = vt_start;
}
}
best_vec
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
use crate::index::vector::{DistanceMetric, HnswConfig};
#[test]
fn test_omen_head_on_collision() {
let db = AletheiaDB::new().unwrap();
db.enable_vector_index("vec", HnswConfig::new(2, DistanceMetric::Euclidean))
.unwrap();
let props_a = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let a = db.create_node("A", props_a).unwrap();
let props_b = PropertyMapBuilder::new()
.insert_vector("vec", &[10.0, 0.0])
.build();
let b = db.create_node("B", props_b).unwrap();
std::thread::sleep(Duration::from_millis(10));
let t_start = time::now();
std::thread::sleep(Duration::from_millis(100));
db.write(|tx| {
tx.update_node(
a,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
db.write(|tx| {
tx.update_node(
b,
PropertyMapBuilder::new()
.insert_vector("vec", &[9.0, 0.0])
.build(),
)
})
.unwrap();
let t_end = time::now();
let omen = Omen::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let encounter = omen
.predict_encounter(a, b, window, "vec")
.unwrap()
.unwrap();
println!("Encounter: {:?}", encounter);
assert!(!encounter.is_past, "Collision should be in future");
assert!(
encounter.predicted_distance < 0.01,
"Should be a direct hit"
);
assert!(encounter.time_to_encounter.as_secs_f32() > 0.0);
}
#[test]
fn test_omen_diverging() {
let db = AletheiaDB::new().unwrap();
db.enable_vector_index("vec", HnswConfig::new(2, DistanceMetric::Euclidean))
.unwrap();
let pa = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let a = db.create_node("A", pa.clone()).unwrap();
let b = db.create_node("B", pa.clone()).unwrap();
std::thread::sleep(Duration::from_millis(10));
let t_start = time::now();
std::thread::sleep(Duration::from_millis(50));
db.write(|tx| {
tx.update_node(
a,
PropertyMapBuilder::new()
.insert_vector("vec", &[-1.0, 0.0])
.build(),
)
})
.unwrap();
db.write(|tx| {
tx.update_node(
b,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
let t_end = time::now();
let omen = Omen::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let encounter = omen
.predict_encounter(a, b, window, "vec")
.unwrap()
.unwrap();
assert!(encounter.is_past, "Encounter should be in the past");
assert!(encounter.predicted_distance < 0.1);
}
#[test]
fn test_omen_static_target() {
let db = AletheiaDB::new().unwrap();
db.enable_vector_index("vec", HnswConfig::new(2, DistanceMetric::Euclidean))
.unwrap();
let props_t = PropertyMapBuilder::new()
.insert_vector("vec", &[10.0, 0.0])
.build();
let target = db.create_node("Target", props_t).unwrap();
let props_m = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let mover = db.create_node("Mover", props_m).unwrap();
std::thread::sleep(Duration::from_millis(10));
let t_start = time::now();
std::thread::sleep(Duration::from_millis(50));
db.write(|tx| {
tx.update_node(
mover,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
let t_end = time::now();
let omen = Omen::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let encounter = omen
.predict_encounter(mover, target, window, "vec")
.unwrap()
.unwrap();
assert!(!encounter.is_past);
assert!(encounter.predicted_distance < 0.01);
}
#[test]
fn test_omen_parallel() {
let db = AletheiaDB::new().unwrap();
db.enable_vector_index("vec", HnswConfig::new(2, DistanceMetric::Euclidean))
.unwrap();
let a = db
.create_node(
"A",
PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build(),
)
.unwrap();
let b = db
.create_node(
"B",
PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 1.0])
.build(),
)
.unwrap();
std::thread::sleep(Duration::from_millis(10));
let t_start = time::now();
std::thread::sleep(Duration::from_millis(50));
db.write(|tx| {
tx.update_node(
a,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
db.write(|tx| {
tx.update_node(
b,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 1.0])
.build(),
)
})
.unwrap();
let t_end = time::now();
let omen = Omen::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let encounter = omen
.predict_encounter(a, b, window, "vec")
.unwrap()
.unwrap();
assert_eq!(encounter.time_to_encounter.as_secs(), 0);
assert!((encounter.predicted_distance - 1.0).abs() < 0.01);
}
}