#![allow(clippy::collapsible_if)]
use crate::AletheiaDB;
use crate::core::error::{Result, StorageError};
use crate::core::graph::{Edge, Node};
use crate::core::id::{EdgeId, MAX_VALID_ID, NodeId, VersionId};
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::{PropertyMap, PropertyMapBuilder};
use crate::core::temporal::Timestamp;
use std::collections::{HashMap, HashSet, VecDeque};
#[derive(Debug, Clone)]
pub struct Scenario {
pub added_nodes: HashMap<NodeId, Node>,
pub modified_nodes: HashMap<NodeId, PropertyMap>,
pub removed_nodes: HashSet<NodeId>,
pub added_edges: HashMap<EdgeId, Edge>,
pub removed_edges: HashSet<EdgeId>,
pub added_outgoing: HashMap<NodeId, Vec<EdgeId>>,
next_node_id: u64,
next_edge_id: u64,
}
impl Default for Scenario {
fn default() -> Self {
Self {
added_nodes: HashMap::new(),
modified_nodes: HashMap::new(),
removed_nodes: HashSet::new(),
added_edges: HashMap::new(),
removed_edges: HashSet::new(),
added_outgoing: HashMap::new(),
next_node_id: MAX_VALID_ID + 1,
next_edge_id: MAX_VALID_ID + 1,
}
}
}
impl Scenario {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub struct HindsightDiff {
pub added_nodes: HashMap<NodeId, Node>,
pub removed_nodes: HashSet<NodeId>,
pub modified_nodes: HashMap<NodeId, PropertyMap>,
}
pub struct Hindsight<'a> {
db: &'a AletheiaDB,
scenario: Scenario,
base_time: Option<(Timestamp, Timestamp)>,
}
impl<'a> Hindsight<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self {
db,
scenario: Scenario::new(),
base_time: None,
}
}
pub fn at(mut self, valid_time: Timestamp, tx_time: Timestamp) -> Self {
self.base_time = Some((valid_time, tx_time));
self
}
pub fn scenario(&self) -> &Scenario {
&self.scenario
}
pub fn diff(&self) -> Result<HindsightDiff> {
let mut modified = self.scenario.modified_nodes.clone();
modified.retain(|id, _| !self.scenario.removed_nodes.contains(id));
Ok(HindsightDiff {
added_nodes: self.scenario.added_nodes.clone(),
removed_nodes: self.scenario.removed_nodes.clone(),
modified_nodes: modified,
})
}
fn next_node_id(&mut self) -> NodeId {
let id = self.scenario.next_node_id;
self.scenario.next_node_id += 1;
NodeId::new_unchecked(id)
}
fn next_edge_id(&mut self) -> EdgeId {
let id = self.scenario.next_edge_id;
self.scenario.next_edge_id += 1;
EdgeId::new_unchecked(id)
}
pub fn add_node(&mut self, label: &str, properties: PropertyMap) -> Result<NodeId> {
let id = self.next_node_id();
let interned_label = GLOBAL_INTERNER.intern(label)?;
let node = Node::new(
id,
interned_label,
properties,
VersionId::new_unchecked(0), );
self.scenario.added_nodes.insert(id, node);
Ok(id)
}
pub fn add_edge(
&mut self,
source: NodeId,
target: NodeId,
label: &str,
properties: PropertyMap,
) -> Result<EdgeId> {
let id = self.next_edge_id();
let interned_label = GLOBAL_INTERNER.intern(label)?;
let edge = Edge::new(
id,
interned_label,
source,
target,
properties,
VersionId::new_unchecked(0), );
self.scenario.added_edges.insert(id, edge);
self.scenario
.added_outgoing
.entry(source)
.or_default()
.push(id);
Ok(id)
}
pub fn remove_node(&mut self, id: NodeId) {
if self.scenario.added_nodes.contains_key(&id) {
self.scenario.added_nodes.remove(&id);
} else {
self.scenario.removed_nodes.insert(id);
}
}
pub fn remove_edge(&mut self, id: EdgeId) {
if self.scenario.added_edges.contains_key(&id) {
if let Some(edge) = self.scenario.added_edges.remove(&id) {
if let Some(list) = self.scenario.added_outgoing.get_mut(&edge.source) {
list.retain(|&e| e != id);
}
}
} else {
self.scenario.removed_edges.insert(id);
}
}
pub fn update_node(&mut self, id: NodeId, properties: PropertyMap) -> Result<()> {
if let Some(node) = self.scenario.added_nodes.get_mut(&id) {
let mut builder = PropertyMapBuilder::new();
for (k, v) in node.properties.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
for (k, v) in properties.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
node.properties = builder.build();
} else {
if let Some(existing_patch) = self.scenario.modified_nodes.get_mut(&id) {
let mut builder = PropertyMapBuilder::new();
for (k, v) in existing_patch.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
for (k, v) in properties.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
*existing_patch = builder.build();
} else {
self.scenario.modified_nodes.insert(id, properties);
}
}
Ok(())
}
pub fn get_node(&self, id: NodeId) -> Result<Node> {
if self.scenario.removed_nodes.contains(&id) {
return Err(crate::core::error::Error::Storage(
StorageError::NodeNotFound(id),
));
}
if let Some(node) = self.scenario.added_nodes.get(&id) {
return Ok(node.clone());
}
let mut node = if let Some((vt, tt)) = self.base_time {
self.db.get_node_at_time(id, vt, tt)?
} else {
self.db.get_node(id)?
};
if let Some(patch) = self.scenario.modified_nodes.get(&id) {
let mut builder = PropertyMapBuilder::new();
for (k, v) in node.properties.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
for (k, v) in patch.iter() {
if let Some(key_str) = GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string()) {
builder = builder.insert(&key_str, v.clone());
}
}
node.properties = builder.build();
}
Ok(node)
}
pub fn get_edge(&self, id: EdgeId) -> Result<Edge> {
if self.scenario.removed_edges.contains(&id) {
return Err(crate::core::error::Error::Storage(
StorageError::EdgeNotFound(id),
));
}
if let Some(edge) = self.scenario.added_edges.get(&id) {
return Ok(edge.clone());
}
let edge = if let Some((vt, tt)) = self.base_time {
self.db.get_edge_at_time(id, vt, tt)?
} else {
self.db.get_edge(id)?
};
if self.scenario.removed_nodes.contains(&edge.source)
|| self.scenario.removed_nodes.contains(&edge.target)
{
return Err(crate::core::error::Error::Storage(
StorageError::EdgeNotFound(id),
));
}
Ok(edge)
}
pub fn get_outgoing_edges(&self, id: NodeId) -> Vec<EdgeId> {
if self.scenario.removed_nodes.contains(&id) {
return Vec::new();
}
let mut edges = Vec::new();
if !self.scenario.added_nodes.contains_key(&id) {
let db_edges = if let Some((vt, tt)) = self.base_time {
self.db.get_outgoing_edges_at_time(id, vt, tt)
} else {
self.db.current.get_outgoing_edges(id)
};
for edge_id in db_edges {
if self.scenario.removed_edges.contains(&edge_id) {
continue;
}
let target_opt = if let Some((vt, tt)) = self.base_time {
self.db
.get_edge_at_time(edge_id, vt, tt)
.map(|e| e.target)
.ok()
} else {
self.db.current.get_edge_target(edge_id).ok()
};
if let Some(target) = target_opt
&& !self.scenario.removed_nodes.contains(&target)
{
edges.push(edge_id);
}
}
}
if let Some(added) = self.scenario.added_outgoing.get(&id) {
for &edge_id in added {
if let Some(edge) = self.scenario.added_edges.get(&edge_id)
&& !self.scenario.removed_nodes.contains(&edge.target)
{
edges.push(edge_id);
}
}
}
edges
}
pub fn find_path_bfs(&self, start: NodeId, end: NodeId) -> Option<Vec<EdgeId>> {
if self.scenario.removed_nodes.contains(&start)
|| self.scenario.removed_nodes.contains(&end)
{
return None;
}
if start == end {
return Some(Vec::new());
}
let mut queue = VecDeque::new();
queue.push_back((start, Vec::new()));
let mut visited = HashSet::new();
visited.insert(start);
let max_depth = 1000;
while let Some((current, path)) = queue.pop_front() {
if path.len() > max_depth {
continue;
}
if current == end {
return Some(path);
}
for edge_id in self.get_outgoing_edges(current) {
let target_opt = if let Some(edge) = self.scenario.added_edges.get(&edge_id) {
Some(edge.target)
} else {
if let Some((vt, tt)) = self.base_time {
self.db
.get_edge_at_time(edge_id, vt, tt)
.map(|e| e.target)
.ok()
} else {
self.db.current.get_edge_target(edge_id).ok()
}
};
if let Some(target) = target_opt
&& !visited.contains(&target)
{
visited.insert(target);
let mut new_path = path.clone();
new_path.push(edge_id);
queue.push_back((target, new_path));
}
}
}
None
}
pub fn find_similar(
&self,
property: &str,
vector: &[f32],
k: usize,
) -> Result<Vec<(NodeId, f32)>> {
let mut candidates = Vec::new();
for (id, node) in &self.scenario.added_nodes {
if let Some(val) = node.properties.get(property)
&& let Some(vec) = val.as_vector()
{
let score = crate::core::vector::cosine_similarity(vector, vec)?;
candidates.push((*id, score));
}
}
for (id, patch) in &self.scenario.modified_nodes {
if self.scenario.removed_nodes.contains(id) {
continue;
}
if let Some(val) = patch.get(property)
&& let Some(vec) = val.as_vector()
{
let score = crate::core::vector::cosine_similarity(vector, vec)?;
candidates.push((*id, score));
}
}
let mut exclude_ids = self.scenario.removed_nodes.clone();
for (id, patch) in &self.scenario.modified_nodes {
if patch.contains_key(property) {
exclude_ids.insert(*id);
}
}
let db_results = if self.db.has_vector_index(property) {
if let Some((vt, _)) = self.base_time {
let fetch_k = k + exclude_ids.len();
let results = self
.db
.find_similar_as_of_in(property, vector, fetch_k, vt)?;
results
.into_iter()
.filter(|(id, _)| !exclude_ids.contains(id))
.collect()
} else {
self.db
.find_similar_with_predicate(property, vector, k, |id| {
!exclude_ids.contains(id)
})?
}
} else {
Vec::new() };
candidates.extend(db_results);
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
candidates.truncate(k);
Ok(candidates)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
use crate::index::vector::temporal::TemporalVectorConfig;
use crate::index::vector::{DistanceMetric, HnswConfig};
use std::thread;
use std::time::Duration;
#[test]
fn test_hindsight_temporal_get() {
let db = AletheiaDB::new().unwrap();
let props_v1 = PropertyMapBuilder::new().insert("version", 1).build();
let id = db.create_node("Node", props_v1).unwrap();
thread::sleep(Duration::from_millis(10));
let t1 = time::now();
thread::sleep(Duration::from_millis(10));
let props_v2 = PropertyMapBuilder::new().insert("version", 2).build();
db.write(|tx| tx.update_node(id, props_v2)).unwrap();
thread::sleep(Duration::from_millis(10));
let t2 = time::now();
let hs_t1 = Hindsight::new(&db).at(t1, t1);
let node_t1 = hs_t1.get_node(id).unwrap();
assert_eq!(
node_t1.get_property("version").unwrap().as_int().unwrap(),
1
);
let hs_t2 = Hindsight::new(&db).at(t2, t2);
let node_t2 = hs_t2.get_node(id).unwrap();
assert_eq!(
node_t2.get_property("version").unwrap().as_int().unwrap(),
2
);
}
#[test]
fn test_hindsight_diff() {
let db = AletheiaDB::new().unwrap();
let props = PropertyMapBuilder::new().build();
let b = db.create_node("Node", props.clone()).unwrap();
let c = db.create_node("Node", props.clone()).unwrap();
let d = db.create_node("Node", props.clone()).unwrap();
let mut hs = Hindsight::new(&db);
let a = hs.add_node("Node", props.clone()).unwrap();
hs.remove_node(b);
let props_mod = PropertyMapBuilder::new().insert("mod", true).build();
hs.update_node(c, props_mod).unwrap();
hs.update_node(d, props.clone()).unwrap();
hs.remove_node(d);
let diff = hs.diff().unwrap();
assert!(diff.added_nodes.contains_key(&a));
assert_eq!(diff.added_nodes.len(), 1);
assert!(diff.removed_nodes.contains(&b));
assert!(diff.removed_nodes.contains(&d));
assert_eq!(diff.removed_nodes.len(), 2);
assert!(diff.modified_nodes.contains_key(&c));
assert!(!diff.modified_nodes.contains_key(&d));
assert_eq!(diff.modified_nodes.len(), 1);
}
#[test]
fn test_hindsight_temporal_vector_search() {
let db = AletheiaDB::new().unwrap();
let hnsw_config = HnswConfig::new(2, DistanceMetric::Cosine);
let temporal_config = TemporalVectorConfig::default_with_hnsw(hnsw_config.clone());
db.enable_temporal_vector_index("vec", temporal_config)
.unwrap();
let props_v1 = PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build();
let id = db.create_node("Node", props_v1).unwrap();
thread::sleep(Duration::from_millis(50));
let t1 = time::now();
thread::sleep(Duration::from_millis(50));
let props_v2 = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 1.0])
.build();
db.write(|tx| tx.update_node(id, props_v2)).unwrap();
if let Some(idx) = db.current.get_temporal_vector_index() {
idx.create_manual_snapshot().unwrap();
}
thread::sleep(Duration::from_millis(50));
let t2 = time::now();
let hs_t1 = Hindsight::new(&db).at(t1, t1);
let results_t1 = hs_t1.find_similar("vec", &[1.0, 0.0], 1).unwrap();
assert!(!results_t1.is_empty());
assert_eq!(results_t1[0].0, id);
assert!((results_t1[0].1 - 1.0).abs() < 0.001);
let hs_t2 = Hindsight::new(&db).at(t2, t2);
let results_t2 = hs_t2.find_similar("vec", &[1.0, 0.0], 1).unwrap();
assert!(!results_t2.is_empty());
assert_eq!(results_t2[0].0, id);
assert!(results_t2[0].1 < 0.001); }
#[test]
fn test_hindsight_basic_crud() {
let db = AletheiaDB::new().unwrap();
let mut hindsight = Hindsight::new(&db);
let props = PropertyMapBuilder::new().insert("name", "Ghost").build();
let id = hindsight.add_node("Spirit", props).unwrap();
assert!(id.as_u64() > MAX_VALID_ID);
let node = hindsight.get_node(id).unwrap();
assert!(node.has_label_str("Spirit"));
assert_eq!(
node.get_property("name").unwrap().as_str().unwrap(),
"Ghost"
);
let update_props = PropertyMapBuilder::new().insert("age", 100).build();
hindsight.update_node(id, update_props).unwrap();
let node_updated = hindsight.get_node(id).unwrap();
assert_eq!(
node_updated.get_property("age").unwrap().as_int().unwrap(),
100
);
assert_eq!(
node_updated.get_property("name").unwrap().as_str().unwrap(),
"Ghost"
);
hindsight.remove_node(id);
assert!(hindsight.get_node(id).is_err());
}
#[test]
fn test_hindsight_pathfinding() {
let db = AletheiaDB::new().unwrap();
let props = PropertyMapBuilder::new().build();
let a = db.create_node("Node", props.clone()).unwrap();
let b = db.create_node("Node", props.clone()).unwrap();
let d = db.create_node("Node", props.clone()).unwrap();
db.create_edge(a, b, "NEXT", props.clone()).unwrap();
let mut hindsight = Hindsight::new(&db);
let path = hindsight.find_path_bfs(a, b).unwrap();
assert_eq!(path.len(), 1);
assert!(hindsight.find_path_bfs(b, d).is_none());
let _e_bd = hindsight.add_edge(b, d, "NEXT", props.clone()).unwrap();
let path_new = hindsight.find_path_bfs(a, d).unwrap();
assert_eq!(path_new.len(), 2);
let edges = hindsight.get_outgoing_edges(a);
let e_ab = edges[0];
hindsight.remove_edge(e_ab);
assert!(hindsight.find_path_bfs(a, d).is_none());
}
#[test]
fn test_hindsight_vector_search() {
let db = AletheiaDB::new().unwrap();
let config = HnswConfig::new(2, DistanceMetric::Cosine);
db.enable_vector_index("vec", config).unwrap();
let props1 = PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build();
let n1 = db.create_node("Node", props1).unwrap();
let props2 = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 1.0])
.build();
let n2 = db.create_node("Node", props2).unwrap();
let mut hindsight = Hindsight::new(&db);
let props3 = PropertyMapBuilder::new()
.insert_vector("vec", &[0.9, 0.1])
.build();
let n3 = hindsight.add_node("Node", props3).unwrap();
hindsight.remove_node(n2);
let results = hindsight.find_similar("vec", &[1.0, 0.0], 5).unwrap();
assert_eq!(results.len(), 2);
let ids: Vec<NodeId> = results.iter().map(|(id, _)| *id).collect();
assert!(ids.contains(&n1));
assert!(ids.contains(&n3));
assert!(!ids.contains(&n2));
}
#[test]
fn test_hindsight_find_similar_excludes_removed_modified_nodes() {
let db = AletheiaDB::new().unwrap();
let mut hindsight = Hindsight::new(&db);
let props = PropertyMapBuilder::new()
.insert("name", "Zombie")
.insert_vector("vec", &[1.0, 0.0])
.build();
let id = db.create_node("Node", props.clone()).unwrap();
let update_props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.9, 0.1])
.build();
hindsight.update_node(id, update_props).unwrap();
hindsight.remove_node(id);
let results = hindsight.find_similar("vec", &[1.0, 0.0], 10).unwrap();
let found = results.iter().any(|(n_id, _)| *n_id == id);
assert!(
!found,
"Removed node should not be found in similarity search, but it was found as a zombie candidate from modified_nodes"
);
}
}