Skip to main content

ai_memory/federation/
reflection_bookkeeping.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 L2-2 — cross-peer `reflection_depth` bookkeeping.
5//!
6//! The recursive-learning primitive (Task 4/8) caps `reflection_depth`
7//! on the LOCAL host via `GovernancePolicy.effective_max_reflection_depth`.
8//! Without cross-peer bookkeeping a peer can sync a depth-N reflection
9//! into the substrate that would have been refused locally — and the
10//! local curator can then derive further reflections from it,
11//! laundering depth through federation.
12//!
13//! L2-2 closes that with three guarantees on the receive path:
14//!
15//! 1. **Origin recording.** When a reflection memory (a row whose
16//!    `reflection_depth > 0` or whose `metadata.reflection_metadata`
17//!    is present) lands via `sync_push`, the receiver stamps two
18//!    fields under `metadata.reflection_origin`:
19//!     - `peer_origin` — the `sender_agent_id` claim from the push
20//!       envelope (the substrate-identity of the peer that pushed
21//!       the row to us; not the original author, which is captured
22//!       by `metadata.agent_id`).
23//!     - `original_depth` — the depth the row carried in transit, so
24//!       a later auditor can see what the source peer claimed.
25//!     - `local_depth_at_arrival` — the local
26//!       `effective_max_reflection_depth` at the moment the row
27//!       arrived, so an after-the-fact tightening of the cap is
28//!       visible on every imported row.
29//!
30//!    The original `reflection_depth` column itself is **preserved**
31//!    — federation never silently rewrites depth. The local cap is
32//!    enforced on **derived** writes, not on the inbound import.
33//!
34//! 2. **Derived-write enforcement.** [`enforce_local_cap_on_derived`]
35//!    is invoked by `storage::reflect` (Task 4/8) before any NEW
36//!    reflection lands. It computes the proposed `new_depth` against
37//!    the LOCAL cap regardless of whether one or more sources are
38//!    imported rows — the cap is local territorial sovereignty, not
39//!    a remote peer's permission. Already enforced by the existing
40//!    `reflect` path; the function here is the named hook so the
41//!    audit emitter can surface the cross-peer context.
42//!
43//! 3. **Inspection surface.** [`reflection_origin`] returns the
44//!    structured record for any memory id so the MCP
45//!    `memory_reflection_origin` tool (and operators) can answer
46//!    "where did this reflection come from?".
47//!
48//! This module is a substrate-layer helper. The HTTP receive path
49//! (`handlers::federation_receive::sync_push`) calls
50//! [`stamp_reflection_origin`] on every applied reflection memory;
51//! the MCP `memory_reflection_origin` handler calls
52//! [`reflection_origin`] for read-side queries.
53
54use crate::models::field_names;
55use anyhow::{Context, Result};
56use rusqlite::Connection;
57use serde::{Deserialize, Serialize};
58use serde_json::{Map, Value};
59
60use crate::models::Memory;
61
62/// v0.7.0 L2-2 — metadata sub-object key holding the imported-from-peer
63/// provenance. Lives under `Memory.metadata.reflection_origin` so the
64/// reflection's own `metadata.reflection_metadata` (Task 4/8 substrate
65/// stamp) stays untouched.
66pub const REFLECTION_ORIGIN_KEY: &str = "reflection_origin";
67
68/// v0.7.0 L2-2 — structured record returned by [`reflection_origin`].
69/// Mirrors the wire shape of the `memory_reflection_origin` MCP tool.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct ReflectionOrigin {
72    /// The id of the memory the record describes.
73    pub memory_id: String,
74    /// `sender_agent_id` from the push envelope that delivered the row.
75    /// `None` for locally authored reflections (no peer_origin stamp).
76    pub peer_origin: Option<String>,
77    /// `metadata.agent_id` — the original signer (NHI). May differ from
78    /// `peer_origin` when an intermediate peer re-broadcasts a row it
79    /// itself received from upstream.
80    pub signing_agent: Option<String>,
81    /// `reflection_depth` the row carried in transit. Always populated
82    /// when the row is a reflection (depth > 0).
83    pub original_depth: i32,
84    /// Snapshot of the local `effective_max_reflection_depth` at the
85    /// moment this row was first imported. `None` when the row was
86    /// authored locally (no import event to anchor against).
87    pub local_depth_at_arrival: Option<u32>,
88    /// `true` if the row is a reflection (depth > 0) regardless of
89    /// import path; lets callers handle a non-reflection lookup with
90    /// a clean response shape rather than a 404.
91    pub is_reflection: bool,
92}
93
94/// v0.7.0 L2-2 — non-destructive metadata patch: stamps
95/// `metadata.reflection_origin = { peer_origin, original_depth,
96/// local_depth_at_arrival }` onto an inbound reflection memory's
97/// metadata BEFORE it gets persisted via `insert_if_newer`.
98///
99/// Returns a mutated [`Memory`] with the patched metadata. The original
100/// `reflection_depth` column is untouched — federation never silently
101/// rewrites depth. The local cap enforcement happens later on derived
102/// writes (see [`enforce_local_cap_on_derived`]).
103///
104/// Idempotent: if the row already carries a `reflection_origin` block
105/// (e.g., we are processing the same push twice on a retry), the
106/// existing stamp is preserved. The first peer to deliver the row wins
107/// the origin record; downstream re-fans don't overwrite it.
108///
109/// Only reflection rows (`reflection_depth > 0`) get the stamp. Plain
110/// memory rows pass through unchanged so the metadata bloat is bounded
111/// to the reflection subgraph.
112#[must_use]
113pub fn stamp_reflection_origin(mem: &Memory, sender_agent_id: &str, local_cap: u32) -> Memory {
114    // Non-reflections: untouched.
115    if mem.reflection_depth <= 0 {
116        return mem.clone();
117    }
118    let mut out = mem.clone();
119    // Coerce metadata to an object (the canonical shape for memories);
120    // if a peer sent us something weird (array / scalar / null), replace
121    // it with a fresh object so the stamp lands cleanly.
122    let mut meta_map: Map<String, Value> = match out.metadata.take() {
123        Value::Object(m) => m,
124        _ => Map::new(),
125    };
126    // Idempotency: existing stamp wins. First delivery records the
127    // peer; later re-fans never overwrite the substrate-of-record.
128    if !meta_map.contains_key(REFLECTION_ORIGIN_KEY) {
129        let stamp = serde_json::json!({
130            (field_names::PEER_ORIGIN): sender_agent_id,
131            (field_names::ORIGINAL_DEPTH): mem.reflection_depth,
132            (field_names::LOCAL_DEPTH_AT_ARRIVAL): local_cap,
133        });
134        meta_map.insert(REFLECTION_ORIGIN_KEY.to_string(), stamp);
135    }
136    out.metadata = Value::Object(meta_map);
137    out
138}
139
140/// v0.7.0 L2-2 — read-side lookup for the [`ReflectionOrigin`] record
141/// of a memory by id. Backs the MCP `memory_reflection_origin` tool.
142///
143/// Returns `Ok(None)` when the id does not exist; returns a populated
144/// record (with `is_reflection = false`) when the id exists but is not
145/// a reflection — callers can then surface a clean "this memory is not
146/// a reflection" response rather than a 404, which keeps the MCP tool's
147/// shape uniform across reflection / non-reflection inputs.
148///
149/// # Errors
150///
151/// Wrapped rusqlite/SQL errors from the underlying `db::get` call.
152pub fn reflection_origin(conn: &Connection, id: &str) -> Result<Option<ReflectionOrigin>> {
153    let mem = match crate::storage::get(conn, id).context("storage::get for reflection_origin")? {
154        Some(m) => m,
155        None => return Ok(None),
156    };
157    Ok(Some(reflection_origin_from_memory(&mem)))
158}
159
160/// Pure derivation of a [`ReflectionOrigin`] from an already-fetched
161/// [`Memory`] — the storage-agnostic half of [`reflection_origin`].
162/// Shared by the sqlite path (which fetches via `storage::get`) and the
163/// postgres SAL path (which fetches via the `MemoryStore` trait) so the
164/// origin-metadata derivation lives in exactly one place.
165#[must_use]
166pub fn reflection_origin_from_memory(mem: &Memory) -> ReflectionOrigin {
167    let is_reflection = mem.reflection_depth > 0;
168    let signing_agent = mem
169        .metadata
170        .get("agent_id")
171        .and_then(Value::as_str)
172        .map(str::to_string);
173    let origin_obj = mem.metadata.get(REFLECTION_ORIGIN_KEY);
174    let peer_origin = origin_obj
175        .and_then(|v| v.get(field_names::PEER_ORIGIN))
176        .and_then(Value::as_str)
177        .map(str::to_string);
178    let local_depth_at_arrival = origin_obj
179        .and_then(|v| v.get(field_names::LOCAL_DEPTH_AT_ARRIVAL))
180        .and_then(Value::as_u64)
181        .and_then(|n| u32::try_from(n).ok());
182    ReflectionOrigin {
183        memory_id: mem.id.clone(),
184        peer_origin,
185        signing_agent,
186        original_depth: mem.reflection_depth,
187        local_depth_at_arrival,
188        is_reflection,
189    }
190}
191
192/// v0.7.0 L2-2 — enforcement hook for the LOCAL `max_reflection_depth`
193/// cap on derived reflections. Called from the storage `reflect` path
194/// before the new row commits, BUT the actual cap check already lives
195/// in `storage::reflect::reflect`; this function is the named explainer
196/// so the cross-peer context can be surfaced in audit + refusal text.
197///
198/// Given the source memories (including any imported ones) and the
199/// proposed new depth, returns:
200/// - `Ok(())` when the local cap is satisfied,
201/// - `Err(LocalCapRefusal { ... })` with a refusal reason that names
202///   the imported source's `peer_origin` when at least one source has
203///   a `reflection_origin` stamp (so the operator sees the cross-peer
204///   provenance in the refusal message).
205///
206/// # Errors
207///
208/// Returns [`LocalCapRefusal`] when `new_depth > local_cap`. Callers
209/// map this back to `MemoryError::ReflectionDepthExceeded` for the
210/// HTTP wire envelope.
211pub fn enforce_local_cap_on_derived(
212    new_depth: u32,
213    local_cap: u32,
214    sources: &[Memory],
215) -> std::result::Result<(), LocalCapRefusal> {
216    if new_depth <= local_cap {
217        return Ok(());
218    }
219    // Identify any source whose `reflection_origin.peer_origin` is set
220    // — those are the imported sources that drove the depth over the
221    // local cap. Surface the first such peer in the refusal reason so
222    // operators can see WHERE the depth came from at a glance.
223    let imported_peer = sources.iter().find_map(|m| {
224        m.metadata
225            .get(REFLECTION_ORIGIN_KEY)
226            .and_then(|v| v.get(field_names::PEER_ORIGIN))
227            .and_then(Value::as_str)
228            .map(str::to_string)
229    });
230    let max_source_depth = sources
231        .iter()
232        .map(|m| m.reflection_depth)
233        .max()
234        .unwrap_or(0)
235        .max(0);
236    Err(LocalCapRefusal {
237        attempted: new_depth,
238        local_cap,
239        max_source_depth: u32::try_from(max_source_depth).unwrap_or(u32::MAX),
240        imported_peer,
241    })
242}
243
244/// v0.7.0 L2-2 — refusal record returned by [`enforce_local_cap_on_derived`].
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct LocalCapRefusal {
247    /// Depth the curator attempted to write.
248    pub attempted: u32,
249    /// Local namespace cap that gated the write.
250    pub local_cap: u32,
251    /// Max `reflection_depth` across the supplied sources.
252    pub max_source_depth: u32,
253    /// First imported source's `peer_origin`, if any. `None` when no
254    /// source has an import stamp (purely local subgraph).
255    pub imported_peer: Option<String>,
256}
257
258impl std::fmt::Display for LocalCapRefusal {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        match self.imported_peer.as_deref() {
261            Some(peer) => write!(
262                f,
263                "remote reflection at depth {} (from peer {}), local depth limit {}",
264                self.max_source_depth, peer, self.local_cap,
265            ),
266            None => write!(
267                f,
268                "reflection depth {} would exceed local cap {}",
269                self.attempted, self.local_cap,
270            ),
271        }
272    }
273}
274
275impl std::error::Error for LocalCapRefusal {}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::models::Tier;
281    use chrono::Utc;
282
283    fn reflection_memory(id: &str, depth: i32) -> Memory {
284        let now = Utc::now().to_rfc3339();
285        Memory {
286            id: id.to_string(),
287            tier: Tier::Mid,
288            namespace: "test".to_string(),
289            title: format!("reflection-{id}"),
290            content: "body".to_string(),
291            tags: vec![],
292            priority: 5,
293            confidence: 1.0,
294            source: "test".to_string(),
295            access_count: 0,
296            created_at: now.clone(),
297            updated_at: now,
298            last_accessed_at: None,
299            expires_at: None,
300            metadata: serde_json::json!({"agent_id": "ai:test"}),
301            reflection_depth: depth,
302            memory_kind: crate::models::MemoryKind::Observation,
303            entity_id: None,
304            persona_version: None,
305            citations: Vec::new(),
306            source_uri: None,
307            source_span: None,
308            confidence_source: crate::models::ConfidenceSource::CallerProvided,
309            confidence_signals: None,
310            confidence_decayed_at: None,
311            version: 1,
312        }
313    }
314
315    // ── #1549 reflection_origin_from_memory coverage ─────────────────
316    #[test]
317    fn reflection_origin_from_memory_derives_all_fields() {
318        let mut mem = reflection_memory("m-derive", 2);
319        let mut meta = serde_json::Map::new();
320        meta.insert("agent_id".to_string(), serde_json::json!("ai:signer@host"));
321        meta.insert(
322            REFLECTION_ORIGIN_KEY.to_string(),
323            serde_json::json!({ "peer_origin": "peer-x", "local_depth_at_arrival": 5 }),
324        );
325        mem.metadata = serde_json::Value::Object(meta);
326        let origin = reflection_origin_from_memory(&mem);
327        assert_eq!(origin.memory_id, "m-derive");
328        assert!(origin.is_reflection);
329        assert_eq!(origin.original_depth, 2);
330        assert_eq!(origin.signing_agent.as_deref(), Some("ai:signer@host"));
331        assert_eq!(origin.peer_origin.as_deref(), Some("peer-x"));
332        assert_eq!(origin.local_depth_at_arrival, Some(5));
333    }
334
335    #[test]
336    fn reflection_origin_from_memory_non_reflection_is_flagged_false() {
337        let origin = reflection_origin_from_memory(&reflection_memory("m-base", 0));
338        assert!(!origin.is_reflection);
339        assert_eq!(origin.original_depth, 0);
340        assert!(origin.peer_origin.is_none());
341        assert!(origin.local_depth_at_arrival.is_none());
342    }
343
344    #[test]
345    fn stamp_skips_non_reflection() {
346        let mut mem = reflection_memory("m1", 0);
347        let before = mem.metadata.clone();
348        let stamped = stamp_reflection_origin(&mem, "peer-A", 3);
349        assert_eq!(stamped.metadata, before);
350        mem.reflection_depth = 0;
351        assert!(stamped.metadata.get(REFLECTION_ORIGIN_KEY).is_none());
352    }
353
354    #[test]
355    fn stamp_records_peer_and_local_cap() {
356        let mem = reflection_memory("m1", 2);
357        let stamped = stamp_reflection_origin(&mem, "peer-A", 3);
358        let origin = stamped
359            .metadata
360            .get(REFLECTION_ORIGIN_KEY)
361            .expect("origin stamped");
362        assert_eq!(origin["peer_origin"].as_str(), Some("peer-A"));
363        assert_eq!(origin["original_depth"].as_i64(), Some(2));
364        assert_eq!(origin["local_depth_at_arrival"].as_u64(), Some(3));
365    }
366
367    #[test]
368    fn stamp_is_idempotent_first_writer_wins() {
369        let mem = reflection_memory("m1", 2);
370        let first = stamp_reflection_origin(&mem, "peer-A", 3);
371        let second = stamp_reflection_origin(&first, "peer-B", 5);
372        let origin = second
373            .metadata
374            .get(REFLECTION_ORIGIN_KEY)
375            .expect("origin preserved");
376        // peer-A wins; peer-B's re-fan didn't overwrite.
377        assert_eq!(origin["peer_origin"].as_str(), Some("peer-A"));
378        assert_eq!(origin["local_depth_at_arrival"].as_u64(), Some(3));
379    }
380
381    #[test]
382    fn enforce_local_cap_allows_when_under_limit() {
383        let sources = vec![reflection_memory("s1", 1)];
384        assert!(enforce_local_cap_on_derived(2, 3, &sources).is_ok());
385    }
386
387    #[test]
388    fn enforce_local_cap_refuses_with_imported_peer_named() {
389        let mut imported = reflection_memory("s1", 2);
390        imported.metadata = serde_json::json!({
391            "agent_id": "ai:test",
392            REFLECTION_ORIGIN_KEY: {
393                "peer_origin": "peer-A",
394                "original_depth": 2,
395                "local_depth_at_arrival": 3,
396            },
397        });
398        let refusal = enforce_local_cap_on_derived(3, 2, &[imported]).unwrap_err();
399        let msg = refusal.to_string();
400        assert!(
401            msg.contains("peer-A") && msg.contains("local depth limit 2"),
402            "refusal msg should name peer + local cap: {msg}"
403        );
404    }
405}