1use crate::models::ConfidenceSource;
58use crate::models::{Memory, MemoryKind, Tier};
59use anyhow::Result;
60use rusqlite::{Connection, params};
61use serde_json::json;
62
63#[derive(Debug, Clone)]
73struct PendingNotification {
74 dependent_id: String,
75 dependent_namespace: String,
76 invalidated_id: String,
77 invalidating_id: String,
78 timestamp: String,
79}
80
81pub fn propagate_reflection_invalidation(
105 conn: &Connection,
106 invalidated_id: &str,
107 invalidating_id: &str,
108 signing_agent_id: &str,
109) -> Result<Vec<String>> {
110 let timestamp = chrono::Utc::now().to_rfc3339();
111 let dependents = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
112 let mut notified_ids: Vec<String> = Vec::with_capacity(dependents.len());
113
114 for (dependent_id, dependent_namespace) in dependents {
115 let pending = PendingNotification {
116 dependent_id: dependent_id.clone(),
117 dependent_namespace,
118 invalidated_id: invalidated_id.to_string(),
119 invalidating_id: invalidating_id.to_string(),
120 timestamp: timestamp.clone(),
121 };
122 write_notification(conn, &pending, signing_agent_id)?;
123 notified_ids.push(dependent_id);
124 }
125
126 Ok(notified_ids)
127}
128
129pub fn list_dependents_of_invalidated(
143 conn: &Connection,
144 invalidated_id: &str,
145) -> Result<Vec<DependentRecord>> {
146 let rows = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
147 Ok(rows
148 .into_iter()
149 .map(|(id, namespace)| DependentRecord { id, namespace })
150 .collect())
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct DependentRecord {
156 pub id: String,
157 pub namespace: String,
158}
159
160fn list_dependents_of_invalidated_internal(
163 conn: &Connection,
164 invalidated_id: &str,
165) -> Result<Vec<(String, String)>> {
166 let mut stmt = conn.prepare(
167 "SELECT m.id, m.namespace
168 FROM memory_links l
169 JOIN memories m ON m.id = l.source_id
170 WHERE l.target_id = ?1 AND l.relation = 'reflects_on'",
171 )?;
172 let rows = stmt.query_map(params![invalidated_id], |row| {
173 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
174 })?;
175 let mut out: Vec<(String, String)> = Vec::new();
176 for r in rows {
177 out.push(r?);
178 }
179 Ok(out)
180}
181
182fn invalidations_namespace_for(parent: &str) -> String {
188 format!("{parent}/_invalidations")
189}
190
191fn write_notification(
199 conn: &Connection,
200 pending: &PendingNotification,
201 signing_agent_id: &str,
202) -> Result<()> {
203 let now = pending.timestamp.clone();
204 let target_namespace = invalidations_namespace_for(&pending.dependent_namespace);
205
206 let title = format!(
211 "invalidation: {} -> {}",
212 pending.invalidated_id, pending.dependent_id
213 );
214
215 let metadata = json!({
216 "agent_id": signing_agent_id,
217 "notification_kind": "reflection_invalidation",
218 "dependent_id": pending.dependent_id,
219 "invalidated_id": pending.invalidated_id,
220 "invalidating_id": pending.invalidating_id,
221 "timestamp": pending.timestamp,
222 });
223
224 let mem = Memory {
225 id: uuid::Uuid::new_v4().to_string(),
226 tier: Tier::Mid,
227 namespace: target_namespace,
228 title,
229 content: format!(
230 "Reflection {invalidated} was superseded by {invalidating}. \
231 Memory {dependent} reflects_on the now-invalidated reflection \
232 and may need re-evaluation.",
233 invalidated = pending.invalidated_id,
234 invalidating = pending.invalidating_id,
235 dependent = pending.dependent_id,
236 ),
237 tags: vec!["_invalidation".to_string()],
238 priority: 7,
239 confidence: 1.0,
240 source: "notification".to_string(),
241 access_count: 0,
242 created_at: now.clone(),
243 updated_at: now,
244 last_accessed_at: None,
245 expires_at: None, metadata,
247 reflection_depth: 0,
248 memory_kind: MemoryKind::Observation,
249 entity_id: None,
250 persona_version: None,
251 citations: Vec::new(),
252 source_uri: None,
253 source_span: None,
254 confidence_source: ConfidenceSource::CallerProvided,
255 confidence_signals: None,
256 confidence_decayed_at: None,
257 version: 1,
258 };
259
260 crate::storage::insert(conn, &mem)?;
261
262 let payload_bytes = json!({
266 "event": "reflection.invalidation_notified",
267 "dependent_id": pending.dependent_id,
268 "invalidated_id": pending.invalidated_id,
269 "invalidating_id": pending.invalidating_id,
270 "timestamp": pending.timestamp,
271 })
272 .to_string()
273 .into_bytes();
274
275 let event = crate::signed_events::SignedEvent {
276 id: uuid::Uuid::new_v4().to_string(),
277 agent_id: signing_agent_id.to_string(),
278 event_type: crate::signed_events::event_types::REFLECTION_INVALIDATION_NOTIFIED.to_string(),
279 payload_hash: crate::signed_events::payload_hash(&payload_bytes),
280 signature: None,
281 attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
282 timestamp: pending.timestamp.clone(),
283 ..crate::signed_events::SignedEvent::default()
284 };
285 if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
286 tracing::warn!(
291 target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
292 dependent_id = %pending.dependent_id,
293 invalidated_id = %pending.invalidated_id,
294 "failed to append reflection.invalidation_notified row: {e}"
295 );
296 }
297
298 Ok(())
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304 use crate::models::Memory;
305 use crate::storage as db;
306
307 fn fresh_conn() -> Connection {
308 db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
309 }
310
311 fn make_mem(title: &str, namespace: &str, kind: MemoryKind) -> Memory {
312 let now = chrono::Utc::now().to_rfc3339();
313 Memory {
314 id: uuid::Uuid::new_v4().to_string(),
315 tier: Tier::Mid,
316 namespace: namespace.to_string(),
317 title: title.to_string(),
318 content: format!("body {title}"),
319 tags: vec![],
320 priority: 5,
321 confidence: 1.0,
322 source: "test".to_string(),
323 access_count: 0,
324 created_at: now.clone(),
325 updated_at: now,
326 last_accessed_at: None,
327 expires_at: None,
328 metadata: json!({"agent_id": "ai:tester"}),
329 reflection_depth: if matches!(kind, MemoryKind::Reflection) {
330 1
331 } else {
332 0
333 },
334 memory_kind: kind,
335 entity_id: None,
336 persona_version: None,
337 citations: Vec::new(),
338 source_uri: None,
339 source_span: None,
340 confidence_source: ConfidenceSource::CallerProvided,
341 confidence_signals: None,
342 confidence_decayed_at: None,
343 version: 1,
344 }
345 }
346
347 #[test]
348 fn invalidations_namespace_appends_underscore_segment() {
349 assert_eq!(
350 invalidations_namespace_for("team/alpha"),
351 "team/alpha/_invalidations"
352 );
353 assert_eq!(
354 invalidations_namespace_for("global"),
355 "global/_invalidations"
356 );
357 }
358
359 #[test]
360 fn list_dependents_returns_inbound_reflects_on_only() {
361 let conn = fresh_conn();
362 let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
365 let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
366 let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
367 let m3 = make_mem("M3", "ns-a", MemoryKind::Observation);
368 let r1_id = db::insert(&conn, &r1).expect("insert r1");
369 let m1_id = db::insert(&conn, &m1).expect("insert m1");
370 let m2_id = db::insert(&conn, &m2).expect("insert m2");
371 let m3_id = db::insert(&conn, &m3).expect("insert m3");
372 db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("link m1");
373 db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("link m2");
374 db::create_link(&conn, &m3_id, &r1_id, "related_to").expect("link m3 (noise)");
375
376 let deps = list_dependents_of_invalidated(&conn, &r1_id).expect("walk");
377 let ids: Vec<&str> = deps.iter().map(|d| d.id.as_str()).collect();
378 assert_eq!(ids.len(), 2, "only reflects_on edges count, got {ids:?}");
379 assert!(ids.contains(&m1_id.as_str()));
380 assert!(ids.contains(&m2_id.as_str()));
381 assert!(!ids.contains(&m3_id.as_str()), "related_to leaked through");
382 }
383
384 #[test]
385 fn propagate_writes_one_notification_per_dependent() {
386 let conn = fresh_conn();
387 let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
388 let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
389 let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
390 let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
391 let r1_id = db::insert(&conn, &r1).expect("insert r1");
392 let r2_id = db::insert(&conn, &r2).expect("insert r2");
393 let m1_id = db::insert(&conn, &m1).expect("insert m1");
394 let m2_id = db::insert(&conn, &m2).expect("insert m2");
395 db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
396 db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("m2→r1");
397
398 let notified =
399 propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
400 assert_eq!(notified.len(), 2);
401
402 let count: i64 = conn
405 .query_row(
406 "SELECT COUNT(*) FROM memories WHERE namespace = ?1",
407 params!["ns-a/_invalidations"],
408 |r| r.get(0),
409 )
410 .unwrap();
411 assert_eq!(count, 1, "ns-a got 1 notification (for m1)");
412
413 let count_b: i64 = conn
414 .query_row(
415 "SELECT COUNT(*) FROM memories WHERE namespace = ?1",
416 params!["ns-b/_invalidations"],
417 |r| r.get(0),
418 )
419 .unwrap();
420 assert_eq!(count_b, 1, "ns-b got 1 notification (for m2)");
421 }
422
423 #[test]
424 fn propagate_records_signed_events_row_per_notification() {
425 let conn = fresh_conn();
426 let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
427 let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
428 let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
429 let r1_id = db::insert(&conn, &r1).expect("insert r1");
430 let r2_id = db::insert(&conn, &r2).expect("insert r2");
431 let m1_id = db::insert(&conn, &m1).expect("insert m1");
432 db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
433
434 let _ =
435 propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
436
437 let cnt: i64 = conn
438 .query_row(
439 "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
440 params!["reflection.invalidation_notified"],
441 |r| r.get(0),
442 )
443 .unwrap_or(0);
444 assert_eq!(cnt, 1, "one signed_events row per notification");
445 }
446
447 #[test]
448 fn propagate_with_no_dependents_is_a_no_op() {
449 let conn = fresh_conn();
450 let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
451 let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
452 let r1_id = db::insert(&conn, &r1).expect("insert r1");
453 let r2_id = db::insert(&conn, &r2).expect("insert r2");
454 let notified =
455 propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
456 assert!(notified.is_empty());
457 let count: i64 = conn
458 .query_row(
459 "SELECT COUNT(*) FROM memories WHERE namespace LIKE '%_invalidations'",
460 [],
461 |r| r.get(0),
462 )
463 .unwrap();
464 assert_eq!(count, 0);
465 }
466
467 #[test]
468 fn metadata_carries_all_four_required_fields() {
469 let conn = fresh_conn();
470 let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
471 let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
472 let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
473 let r1_id = db::insert(&conn, &r1).expect("insert r1");
474 let r2_id = db::insert(&conn, &r2).expect("insert r2");
475 let m1_id = db::insert(&conn, &m1).expect("insert m1");
476 db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
477
478 let _ =
479 propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
480
481 let meta_str: String = conn
482 .query_row(
483 "SELECT metadata FROM memories WHERE namespace = ?1 LIMIT 1",
484 params!["ns-a/_invalidations"],
485 |r| r.get(0),
486 )
487 .unwrap();
488 let meta: serde_json::Value = serde_json::from_str(&meta_str).unwrap();
489 assert_eq!(meta["dependent_id"].as_str(), Some(m1_id.as_str()));
490 assert_eq!(meta["invalidated_id"].as_str(), Some(r1_id.as_str()));
491 assert_eq!(meta["invalidating_id"].as_str(), Some(r2_id.as_str()));
492 assert!(meta["timestamp"].is_string());
493 assert_eq!(
494 meta["notification_kind"].as_str(),
495 Some("reflection_invalidation")
496 );
497 }
498}