use crate::AletheiaDB;
use crate::core::error::Result;
use crate::core::id::{NodeId, VersionId};
use crate::core::property::PropertyMap;
use crate::core::vector::ops::euclidean_distance;
#[derive(Debug, Clone)]
pub struct MemoryFrame {
pub timestamp: i64,
pub version_id: VersionId,
pub reason: String,
pub properties: PropertyMap,
}
pub struct Mnemosyne<'a> {
db: &'a AletheiaDB,
}
impl<'a> Mnemosyne<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self { db }
}
pub fn consolidate_memory(
&self,
node_id: NodeId,
vector_prop: &str,
threshold: f32,
) -> Result<Vec<MemoryFrame>> {
let history = self.db.get_node_history(node_id)?;
let mut frames = Vec::new();
let mut last_kept_vector: Option<Vec<f32>> = None;
let mut last_kept_props: Option<PropertyMap> = None;
for (i, version) in history.versions.iter().enumerate() {
let current_props = &version.properties;
let current_vector = current_props
.get(vector_prop)
.and_then(|v| v.as_vector())
.map(|v| v.to_vec());
let timestamp = version.temporal.transaction_time().start().wallclock();
if i == 0 {
frames.push(MemoryFrame {
timestamp,
version_id: version.version_id,
reason: "Initial State".to_string(),
properties: current_props.clone(),
});
last_kept_vector = current_vector;
last_kept_props = Some(current_props.clone());
continue;
}
let mut semantic_change = false;
let mut distance = 0.0;
if let (Some(last_vec), Some(curr_vec)) = (&last_kept_vector, ¤t_vector) {
match euclidean_distance(last_vec, curr_vec) {
Ok(d) => {
distance = d;
if distance > threshold {
semantic_change = true;
}
}
Err(_) => {
semantic_change = true;
distance = f32::INFINITY;
}
}
} else if last_kept_vector.is_some() != current_vector.is_some() {
semantic_change = true;
distance = f32::INFINITY; }
let mut property_change = false;
if let Some(last_props) = &last_kept_props {
for (k, v) in current_props.iter() {
if let Some(key_str) =
crate::core::GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string())
{
if key_str == vector_prop {
continue;
}
match last_props.get(key_str.as_str()) {
Some(last_val) => {
if !last_val.semantically_equal(v) {
property_change = true;
break;
}
}
None => {
property_change = true; break;
}
}
}
}
if !property_change {
for (k, _) in last_props.iter() {
if let Some(key_str) =
crate::core::GLOBAL_INTERNER.resolve_with(*k, |s| s.to_string())
{
if key_str == vector_prop {
continue;
}
if !current_props.contains_key(&key_str) {
property_change = true; break;
}
}
}
}
} else {
property_change = true; }
let reason = if semantic_change && property_change {
format!("Vector Shift ({:.4}) + Property Change", distance)
} else if semantic_change {
format!("Vector Shift ({:.4})", distance)
} else if property_change {
"Property Change".to_string()
} else {
String::new()
};
if !reason.is_empty() {
frames.push(MemoryFrame {
timestamp,
version_id: version.version_id,
reason,
properties: current_props.clone(),
});
last_kept_vector = current_vector;
last_kept_props = Some(current_props.clone());
}
}
Ok(frames)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
#[test]
fn test_mnemosyne_drift_consolidation() {
let db = AletheiaDB::new().unwrap();
let config = HnswConfig::new(2, DistanceMetric::Euclidean);
db.enable_vector_index("vec", config).unwrap();
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let node = db.create_node("Node", props).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[0.1, 0.0])
.build(),
)
})
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[0.2, 0.0])
.build(),
)
})
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
let mnemosyne = Mnemosyne::new(&db);
let frames = mnemosyne.consolidate_memory(node, "vec", 0.5).unwrap();
assert_eq!(frames.len(), 2, "Should consolidate to 2 frames");
assert_eq!(frames[0].reason, "Initial State");
assert!(frames[1].reason.contains("Vector Shift"));
let last_vec = frames[1]
.properties
.get("vec")
.unwrap()
.as_vector()
.unwrap();
assert_eq!(last_vec, &[1.0, 0.0]);
}
#[test]
fn test_mnemosyne_accumulation() {
let db = AletheiaDB::new().unwrap();
let config = HnswConfig::new(2, DistanceMetric::Euclidean);
db.enable_vector_index("vec", config).unwrap();
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let node = db.create_node("Node", props).unwrap();
for i in 1..=5 {
std::thread::sleep(std::time::Duration::from_millis(1));
let val = i as f32 * 0.2;
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[val, 0.0])
.build(),
)
})
.unwrap();
}
let mnemosyne = Mnemosyne::new(&db);
let frames = mnemosyne.consolidate_memory(node, "vec", 0.5).unwrap();
assert_eq!(frames.len(), 2);
let vec_t3 = frames[1]
.properties
.get("vec")
.unwrap()
.as_vector()
.unwrap();
assert!((vec_t3[0] - 0.6).abs() < 1e-5);
}
#[test]
fn test_mnemosyne_property_change() {
let db = AletheiaDB::new().unwrap();
let config = HnswConfig::new(2, DistanceMetric::Euclidean);
db.enable_vector_index("vec", config).unwrap();
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.insert("status", "ok")
.build();
let node = db.create_node("Node", props).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.write(|tx| {
tx.update_node(
node,
PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.insert("status", "error")
.build(),
)
})
.unwrap();
let mnemosyne = Mnemosyne::new(&db);
let frames = mnemosyne.consolidate_memory(node, "vec", 0.5).unwrap();
assert_eq!(frames.len(), 2);
assert_eq!(frames[1].reason, "Property Change");
assert_eq!(
frames[1].properties.get("status").unwrap().as_str(),
Some("error")
);
}
}