use crate::AletheiaDB;
use crate::core::error::{Error, Result, VectorError};
use crate::core::id::NodeId;
use crate::core::temporal::TimeRange;
use std::time::Duration;
pub struct Dreamer<'a> {
db: &'a AletheiaDB,
}
impl<'a> Dreamer<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self { db }
}
pub fn predict_future(
&self,
node_id: NodeId,
property: &str,
history_window: TimeRange,
future_horizon: Duration,
k: usize,
) -> Result<Vec<(NodeId, f32)>> {
let history = self.db.get_node_history(node_id)?;
let mut snapshots: Vec<(i64, Vec<f32>)> = Vec::new();
for version in &history.versions {
let valid_time = version.temporal.valid_time();
if valid_time.start() < history_window.end()
&& valid_time.end() > history_window.start()
{
if let Some(prop_val) = version.properties.get(property)
&& let Some(vec) = prop_val.as_vector()
{
let effective_time = valid_time
.start()
.wallclock()
.max(history_window.start().wallclock());
snapshots.push((effective_time, vec.to_vec()));
}
}
}
snapshots.sort_by_key(|(t, _)| *t);
snapshots.dedup_by_key(|(t, _)| *t);
if snapshots.is_empty() {
return Err(Error::Vector(VectorError::IndexError(format!(
"No vector history found for node {} in property '{}' within the specified window",
node_id, property
))));
}
let (last_time, last_vec) = snapshots.last().unwrap();
let projected_vec = if snapshots.len() < 2 {
last_vec.clone()
} else {
let (first_time, first_vec) = snapshots.first().unwrap();
if first_vec.len() != last_vec.len() {
return Err(Error::Vector(VectorError::DimensionMismatch {
expected: first_vec.len(),
actual: last_vec.len(),
}));
}
let duration_micros = last_time - first_time;
if duration_micros <= 0 {
last_vec.clone()
} else {
let duration_secs = duration_micros as f32 / 1_000_000.0;
let horizon_secs = future_horizon.as_secs_f32();
if first_vec.len() != last_vec.len() {
return Err(Error::Vector(VectorError::DimensionMismatch {
expected: first_vec.len(),
actual: last_vec.len(),
}));
}
let mut proj = Vec::with_capacity(last_vec.len());
for (start, end) in first_vec.iter().zip(last_vec.iter()) {
let velocity = (end - start) / duration_secs;
let future_val = end + (velocity * horizon_secs);
proj.push(future_val);
}
proj
}
};
self.db.search_vectors_in(property, &projected_vec, k)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
use crate::index::vector::{DistanceMetric, HnswConfig};
#[test]
fn test_dreamer_trajectory_extrapolation() {
let db = AletheiaDB::new().unwrap();
let config = HnswConfig::new(2, DistanceMetric::Euclidean); db.enable_vector_index("embedding", config).unwrap();
let t_start = time::now();
let _novice = db
.create_node(
"Level",
PropertyMapBuilder::new()
.insert_vector("embedding", &[0.0, 0.0])
.build(),
)
.unwrap();
let _inter = db
.create_node(
"Level",
PropertyMapBuilder::new()
.insert_vector("embedding", &[5.0, 5.0])
.build(),
)
.unwrap();
let expert = db
.create_node(
"Level",
PropertyMapBuilder::new()
.insert_vector("embedding", &[10.0, 10.0])
.build(),
)
.unwrap();
let learner_props = PropertyMapBuilder::new()
.insert("name", "Learner")
.insert_vector("embedding", &[0.0, 0.0])
.build();
let learner = db.create_node("Student", learner_props).unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
let _t_mid = time::now();
let update_props = PropertyMapBuilder::new()
.insert_vector("embedding", &[5.0, 5.0])
.build();
db.write(|tx| tx.update_node(learner, update_props))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
let t_end = time::now();
let history = db.get_node_history(learner).unwrap();
assert!(history.version_count() >= 2);
let dreamer = Dreamer::new(&db);
let window = TimeRange::new(t_start, t_end).unwrap();
let duration_micros = t_end.wallclock() - t_start.wallclock();
let _duration = Duration::from_micros(duration_micros as u64);
let horizon = Duration::from_millis(50);
let predictions = dreamer
.predict_future(learner, "embedding", window, horizon, 3)
.unwrap();
assert!(!predictions.is_empty());
assert!(
predictions.iter().any(|(id, _)| *id == expert),
"Dreamer should predict the learner becomes an expert"
);
}
#[test]
fn test_dreamer_static_trajectory() {
let db = AletheiaDB::new().unwrap();
db.enable_vector_index("vec", HnswConfig::new(2, DistanceMetric::Euclidean))
.unwrap();
let t0 = time::now();
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 1.0])
.build();
let node = db.create_node("Node", props).unwrap();
let t1 = time::now();
let dreamer = Dreamer::new(&db);
let window = TimeRange::new(t0, t1).unwrap();
let res = dreamer
.predict_future(node, "vec", window, Duration::from_secs(10), 1)
.unwrap();
assert_eq!(res[0].0, node);
}
#[test]
fn test_dreamer_mixed_dimensions_error() {
let db = AletheiaDB::new().unwrap();
let t0 = time::now();
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let node = db.create_node("Node", props).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 1.0, 1.0])
.build(),
)
})
.unwrap();
let t1 = time::now();
let dreamer = Dreamer::new(&db);
let window = TimeRange::new(t0, t1).unwrap();
let res = dreamer.predict_future(node, "vec", window, Duration::from_secs(10), 1);
assert!(res.is_err());
match res {
Err(Error::Vector(VectorError::DimensionMismatch { .. })) => (),
_ => panic!("Expected DimensionMismatch error, got {:?}", res),
}
}
}