use crate::models::ConfidenceSource;
use crate::models::{Memory, MemoryKind, Tier};
use anyhow::Result;
use rusqlite::{Connection, params};
use serde_json::json;
#[derive(Debug, Clone)]
struct PendingNotification {
dependent_id: String,
dependent_namespace: String,
invalidated_id: String,
invalidating_id: String,
timestamp: String,
}
pub fn propagate_reflection_invalidation(
conn: &Connection,
invalidated_id: &str,
invalidating_id: &str,
signing_agent_id: &str,
) -> Result<Vec<String>> {
let timestamp = chrono::Utc::now().to_rfc3339();
let dependents = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
let mut notified_ids: Vec<String> = Vec::with_capacity(dependents.len());
for (dependent_id, dependent_namespace) in dependents {
let pending = PendingNotification {
dependent_id: dependent_id.clone(),
dependent_namespace,
invalidated_id: invalidated_id.to_string(),
invalidating_id: invalidating_id.to_string(),
timestamp: timestamp.clone(),
};
write_notification(conn, &pending, signing_agent_id)?;
notified_ids.push(dependent_id);
}
Ok(notified_ids)
}
pub fn list_dependents_of_invalidated(
conn: &Connection,
invalidated_id: &str,
) -> Result<Vec<DependentRecord>> {
let rows = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
Ok(rows
.into_iter()
.map(|(id, namespace)| DependentRecord { id, namespace })
.collect())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DependentRecord {
pub id: String,
pub namespace: String,
}
fn list_dependents_of_invalidated_internal(
conn: &Connection,
invalidated_id: &str,
) -> Result<Vec<(String, String)>> {
let mut stmt = conn.prepare(
"SELECT m.id, m.namespace
FROM memory_links l
JOIN memories m ON m.id = l.source_id
WHERE l.target_id = ?1 AND l.relation = 'reflects_on'",
)?;
let rows = stmt.query_map(params![invalidated_id], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut out: Vec<(String, String)> = Vec::new();
for r in rows {
out.push(r?);
}
Ok(out)
}
fn invalidations_namespace_for(parent: &str) -> String {
format!("{parent}/_invalidations")
}
fn write_notification(
conn: &Connection,
pending: &PendingNotification,
signing_agent_id: &str,
) -> Result<()> {
let now = pending.timestamp.clone();
let target_namespace = invalidations_namespace_for(&pending.dependent_namespace);
let title = format!(
"invalidation: {} -> {}",
pending.invalidated_id, pending.dependent_id
);
let metadata = json!({
"agent_id": signing_agent_id,
"notification_kind": "reflection_invalidation",
"dependent_id": pending.dependent_id,
"invalidated_id": pending.invalidated_id,
"invalidating_id": pending.invalidating_id,
"timestamp": pending.timestamp,
});
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: target_namespace,
title,
content: format!(
"Reflection {invalidated} was superseded by {invalidating}. \
Memory {dependent} reflects_on the now-invalidated reflection \
and may need re-evaluation.",
invalidated = pending.invalidated_id,
invalidating = pending.invalidating_id,
dependent = pending.dependent_id,
),
tags: vec!["_invalidation".to_string()],
priority: 7,
confidence: 1.0,
source: "notification".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None, metadata,
reflection_depth: 0,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
crate::storage::insert(conn, &mem)?;
let payload_bytes = json!({
"event": "reflection.invalidation_notified",
"dependent_id": pending.dependent_id,
"invalidated_id": pending.invalidated_id,
"invalidating_id": pending.invalidating_id,
"timestamp": pending.timestamp,
})
.to_string()
.into_bytes();
let event = crate::signed_events::SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: signing_agent_id.to_string(),
event_type: crate::signed_events::event_types::REFLECTION_INVALIDATION_NOTIFIED.to_string(),
payload_hash: crate::signed_events::payload_hash(&payload_bytes),
signature: None,
attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
timestamp: pending.timestamp.clone(),
..crate::signed_events::SignedEvent::default()
};
if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
tracing::warn!(
target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
dependent_id = %pending.dependent_id,
invalidated_id = %pending.invalidated_id,
"failed to append reflection.invalidation_notified row: {e}"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::Memory;
use crate::storage as db;
fn fresh_conn() -> Connection {
db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn make_mem(title: &str, namespace: &str, kind: MemoryKind) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: namespace.to_string(),
title: title.to_string(),
content: format!("body {title}"),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: json!({"agent_id": "ai:tester"}),
reflection_depth: if matches!(kind, MemoryKind::Reflection) {
1
} else {
0
},
memory_kind: kind,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn invalidations_namespace_appends_underscore_segment() {
assert_eq!(
invalidations_namespace_for("team/alpha"),
"team/alpha/_invalidations"
);
assert_eq!(
invalidations_namespace_for("global"),
"global/_invalidations"
);
}
#[test]
fn list_dependents_returns_inbound_reflects_on_only() {
let conn = fresh_conn();
let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
let m3 = make_mem("M3", "ns-a", MemoryKind::Observation);
let r1_id = db::insert(&conn, &r1).expect("insert r1");
let m1_id = db::insert(&conn, &m1).expect("insert m1");
let m2_id = db::insert(&conn, &m2).expect("insert m2");
let m3_id = db::insert(&conn, &m3).expect("insert m3");
db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("link m1");
db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("link m2");
db::create_link(&conn, &m3_id, &r1_id, "related_to").expect("link m3 (noise)");
let deps = list_dependents_of_invalidated(&conn, &r1_id).expect("walk");
let ids: Vec<&str> = deps.iter().map(|d| d.id.as_str()).collect();
assert_eq!(ids.len(), 2, "only reflects_on edges count, got {ids:?}");
assert!(ids.contains(&m1_id.as_str()));
assert!(ids.contains(&m2_id.as_str()));
assert!(!ids.contains(&m3_id.as_str()), "related_to leaked through");
}
#[test]
fn propagate_writes_one_notification_per_dependent() {
let conn = fresh_conn();
let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
let r1_id = db::insert(&conn, &r1).expect("insert r1");
let r2_id = db::insert(&conn, &r2).expect("insert r2");
let m1_id = db::insert(&conn, &m1).expect("insert m1");
let m2_id = db::insert(&conn, &m2).expect("insert m2");
db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("m2→r1");
let notified =
propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
assert_eq!(notified.len(), 2);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memories WHERE namespace = ?1",
params!["ns-a/_invalidations"],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1, "ns-a got 1 notification (for m1)");
let count_b: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memories WHERE namespace = ?1",
params!["ns-b/_invalidations"],
|r| r.get(0),
)
.unwrap();
assert_eq!(count_b, 1, "ns-b got 1 notification (for m2)");
}
#[test]
fn propagate_records_signed_events_row_per_notification() {
let conn = fresh_conn();
let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
let r1_id = db::insert(&conn, &r1).expect("insert r1");
let r2_id = db::insert(&conn, &r2).expect("insert r2");
let m1_id = db::insert(&conn, &m1).expect("insert m1");
db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
let _ =
propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
let cnt: i64 = conn
.query_row(
"SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
params!["reflection.invalidation_notified"],
|r| r.get(0),
)
.unwrap_or(0);
assert_eq!(cnt, 1, "one signed_events row per notification");
}
#[test]
fn propagate_with_no_dependents_is_a_no_op() {
let conn = fresh_conn();
let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
let r1_id = db::insert(&conn, &r1).expect("insert r1");
let r2_id = db::insert(&conn, &r2).expect("insert r2");
let notified =
propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
assert!(notified.is_empty());
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memories WHERE namespace LIKE '%_invalidations'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn metadata_carries_all_four_required_fields() {
let conn = fresh_conn();
let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
let r1_id = db::insert(&conn, &r1).expect("insert r1");
let r2_id = db::insert(&conn, &r2).expect("insert r2");
let m1_id = db::insert(&conn, &m1).expect("insert m1");
db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
let _ =
propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
let meta_str: String = conn
.query_row(
"SELECT metadata FROM memories WHERE namespace = ?1 LIMIT 1",
params!["ns-a/_invalidations"],
|r| r.get(0),
)
.unwrap();
let meta: serde_json::Value = serde_json::from_str(&meta_str).unwrap();
assert_eq!(meta["dependent_id"].as_str(), Some(m1_id.as_str()));
assert_eq!(meta["invalidated_id"].as_str(), Some(r1_id.as_str()));
assert_eq!(meta["invalidating_id"].as_str(), Some(r2_id.as_str()));
assert!(meta["timestamp"].is_string());
assert_eq!(
meta["notification_kind"].as_str(),
Some("reflection_invalidation")
);
}
}