Skip to main content

ai_memory/storage/
reflect.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Reflection family — the substrate-native recursive-learning
5//! primitive (v0.7.0 Tasks 4/5/6 of the recursive-learning epic).
6//! v0.7.0 L0.5-3 extracted `ReflectError`, `ReflectOutcome`,
7//! `ReflectHookDecision`, `ReflectHooks`, `ReflectInput`, `reflect`,
8//! `reflect_with_hooks`, `canonical_cbor_reflection_depth_exceeded`,
9//! and `emit_reflection_depth_exceeded_audit` out of `src/db.rs` into
10//! this sub-module. Pure refactor — semantics unchanged.
11
12use crate::identity::keypair::AgentKeypair;
13use crate::models::ConfidenceSource;
14use crate::models::field_names;
15use anyhow::Context;
16use chrono::Utc;
17use rusqlite::Connection;
18
19use crate::models::{GovernancePolicy, Memory, MemoryKind, Tier};
20
21use super::{
22    ConflictMode, create_link_signed, get, insert_with_conflict, resolve_governance_policy,
23};
24
25/// Typed substrate-level error surface for [`reflect`]. Kept distinct
26/// from [`crate::errors::MemoryError`] so the SQLite substrate layer
27/// stays free of HTTP-status concerns; the caller at the MCP / HTTP
28/// boundary maps these into the wire-shaped variant. Task 5/8 matches
29/// on `ReflectError::DepthExceeded` here (and the equivalent
30/// `MemoryError::ReflectionDepthExceeded` variant) to emit the
31/// `signed_events` audit record for the refusal decision.
32#[derive(Debug)]
33pub enum ReflectError {
34    /// Input violated a validator. Carries the operator-readable
35    /// reason; the MCP layer surfaces it verbatim.
36    Validation(String),
37    /// One of the requested source memories does not exist. Carries
38    /// the offending id so the caller can name the missing source.
39    SourceNotFound(String),
40    /// Proposed reflection depth exceeds the resolved namespace cap.
41    /// The triple is the structured payload Task 5/8 will attach to
42    /// the audit row.
43    DepthExceeded {
44        attempted: u32,
45        cap: u32,
46        namespace: String,
47    },
48    /// v0.7.0 recursive-learning Task 6/8 — a `pre_reflect` hook
49    /// callback returned [`ReflectHookDecision::Deny`], vetoing the
50    /// reflection. Distinct from `DepthExceeded` because the substrate
51    /// cap was NOT evaluated (the veto fires earlier in step 4) and
52    /// because the Task 5 depth-cap audit row is NOT emitted on this
53    /// path — hook vetoes are caller-policy refusals that carry their
54    /// own provenance via the hook's own decision record (if any).
55    HookVeto { reason: String, code: i32 },
56    /// Database error during the atomic write. Carries the underlying
57    /// rusqlite / anyhow string.
58    Database(String),
59}
60
61impl std::fmt::Display for ReflectError {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::Validation(m) | Self::SourceNotFound(m) | Self::Database(m) => f.write_str(m),
65            Self::DepthExceeded {
66                attempted,
67                cap,
68                namespace,
69            } => write!(
70                f,
71                "reflection depth {attempted} would exceed namespace \
72                 max_reflection_depth {cap} (namespace='{namespace}')"
73            ),
74            Self::HookVeto { reason, code } => {
75                write!(
76                    f,
77                    "pre_reflect hook vetoed reflection (code={code}): {reason}"
78                )
79            }
80        }
81    }
82}
83
84impl std::error::Error for ReflectError {}
85
86/// Outcome of a successful [`reflect`] write. Mirrors the MCP `memory_reflect`
87/// wire shape so the dispatch layer is a thin serialization wrapper.
88#[derive(Debug, Clone)]
89pub struct ReflectOutcome {
90    /// Newly minted reflection memory id.
91    pub id: String,
92    /// Depth assigned to the new memory (max source depth + 1).
93    pub reflection_depth: i32,
94    /// Source memory ids the new memory reflects on, in input order.
95    pub reflects_on: Vec<String>,
96    /// Namespace the reflection landed in (resolved to the first source's
97    /// namespace when the caller omitted the field).
98    pub namespace: String,
99}
100
101/// v0.7.0 recursive-learning Task 6/8 — substrate-level decision
102/// surface returned by a `pre_reflect` hook callback.
103///
104/// Mirrors the shape of [`crate::hooks::HookDecision`] minus the
105/// `Modify` and `AskUser` variants — the substrate hook surface only
106/// exposes the two outcomes that affect the reflect control flow:
107/// continue (`Allow`) or veto (`Deny`). Hook-supplied delta merging
108/// and operator prompts are handled by the wire-level
109/// [`crate::hooks::HookChain`] when the daemon's hook pipeline is
110/// configured (G7+ wiring); this in-substrate variant is the path
111/// the substrate uses today to fire `PreReflect` / `PostReflect`
112/// events on the reflect codepath.
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum ReflectHookDecision {
115    /// Continue evaluating the reflect — proceed to the cap check.
116    Allow,
117    /// Veto the reflect. The substrate returns
118    /// [`ReflectError::HookVeto`] with the supplied reason +
119    /// HTTP-style status code; the cap check is NOT evaluated and
120    /// the depth-cap audit row is NOT emitted (this is a caller-
121    /// policy refusal, not a substrate cap refusal — Task 5 audits
122    /// the latter; hook vetoes carry their own provenance).
123    Deny { reason: String, code: i32 },
124}
125
126/// v0.7.0 recursive-learning Task 6/8 — optional in-substrate hook
127/// callbacks fired by [`reflect_with_hooks`]. Bundled into a single
128/// struct so the substrate signature stays compact and so future
129/// callbacks (e.g. on-rollback) can land without churning every
130/// call site.
131///
132/// Both callbacks are `Option<...>`; when `None`, the substrate
133/// behaves identically to the unhooked [`reflect`] entry-point. The
134/// callback type is `Box<dyn Fn(...)>` so the substrate stays
135/// allocator-friendly (one allocation per reflect call) and so test
136/// code can pass simple closures that capture observation state.
137pub struct ReflectHooks<'a> {
138    /// Fired BEFORE the cap check (step 4 of `reflect`). Receives a
139    /// read-only view of the in-flight [`ReflectInput`] (the
140    /// substrate-side equivalent of [`crate::hooks::events::ReflectDelta`]
141    /// — the in-process callback gets the typed input directly,
142    /// while the cross-process wire path serialises a `ReflectDelta`).
143    /// Returns [`ReflectHookDecision::Deny`] to veto.
144    pub pre_reflect: Option<Box<dyn Fn(&ReflectInput) -> ReflectHookDecision + Send + Sync + 'a>>,
145    /// Fired AFTER the transaction commits (step 7 of `reflect`).
146    /// Receives a read-only snapshot of the post-commit outcome
147    /// (mirrors [`crate::hooks::events::ReflectResult`]). Notify-class
148    /// — return value is ignored; the reflect already landed.
149    pub post_reflect: Option<Box<dyn Fn(&ReflectOutcome) + Send + Sync + 'a>>,
150    /// Issue #815 — signing keypair for the `reflects_on` edges
151    /// written inside the reflect transaction. When `Some`, each
152    /// edge is persisted via [`create_link_signed`] with this
153    /// keypair, producing `attest_level='self_signed'` rows with a
154    /// 64-byte Ed25519 signature. When `None`, edges land as
155    /// `attest_level='unsigned'` — the v0.6.x behaviour and the
156    /// state of the world before #815 fixed the storage::reflect
157    /// gap that #814 left behind.
158    pub active_keypair: Option<&'a AgentKeypair>,
159}
160
161impl<'a> ReflectHooks<'a> {
162    /// Empty bundle — both callbacks `None`, no signing keypair.
163    /// The default used by callers that don't want to register
164    /// hooks AND don't have a keypair to sign with (test harnesses,
165    /// the thin [`reflect`] shim that preserves pre-#815 behaviour).
166    #[must_use]
167    pub fn empty() -> Self {
168        Self {
169            pre_reflect: None,
170            post_reflect: None,
171            active_keypair: None,
172        }
173    }
174}
175
176impl<'a> Default for ReflectHooks<'a> {
177    fn default() -> Self {
178        Self::empty()
179    }
180}
181
182impl<'a> std::fmt::Debug for ReflectHooks<'a> {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        f.debug_struct("ReflectHooks")
185            .field("pre_reflect", &self.pre_reflect.as_ref().map(|_| "<fn>"))
186            .field("post_reflect", &self.post_reflect.as_ref().map(|_| "<fn>"))
187            .field(
188                "active_keypair",
189                &self.active_keypair.map(|k| k.agent_id.as_str()),
190            )
191            .finish()
192    }
193}
194
195/// Input bundle for [`reflect`]. Holds every caller-tunable field of the
196/// new reflection memory plus the source-id list. Defaults mirror the
197/// MCP tool schema (`tier=mid`, `priority=5`, `confidence=1.0`,
198/// `source=DEFAULT_NHI_SOURCE` per `crate::validate::DEFAULT_NHI_SOURCE`
199/// = `"nhi"` post-#1175 — pre-#1175 this defaulted to `"claude"`, a
200/// heterogeneous-NHI monoculture defect that #1175 closed) so the
201/// dispatch layer can build this from the raw JSON arguments without
202/// further fixup.
203#[derive(Debug, Clone)]
204pub struct ReflectInput {
205    pub source_ids: Vec<String>,
206    pub title: String,
207    pub content: String,
208    /// `None` → resolve to the namespace of the first source memory.
209    pub namespace: Option<String>,
210    pub tier: Tier,
211    pub tags: Vec<String>,
212    pub priority: i32,
213    pub confidence: f64,
214    pub source: String,
215    pub agent_id: String,
216    /// Caller-supplied metadata. The reflection writer merges system-
217    /// generated `reflection_metadata` keys underneath this object;
218    /// caller-supplied keys win on collision (the additive contract
219    /// documented on the MCP tool).
220    pub metadata: serde_json::Value,
221}
222
223/// v0.7.0 recursive-learning Task 4/8 (issue #655) — substrate-native
224/// reflection primitive.
225///
226/// Steps (matches the MCP tool contract):
227///
228/// 1. Validate inputs (`title`, `content`, namespace, tags, priority,
229///    confidence, agent_id, source_ids).
230/// 2. Load each source memory; bail with [`ReflectError::SourceNotFound`]
231///    on any missing id (no partial write).
232/// 3. Compute `new_depth = max(source.reflection_depth) + 1`.
233/// 4. Resolve the effective namespace cap via
234///    [`resolve_governance_policy`] (walks the ancestor chain leaf-
235///    first), fall back to [`GovernancePolicy::default`] when the chain
236///    has no policy at any level, then call
237///    [`GovernancePolicy::effective_max_reflection_depth`] on the
238///    resolved policy.
239/// 5. Refuse with [`ReflectError::DepthExceeded`] when
240///    `new_depth > max_dep`.
241/// 6. Insert the new reflection memory and write a `reflects_on` link
242///    from the new memory to each source — all inside a single
243///    `BEGIN IMMEDIATE` … `COMMIT` block. Any insert / link failure
244///    rolls back the entire write so a half-written reflection cannot
245///    survive.
246///
247/// The new memory's metadata is the caller-supplied object with a
248/// system-generated `reflection_metadata` key spliced in (recording
249/// the source-id list, the resolved depth, and the RFC3339 creation
250/// timestamp). **Caller-supplied keys win on collision** — if the
251/// caller already supplied `reflection_metadata` we honor their value
252/// and skip the system splice. This is the documented additive contract.
253///
254/// The `agent_id` field on the input bundle is stamped into
255/// `metadata.agent_id` before insert; the caller is responsible for
256/// resolving it via [`crate::identity::resolve_agent_id`].
257///
258/// # Errors
259///
260/// Returns one of the four [`ReflectError`] variants. The DB-error
261/// variant is the only one with no structured payload — every other
262/// variant carries enough information for the caller to render a clean
263/// operator-readable message and (for `DepthExceeded`) for Task 5/8 to
264/// emit a structured audit row.
265pub fn reflect(
266    conn: &Connection,
267    input: &ReflectInput,
268) -> std::result::Result<ReflectOutcome, ReflectError> {
269    // Thin shim over [`reflect_with_hooks`] with an empty hook bundle.
270    // Existing callers (MCP `memory_reflect`, the `tests/recursive_
271    // learning_task4_*` suite, the Postgres parity test) keep using
272    // this entry-point unchanged; the new in-substrate hook surface
273    // is opt-in via `reflect_with_hooks`.
274    reflect_with_hooks(conn, input, &ReflectHooks::empty())
275}
276
277/// v0.7.0 recursive-learning Task 6/8 — variant of [`reflect`] with
278/// in-substrate hook callbacks. See [`reflect`] for the full step
279/// list; the only deltas are:
280///
281///   * Between step 4 (depth + cap resolution) and step 5 (cap
282///     check), `hooks.pre_reflect` fires when configured. A
283///     [`ReflectHookDecision::Deny`] return propagates as
284///     [`ReflectError::HookVeto`]; the cap check is NOT evaluated and
285///     the Task 5 depth-cap audit is NOT emitted on this path.
286///   * After step 6 commits (transaction COMMIT succeeds, just before
287///     returning `ReflectOutcome`), `hooks.post_reflect` fires with
288///     the read-only outcome. Notify-class — return value is ignored.
289///
290/// Calling `reflect_with_hooks(conn, input, &ReflectHooks::empty())`
291/// is identical to calling `reflect(conn, input)`.
292///
293/// # Errors
294///
295/// Same five [`ReflectError`] variants as [`reflect`] plus
296/// [`ReflectError::HookVeto`] when a pre_reflect handler vetoes.
297#[allow(clippy::too_many_lines)]
298pub fn reflect_with_hooks(
299    conn: &Connection,
300    input: &ReflectInput,
301    hooks: &ReflectHooks<'_>,
302) -> std::result::Result<ReflectOutcome, ReflectError> {
303    use crate::validate;
304    // ─── 1. Validate inputs ──────────────────────────────────────────
305    validate::validate_title(&input.title).map_err(|e| ReflectError::Validation(e.to_string()))?;
306    validate::validate_content(&input.content)
307        .map_err(|e| ReflectError::Validation(e.to_string()))?;
308    validate::validate_tags(&input.tags).map_err(|e| ReflectError::Validation(e.to_string()))?;
309    validate::validate_priority(input.priority)
310        .map_err(|e| ReflectError::Validation(e.to_string()))?;
311    validate::validate_confidence(input.confidence)
312        .map_err(|e| ReflectError::Validation(e.to_string()))?;
313    validate::validate_source(&input.source)
314        .map_err(|e| ReflectError::Validation(e.to_string()))?;
315    validate::validate_agent_id(&input.agent_id)
316        .map_err(|e| ReflectError::Validation(e.to_string()))?;
317    if input.source_ids.is_empty() {
318        return Err(ReflectError::Validation(
319            "source_ids cannot be empty — a reflection must reflect on at least one source memory"
320                .into(),
321        ));
322    }
323    // Each source id must be well-formed before we hit the DB; this
324    // gives the caller a clean "bad id at index N" surface for free.
325    let mut seen = std::collections::HashSet::new();
326    for (i, id) in input.source_ids.iter().enumerate() {
327        validate::validate_id(id)
328            .map_err(|e| ReflectError::Validation(format!("source_ids[{i}]: {e}")))?;
329        if !seen.insert(id.as_str()) {
330            return Err(ReflectError::Validation(format!(
331                "source_ids[{i}]: duplicate id '{id}'"
332            )));
333        }
334    }
335    if let Some(ref ns) = input.namespace {
336        validate::validate_namespace(ns).map_err(|e| ReflectError::Validation(e.to_string()))?;
337    }
338    validate::validate_metadata(&input.metadata)
339        .map_err(|e| ReflectError::Validation(e.to_string()))?;
340
341    // ─── 2. Load each source memory; bail on any missing id ─────────
342    let mut sources = Vec::with_capacity(input.source_ids.len());
343    for id in &input.source_ids {
344        match get(conn, id).map_err(|e| ReflectError::Database(e.to_string()))? {
345            Some(m) => sources.push(m),
346            None => return Err(ReflectError::SourceNotFound(id.clone())),
347        }
348    }
349
350    // ─── 3. Compute new_depth = max(source depths) + 1 ──────────────
351    let max_src_depth = sources
352        .iter()
353        .map(|m| m.reflection_depth)
354        .max()
355        .unwrap_or(0);
356    // Clamp to non-negative before adding 1 (the column is i32 but the
357    // cap is u32; pre-v0.7.0 rows landed at 0 so `max < 0` can't happen
358    // in practice, but a `.max(0)` here is cheap belt-and-braces).
359    let new_depth_i32 = max_src_depth.max(0).saturating_add(1);
360    // u32 conversion: new_depth is at most i32::MAX which fits in u32.
361    #[allow(clippy::cast_sign_loss)]
362    let new_depth_u32: u32 = new_depth_i32 as u32;
363
364    // ─── 4. Resolve target namespace + governance cap ───────────────
365    let target_namespace = match input.namespace {
366        Some(ref ns) => ns.clone(),
367        // Default to the namespace of the FIRST source memory — matches
368        // the documented MCP schema default. Operators who want a
369        // different target namespace pass it explicitly.
370        None => sources[0].namespace.clone(),
371    };
372    // Carry-forward (Task 2 note): `resolve_governance_policy` returns
373    // `None` when no level of the ancestor chain has a policy at all.
374    // Treat that as "use the compiled default" — i.e. fall back to
375    // `GovernancePolicy::default()` which has `max_reflection_depth =
376    // None` and therefore yields the compiled-in cap of 3.
377    let policy = resolve_governance_policy(conn, &target_namespace)
378        .unwrap_or_else(GovernancePolicy::default);
379    let cap = policy.effective_max_reflection_depth();
380
381    // ─── 4.5 `pre_reflect` hook (v0.7.0 Task 6/8) ──────────────────
382    //
383    // Fires BEFORE the cap check so a hook handler may VETO the
384    // reflection by returning `ReflectHookDecision::Deny`. Vetoes
385    // from `pre_reflect` are distinct from the cap refusal —
386    // caller-policy refusals (e.g. "this agent is rate-limited",
387    // "this content type is policy-restricted") rather than
388    // depth-cap refusals. The Task 5 `reflection.depth_exceeded`
389    // audit row is NOT emitted on this path; the hook handler may
390    // emit its own audit if desired.
391    if let Some(pre) = hooks.pre_reflect.as_ref() {
392        match (pre)(input) {
393            ReflectHookDecision::Allow => {}
394            ReflectHookDecision::Deny { reason, code } => {
395                return Err(ReflectError::HookVeto { reason, code });
396            }
397        }
398    }
399
400    // ─── 5. Refuse if proposed depth exceeds cap ────────────────────
401    //
402    // Task 5/8 (v0.7.0): before propagating the refusal to the caller,
403    // append a `reflection.depth_exceeded` row to `signed_events`. The
404    // audit row is the cryptographic-provenance leg of the v0.7.0 cap
405    // contract — every cap refusal becomes part of the tamper-evident
406    // audit chain so a future operator can prove that the daemon
407    // honored the cap, not just "trusted the agent didn't try".
408    //
409    // Note: audit is fired only by this cap refusal; hook vetoes
410    // (Task 6/8 `pre_reflect`) carry their own provenance via the
411    // hook's own decision record (if any), so they are deliberately
412    // NOT emitted here.
413    if new_depth_u32 > cap {
414        // v0.7.0 L2-2 — cross-peer enforcement. If any source carries a
415        // `reflection_origin.peer_origin` stamp (it was imported via
416        // federation `sync_push`), surface the originating peer in the
417        // refusal so operators see the cross-peer provenance — not just
418        // "depth exceeded". Local cap is enforced regardless of source
419        // origin (territorial sovereignty), but the message distinguishes
420        // "remote reflection at depth N, local depth limit M" from a
421        // purely local cap breach.
422        let cross_peer_refusal =
423            crate::federation::reflection_bookkeeping::enforce_local_cap_on_derived(
424                new_depth_u32,
425                cap,
426                &sources,
427            );
428        let peer_origin: Option<String> = if let Err(ref r) = cross_peer_refusal {
429            if let Some(ref peer) = r.imported_peer {
430                tracing::warn!(
431                    target: "federation::reflection_bookkeeping",
432                    peer = %peer,
433                    attempted = new_depth_u32,
434                    local_cap = cap,
435                    namespace = %target_namespace,
436                    "L2-2: refusing derived reflection: {}",
437                    r,
438                );
439            }
440            r.imported_peer.clone()
441        } else {
442            None
443        };
444        emit_reflection_depth_exceeded_audit(
445            conn,
446            &input.agent_id,
447            new_depth_u32,
448            cap,
449            &target_namespace,
450            &input.source_ids,
451            &input.title,
452            peer_origin.as_deref(),
453        );
454        return Err(ReflectError::DepthExceeded {
455            attempted: new_depth_u32,
456            cap,
457            namespace: target_namespace,
458        });
459    }
460
461    // ─── 6. Atomic insert + N links inside a single transaction ─────
462    // Build the system-generated reflection_metadata block. The caller-
463    // supplied object wins on key collisions — if `reflection_metadata`
464    // is already set, we leave it alone.
465    let now = Utc::now().to_rfc3339();
466    let mut metadata = match input.metadata.clone() {
467        serde_json::Value::Object(map) => map,
468        _ => serde_json::Map::new(),
469    };
470    // Always stamp agent_id (the resolver already validated it).
471    metadata.insert(
472        "agent_id".to_string(),
473        serde_json::Value::String(input.agent_id.clone()),
474    );
475    // Splice reflection_metadata only when the caller didn't pre-set it.
476    if !metadata.contains_key(field_names::REFLECTION_METADATA) {
477        let reflection_meta = serde_json::json!({
478            "reflected_on_source_ids": input.source_ids,
479            (field_names::REFLECTION_DEPTH): new_depth_i32,
480            "reflection_created_at": now,
481        });
482        metadata.insert(
483            field_names::REFLECTION_METADATA.to_string(),
484            reflection_meta,
485        );
486    }
487    let metadata_value = serde_json::Value::Object(metadata);
488    // Re-validate the merged metadata so an oversized splice surfaces
489    // here (vs. a confusing DB constraint error later).
490    validate::validate_metadata(&metadata_value)
491        .map_err(|e| ReflectError::Validation(e.to_string()))?;
492
493    let new_mem = Memory {
494        id: uuid::Uuid::new_v4().to_string(),
495        tier: input.tier.clone(),
496        namespace: target_namespace.clone(),
497        title: input.title.clone(),
498        content: input.content.clone(),
499        tags: input.tags.clone(),
500        priority: input.priority.clamp(1, 10),
501        confidence: input.confidence.clamp(0.0, 1.0),
502        source: input.source.clone(),
503        access_count: 0,
504        created_at: now.clone(),
505        updated_at: now.clone(),
506        last_accessed_at: None,
507        expires_at: None,
508        metadata: metadata_value,
509        reflection_depth: new_depth_i32,
510        // L1-1: reflection memories are always typed as Reflection,
511        // regardless of what the caller passes in metadata.type (the
512        // back-compat path). This is the first-class typed counterpart
513        // to the metadata.type = 'reflection' splice above.
514        memory_kind: MemoryKind::Reflection,
515        entity_id: None,
516        persona_version: None,
517        citations: Vec::new(),
518        source_uri: None,
519        source_span: None,
520        confidence_source: ConfidenceSource::CallerProvided,
521        confidence_signals: None,
522        confidence_decayed_at: None,
523        version: 1,
524    };
525
526    // Atomic boundary: insert the reflection row + N `reflects_on`
527    // links inside a single BEGIN IMMEDIATE ... COMMIT block. If any
528    // link insert fails, ROLLBACK undoes the reflection row too.
529    // Matches the `consolidate` pattern earlier in this file.
530    conn.execute_batch(super::connection::SQL_BEGIN_IMMEDIATE)
531        .map_err(|e| ReflectError::Database(e.to_string()))?;
532
533    let txn_result = (|| -> std::result::Result<String, ReflectError> {
534        // v0.7.0 fix campaign R1-M3 (#690) — substrate-side reflections
535        // must NOT silently merge into an existing (title, namespace).
536        // If a row with the same title is already present in the
537        // reflection's namespace, the caller asked us to land a
538        // duplicate; that's a deduplication risk we surface as a
539        // validation error rather than smashing the existing row.
540        let actual_id = insert_with_conflict(conn, &new_mem, ConflictMode::Error).map_err(|e| {
541            if e.downcast_ref::<crate::storage::ConflictError>().is_some() {
542                ReflectError::Validation(format!(
543                    "reflection title collides with an existing memory in the same namespace: {e}"
544                ))
545            } else {
546                ReflectError::Database(e.to_string())
547            }
548        })?;
549        // Self-link rejection lives in `validate_link`; a self-link
550        // (source id appearing in the source list) would only happen
551        // via caller error, but we still surface it as a validation
552        // failure with the txn rolled back so the reflection never
553        // lands.
554        for src_id in &input.source_ids {
555            validate::validate_link(
556                &actual_id,
557                src_id,
558                crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
559            )
560            .map_err(|e| ReflectError::Validation(e.to_string()))?;
561            // Issue #815 — the pre-#815 path called `create_link` here,
562            // which always produced `attest_level='unsigned'` rows for
563            // every reflects_on edge regardless of whether the caller
564            // had loaded a daemon keypair. Route through the signed
565            // helper instead so the keypair threaded through the
566            // hook bundle (MCP-tier handler, curator daemon) reaches
567            // the link insert and the edges land as `self_signed`
568            // with a 64-byte Ed25519 signature. Callers that pass
569            // `active_keypair: None` (the `reflect()` shim, the
570            // auto-export hook constructor's no-keypair test paths)
571            // get the previous unsigned behaviour — `create_link_signed`
572            // matches `create_link`'s output when the keypair is
573            // absent (verified by the existing
574            // `create_link_signed_without_keypair_is_unsigned` test in
575            // `src/storage/mod.rs`).
576            create_link_signed(
577                conn,
578                &actual_id,
579                src_id,
580                crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
581                hooks.active_keypair,
582            )
583            .map_err(|e| ReflectError::Database(e.to_string()))?;
584        }
585        Ok(actual_id)
586    })();
587
588    match txn_result {
589        Ok(actual_id) => {
590            conn.execute_batch(super::connection::SQL_COMMIT)
591                .map_err(|e| ReflectError::Database(e.to_string()))?;
592            let outcome = ReflectOutcome {
593                id: actual_id,
594                reflection_depth: new_depth_i32,
595                reflects_on: input.source_ids.clone(),
596                namespace: target_namespace,
597            };
598            // ─── 7. `post_reflect` hook (v0.7.0 Task 6/8) ───────────
599            //
600            // Fires AFTER the transaction commits so the hook handler
601            // can read the new reflection memory + its `reflects_on`
602            // links via the same connection. Notify-class — the
603            // return value is ignored beyond logging (post-commit
604            // events cannot veto a side-effect that already
605            // happened).
606            if let Some(post) = hooks.post_reflect.as_ref() {
607                (post)(&outcome);
608            }
609            Ok(outcome)
610        }
611        Err(e) => {
612            if let Err(rb) = conn.execute_batch(super::connection::SQL_ROLLBACK) {
613                tracing::error!("ROLLBACK failed in reflect: {}", rb);
614            }
615            Err(e)
616        }
617    }
618}
619
620/// v0.7.0 recursive-learning Task 5/8 — canonical-CBOR encoding of the
621/// `reflection.depth_exceeded` audit payload.
622///
623/// Mirrors the deterministic encoding contract used by
624/// [`crate::identity::sign::canonical_cbor`] — map keys sorted
625/// lexicographically (`BTreeMap` iteration order), `Option::None`
626/// encoded as `Null`, integers in shortest-form. The same payload
627/// hashes to the same bytes on every host so a downstream auditor can
628/// re-derive the `payload_hash` from the four structured fields below.
629///
630/// Note that we deliberately do NOT include the rejected reflection's
631/// `content` body in the payload — that would balloon the audit row
632/// (and risk leaking PII into the chain). Title + source ids is the
633/// provenance hook; the body is not the audit's job.
634///
635/// v0.7.0 L2-2 — when `peer_origin` is `Some`, the encoded payload
636/// includes a `peer_origin` field naming the federation peer that
637/// delivered the imported source memory whose depth drove the cap
638/// breach. When `None` (purely local-source refusal) the field is
639/// omitted so existing-row payload hashes are unchanged on the
640/// pre-L2-2 codepath. The conditional-inclusion-vs-`Null` distinction
641/// matters: a presence-encoded `Null` would silently mutate every
642/// pre-L2-2 hash on every host the moment L2-2 ships, even where no
643/// federation is configured.
644///
645/// # Errors
646///
647/// Returns the underlying CBOR encoder error if encoding fails — in
648/// practice unreachable for the fixed-shape input above, surfaced as
649/// a `Result` so callers don't have to choose between panicking and
650/// silently logging an incomplete payload.
651pub fn canonical_cbor_reflection_depth_exceeded(
652    agent_id: &str,
653    attempted: u32,
654    cap: u32,
655    namespace: &str,
656    source_ids: &[String],
657    proposed_title: &str,
658    created_at: &str,
659    peer_origin: Option<&str>,
660) -> anyhow::Result<Vec<u8>> {
661    use std::collections::BTreeMap;
662    let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
663    map.insert("agent_id", ciborium::Value::Text(agent_id.to_string()));
664    map.insert("attempted", ciborium::Value::Integer(attempted.into()));
665    map.insert("cap", ciborium::Value::Integer(cap.into()));
666    map.insert(
667        field_names::CREATED_AT,
668        ciborium::Value::Text(created_at.to_string()),
669    );
670    map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
671    // v0.7.0 L2-2 — conditional inclusion preserves pre-L2-2 payload
672    // hashes on the purely-local refusal path (no `peer_origin` key
673    // present at all in the encoded map). Cross-peer refusals carry the
674    // peer claim as a tamper-evident structured field.
675    if let Some(peer) = peer_origin {
676        map.insert(
677            field_names::PEER_ORIGIN,
678            ciborium::Value::Text(peer.to_string()),
679        );
680    }
681    map.insert(
682        "proposed_title",
683        ciborium::Value::Text(proposed_title.to_string()),
684    );
685    map.insert(
686        field_names::SOURCE_IDS,
687        ciborium::Value::Array(
688            source_ids
689                .iter()
690                .map(|s| ciborium::Value::Text(s.clone()))
691                .collect(),
692        ),
693    );
694    let entries: Vec<(ciborium::Value, ciborium::Value)> = map
695        .into_iter()
696        .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
697        .collect();
698    let value = ciborium::Value::Map(entries);
699    let mut out: Vec<u8> = Vec::with_capacity(256);
700    ciborium::ser::into_writer(&value, &mut out)
701        .context("CBOR encode reflection_depth_exceeded audit payload")?;
702    Ok(out)
703}
704
705/// v0.7.0 recursive-learning Task 5/8 — append a
706/// `reflection.depth_exceeded` row to `signed_events` for an in-flight
707/// cap refusal.
708///
709/// Mirrors the [`invalidate_link`] audit-emit pattern: best-effort —
710/// audit-write failure is logged via `tracing::warn!(target:
711/// "signed_events", ...)` but does NOT crater the refusal path. The
712/// refusal still propagates to the caller regardless of audit-write
713/// success, because (a) the refusal already happened and (b) crashing
714/// the legitimate caller for a substrate problem they cannot fix would
715/// be worse than a missed audit row.
716///
717/// `attest_level` is `"unsigned"` because the substrate emits this row
718/// itself (the caller did not sign it with their keypair). The
719/// `signature` column is `None`. The `payload_hash` is SHA-256 over
720/// the canonical-CBOR encoding of the structured fields, so a future
721/// auditor can re-derive the same hash from any honest source of the
722/// same fields.
723pub(crate) fn emit_reflection_depth_exceeded_audit(
724    conn: &Connection,
725    agent_id: &str,
726    attempted: u32,
727    cap: u32,
728    namespace: &str,
729    source_ids: &[String],
730    proposed_title: &str,
731    peer_origin: Option<&str>,
732) {
733    let created_at = Utc::now().to_rfc3339();
734    let cbor = match canonical_cbor_reflection_depth_exceeded(
735        agent_id,
736        attempted,
737        cap,
738        namespace,
739        source_ids,
740        proposed_title,
741        &created_at,
742        peer_origin,
743    ) {
744        Ok(b) => b,
745        Err(e) => {
746            tracing::warn!(
747                target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
748                agent_id, attempted, cap, namespace,
749                "failed to encode canonical CBOR for reflection_depth_exceeded audit: {e}"
750            );
751            return;
752        }
753    };
754    // v0.7.0 L2-2 — distinguish the audit row's `event_type` so
755    // operators (and downstream tooling) can filter the cross-peer
756    // refusal stream from the local-only stream without re-decoding
757    // the CBOR payload. The two-variant `event_type` does not change
758    // the audit-chain contract: payload_hash + signature + timestamp
759    // semantics remain identical; only the textual label differs.
760    let event_type = if peer_origin.is_some() {
761        "reflection.depth_exceeded.cross_peer"
762    } else {
763        "reflection.depth_exceeded"
764    };
765    let event = crate::signed_events::SignedEvent {
766        id: uuid::Uuid::new_v4().to_string(),
767        agent_id: agent_id.to_string(),
768        event_type: event_type.to_string(),
769        payload_hash: crate::signed_events::payload_hash(&cbor),
770        signature: None,
771        attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
772        timestamp: created_at,
773        ..crate::signed_events::SignedEvent::default()
774    };
775    if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
776        tracing::warn!(
777            target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
778            agent_id, attempted, cap, namespace,
779            "failed to append reflection_depth_exceeded audit row: {e}"
780        );
781    }
782}
783
784#[cfg(test)]
785mod l2_2_audit_tests {
786    //! v0.7.0 L2-2 — lib-level unit tests pinning the cross-peer
787    //! audit payload encoder. The end-to-end three-peer choreography
788    //! lives in `tests/federation_reflection_replication.rs`; here we
789    //! pin the structural-encoding invariants without touching the
790    //! database substrate, so the lib's `payload_hash` contract is
791    //! covered even when the integration test binary is excluded.
792
793    use super::canonical_cbor_reflection_depth_exceeded;
794
795    /// `peer_origin = None` and `peer_origin = Some(_)` MUST encode to
796    /// different byte sequences. This is the load-bearing invariant:
797    /// if both encoded identically, the audit row's payload_hash
798    /// wouldn't actually bind the cross-peer claim, and a tampered
799    /// `event_type` could orphan the structured field.
800    #[test]
801    fn peer_origin_some_vs_none_yields_distinct_bytes() {
802        let base = (
803            "ai:test",
804            3_u32,
805            2_u32,
806            "ns/l2-2",
807            vec!["src-1".to_string()],
808            "title",
809            "2026-05-13T00:00:00+00:00",
810        );
811        let local = canonical_cbor_reflection_depth_exceeded(
812            base.0, base.1, base.2, base.3, &base.4, base.5, base.6, None,
813        )
814        .expect("encode None");
815        let cross = canonical_cbor_reflection_depth_exceeded(
816            base.0,
817            base.1,
818            base.2,
819            base.3,
820            &base.4,
821            base.5,
822            base.6,
823            Some("ai:peer-x"),
824        )
825        .expect("encode Some");
826        assert_ne!(local, cross, "peer_origin claim must be byte-load-bearing");
827        // Two different peer_origin claims also yield different bytes.
828        let cross_y = canonical_cbor_reflection_depth_exceeded(
829            base.0,
830            base.1,
831            base.2,
832            base.3,
833            &base.4,
834            base.5,
835            base.6,
836            Some("ai:peer-y"),
837        )
838        .expect("encode Some(other)");
839        assert_ne!(
840            cross, cross_y,
841            "swapping the peer_origin string must change the bytes"
842        );
843    }
844
845    /// The encoder is deterministic — two encodes of the same Some
846    /// peer_origin produce the same bytes. Mirrors the
847    /// `canonical_cbor_is_deterministic_across_encodes` invariant on
848    /// the local-only encoder.
849    #[test]
850    fn cross_peer_encoding_is_deterministic() {
851        let a = canonical_cbor_reflection_depth_exceeded(
852            "ai:a",
853            7,
854            3,
855            "ns",
856            &["s1".to_string(), "s2".to_string()],
857            "t",
858            "2026-05-13T00:00:00+00:00",
859            Some("peer-A"),
860        )
861        .expect("encode 1");
862        let b = canonical_cbor_reflection_depth_exceeded(
863            "ai:a",
864            7,
865            3,
866            "ns",
867            &["s1".to_string(), "s2".to_string()],
868            "t",
869            "2026-05-13T00:00:00+00:00",
870            Some("peer-A"),
871        )
872        .expect("encode 2");
873        assert_eq!(a, b, "cross-peer encoding must be byte-stable");
874    }
875
876    /// The encoded map's key ordering is lexicographic — `peer_origin`
877    /// sorts between `namespace` and `proposed_title` in the canonical
878    /// `BTreeMap`. We can't easily reach the bytes' raw structure
879    /// without a CBOR decode dependency on this test path, so we
880    /// instead pin the observable behaviour: encoding remains
881    /// deterministic AND adding `peer_origin` only differs the bytes
882    /// (it doesn't reorder the rest of the keys to perturb hashes for
883    /// pre-existing fields). Encode twice without peer_origin, then
884    /// twice with — both pairs must be internally byte-stable.
885    #[test]
886    fn key_ordering_is_lexicographic_via_btreemap() {
887        let no_peer = canonical_cbor_reflection_depth_exceeded(
888            "ai:test",
889            4,
890            3,
891            "ns",
892            &["s1".to_string()],
893            "title",
894            "2026-05-13T00:00:00+00:00",
895            None,
896        )
897        .expect("encode none");
898        let no_peer2 = canonical_cbor_reflection_depth_exceeded(
899            "ai:test",
900            4,
901            3,
902            "ns",
903            &["s1".to_string()],
904            "title",
905            "2026-05-13T00:00:00+00:00",
906            None,
907        )
908        .expect("encode none again");
909        assert_eq!(no_peer, no_peer2);
910    }
911}