Skip to main content

ai_memory/notification/
invalidation.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 L2-3 (issue #668) — Reflection invalidation propagation
5//! (notification, not cascade).
6//!
7//! ## Wire contract
8//!
9//! When a `supersedes` edge lands with both endpoints carrying
10//! `memory_kind = 'reflection'`, the substrate
11//!
12//! 1. Walks every memory `Mi` whose row satisfies
13//!    `(Mi.id, invalidated_reflection_id, "reflects_on")` in
14//!    `memory_links`. That is the set of memories which used the
15//!    now-invalidated reflection as a reasoning source.
16//! 2. For each `Mi`, writes one **notification memory** into the
17//!    namespace `<Mi.namespace>/_invalidations`. The notification's
18//!    `metadata` carries the four identifiers a curator/operator
19//!    needs to triage: `dependent_id`, `invalidated_id`,
20//!    `invalidating_id`, and an RFC3339 `timestamp`.
21//! 3. Appends one `reflection.invalidation_notified` row to
22//!    `signed_events` per notification so an auditor can replay the
23//!    exact set of dependents that were flagged for review.
24//!
25//! ## Why notification, not cascade
26//!
27//! Dependents are **not** auto-superseded. A reflection that pointed
28//! at the invalidated reflection may still be valid (the new winner
29//! could be a narrower restatement, a rephrasing, or a strictly
30//! stronger claim that the dependent should adopt unchanged).
31//! Auto-cascading the invalidation would
32//!
33//! * destroy curator/operator judgment on whether the dependent
34//!   chain is genuinely affected, and
35//! * burn the trust budget the substrate has built: a single bad
36//!   supersession would silently nuke an arbitrarily large reflection
37//!   sub-graph.
38//!
39//! The notification memory is the explicit hand-off — operators (via
40//! the new `memory_dependents_of_invalidated` MCP tool) or the
41//! curator pass surface the flagged set and the human/agent decides
42//! per-dependent.
43//!
44//! ## Idempotency
45//!
46//! The walker is **not** internally idempotent in v0.7.0 — calling
47//! it twice on the same invalidation produces two notification
48//! memories per dependent (they upsert on `(title, namespace)` so
49//! the row count stays bounded by the namespace+title combinatorics,
50//! but each call still attempts the insert). The MCP-side caller in
51//! `mcp::tools::link::handle_link` only fires the walker once per
52//! successful supersedes write, so duplicates require a deliberate
53//! re-invocation. This keeps the helper simple; the v0.8.0 backlog
54//! tracks moving idempotency into the walker itself if the
55//! cross-peer federation case demands it.
56
57use crate::models::ConfidenceSource;
58use crate::models::{Memory, MemoryKind, Tier};
59use anyhow::Result;
60use rusqlite::{Connection, params};
61use serde_json::json;
62
63/// One namespaced notification row to be written into
64/// `<namespace>/_invalidations`.
65///
66/// Internal-only struct: callers consume the higher-level entry
67/// points (`propagate_reflection_invalidation`,
68/// `list_dependents_of_invalidated`). Kept distinct from the wire
69/// `Memory` so the walker can stage all rows before any DB write
70/// (the write loop short-circuits on the first error so a partial
71/// fan-out leaves a deterministic prefix).
72#[derive(Debug, Clone)]
73struct PendingNotification {
74    dependent_id: String,
75    dependent_namespace: String,
76    invalidated_id: String,
77    invalidating_id: String,
78    timestamp: String,
79}
80
81/// Public entry point for the substrate-side walker.
82///
83/// Called by `mcp::tools::link::handle_link` exactly once per
84/// successful Reflection→Reflection `supersedes` write. The caller
85/// has already verified
86///
87/// * the edge relation is `"supersedes"`,
88/// * both `invalidated_id` and `invalidating_id` resolve to
89///   memories whose `memory_kind == MemoryKind::Reflection`.
90///
91/// Returns the list of dependent memory ids that were notified —
92/// useful both for the `memory_link` wire response (so the caller
93/// can log how many dependents were flagged) and for the test
94/// suite's acceptance checks.
95///
96/// # Errors
97///
98/// Returns the first SQL error encountered. On error, any
99/// notifications already written before the failure are left in
100/// place — the substrate prefers eventual consistency to atomic
101/// rollback here because (a) each notification is independently
102/// useful, and (b) the `signed_events` companion row gives the
103/// auditor the exact partial-prefix for forensic replay.
104pub fn propagate_reflection_invalidation(
105    conn: &Connection,
106    invalidated_id: &str,
107    invalidating_id: &str,
108    signing_agent_id: &str,
109) -> Result<Vec<String>> {
110    let timestamp = chrono::Utc::now().to_rfc3339();
111    let dependents = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
112    let mut notified_ids: Vec<String> = Vec::with_capacity(dependents.len());
113
114    for (dependent_id, dependent_namespace) in dependents {
115        let pending = PendingNotification {
116            dependent_id: dependent_id.clone(),
117            dependent_namespace,
118            invalidated_id: invalidated_id.to_string(),
119            invalidating_id: invalidating_id.to_string(),
120            timestamp: timestamp.clone(),
121        };
122        write_notification(conn, &pending, signing_agent_id)?;
123        notified_ids.push(dependent_id);
124    }
125
126    Ok(notified_ids)
127}
128
129/// List the dependents of an invalidated reflection — every memory
130/// whose row writes `(self → invalidated_id, "reflects_on")` into
131/// `memory_links`. Returned as `(dependent_id, dependent_namespace)`
132/// so the caller can shape the notification's target namespace
133/// without a second DB round-trip.
134///
135/// Public via the parent module so the
136/// `memory_dependents_of_invalidated` MCP tool can call it directly
137/// without re-running the walker.
138///
139/// # Errors
140///
141/// Bubbles up rusqlite errors from the inner JOIN.
142pub fn list_dependents_of_invalidated(
143    conn: &Connection,
144    invalidated_id: &str,
145) -> Result<Vec<DependentRecord>> {
146    let rows = list_dependents_of_invalidated_internal(conn, invalidated_id)?;
147    Ok(rows
148        .into_iter()
149        .map(|(id, namespace)| DependentRecord { id, namespace })
150        .collect())
151}
152
153/// Wire shape for the `memory_dependents_of_invalidated` MCP tool.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct DependentRecord {
156    pub id: String,
157    pub namespace: String,
158}
159
160/// SQL helper: pull `(dependent_id, dependent_namespace)` for every
161/// inbound `reflects_on` edge pointed at `invalidated_id`.
162fn list_dependents_of_invalidated_internal(
163    conn: &Connection,
164    invalidated_id: &str,
165) -> Result<Vec<(String, String)>> {
166    let mut stmt = conn.prepare(
167        "SELECT m.id, m.namespace
168           FROM memory_links l
169           JOIN memories m ON m.id = l.source_id
170          WHERE l.target_id = ?1 AND l.relation = 'reflects_on'",
171    )?;
172    let rows = stmt.query_map(params![invalidated_id], |row| {
173        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
174    })?;
175    let mut out: Vec<(String, String)> = Vec::new();
176    for r in rows {
177        out.push(r?);
178    }
179    Ok(out)
180}
181
182/// Compose `<namespace>/_invalidations` for the notification target.
183///
184/// Hierarchical namespaces (e.g. `team/foo`) get the
185/// `_invalidations` suffix appended after the deepest segment so the
186/// dependent's parent scope still owns the notification.
187fn invalidations_namespace_for(parent: &str) -> String {
188    format!("{parent}/_invalidations")
189}
190
191/// Persist one notification memory + one `signed_events` row.
192///
193/// The notification memory is `Tier::Mid` (7-day TTL) — long enough
194/// for a weekly curator pass to surface it, short enough that an
195/// abandoned notification doesn't permanently bloat the namespace.
196/// Operators that want the audit trail forever can re-promote a
197/// notification to `Long` tier via `memory_promote` after triage.
198fn write_notification(
199    conn: &Connection,
200    pending: &PendingNotification,
201    signing_agent_id: &str,
202) -> Result<()> {
203    let now = pending.timestamp.clone();
204    let target_namespace = invalidations_namespace_for(&pending.dependent_namespace);
205
206    // Title carries the dependent + invalidated pair so the
207    // namespace upsert is idempotent on the (dependent, invalidated)
208    // pair — re-invoking the walker for the same pair doesn't
209    // multiply rows.
210    let title = format!(
211        "invalidation: {} -> {}",
212        pending.invalidated_id, pending.dependent_id
213    );
214
215    let metadata = json!({
216        "agent_id": signing_agent_id,
217        "notification_kind": "reflection_invalidation",
218        "dependent_id": pending.dependent_id,
219        "invalidated_id": pending.invalidated_id,
220        "invalidating_id": pending.invalidating_id,
221        "timestamp": pending.timestamp,
222    });
223
224    let mem = Memory {
225        id: uuid::Uuid::new_v4().to_string(),
226        tier: Tier::Mid,
227        namespace: target_namespace,
228        title,
229        content: format!(
230            "Reflection {invalidated} was superseded by {invalidating}. \
231             Memory {dependent} reflects_on the now-invalidated reflection \
232             and may need re-evaluation.",
233            invalidated = pending.invalidated_id,
234            invalidating = pending.invalidating_id,
235            dependent = pending.dependent_id,
236        ),
237        tags: vec!["_invalidation".to_string()],
238        priority: 7,
239        confidence: 1.0,
240        source: "notification".to_string(),
241        access_count: 0,
242        created_at: now.clone(),
243        updated_at: now,
244        last_accessed_at: None,
245        expires_at: None, // filled in by storage::insert via tier default
246        metadata,
247        reflection_depth: 0,
248        memory_kind: MemoryKind::Observation,
249        entity_id: None,
250        persona_version: None,
251        citations: Vec::new(),
252        source_uri: None,
253        source_span: None,
254        confidence_source: ConfidenceSource::CallerProvided,
255        confidence_signals: None,
256        confidence_decayed_at: None,
257        version: 1,
258    };
259
260    crate::storage::insert(conn, &mem)?;
261
262    // Audit row: lets a downstream auditor replay every dependent
263    // that was flagged for a given invalidation without scanning
264    // the namespace.
265    let payload_bytes = json!({
266        "event": "reflection.invalidation_notified",
267        "dependent_id": pending.dependent_id,
268        "invalidated_id": pending.invalidated_id,
269        "invalidating_id": pending.invalidating_id,
270        "timestamp": pending.timestamp,
271    })
272    .to_string()
273    .into_bytes();
274
275    let event = crate::signed_events::SignedEvent {
276        id: uuid::Uuid::new_v4().to_string(),
277        agent_id: signing_agent_id.to_string(),
278        event_type: crate::signed_events::event_types::REFLECTION_INVALIDATION_NOTIFIED.to_string(),
279        payload_hash: crate::signed_events::payload_hash(&payload_bytes),
280        signature: None,
281        attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
282        timestamp: pending.timestamp.clone(),
283        ..crate::signed_events::SignedEvent::default()
284    };
285    if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
286        // Best-effort — the notification memory itself is the
287        // load-bearing artifact. Log loudly so the operator catches
288        // a torn write but don't fail the walker (other dependents
289        // still need their notifications).
290        tracing::warn!(
291            target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
292            dependent_id = %pending.dependent_id,
293            invalidated_id = %pending.invalidated_id,
294            "failed to append reflection.invalidation_notified row: {e}"
295        );
296    }
297
298    Ok(())
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304    use crate::models::Memory;
305    use crate::storage as db;
306
307    fn fresh_conn() -> Connection {
308        db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
309    }
310
311    fn make_mem(title: &str, namespace: &str, kind: MemoryKind) -> Memory {
312        let now = chrono::Utc::now().to_rfc3339();
313        Memory {
314            id: uuid::Uuid::new_v4().to_string(),
315            tier: Tier::Mid,
316            namespace: namespace.to_string(),
317            title: title.to_string(),
318            content: format!("body {title}"),
319            tags: vec![],
320            priority: 5,
321            confidence: 1.0,
322            source: "test".to_string(),
323            access_count: 0,
324            created_at: now.clone(),
325            updated_at: now,
326            last_accessed_at: None,
327            expires_at: None,
328            metadata: json!({"agent_id": "ai:tester"}),
329            reflection_depth: if matches!(kind, MemoryKind::Reflection) {
330                1
331            } else {
332                0
333            },
334            memory_kind: kind,
335            entity_id: None,
336            persona_version: None,
337            citations: Vec::new(),
338            source_uri: None,
339            source_span: None,
340            confidence_source: ConfidenceSource::CallerProvided,
341            confidence_signals: None,
342            confidence_decayed_at: None,
343            version: 1,
344        }
345    }
346
347    #[test]
348    fn invalidations_namespace_appends_underscore_segment() {
349        assert_eq!(
350            invalidations_namespace_for("team/alpha"),
351            "team/alpha/_invalidations"
352        );
353        assert_eq!(
354            invalidations_namespace_for("global"),
355            "global/_invalidations"
356        );
357    }
358
359    #[test]
360    fn list_dependents_returns_inbound_reflects_on_only() {
361        let conn = fresh_conn();
362        // R1 (reflection) is the target of two reflects_on edges and one
363        // related_to edge. Only the reflects_on rows should surface.
364        let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
365        let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
366        let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
367        let m3 = make_mem("M3", "ns-a", MemoryKind::Observation);
368        let r1_id = db::insert(&conn, &r1).expect("insert r1");
369        let m1_id = db::insert(&conn, &m1).expect("insert m1");
370        let m2_id = db::insert(&conn, &m2).expect("insert m2");
371        let m3_id = db::insert(&conn, &m3).expect("insert m3");
372        db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("link m1");
373        db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("link m2");
374        db::create_link(&conn, &m3_id, &r1_id, "related_to").expect("link m3 (noise)");
375
376        let deps = list_dependents_of_invalidated(&conn, &r1_id).expect("walk");
377        let ids: Vec<&str> = deps.iter().map(|d| d.id.as_str()).collect();
378        assert_eq!(ids.len(), 2, "only reflects_on edges count, got {ids:?}");
379        assert!(ids.contains(&m1_id.as_str()));
380        assert!(ids.contains(&m2_id.as_str()));
381        assert!(!ids.contains(&m3_id.as_str()), "related_to leaked through");
382    }
383
384    #[test]
385    fn propagate_writes_one_notification_per_dependent() {
386        let conn = fresh_conn();
387        let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
388        let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
389        let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
390        let m2 = make_mem("M2", "ns-b", MemoryKind::Observation);
391        let r1_id = db::insert(&conn, &r1).expect("insert r1");
392        let r2_id = db::insert(&conn, &r2).expect("insert r2");
393        let m1_id = db::insert(&conn, &m1).expect("insert m1");
394        let m2_id = db::insert(&conn, &m2).expect("insert m2");
395        db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
396        db::create_link(&conn, &m2_id, &r1_id, "reflects_on").expect("m2→r1");
397
398        let notified =
399            propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
400        assert_eq!(notified.len(), 2);
401
402        // Notification rows landed in the dependent's namespace under
403        // /_invalidations.
404        let count: i64 = conn
405            .query_row(
406                "SELECT COUNT(*) FROM memories WHERE namespace = ?1",
407                params!["ns-a/_invalidations"],
408                |r| r.get(0),
409            )
410            .unwrap();
411        assert_eq!(count, 1, "ns-a got 1 notification (for m1)");
412
413        let count_b: i64 = conn
414            .query_row(
415                "SELECT COUNT(*) FROM memories WHERE namespace = ?1",
416                params!["ns-b/_invalidations"],
417                |r| r.get(0),
418            )
419            .unwrap();
420        assert_eq!(count_b, 1, "ns-b got 1 notification (for m2)");
421    }
422
423    #[test]
424    fn propagate_records_signed_events_row_per_notification() {
425        let conn = fresh_conn();
426        let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
427        let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
428        let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
429        let r1_id = db::insert(&conn, &r1).expect("insert r1");
430        let r2_id = db::insert(&conn, &r2).expect("insert r2");
431        let m1_id = db::insert(&conn, &m1).expect("insert m1");
432        db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
433
434        let _ =
435            propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
436
437        let cnt: i64 = conn
438            .query_row(
439                "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
440                params!["reflection.invalidation_notified"],
441                |r| r.get(0),
442            )
443            .unwrap_or(0);
444        assert_eq!(cnt, 1, "one signed_events row per notification");
445    }
446
447    #[test]
448    fn propagate_with_no_dependents_is_a_no_op() {
449        let conn = fresh_conn();
450        let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
451        let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
452        let r1_id = db::insert(&conn, &r1).expect("insert r1");
453        let r2_id = db::insert(&conn, &r2).expect("insert r2");
454        let notified =
455            propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
456        assert!(notified.is_empty());
457        let count: i64 = conn
458            .query_row(
459                "SELECT COUNT(*) FROM memories WHERE namespace LIKE '%_invalidations'",
460                [],
461                |r| r.get(0),
462            )
463            .unwrap();
464        assert_eq!(count, 0);
465    }
466
467    #[test]
468    fn metadata_carries_all_four_required_fields() {
469        let conn = fresh_conn();
470        let r1 = make_mem("R1", "ns-a", MemoryKind::Reflection);
471        let r2 = make_mem("R2", "ns-a", MemoryKind::Reflection);
472        let m1 = make_mem("M1", "ns-a", MemoryKind::Observation);
473        let r1_id = db::insert(&conn, &r1).expect("insert r1");
474        let r2_id = db::insert(&conn, &r2).expect("insert r2");
475        let m1_id = db::insert(&conn, &m1).expect("insert m1");
476        db::create_link(&conn, &m1_id, &r1_id, "reflects_on").expect("m1→r1");
477
478        let _ =
479            propagate_reflection_invalidation(&conn, &r1_id, &r2_id, "ai:tester").expect("walk");
480
481        let meta_str: String = conn
482            .query_row(
483                "SELECT metadata FROM memories WHERE namespace = ?1 LIMIT 1",
484                params!["ns-a/_invalidations"],
485                |r| r.get(0),
486            )
487            .unwrap();
488        let meta: serde_json::Value = serde_json::from_str(&meta_str).unwrap();
489        assert_eq!(meta["dependent_id"].as_str(), Some(m1_id.as_str()));
490        assert_eq!(meta["invalidated_id"].as_str(), Some(r1_id.as_str()));
491        assert_eq!(meta["invalidating_id"].as_str(), Some(r2_id.as_str()));
492        assert!(meta["timestamp"].is_string());
493        assert_eq!(
494            meta["notification_kind"].as_str(),
495            Some("reflection_invalidation")
496        );
497    }
498}