ai_memory/federation/reflection_bookkeeping.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 L2-2 — cross-peer `reflection_depth` bookkeeping.
5//!
6//! The recursive-learning primitive (Task 4/8) caps `reflection_depth`
7//! on the LOCAL host via `GovernancePolicy.effective_max_reflection_depth`.
8//! Without cross-peer bookkeeping a peer can sync a depth-N reflection
9//! into the substrate that would have been refused locally — and the
10//! local curator can then derive further reflections from it,
11//! laundering depth through federation.
12//!
13//! L2-2 closes that with three guarantees on the receive path:
14//!
15//! 1. **Origin recording.** When a reflection memory (a row whose
16//! `reflection_depth > 0` or whose `metadata.reflection_metadata`
17//! is present) lands via `sync_push`, the receiver stamps two
18//! fields under `metadata.reflection_origin`:
19//! - `peer_origin` — the `sender_agent_id` claim from the push
20//! envelope (the substrate-identity of the peer that pushed
21//! the row to us; not the original author, which is captured
22//! by `metadata.agent_id`).
23//! - `original_depth` — the depth the row carried in transit, so
24//! a later auditor can see what the source peer claimed.
25//! - `local_depth_at_arrival` — the local
26//! `effective_max_reflection_depth` at the moment the row
27//! arrived, so an after-the-fact tightening of the cap is
28//! visible on every imported row.
29//!
30//! The original `reflection_depth` column itself is **preserved**
31//! — federation never silently rewrites depth. The local cap is
32//! enforced on **derived** writes, not on the inbound import.
33//!
34//! 2. **Derived-write enforcement.** [`enforce_local_cap_on_derived`]
35//! is invoked by `storage::reflect` (Task 4/8) before any NEW
36//! reflection lands. It computes the proposed `new_depth` against
37//! the LOCAL cap regardless of whether one or more sources are
38//! imported rows — the cap is local territorial sovereignty, not
39//! a remote peer's permission. Already enforced by the existing
40//! `reflect` path; the function here is the named hook so the
41//! audit emitter can surface the cross-peer context.
42//!
43//! 3. **Inspection surface.** [`reflection_origin`] returns the
44//! structured record for any memory id so the MCP
45//! `memory_reflection_origin` tool (and operators) can answer
46//! "where did this reflection come from?".
47//!
48//! This module is a substrate-layer helper. The HTTP receive path
49//! (`handlers::federation_receive::sync_push`) calls
50//! [`stamp_reflection_origin`] on every applied reflection memory;
51//! the MCP `memory_reflection_origin` handler calls
52//! [`reflection_origin`] for read-side queries.
53
54use crate::models::field_names;
55use anyhow::{Context, Result};
56use rusqlite::Connection;
57use serde::{Deserialize, Serialize};
58use serde_json::{Map, Value};
59
60use crate::models::Memory;
61
62/// v0.7.0 L2-2 — metadata sub-object key holding the imported-from-peer
63/// provenance. Lives under `Memory.metadata.reflection_origin` so the
64/// reflection's own `metadata.reflection_metadata` (Task 4/8 substrate
65/// stamp) stays untouched.
66pub const REFLECTION_ORIGIN_KEY: &str = "reflection_origin";
67
68/// v0.7.0 L2-2 — structured record returned by [`reflection_origin`].
69/// Mirrors the wire shape of the `memory_reflection_origin` MCP tool.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct ReflectionOrigin {
72 /// The id of the memory the record describes.
73 pub memory_id: String,
74 /// `sender_agent_id` from the push envelope that delivered the row.
75 /// `None` for locally authored reflections (no peer_origin stamp).
76 pub peer_origin: Option<String>,
77 /// `metadata.agent_id` — the original signer (NHI). May differ from
78 /// `peer_origin` when an intermediate peer re-broadcasts a row it
79 /// itself received from upstream.
80 pub signing_agent: Option<String>,
81 /// `reflection_depth` the row carried in transit. Always populated
82 /// when the row is a reflection (depth > 0).
83 pub original_depth: i32,
84 /// Snapshot of the local `effective_max_reflection_depth` at the
85 /// moment this row was first imported. `None` when the row was
86 /// authored locally (no import event to anchor against).
87 pub local_depth_at_arrival: Option<u32>,
88 /// `true` if the row is a reflection (depth > 0) regardless of
89 /// import path; lets callers handle a non-reflection lookup with
90 /// a clean response shape rather than a 404.
91 pub is_reflection: bool,
92}
93
94/// v0.7.0 L2-2 — non-destructive metadata patch: stamps
95/// `metadata.reflection_origin = { peer_origin, original_depth,
96/// local_depth_at_arrival }` onto an inbound reflection memory's
97/// metadata BEFORE it gets persisted via `insert_if_newer`.
98///
99/// Returns a mutated [`Memory`] with the patched metadata. The original
100/// `reflection_depth` column is untouched — federation never silently
101/// rewrites depth. The local cap enforcement happens later on derived
102/// writes (see [`enforce_local_cap_on_derived`]).
103///
104/// Idempotent: if the row already carries a `reflection_origin` block
105/// (e.g., we are processing the same push twice on a retry), the
106/// existing stamp is preserved. The first peer to deliver the row wins
107/// the origin record; downstream re-fans don't overwrite it.
108///
109/// Only reflection rows (`reflection_depth > 0`) get the stamp. Plain
110/// memory rows pass through unchanged so the metadata bloat is bounded
111/// to the reflection subgraph.
112#[must_use]
113pub fn stamp_reflection_origin(mem: &Memory, sender_agent_id: &str, local_cap: u32) -> Memory {
114 // Non-reflections: untouched.
115 if mem.reflection_depth <= 0 {
116 return mem.clone();
117 }
118 let mut out = mem.clone();
119 // Coerce metadata to an object (the canonical shape for memories);
120 // if a peer sent us something weird (array / scalar / null), replace
121 // it with a fresh object so the stamp lands cleanly.
122 let mut meta_map: Map<String, Value> = match out.metadata.take() {
123 Value::Object(m) => m,
124 _ => Map::new(),
125 };
126 // Idempotency: existing stamp wins. First delivery records the
127 // peer; later re-fans never overwrite the substrate-of-record.
128 if !meta_map.contains_key(REFLECTION_ORIGIN_KEY) {
129 let stamp = serde_json::json!({
130 (field_names::PEER_ORIGIN): sender_agent_id,
131 (field_names::ORIGINAL_DEPTH): mem.reflection_depth,
132 (field_names::LOCAL_DEPTH_AT_ARRIVAL): local_cap,
133 });
134 meta_map.insert(REFLECTION_ORIGIN_KEY.to_string(), stamp);
135 }
136 out.metadata = Value::Object(meta_map);
137 out
138}
139
140/// v0.7.0 L2-2 — read-side lookup for the [`ReflectionOrigin`] record
141/// of a memory by id. Backs the MCP `memory_reflection_origin` tool.
142///
143/// Returns `Ok(None)` when the id does not exist; returns a populated
144/// record (with `is_reflection = false`) when the id exists but is not
145/// a reflection — callers can then surface a clean "this memory is not
146/// a reflection" response rather than a 404, which keeps the MCP tool's
147/// shape uniform across reflection / non-reflection inputs.
148///
149/// # Errors
150///
151/// Wrapped rusqlite/SQL errors from the underlying `db::get` call.
152pub fn reflection_origin(conn: &Connection, id: &str) -> Result<Option<ReflectionOrigin>> {
153 let mem = match crate::storage::get(conn, id).context("storage::get for reflection_origin")? {
154 Some(m) => m,
155 None => return Ok(None),
156 };
157 Ok(Some(reflection_origin_from_memory(&mem)))
158}
159
160/// Pure derivation of a [`ReflectionOrigin`] from an already-fetched
161/// [`Memory`] — the storage-agnostic half of [`reflection_origin`].
162/// Shared by the sqlite path (which fetches via `storage::get`) and the
163/// postgres SAL path (which fetches via the `MemoryStore` trait) so the
164/// origin-metadata derivation lives in exactly one place.
165#[must_use]
166pub fn reflection_origin_from_memory(mem: &Memory) -> ReflectionOrigin {
167 let is_reflection = mem.reflection_depth > 0;
168 let signing_agent = mem
169 .metadata
170 .get("agent_id")
171 .and_then(Value::as_str)
172 .map(str::to_string);
173 let origin_obj = mem.metadata.get(REFLECTION_ORIGIN_KEY);
174 let peer_origin = origin_obj
175 .and_then(|v| v.get(field_names::PEER_ORIGIN))
176 .and_then(Value::as_str)
177 .map(str::to_string);
178 let local_depth_at_arrival = origin_obj
179 .and_then(|v| v.get(field_names::LOCAL_DEPTH_AT_ARRIVAL))
180 .and_then(Value::as_u64)
181 .and_then(|n| u32::try_from(n).ok());
182 ReflectionOrigin {
183 memory_id: mem.id.clone(),
184 peer_origin,
185 signing_agent,
186 original_depth: mem.reflection_depth,
187 local_depth_at_arrival,
188 is_reflection,
189 }
190}
191
192/// v0.7.0 L2-2 — enforcement hook for the LOCAL `max_reflection_depth`
193/// cap on derived reflections. Called from the storage `reflect` path
194/// before the new row commits, BUT the actual cap check already lives
195/// in `storage::reflect::reflect`; this function is the named explainer
196/// so the cross-peer context can be surfaced in audit + refusal text.
197///
198/// Given the source memories (including any imported ones) and the
199/// proposed new depth, returns:
200/// - `Ok(())` when the local cap is satisfied,
201/// - `Err(LocalCapRefusal { ... })` with a refusal reason that names
202/// the imported source's `peer_origin` when at least one source has
203/// a `reflection_origin` stamp (so the operator sees the cross-peer
204/// provenance in the refusal message).
205///
206/// # Errors
207///
208/// Returns [`LocalCapRefusal`] when `new_depth > local_cap`. Callers
209/// map this back to `MemoryError::ReflectionDepthExceeded` for the
210/// HTTP wire envelope.
211pub fn enforce_local_cap_on_derived(
212 new_depth: u32,
213 local_cap: u32,
214 sources: &[Memory],
215) -> std::result::Result<(), LocalCapRefusal> {
216 if new_depth <= local_cap {
217 return Ok(());
218 }
219 // Identify any source whose `reflection_origin.peer_origin` is set
220 // — those are the imported sources that drove the depth over the
221 // local cap. Surface the first such peer in the refusal reason so
222 // operators can see WHERE the depth came from at a glance.
223 let imported_peer = sources.iter().find_map(|m| {
224 m.metadata
225 .get(REFLECTION_ORIGIN_KEY)
226 .and_then(|v| v.get(field_names::PEER_ORIGIN))
227 .and_then(Value::as_str)
228 .map(str::to_string)
229 });
230 let max_source_depth = sources
231 .iter()
232 .map(|m| m.reflection_depth)
233 .max()
234 .unwrap_or(0)
235 .max(0);
236 Err(LocalCapRefusal {
237 attempted: new_depth,
238 local_cap,
239 max_source_depth: u32::try_from(max_source_depth).unwrap_or(u32::MAX),
240 imported_peer,
241 })
242}
243
244/// v0.7.0 L2-2 — refusal record returned by [`enforce_local_cap_on_derived`].
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct LocalCapRefusal {
247 /// Depth the curator attempted to write.
248 pub attempted: u32,
249 /// Local namespace cap that gated the write.
250 pub local_cap: u32,
251 /// Max `reflection_depth` across the supplied sources.
252 pub max_source_depth: u32,
253 /// First imported source's `peer_origin`, if any. `None` when no
254 /// source has an import stamp (purely local subgraph).
255 pub imported_peer: Option<String>,
256}
257
258impl std::fmt::Display for LocalCapRefusal {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 match self.imported_peer.as_deref() {
261 Some(peer) => write!(
262 f,
263 "remote reflection at depth {} (from peer {}), local depth limit {}",
264 self.max_source_depth, peer, self.local_cap,
265 ),
266 None => write!(
267 f,
268 "reflection depth {} would exceed local cap {}",
269 self.attempted, self.local_cap,
270 ),
271 }
272 }
273}
274
275impl std::error::Error for LocalCapRefusal {}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::models::Tier;
281 use chrono::Utc;
282
283 fn reflection_memory(id: &str, depth: i32) -> Memory {
284 let now = Utc::now().to_rfc3339();
285 Memory {
286 id: id.to_string(),
287 tier: Tier::Mid,
288 namespace: "test".to_string(),
289 title: format!("reflection-{id}"),
290 content: "body".to_string(),
291 tags: vec![],
292 priority: 5,
293 confidence: 1.0,
294 source: "test".to_string(),
295 access_count: 0,
296 created_at: now.clone(),
297 updated_at: now,
298 last_accessed_at: None,
299 expires_at: None,
300 metadata: serde_json::json!({"agent_id": "ai:test"}),
301 reflection_depth: depth,
302 memory_kind: crate::models::MemoryKind::Observation,
303 entity_id: None,
304 persona_version: None,
305 citations: Vec::new(),
306 source_uri: None,
307 source_span: None,
308 confidence_source: crate::models::ConfidenceSource::CallerProvided,
309 confidence_signals: None,
310 confidence_decayed_at: None,
311 version: 1,
312 }
313 }
314
315 // ── #1549 reflection_origin_from_memory coverage ─────────────────
316 #[test]
317 fn reflection_origin_from_memory_derives_all_fields() {
318 let mut mem = reflection_memory("m-derive", 2);
319 let mut meta = serde_json::Map::new();
320 meta.insert("agent_id".to_string(), serde_json::json!("ai:signer@host"));
321 meta.insert(
322 REFLECTION_ORIGIN_KEY.to_string(),
323 serde_json::json!({ "peer_origin": "peer-x", "local_depth_at_arrival": 5 }),
324 );
325 mem.metadata = serde_json::Value::Object(meta);
326 let origin = reflection_origin_from_memory(&mem);
327 assert_eq!(origin.memory_id, "m-derive");
328 assert!(origin.is_reflection);
329 assert_eq!(origin.original_depth, 2);
330 assert_eq!(origin.signing_agent.as_deref(), Some("ai:signer@host"));
331 assert_eq!(origin.peer_origin.as_deref(), Some("peer-x"));
332 assert_eq!(origin.local_depth_at_arrival, Some(5));
333 }
334
335 #[test]
336 fn reflection_origin_from_memory_non_reflection_is_flagged_false() {
337 let origin = reflection_origin_from_memory(&reflection_memory("m-base", 0));
338 assert!(!origin.is_reflection);
339 assert_eq!(origin.original_depth, 0);
340 assert!(origin.peer_origin.is_none());
341 assert!(origin.local_depth_at_arrival.is_none());
342 }
343
344 #[test]
345 fn stamp_skips_non_reflection() {
346 let mut mem = reflection_memory("m1", 0);
347 let before = mem.metadata.clone();
348 let stamped = stamp_reflection_origin(&mem, "peer-A", 3);
349 assert_eq!(stamped.metadata, before);
350 mem.reflection_depth = 0;
351 assert!(stamped.metadata.get(REFLECTION_ORIGIN_KEY).is_none());
352 }
353
354 #[test]
355 fn stamp_records_peer_and_local_cap() {
356 let mem = reflection_memory("m1", 2);
357 let stamped = stamp_reflection_origin(&mem, "peer-A", 3);
358 let origin = stamped
359 .metadata
360 .get(REFLECTION_ORIGIN_KEY)
361 .expect("origin stamped");
362 assert_eq!(origin["peer_origin"].as_str(), Some("peer-A"));
363 assert_eq!(origin["original_depth"].as_i64(), Some(2));
364 assert_eq!(origin["local_depth_at_arrival"].as_u64(), Some(3));
365 }
366
367 #[test]
368 fn stamp_is_idempotent_first_writer_wins() {
369 let mem = reflection_memory("m1", 2);
370 let first = stamp_reflection_origin(&mem, "peer-A", 3);
371 let second = stamp_reflection_origin(&first, "peer-B", 5);
372 let origin = second
373 .metadata
374 .get(REFLECTION_ORIGIN_KEY)
375 .expect("origin preserved");
376 // peer-A wins; peer-B's re-fan didn't overwrite.
377 assert_eq!(origin["peer_origin"].as_str(), Some("peer-A"));
378 assert_eq!(origin["local_depth_at_arrival"].as_u64(), Some(3));
379 }
380
381 #[test]
382 fn enforce_local_cap_allows_when_under_limit() {
383 let sources = vec![reflection_memory("s1", 1)];
384 assert!(enforce_local_cap_on_derived(2, 3, &sources).is_ok());
385 }
386
387 #[test]
388 fn enforce_local_cap_refuses_with_imported_peer_named() {
389 let mut imported = reflection_memory("s1", 2);
390 imported.metadata = serde_json::json!({
391 "agent_id": "ai:test",
392 REFLECTION_ORIGIN_KEY: {
393 "peer_origin": "peer-A",
394 "original_depth": 2,
395 "local_depth_at_arrival": 3,
396 },
397 });
398 let refusal = enforce_local_cap_on_derived(3, 2, &[imported]).unwrap_err();
399 let msg = refusal.to_string();
400 assert!(
401 msg.contains("peer-A") && msg.contains("local depth limit 2"),
402 "refusal msg should name peer + local cap: {msg}"
403 );
404 }
405}