ai_memory/storage/reflect.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Reflection family — the substrate-native recursive-learning
5//! primitive (v0.7.0 Tasks 4/5/6 of the recursive-learning epic).
6//! v0.7.0 L0.5-3 extracted `ReflectError`, `ReflectOutcome`,
7//! `ReflectHookDecision`, `ReflectHooks`, `ReflectInput`, `reflect`,
8//! `reflect_with_hooks`, `canonical_cbor_reflection_depth_exceeded`,
9//! and `emit_reflection_depth_exceeded_audit` out of `src/db.rs` into
10//! this sub-module. Pure refactor — semantics unchanged.
11
12use crate::identity::keypair::AgentKeypair;
13use crate::models::ConfidenceSource;
14use crate::models::field_names;
15use anyhow::Context;
16use chrono::Utc;
17use rusqlite::Connection;
18
19use crate::models::{GovernancePolicy, Memory, MemoryKind, Tier};
20
21use super::{
22 ConflictMode, create_link_signed, get, insert_with_conflict, resolve_governance_policy,
23};
24
25/// Typed substrate-level error surface for [`reflect`]. Kept distinct
26/// from [`crate::errors::MemoryError`] so the SQLite substrate layer
27/// stays free of HTTP-status concerns; the caller at the MCP / HTTP
28/// boundary maps these into the wire-shaped variant. Task 5/8 matches
29/// on `ReflectError::DepthExceeded` here (and the equivalent
30/// `MemoryError::ReflectionDepthExceeded` variant) to emit the
31/// `signed_events` audit record for the refusal decision.
32#[derive(Debug)]
33pub enum ReflectError {
34 /// Input violated a validator. Carries the operator-readable
35 /// reason; the MCP layer surfaces it verbatim.
36 Validation(String),
37 /// One of the requested source memories does not exist. Carries
38 /// the offending id so the caller can name the missing source.
39 SourceNotFound(String),
40 /// Proposed reflection depth exceeds the resolved namespace cap.
41 /// The triple is the structured payload Task 5/8 will attach to
42 /// the audit row.
43 DepthExceeded {
44 attempted: u32,
45 cap: u32,
46 namespace: String,
47 },
48 /// v0.7.0 recursive-learning Task 6/8 — a `pre_reflect` hook
49 /// callback returned [`ReflectHookDecision::Deny`], vetoing the
50 /// reflection. Distinct from `DepthExceeded` because the substrate
51 /// cap was NOT evaluated (the veto fires earlier in step 4) and
52 /// because the Task 5 depth-cap audit row is NOT emitted on this
53 /// path — hook vetoes are caller-policy refusals that carry their
54 /// own provenance via the hook's own decision record (if any).
55 HookVeto { reason: String, code: i32 },
56 /// Database error during the atomic write. Carries the underlying
57 /// rusqlite / anyhow string.
58 Database(String),
59}
60
61impl std::fmt::Display for ReflectError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::Validation(m) | Self::SourceNotFound(m) | Self::Database(m) => f.write_str(m),
65 Self::DepthExceeded {
66 attempted,
67 cap,
68 namespace,
69 } => write!(
70 f,
71 "reflection depth {attempted} would exceed namespace \
72 max_reflection_depth {cap} (namespace='{namespace}')"
73 ),
74 Self::HookVeto { reason, code } => {
75 write!(
76 f,
77 "pre_reflect hook vetoed reflection (code={code}): {reason}"
78 )
79 }
80 }
81 }
82}
83
84impl std::error::Error for ReflectError {}
85
86/// Outcome of a successful [`reflect`] write. Mirrors the MCP `memory_reflect`
87/// wire shape so the dispatch layer is a thin serialization wrapper.
88#[derive(Debug, Clone)]
89pub struct ReflectOutcome {
90 /// Newly minted reflection memory id.
91 pub id: String,
92 /// Depth assigned to the new memory (max source depth + 1).
93 pub reflection_depth: i32,
94 /// Source memory ids the new memory reflects on, in input order.
95 pub reflects_on: Vec<String>,
96 /// Namespace the reflection landed in (resolved to the first source's
97 /// namespace when the caller omitted the field).
98 pub namespace: String,
99}
100
101/// v0.7.0 recursive-learning Task 6/8 — substrate-level decision
102/// surface returned by a `pre_reflect` hook callback.
103///
104/// Mirrors the shape of [`crate::hooks::HookDecision`] minus the
105/// `Modify` and `AskUser` variants — the substrate hook surface only
106/// exposes the two outcomes that affect the reflect control flow:
107/// continue (`Allow`) or veto (`Deny`). Hook-supplied delta merging
108/// and operator prompts are handled by the wire-level
109/// [`crate::hooks::HookChain`] when the daemon's hook pipeline is
110/// configured (G7+ wiring); this in-substrate variant is the path
111/// the substrate uses today to fire `PreReflect` / `PostReflect`
112/// events on the reflect codepath.
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum ReflectHookDecision {
115 /// Continue evaluating the reflect — proceed to the cap check.
116 Allow,
117 /// Veto the reflect. The substrate returns
118 /// [`ReflectError::HookVeto`] with the supplied reason +
119 /// HTTP-style status code; the cap check is NOT evaluated and
120 /// the depth-cap audit row is NOT emitted (this is a caller-
121 /// policy refusal, not a substrate cap refusal — Task 5 audits
122 /// the latter; hook vetoes carry their own provenance).
123 Deny { reason: String, code: i32 },
124}
125
126/// v0.7.0 recursive-learning Task 6/8 — optional in-substrate hook
127/// callbacks fired by [`reflect_with_hooks`]. Bundled into a single
128/// struct so the substrate signature stays compact and so future
129/// callbacks (e.g. on-rollback) can land without churning every
130/// call site.
131///
132/// Both callbacks are `Option<...>`; when `None`, the substrate
133/// behaves identically to the unhooked [`reflect`] entry-point. The
134/// callback type is `Box<dyn Fn(...)>` so the substrate stays
135/// allocator-friendly (one allocation per reflect call) and so test
136/// code can pass simple closures that capture observation state.
137pub struct ReflectHooks<'a> {
138 /// Fired BEFORE the cap check (step 4 of `reflect`). Receives a
139 /// read-only view of the in-flight [`ReflectInput`] (the
140 /// substrate-side equivalent of [`crate::hooks::events::ReflectDelta`]
141 /// — the in-process callback gets the typed input directly,
142 /// while the cross-process wire path serialises a `ReflectDelta`).
143 /// Returns [`ReflectHookDecision::Deny`] to veto.
144 pub pre_reflect: Option<Box<dyn Fn(&ReflectInput) -> ReflectHookDecision + Send + Sync + 'a>>,
145 /// Fired AFTER the transaction commits (step 7 of `reflect`).
146 /// Receives a read-only snapshot of the post-commit outcome
147 /// (mirrors [`crate::hooks::events::ReflectResult`]). Notify-class
148 /// — return value is ignored; the reflect already landed.
149 pub post_reflect: Option<Box<dyn Fn(&ReflectOutcome) + Send + Sync + 'a>>,
150 /// Issue #815 — signing keypair for the `reflects_on` edges
151 /// written inside the reflect transaction. When `Some`, each
152 /// edge is persisted via [`create_link_signed`] with this
153 /// keypair, producing `attest_level='self_signed'` rows with a
154 /// 64-byte Ed25519 signature. When `None`, edges land as
155 /// `attest_level='unsigned'` — the v0.6.x behaviour and the
156 /// state of the world before #815 fixed the storage::reflect
157 /// gap that #814 left behind.
158 pub active_keypair: Option<&'a AgentKeypair>,
159}
160
161impl<'a> ReflectHooks<'a> {
162 /// Empty bundle — both callbacks `None`, no signing keypair.
163 /// The default used by callers that don't want to register
164 /// hooks AND don't have a keypair to sign with (test harnesses,
165 /// the thin [`reflect`] shim that preserves pre-#815 behaviour).
166 #[must_use]
167 pub fn empty() -> Self {
168 Self {
169 pre_reflect: None,
170 post_reflect: None,
171 active_keypair: None,
172 }
173 }
174}
175
176impl<'a> Default for ReflectHooks<'a> {
177 fn default() -> Self {
178 Self::empty()
179 }
180}
181
182impl<'a> std::fmt::Debug for ReflectHooks<'a> {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("ReflectHooks")
185 .field("pre_reflect", &self.pre_reflect.as_ref().map(|_| "<fn>"))
186 .field("post_reflect", &self.post_reflect.as_ref().map(|_| "<fn>"))
187 .field(
188 "active_keypair",
189 &self.active_keypair.map(|k| k.agent_id.as_str()),
190 )
191 .finish()
192 }
193}
194
195/// Input bundle for [`reflect`]. Holds every caller-tunable field of the
196/// new reflection memory plus the source-id list. Defaults mirror the
197/// MCP tool schema (`tier=mid`, `priority=5`, `confidence=1.0`,
198/// `source=DEFAULT_NHI_SOURCE` per `crate::validate::DEFAULT_NHI_SOURCE`
199/// = `"nhi"` post-#1175 — pre-#1175 this defaulted to `"claude"`, a
200/// heterogeneous-NHI monoculture defect that #1175 closed) so the
201/// dispatch layer can build this from the raw JSON arguments without
202/// further fixup.
203#[derive(Debug, Clone)]
204pub struct ReflectInput {
205 pub source_ids: Vec<String>,
206 pub title: String,
207 pub content: String,
208 /// `None` → resolve to the namespace of the first source memory.
209 pub namespace: Option<String>,
210 pub tier: Tier,
211 pub tags: Vec<String>,
212 pub priority: i32,
213 pub confidence: f64,
214 pub source: String,
215 pub agent_id: String,
216 /// Caller-supplied metadata. The reflection writer merges system-
217 /// generated `reflection_metadata` keys underneath this object;
218 /// caller-supplied keys win on collision (the additive contract
219 /// documented on the MCP tool).
220 pub metadata: serde_json::Value,
221}
222
223/// v0.7.0 recursive-learning Task 4/8 (issue #655) — substrate-native
224/// reflection primitive.
225///
226/// Steps (matches the MCP tool contract):
227///
228/// 1. Validate inputs (`title`, `content`, namespace, tags, priority,
229/// confidence, agent_id, source_ids).
230/// 2. Load each source memory; bail with [`ReflectError::SourceNotFound`]
231/// on any missing id (no partial write).
232/// 3. Compute `new_depth = max(source.reflection_depth) + 1`.
233/// 4. Resolve the effective namespace cap via
234/// [`resolve_governance_policy`] (walks the ancestor chain leaf-
235/// first), fall back to [`GovernancePolicy::default`] when the chain
236/// has no policy at any level, then call
237/// [`GovernancePolicy::effective_max_reflection_depth`] on the
238/// resolved policy.
239/// 5. Refuse with [`ReflectError::DepthExceeded`] when
240/// `new_depth > max_dep`.
241/// 6. Insert the new reflection memory and write a `reflects_on` link
242/// from the new memory to each source — all inside a single
243/// `BEGIN IMMEDIATE` … `COMMIT` block. Any insert / link failure
244/// rolls back the entire write so a half-written reflection cannot
245/// survive.
246///
247/// The new memory's metadata is the caller-supplied object with a
248/// system-generated `reflection_metadata` key spliced in (recording
249/// the source-id list, the resolved depth, and the RFC3339 creation
250/// timestamp). **Caller-supplied keys win on collision** — if the
251/// caller already supplied `reflection_metadata` we honor their value
252/// and skip the system splice. This is the documented additive contract.
253///
254/// The `agent_id` field on the input bundle is stamped into
255/// `metadata.agent_id` before insert; the caller is responsible for
256/// resolving it via [`crate::identity::resolve_agent_id`].
257///
258/// # Errors
259///
260/// Returns one of the four [`ReflectError`] variants. The DB-error
261/// variant is the only one with no structured payload — every other
262/// variant carries enough information for the caller to render a clean
263/// operator-readable message and (for `DepthExceeded`) for Task 5/8 to
264/// emit a structured audit row.
265pub fn reflect(
266 conn: &Connection,
267 input: &ReflectInput,
268) -> std::result::Result<ReflectOutcome, ReflectError> {
269 // Thin shim over [`reflect_with_hooks`] with an empty hook bundle.
270 // Existing callers (MCP `memory_reflect`, the `tests/recursive_
271 // learning_task4_*` suite, the Postgres parity test) keep using
272 // this entry-point unchanged; the new in-substrate hook surface
273 // is opt-in via `reflect_with_hooks`.
274 reflect_with_hooks(conn, input, &ReflectHooks::empty())
275}
276
277/// v0.7.0 recursive-learning Task 6/8 — variant of [`reflect`] with
278/// in-substrate hook callbacks. See [`reflect`] for the full step
279/// list; the only deltas are:
280///
281/// * Between step 4 (depth + cap resolution) and step 5 (cap
282/// check), `hooks.pre_reflect` fires when configured. A
283/// [`ReflectHookDecision::Deny`] return propagates as
284/// [`ReflectError::HookVeto`]; the cap check is NOT evaluated and
285/// the Task 5 depth-cap audit is NOT emitted on this path.
286/// * After step 6 commits (transaction COMMIT succeeds, just before
287/// returning `ReflectOutcome`), `hooks.post_reflect` fires with
288/// the read-only outcome. Notify-class — return value is ignored.
289///
290/// Calling `reflect_with_hooks(conn, input, &ReflectHooks::empty())`
291/// is identical to calling `reflect(conn, input)`.
292///
293/// # Errors
294///
295/// Same five [`ReflectError`] variants as [`reflect`] plus
296/// [`ReflectError::HookVeto`] when a pre_reflect handler vetoes.
297#[allow(clippy::too_many_lines)]
298pub fn reflect_with_hooks(
299 conn: &Connection,
300 input: &ReflectInput,
301 hooks: &ReflectHooks<'_>,
302) -> std::result::Result<ReflectOutcome, ReflectError> {
303 use crate::validate;
304 // ─── 1. Validate inputs ──────────────────────────────────────────
305 validate::validate_title(&input.title).map_err(|e| ReflectError::Validation(e.to_string()))?;
306 validate::validate_content(&input.content)
307 .map_err(|e| ReflectError::Validation(e.to_string()))?;
308 validate::validate_tags(&input.tags).map_err(|e| ReflectError::Validation(e.to_string()))?;
309 validate::validate_priority(input.priority)
310 .map_err(|e| ReflectError::Validation(e.to_string()))?;
311 validate::validate_confidence(input.confidence)
312 .map_err(|e| ReflectError::Validation(e.to_string()))?;
313 validate::validate_source(&input.source)
314 .map_err(|e| ReflectError::Validation(e.to_string()))?;
315 validate::validate_agent_id(&input.agent_id)
316 .map_err(|e| ReflectError::Validation(e.to_string()))?;
317 if input.source_ids.is_empty() {
318 return Err(ReflectError::Validation(
319 "source_ids cannot be empty — a reflection must reflect on at least one source memory"
320 .into(),
321 ));
322 }
323 // Each source id must be well-formed before we hit the DB; this
324 // gives the caller a clean "bad id at index N" surface for free.
325 let mut seen = std::collections::HashSet::new();
326 for (i, id) in input.source_ids.iter().enumerate() {
327 validate::validate_id(id)
328 .map_err(|e| ReflectError::Validation(format!("source_ids[{i}]: {e}")))?;
329 if !seen.insert(id.as_str()) {
330 return Err(ReflectError::Validation(format!(
331 "source_ids[{i}]: duplicate id '{id}'"
332 )));
333 }
334 }
335 if let Some(ref ns) = input.namespace {
336 validate::validate_namespace(ns).map_err(|e| ReflectError::Validation(e.to_string()))?;
337 }
338 validate::validate_metadata(&input.metadata)
339 .map_err(|e| ReflectError::Validation(e.to_string()))?;
340
341 // ─── 2. Load each source memory; bail on any missing id ─────────
342 let mut sources = Vec::with_capacity(input.source_ids.len());
343 for id in &input.source_ids {
344 match get(conn, id).map_err(|e| ReflectError::Database(e.to_string()))? {
345 Some(m) => sources.push(m),
346 None => return Err(ReflectError::SourceNotFound(id.clone())),
347 }
348 }
349
350 // ─── 3. Compute new_depth = max(source depths) + 1 ──────────────
351 let max_src_depth = sources
352 .iter()
353 .map(|m| m.reflection_depth)
354 .max()
355 .unwrap_or(0);
356 // Clamp to non-negative before adding 1 (the column is i32 but the
357 // cap is u32; pre-v0.7.0 rows landed at 0 so `max < 0` can't happen
358 // in practice, but a `.max(0)` here is cheap belt-and-braces).
359 let new_depth_i32 = max_src_depth.max(0).saturating_add(1);
360 // u32 conversion: new_depth is at most i32::MAX which fits in u32.
361 #[allow(clippy::cast_sign_loss)]
362 let new_depth_u32: u32 = new_depth_i32 as u32;
363
364 // ─── 4. Resolve target namespace + governance cap ───────────────
365 let target_namespace = match input.namespace {
366 Some(ref ns) => ns.clone(),
367 // Default to the namespace of the FIRST source memory — matches
368 // the documented MCP schema default. Operators who want a
369 // different target namespace pass it explicitly.
370 None => sources[0].namespace.clone(),
371 };
372 // Carry-forward (Task 2 note): `resolve_governance_policy` returns
373 // `None` when no level of the ancestor chain has a policy at all.
374 // Treat that as "use the compiled default" — i.e. fall back to
375 // `GovernancePolicy::default()` which has `max_reflection_depth =
376 // None` and therefore yields the compiled-in cap of 3.
377 let policy = resolve_governance_policy(conn, &target_namespace)
378 .unwrap_or_else(GovernancePolicy::default);
379 let cap = policy.effective_max_reflection_depth();
380
381 // ─── 4.5 `pre_reflect` hook (v0.7.0 Task 6/8) ──────────────────
382 //
383 // Fires BEFORE the cap check so a hook handler may VETO the
384 // reflection by returning `ReflectHookDecision::Deny`. Vetoes
385 // from `pre_reflect` are distinct from the cap refusal —
386 // caller-policy refusals (e.g. "this agent is rate-limited",
387 // "this content type is policy-restricted") rather than
388 // depth-cap refusals. The Task 5 `reflection.depth_exceeded`
389 // audit row is NOT emitted on this path; the hook handler may
390 // emit its own audit if desired.
391 if let Some(pre) = hooks.pre_reflect.as_ref() {
392 match (pre)(input) {
393 ReflectHookDecision::Allow => {}
394 ReflectHookDecision::Deny { reason, code } => {
395 return Err(ReflectError::HookVeto { reason, code });
396 }
397 }
398 }
399
400 // ─── 5. Refuse if proposed depth exceeds cap ────────────────────
401 //
402 // Task 5/8 (v0.7.0): before propagating the refusal to the caller,
403 // append a `reflection.depth_exceeded` row to `signed_events`. The
404 // audit row is the cryptographic-provenance leg of the v0.7.0 cap
405 // contract — every cap refusal becomes part of the tamper-evident
406 // audit chain so a future operator can prove that the daemon
407 // honored the cap, not just "trusted the agent didn't try".
408 //
409 // Note: audit is fired only by this cap refusal; hook vetoes
410 // (Task 6/8 `pre_reflect`) carry their own provenance via the
411 // hook's own decision record (if any), so they are deliberately
412 // NOT emitted here.
413 if new_depth_u32 > cap {
414 // v0.7.0 L2-2 — cross-peer enforcement. If any source carries a
415 // `reflection_origin.peer_origin` stamp (it was imported via
416 // federation `sync_push`), surface the originating peer in the
417 // refusal so operators see the cross-peer provenance — not just
418 // "depth exceeded". Local cap is enforced regardless of source
419 // origin (territorial sovereignty), but the message distinguishes
420 // "remote reflection at depth N, local depth limit M" from a
421 // purely local cap breach.
422 let cross_peer_refusal =
423 crate::federation::reflection_bookkeeping::enforce_local_cap_on_derived(
424 new_depth_u32,
425 cap,
426 &sources,
427 );
428 let peer_origin: Option<String> = if let Err(ref r) = cross_peer_refusal {
429 if let Some(ref peer) = r.imported_peer {
430 tracing::warn!(
431 target: "federation::reflection_bookkeeping",
432 peer = %peer,
433 attempted = new_depth_u32,
434 local_cap = cap,
435 namespace = %target_namespace,
436 "L2-2: refusing derived reflection: {}",
437 r,
438 );
439 }
440 r.imported_peer.clone()
441 } else {
442 None
443 };
444 emit_reflection_depth_exceeded_audit(
445 conn,
446 &input.agent_id,
447 new_depth_u32,
448 cap,
449 &target_namespace,
450 &input.source_ids,
451 &input.title,
452 peer_origin.as_deref(),
453 );
454 return Err(ReflectError::DepthExceeded {
455 attempted: new_depth_u32,
456 cap,
457 namespace: target_namespace,
458 });
459 }
460
461 // ─── 6. Atomic insert + N links inside a single transaction ─────
462 // Build the system-generated reflection_metadata block. The caller-
463 // supplied object wins on key collisions — if `reflection_metadata`
464 // is already set, we leave it alone.
465 let now = Utc::now().to_rfc3339();
466 let mut metadata = match input.metadata.clone() {
467 serde_json::Value::Object(map) => map,
468 _ => serde_json::Map::new(),
469 };
470 // Always stamp agent_id (the resolver already validated it).
471 metadata.insert(
472 "agent_id".to_string(),
473 serde_json::Value::String(input.agent_id.clone()),
474 );
475 // Splice reflection_metadata only when the caller didn't pre-set it.
476 if !metadata.contains_key(field_names::REFLECTION_METADATA) {
477 let reflection_meta = serde_json::json!({
478 "reflected_on_source_ids": input.source_ids,
479 (field_names::REFLECTION_DEPTH): new_depth_i32,
480 "reflection_created_at": now,
481 });
482 metadata.insert(
483 field_names::REFLECTION_METADATA.to_string(),
484 reflection_meta,
485 );
486 }
487 let metadata_value = serde_json::Value::Object(metadata);
488 // Re-validate the merged metadata so an oversized splice surfaces
489 // here (vs. a confusing DB constraint error later).
490 validate::validate_metadata(&metadata_value)
491 .map_err(|e| ReflectError::Validation(e.to_string()))?;
492
493 let new_mem = Memory {
494 id: uuid::Uuid::new_v4().to_string(),
495 tier: input.tier.clone(),
496 namespace: target_namespace.clone(),
497 title: input.title.clone(),
498 content: input.content.clone(),
499 tags: input.tags.clone(),
500 priority: input.priority.clamp(1, 10),
501 confidence: input.confidence.clamp(0.0, 1.0),
502 source: input.source.clone(),
503 access_count: 0,
504 created_at: now.clone(),
505 updated_at: now.clone(),
506 last_accessed_at: None,
507 expires_at: None,
508 metadata: metadata_value,
509 reflection_depth: new_depth_i32,
510 // L1-1: reflection memories are always typed as Reflection,
511 // regardless of what the caller passes in metadata.type (the
512 // back-compat path). This is the first-class typed counterpart
513 // to the metadata.type = 'reflection' splice above.
514 memory_kind: MemoryKind::Reflection,
515 entity_id: None,
516 persona_version: None,
517 citations: Vec::new(),
518 source_uri: None,
519 source_span: None,
520 confidence_source: ConfidenceSource::CallerProvided,
521 confidence_signals: None,
522 confidence_decayed_at: None,
523 version: 1,
524 };
525
526 // Atomic boundary: insert the reflection row + N `reflects_on`
527 // links inside a single BEGIN IMMEDIATE ... COMMIT block. If any
528 // link insert fails, ROLLBACK undoes the reflection row too.
529 // Matches the `consolidate` pattern earlier in this file.
530 conn.execute_batch(super::connection::SQL_BEGIN_IMMEDIATE)
531 .map_err(|e| ReflectError::Database(e.to_string()))?;
532
533 let txn_result = (|| -> std::result::Result<String, ReflectError> {
534 // v0.7.0 fix campaign R1-M3 (#690) — substrate-side reflections
535 // must NOT silently merge into an existing (title, namespace).
536 // If a row with the same title is already present in the
537 // reflection's namespace, the caller asked us to land a
538 // duplicate; that's a deduplication risk we surface as a
539 // validation error rather than smashing the existing row.
540 let actual_id = insert_with_conflict(conn, &new_mem, ConflictMode::Error).map_err(|e| {
541 if e.downcast_ref::<crate::storage::ConflictError>().is_some() {
542 ReflectError::Validation(format!(
543 "reflection title collides with an existing memory in the same namespace: {e}"
544 ))
545 } else {
546 ReflectError::Database(e.to_string())
547 }
548 })?;
549 // Self-link rejection lives in `validate_link`; a self-link
550 // (source id appearing in the source list) would only happen
551 // via caller error, but we still surface it as a validation
552 // failure with the txn rolled back so the reflection never
553 // lands.
554 for src_id in &input.source_ids {
555 validate::validate_link(
556 &actual_id,
557 src_id,
558 crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
559 )
560 .map_err(|e| ReflectError::Validation(e.to_string()))?;
561 // Issue #815 — the pre-#815 path called `create_link` here,
562 // which always produced `attest_level='unsigned'` rows for
563 // every reflects_on edge regardless of whether the caller
564 // had loaded a daemon keypair. Route through the signed
565 // helper instead so the keypair threaded through the
566 // hook bundle (MCP-tier handler, curator daemon) reaches
567 // the link insert and the edges land as `self_signed`
568 // with a 64-byte Ed25519 signature. Callers that pass
569 // `active_keypair: None` (the `reflect()` shim, the
570 // auto-export hook constructor's no-keypair test paths)
571 // get the previous unsigned behaviour — `create_link_signed`
572 // matches `create_link`'s output when the keypair is
573 // absent (verified by the existing
574 // `create_link_signed_without_keypair_is_unsigned` test in
575 // `src/storage/mod.rs`).
576 create_link_signed(
577 conn,
578 &actual_id,
579 src_id,
580 crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
581 hooks.active_keypair,
582 )
583 .map_err(|e| ReflectError::Database(e.to_string()))?;
584 }
585 Ok(actual_id)
586 })();
587
588 match txn_result {
589 Ok(actual_id) => {
590 conn.execute_batch(super::connection::SQL_COMMIT)
591 .map_err(|e| ReflectError::Database(e.to_string()))?;
592 let outcome = ReflectOutcome {
593 id: actual_id,
594 reflection_depth: new_depth_i32,
595 reflects_on: input.source_ids.clone(),
596 namespace: target_namespace,
597 };
598 // ─── 7. `post_reflect` hook (v0.7.0 Task 6/8) ───────────
599 //
600 // Fires AFTER the transaction commits so the hook handler
601 // can read the new reflection memory + its `reflects_on`
602 // links via the same connection. Notify-class — the
603 // return value is ignored beyond logging (post-commit
604 // events cannot veto a side-effect that already
605 // happened).
606 if let Some(post) = hooks.post_reflect.as_ref() {
607 (post)(&outcome);
608 }
609 Ok(outcome)
610 }
611 Err(e) => {
612 if let Err(rb) = conn.execute_batch(super::connection::SQL_ROLLBACK) {
613 tracing::error!("ROLLBACK failed in reflect: {}", rb);
614 }
615 Err(e)
616 }
617 }
618}
619
620/// v0.7.0 recursive-learning Task 5/8 — canonical-CBOR encoding of the
621/// `reflection.depth_exceeded` audit payload.
622///
623/// Mirrors the deterministic encoding contract used by
624/// [`crate::identity::sign::canonical_cbor`] — map keys sorted
625/// lexicographically (`BTreeMap` iteration order), `Option::None`
626/// encoded as `Null`, integers in shortest-form. The same payload
627/// hashes to the same bytes on every host so a downstream auditor can
628/// re-derive the `payload_hash` from the four structured fields below.
629///
630/// Note that we deliberately do NOT include the rejected reflection's
631/// `content` body in the payload — that would balloon the audit row
632/// (and risk leaking PII into the chain). Title + source ids is the
633/// provenance hook; the body is not the audit's job.
634///
635/// v0.7.0 L2-2 — when `peer_origin` is `Some`, the encoded payload
636/// includes a `peer_origin` field naming the federation peer that
637/// delivered the imported source memory whose depth drove the cap
638/// breach. When `None` (purely local-source refusal) the field is
639/// omitted so existing-row payload hashes are unchanged on the
640/// pre-L2-2 codepath. The conditional-inclusion-vs-`Null` distinction
641/// matters: a presence-encoded `Null` would silently mutate every
642/// pre-L2-2 hash on every host the moment L2-2 ships, even where no
643/// federation is configured.
644///
645/// # Errors
646///
647/// Returns the underlying CBOR encoder error if encoding fails — in
648/// practice unreachable for the fixed-shape input above, surfaced as
649/// a `Result` so callers don't have to choose between panicking and
650/// silently logging an incomplete payload.
651pub fn canonical_cbor_reflection_depth_exceeded(
652 agent_id: &str,
653 attempted: u32,
654 cap: u32,
655 namespace: &str,
656 source_ids: &[String],
657 proposed_title: &str,
658 created_at: &str,
659 peer_origin: Option<&str>,
660) -> anyhow::Result<Vec<u8>> {
661 use std::collections::BTreeMap;
662 let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
663 map.insert("agent_id", ciborium::Value::Text(agent_id.to_string()));
664 map.insert("attempted", ciborium::Value::Integer(attempted.into()));
665 map.insert("cap", ciborium::Value::Integer(cap.into()));
666 map.insert(
667 field_names::CREATED_AT,
668 ciborium::Value::Text(created_at.to_string()),
669 );
670 map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
671 // v0.7.0 L2-2 — conditional inclusion preserves pre-L2-2 payload
672 // hashes on the purely-local refusal path (no `peer_origin` key
673 // present at all in the encoded map). Cross-peer refusals carry the
674 // peer claim as a tamper-evident structured field.
675 if let Some(peer) = peer_origin {
676 map.insert(
677 field_names::PEER_ORIGIN,
678 ciborium::Value::Text(peer.to_string()),
679 );
680 }
681 map.insert(
682 "proposed_title",
683 ciborium::Value::Text(proposed_title.to_string()),
684 );
685 map.insert(
686 field_names::SOURCE_IDS,
687 ciborium::Value::Array(
688 source_ids
689 .iter()
690 .map(|s| ciborium::Value::Text(s.clone()))
691 .collect(),
692 ),
693 );
694 let entries: Vec<(ciborium::Value, ciborium::Value)> = map
695 .into_iter()
696 .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
697 .collect();
698 let value = ciborium::Value::Map(entries);
699 let mut out: Vec<u8> = Vec::with_capacity(256);
700 ciborium::ser::into_writer(&value, &mut out)
701 .context("CBOR encode reflection_depth_exceeded audit payload")?;
702 Ok(out)
703}
704
705/// v0.7.0 recursive-learning Task 5/8 — append a
706/// `reflection.depth_exceeded` row to `signed_events` for an in-flight
707/// cap refusal.
708///
709/// Mirrors the [`invalidate_link`] audit-emit pattern: best-effort —
710/// audit-write failure is logged via `tracing::warn!(target:
711/// "signed_events", ...)` but does NOT crater the refusal path. The
712/// refusal still propagates to the caller regardless of audit-write
713/// success, because (a) the refusal already happened and (b) crashing
714/// the legitimate caller for a substrate problem they cannot fix would
715/// be worse than a missed audit row.
716///
717/// `attest_level` is `"unsigned"` because the substrate emits this row
718/// itself (the caller did not sign it with their keypair). The
719/// `signature` column is `None`. The `payload_hash` is SHA-256 over
720/// the canonical-CBOR encoding of the structured fields, so a future
721/// auditor can re-derive the same hash from any honest source of the
722/// same fields.
723pub(crate) fn emit_reflection_depth_exceeded_audit(
724 conn: &Connection,
725 agent_id: &str,
726 attempted: u32,
727 cap: u32,
728 namespace: &str,
729 source_ids: &[String],
730 proposed_title: &str,
731 peer_origin: Option<&str>,
732) {
733 let created_at = Utc::now().to_rfc3339();
734 let cbor = match canonical_cbor_reflection_depth_exceeded(
735 agent_id,
736 attempted,
737 cap,
738 namespace,
739 source_ids,
740 proposed_title,
741 &created_at,
742 peer_origin,
743 ) {
744 Ok(b) => b,
745 Err(e) => {
746 tracing::warn!(
747 target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
748 agent_id, attempted, cap, namespace,
749 "failed to encode canonical CBOR for reflection_depth_exceeded audit: {e}"
750 );
751 return;
752 }
753 };
754 // v0.7.0 L2-2 — distinguish the audit row's `event_type` so
755 // operators (and downstream tooling) can filter the cross-peer
756 // refusal stream from the local-only stream without re-decoding
757 // the CBOR payload. The two-variant `event_type` does not change
758 // the audit-chain contract: payload_hash + signature + timestamp
759 // semantics remain identical; only the textual label differs.
760 let event_type = if peer_origin.is_some() {
761 "reflection.depth_exceeded.cross_peer"
762 } else {
763 "reflection.depth_exceeded"
764 };
765 let event = crate::signed_events::SignedEvent {
766 id: uuid::Uuid::new_v4().to_string(),
767 agent_id: agent_id.to_string(),
768 event_type: event_type.to_string(),
769 payload_hash: crate::signed_events::payload_hash(&cbor),
770 signature: None,
771 attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
772 timestamp: created_at,
773 ..crate::signed_events::SignedEvent::default()
774 };
775 if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
776 tracing::warn!(
777 target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
778 agent_id, attempted, cap, namespace,
779 "failed to append reflection_depth_exceeded audit row: {e}"
780 );
781 }
782}
783
784#[cfg(test)]
785mod l2_2_audit_tests {
786 //! v0.7.0 L2-2 — lib-level unit tests pinning the cross-peer
787 //! audit payload encoder. The end-to-end three-peer choreography
788 //! lives in `tests/federation_reflection_replication.rs`; here we
789 //! pin the structural-encoding invariants without touching the
790 //! database substrate, so the lib's `payload_hash` contract is
791 //! covered even when the integration test binary is excluded.
792
793 use super::canonical_cbor_reflection_depth_exceeded;
794
795 /// `peer_origin = None` and `peer_origin = Some(_)` MUST encode to
796 /// different byte sequences. This is the load-bearing invariant:
797 /// if both encoded identically, the audit row's payload_hash
798 /// wouldn't actually bind the cross-peer claim, and a tampered
799 /// `event_type` could orphan the structured field.
800 #[test]
801 fn peer_origin_some_vs_none_yields_distinct_bytes() {
802 let base = (
803 "ai:test",
804 3_u32,
805 2_u32,
806 "ns/l2-2",
807 vec!["src-1".to_string()],
808 "title",
809 "2026-05-13T00:00:00+00:00",
810 );
811 let local = canonical_cbor_reflection_depth_exceeded(
812 base.0, base.1, base.2, base.3, &base.4, base.5, base.6, None,
813 )
814 .expect("encode None");
815 let cross = canonical_cbor_reflection_depth_exceeded(
816 base.0,
817 base.1,
818 base.2,
819 base.3,
820 &base.4,
821 base.5,
822 base.6,
823 Some("ai:peer-x"),
824 )
825 .expect("encode Some");
826 assert_ne!(local, cross, "peer_origin claim must be byte-load-bearing");
827 // Two different peer_origin claims also yield different bytes.
828 let cross_y = canonical_cbor_reflection_depth_exceeded(
829 base.0,
830 base.1,
831 base.2,
832 base.3,
833 &base.4,
834 base.5,
835 base.6,
836 Some("ai:peer-y"),
837 )
838 .expect("encode Some(other)");
839 assert_ne!(
840 cross, cross_y,
841 "swapping the peer_origin string must change the bytes"
842 );
843 }
844
845 /// The encoder is deterministic — two encodes of the same Some
846 /// peer_origin produce the same bytes. Mirrors the
847 /// `canonical_cbor_is_deterministic_across_encodes` invariant on
848 /// the local-only encoder.
849 #[test]
850 fn cross_peer_encoding_is_deterministic() {
851 let a = canonical_cbor_reflection_depth_exceeded(
852 "ai:a",
853 7,
854 3,
855 "ns",
856 &["s1".to_string(), "s2".to_string()],
857 "t",
858 "2026-05-13T00:00:00+00:00",
859 Some("peer-A"),
860 )
861 .expect("encode 1");
862 let b = canonical_cbor_reflection_depth_exceeded(
863 "ai:a",
864 7,
865 3,
866 "ns",
867 &["s1".to_string(), "s2".to_string()],
868 "t",
869 "2026-05-13T00:00:00+00:00",
870 Some("peer-A"),
871 )
872 .expect("encode 2");
873 assert_eq!(a, b, "cross-peer encoding must be byte-stable");
874 }
875
876 /// The encoded map's key ordering is lexicographic — `peer_origin`
877 /// sorts between `namespace` and `proposed_title` in the canonical
878 /// `BTreeMap`. We can't easily reach the bytes' raw structure
879 /// without a CBOR decode dependency on this test path, so we
880 /// instead pin the observable behaviour: encoding remains
881 /// deterministic AND adding `peer_origin` only differs the bytes
882 /// (it doesn't reorder the rest of the keys to perturb hashes for
883 /// pre-existing fields). Encode twice without peer_origin, then
884 /// twice with — both pairs must be internally byte-stable.
885 #[test]
886 fn key_ordering_is_lexicographic_via_btreemap() {
887 let no_peer = canonical_cbor_reflection_depth_exceeded(
888 "ai:test",
889 4,
890 3,
891 "ns",
892 &["s1".to_string()],
893 "title",
894 "2026-05-13T00:00:00+00:00",
895 None,
896 )
897 .expect("encode none");
898 let no_peer2 = canonical_cbor_reflection_depth_exceeded(
899 "ai:test",
900 4,
901 3,
902 "ns",
903 &["s1".to_string()],
904 "title",
905 "2026-05-13T00:00:00+00:00",
906 None,
907 )
908 .expect("encode none again");
909 assert_eq!(no_peer, no_peer2);
910 }
911}