Skip to main content

ai_memory/curator/
reflection_pass.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Reflection-pass curator mode — v0.7.0 Layer 2 Task L2-1 (issue #666).
5//!
6//! Implements [`ReflectionPass`], a [`CompactionPass`] that clusters
7//! Observation memories by recall co-occurrence + namespace + temporal
8//! proximity, asks an LLM to summarise the pattern, and persists the
9//! summary as a typed Reflection memory via the substrate
10//! [`crate::storage::reflect_with_hooks`] path — so every reflection
11//! lands with a `reflects_on` edge to every source, the
12//! `metadata.reflection_metadata` block stamped, and (via the
13//! atomic-write contract) inside a single `BEGIN IMMEDIATE` /
14//! `COMMIT` transaction.
15//!
16//! # Why a fresh `CompactionPass` (and not the v0.6.x consolidate path)
17//!
18//! Consolidation collapses near-duplicate memories into a single
19//! canonical body, soft-deleting the originals. **Reflection is
20//! additive** — the sources remain readable, and the new memory is a
21//! typed `Reflection` carrying provenance edges back. That difference
22//! shows up in three places in the impl:
23//!
24//! 1. `persist()` writes via [`crate::storage::reflect`], not via
25//!    `db::consolidate`. The substrate handles the depth cap, the
26//!    `reflects_on` link insert, and the atomic boundary.
27//! 2. `eligible()` requires every cluster member to be
28//!    [`crate::models::MemoryKind::Observation`]. Reflections never
29//!    fold into a parent reflection in this pass (the L2-1 acceptance
30//!    is one level of reflection at a time; multi-level chains form
31//!    naturally across passes if `max_depth >= 2`).
32//! 3. `cluster()` uses a hybrid signal — Jaccard pre-filter +
33//!    optional cosine — but constrains pairs to memories that have
34//!    been recalled together (`access_count >= 1`) within a sliding
35//!    7-day window. This is the "recall co-occurrence" proxy
36//!    documented in #666: we cannot directly observe recall
37//!    co-occurrence without a recall-event log (out of scope here),
38//!    so we use the substrate-visible signals — `access_count`,
39//!    `last_accessed_at`, `created_at` proximity — that approximate
40//!    it within the bounds of one SQLite read.
41//!
42//! # Visibility contract (R7)
43//!
44//! All items are at most `pub(crate)`. The only externally-visible
45//! re-export is the [`ReflectionPassConfig`] struct that the CLI
46//! flag wiring (see `src/cli/curator.rs`) consumes, plus
47//! [`run_reflection_pass`] which the CLI's `--reflect` mode invokes.
48
49// The pass internals run only through the SAL-gated curator path; in a
50// non-sal build only the config/report structs are live, so relax the
51// dead-code / unused-import lints there only (sal builds enforce fully).
52#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
53
54use std::collections::HashSet;
55
56use chrono::{DateTime, Utc};
57use serde::{Deserialize, Serialize};
58
59#[cfg(feature = "sal")]
60use anyhow::Context;
61use anyhow::Result;
62
63#[cfg(feature = "sal")]
64use crate::autonomy::AutonomyLlm;
65#[cfg(feature = "sal")]
66use crate::identity::keypair::AgentKeypair;
67use crate::models::{Memory, MemoryKind, Tier};
68#[cfg(feature = "sal")]
69use crate::storage::reflect::{ReflectError, ReflectInput};
70#[cfg(feature = "sal")]
71use crate::store::{CallerContext, Filter, MemoryStore, StoreError};
72
73#[cfg(feature = "sal")]
74use super::pipeline::MemoryId;
75
76#[cfg(any(feature = "sal", test))]
77use crate::models::ConfidenceSource;
78
79/// Fetch a single memory by id through the SAL trait, mapping the
80/// `NotFound` verdict to `Ok(None)` so call sites keep the
81/// `Option`-shaped contract the pre-#1548 `db::get` free function
82/// returned. Any other [`StoreError`] propagates as an `anyhow` error.
83///
84/// The curator runs as an operator-class background sweep, so the
85/// supplied `ctx` carries `bypass_visibility` (see
86/// [`curator_caller_context`]) — preserving the pre-trait raw-connection
87/// behaviour where `db::get` applied no scope=private visibility filter.
88#[cfg(feature = "sal")]
89async fn store_get_opt(
90    store: &dyn MemoryStore,
91    ctx: &CallerContext,
92    id: &str,
93) -> Result<Option<Memory>> {
94    match store.get(ctx, id).await {
95        Ok(mem) => Ok(Some(mem)),
96        Err(StoreError::NotFound { .. }) => Ok(None),
97        Err(e) => Err(anyhow::anyhow!(e)),
98    }
99}
100
101/// Build the curator's operator-class caller context. The reflection
102/// pass is a background substrate-maintenance sweep, not a tenant-facing
103/// request, so it reads every row regardless of `metadata.scope` —
104/// exactly the posture the pre-#1548 raw-`rusqlite::Connection` path had
105/// (the legacy `db::*` free functions applied no SAL visibility filter).
106/// `agent_id` carries the curator's resolved signing identity so audit
107/// trails attribute the sweep correctly.
108#[cfg(feature = "sal")]
109fn curator_caller_context(agent_id: &str) -> CallerContext {
110    CallerContext::for_admin(agent_id)
111}
112
113/// List up to `limit` memories in `namespace` through the SAL trait.
114/// Replaces the pre-#1548 `db::list(conn, Some(ns), None, limit, 0, …)`
115/// free-function call with the backend-agnostic [`MemoryStore::list`]
116/// surface. The reflection pass only ever needs the namespace + limit
117/// dimensions of the legacy positional signature; the remaining
118/// (offset / tier / since / until / tags / agent_id) slots were always
119/// passed as `0` / `None`, so they are simply absent from [`Filter`].
120#[cfg(feature = "sal")]
121async fn store_list_namespace(
122    store: &dyn MemoryStore,
123    ctx: &CallerContext,
124    namespace: &str,
125    limit: usize,
126) -> Result<Vec<Memory>> {
127    let filter = Filter {
128        namespace: Some(namespace.to_string()),
129        limit,
130        ..Default::default()
131    };
132    store
133        .list(ctx, &filter)
134        .await
135        .map_err(|e| anyhow::anyhow!(e))
136}
137
138// ---------------------------------------------------------------------------
139// Constants — per #666 spec ("≥3 members", "7-day temporal window", …)
140// ---------------------------------------------------------------------------
141
142/// Minimum members per reflection cluster. Below this the eligibility
143/// gate refuses — a "pattern" derived from two observations is just a
144/// pair, not a generalisation.
145pub(crate) const MIN_CLUSTER_SIZE: usize = 3;
146
147/// Maximum members per reflection cluster — prevents pathological
148/// mega-merges where every observation in a namespace folds into one
149/// reflection.
150pub(crate) const MAX_CLUSTER_SIZE: usize = 12;
151
152/// Sliding window for temporal co-occurrence. Two observations within
153/// this many days of each other (by `created_at`) and both in the
154/// same namespace are candidates for clustering. 7 days matches the
155/// spec in #666 ("temporal_proximity: 7-day window").
156pub(crate) const TEMPORAL_WINDOW_DAYS: i64 = 7;
157
158/// Jaccard-keyword similarity threshold for the cheap pre-filter that
159/// gates pairs into the cluster. Looser than the consolidation
160/// threshold (0.55) because reflection looks for *related* — not
161/// near-duplicate — observations.
162pub(crate) const REFLECTION_JACCARD_THRESHOLD: f64 = 0.30;
163
164/// Minimum `access_count` for an observation to qualify as
165/// "co-recalled". Substrate proxy for the spec's "recall
166/// co-occurrence frequency" signal — without a per-recall event log
167/// we approximate via touch-count on the source row, which the recall
168/// pipeline bumps on every hit.
169pub(crate) const MIN_RECALL_COUNT: i64 = 1;
170
171// ---------------------------------------------------------------------------
172// ReflectionPassConfig — per-namespace opt-in (defaults to `enabled = false`)
173// ---------------------------------------------------------------------------
174
175/// Per-namespace configuration for the reflection pass.
176///
177/// Defaults to `enabled = false` per #666 acceptance: reflection is
178/// opt-in because (a) it depends on the Ollama LLM being available
179/// at the time the pass runs, and (b) it writes new (typed) memories
180/// to the namespace, which operators may want to gate by namespace
181/// rather than enable globally.
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183pub struct ReflectionPassConfig {
184    /// When `false` (default), the pass skips this namespace entirely.
185    #[serde(default)]
186    pub enabled: bool,
187    /// Per-namespace override of the operator-supplied `--max-depth`
188    /// flag. When `None`, the pass uses the resolved governance-policy
189    /// `max_reflection_depth` (default `3`) as its ceiling. When
190    /// `Some(N)`, the pass refuses to *propose* a reflection whose
191    /// new depth would exceed `N` (the substrate cap still applies
192    /// on top — this is a curator-side guard rail, not a substrate
193    /// override).
194    #[serde(default, skip_serializing_if = "Option::is_none")]
195    pub max_depth: Option<u32>,
196}
197
198impl Default for ReflectionPassConfig {
199    fn default() -> Self {
200        Self {
201            enabled: false,
202            max_depth: None,
203        }
204    }
205}
206
207// ---------------------------------------------------------------------------
208// ReflectionPass — SAL-backed reflection synthesis (issue #1548)
209// ---------------------------------------------------------------------------
210
211/// Compaction pass that synthesises typed `Reflection` memories from
212/// clusters of co-occurring Observations.
213///
214/// Operates over the SAL [`MemoryStore`] trait (issue #1548) so it runs
215/// against the SQLite *or* Postgres adapter; gated on the `sal` feature
216/// (the trait itself is `sal`-gated). Wired into the curator's
217/// `--reflect` CLI mode via [`run_reflection_pass`] and the
218/// `--store-url --daemon` upkeep sweep; not yet wired into the autonomy
219/// loop's per-cycle sweep (that's a v0.7.1+ pivot once the operator has
220/// had a chance to run the pass manually and vet the proposed
221/// reflections).
222#[cfg(feature = "sal")]
223pub(crate) struct ReflectionPass<'a> {
224    /// SAL store handle. Reads through [`MemoryStore::list`] /
225    /// [`MemoryStore::get_links_for_anchor`]; writes through
226    /// [`MemoryStore::reflect`]. Works against the SQLite *or* Postgres
227    /// adapter (issue #1548) — the pass is backend-agnostic.
228    pub(crate) store: &'a dyn MemoryStore,
229    /// Operator-class caller context (see [`curator_caller_context`]).
230    /// Carries the curator's `agent_id` and `bypass_visibility` so the
231    /// background sweep reads every row regardless of `metadata.scope`,
232    /// matching the pre-trait raw-connection behaviour.
233    pub(crate) ctx: CallerContext,
234    /// LLM trait object. Tests inject a deterministic stub; production
235    /// passes an `&OllamaClient` (which implements [`AutonomyLlm`]).
236    pub(crate) llm: &'a dyn AutonomyLlm,
237    /// Curator's signing keypair. Stamped into the reflection's
238    /// `metadata.agent_id` so every `reflects_on` edge is attributable
239    /// to the same Ed25519 identity. `None` only in tests that exercise
240    /// the no-keypair fallback; production callers must pass `Some(_)`.
241    pub(crate) keypair: Option<&'a AgentKeypair>,
242    /// Curator-side cap on proposed reflection depth. Belt-and-braces
243    /// guard on top of the substrate's per-namespace
244    /// `max_reflection_depth` policy: even when the substrate would
245    /// allow the write, the curator refuses if `max_depth` is set
246    /// and the proposed depth exceeds it. `None` defers entirely to
247    /// substrate policy.
248    pub(crate) max_depth: Option<u32>,
249    /// Suppress every DB write. When `true`, `persist()` returns
250    /// `Ok(())` without calling [`reflect_with_hooks`] and the
251    /// reflection memory is reported as a proposal in the
252    /// [`ReflectionPassReport`].
253    pub(crate) dry_run: bool,
254}
255
256#[cfg(feature = "sal")]
257impl<'a> ReflectionPass<'a> {
258    /// Construct a `ReflectionPass`. `keypair` is the curator's
259    /// signing identity — used for `metadata.agent_id` and for
260    /// `verify()`'s signature-trace check. `max_depth` is the optional
261    /// curator-side ceiling; `dry_run` suppresses writes.
262    pub(crate) fn new(
263        store: &'a dyn MemoryStore,
264        llm: &'a dyn AutonomyLlm,
265        keypair: Option<&'a AgentKeypair>,
266        max_depth: Option<u32>,
267        dry_run: bool,
268    ) -> Self {
269        let agent_id = keypair.map_or_else(
270            || crate::identity::sentinels::AI_CURATOR.to_string(),
271            |k| k.agent_id.clone(),
272        );
273        Self {
274            store,
275            ctx: curator_caller_context(&agent_id),
276            llm,
277            keypair,
278            max_depth,
279            dry_run,
280        }
281    }
282
283    /// Resolve the agent id stamped on every reflection this pass
284    /// writes. Falls back to `"ai:curator"` when the curator was
285    /// started without a keypair — the same fall-back the autonomy
286    /// `consolidate` path uses, kept consistent so a forensic walk of
287    /// `metadata.agent_id` finds curator-written rows under either
288    /// tag.
289    fn agent_id(&self) -> String {
290        self.keypair.map_or_else(
291            || crate::identity::sentinels::AI_CURATOR.to_string(),
292            |k| k.agent_id.clone(),
293        )
294    }
295
296    /// Human-readable pass name used in log messages and reports.
297    /// Currently exercised by the unit tests; the daemon log lines use
298    /// the report fields directly.
299    #[allow(dead_code)]
300    fn name(&self) -> &str {
301        "reflection"
302    }
303
304    /// Partition `memories` into clusters of co-occurring Observations.
305    ///
306    /// Algorithm (one pass per namespace):
307    ///
308    /// 1. Filter to typed `Observation` memories with `access_count >=
309    ///    MIN_RECALL_COUNT` — substrate proxy for "has been recalled
310    ///    recently enough to count as live".
311    /// 2. Within each namespace, walk pairs and seed a cluster when
312    ///    both: (a) the temporal distance between `created_at` is
313    ///    within [`TEMPORAL_WINDOW_DAYS`], and (b) the Jaccard
314    ///    similarity of contents is ≥ [`REFLECTION_JACCARD_THRESHOLD`].
315    /// 3. Cap each cluster at [`MAX_CLUSTER_SIZE`].
316    /// 4. Discard clusters below [`MIN_CLUSTER_SIZE`] (eligibility
317    ///    enforces this too, but discarding here keeps the API tight).
318    fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
319        let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> =
320            std::collections::HashMap::new();
321        for m in memories {
322            if !is_clusterable_observation(m) {
323                continue;
324            }
325            by_ns.entry(&m.namespace).or_default().push(m);
326        }
327
328        let mut clusters: Vec<Vec<MemoryId>> = Vec::new();
329        for (_ns, group) in by_ns {
330            let mut used = vec![false; group.len()];
331            for i in 0..group.len() {
332                if used[i] {
333                    continue;
334                }
335                let mut cluster = vec![group[i].id.clone()];
336                used[i] = true;
337                for j in (i + 1)..group.len() {
338                    if used[j] {
339                        continue;
340                    }
341                    if cluster.len() >= MAX_CLUSTER_SIZE {
342                        break;
343                    }
344                    if pair_co_occurs(group[i], group[j]) {
345                        cluster.push(group[j].id.clone());
346                        used[j] = true;
347                    }
348                }
349                if cluster.len() >= MIN_CLUSTER_SIZE {
350                    clusters.push(cluster);
351                }
352            }
353        }
354        clusters
355    }
356
357    /// Secondary eligibility gate.
358    ///
359    /// A cluster passes when:
360    ///
361    /// * It has ≥ [`MIN_CLUSTER_SIZE`] and ≤ [`MAX_CLUSTER_SIZE`] members.
362    /// * Every member is `MemoryKind::Observation` — reflections that
363    ///   carry meta-pattern depth should be folded by a separate
364    ///   higher-depth pass, not this one.
365    /// * All members share the same (non-reserved) namespace.
366    /// * Every member is not soft-deleted (the substrate `list` call
367    ///   excludes soft-deleted rows but defensive recheck cheap).
368    fn eligible(&self, cluster: &[Memory]) -> bool {
369        if cluster.len() < MIN_CLUSTER_SIZE || cluster.len() > MAX_CLUSTER_SIZE {
370            return false;
371        }
372        let ns = &cluster[0].namespace;
373        if ns.starts_with('_') {
374            return false;
375        }
376        cluster.iter().all(|m| {
377            m.memory_kind == MemoryKind::Observation
378                && &m.namespace == ns
379                && m.access_count >= MIN_RECALL_COUNT
380        })
381    }
382
383    /// LLM-summarise the cluster into a single proposed Reflection
384    /// memory. Does NOT touch the database — the returned `Memory`
385    /// is a *proposal* that `persist()` (or the dry-run reporter)
386    /// consumes.
387    ///
388    /// The proposal carries:
389    ///
390    /// * Title prefixed with `[reflection]` so an operator inspecting
391    ///   the namespace immediately sees the synthetic origin.
392    /// * `memory_kind = Reflection`. The substrate `reflect` path
393    ///   will set this anyway; we set it here so the in-memory
394    ///   proposal is internally consistent.
395    /// * `reflection_depth` left at 0 — the substrate computes the
396    ///   real depth (`max(source.reflection_depth) + 1`) on insert.
397    /// * Tier = max of source tiers (never downgrade).
398    /// * Priority = max of source priorities (the reflection inherits
399    ///   the salience of its highest-priority source).
400    fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
401        if cluster.len() < MIN_CLUSTER_SIZE {
402            anyhow::bail!(
403                "summarize: cluster has {} members (< MIN_CLUSTER_SIZE = {})",
404                cluster.len(),
405                MIN_CLUSTER_SIZE
406            );
407        }
408
409        let input: Vec<(String, String)> = cluster
410            .iter()
411            .map(|m| (m.title.clone(), m.content.clone()))
412            .collect();
413        let summary_text = self
414            .llm
415            .summarize_memories(&input)
416            .context("ReflectionPass::summarize: LLM call failed")?;
417
418        let base_title = cluster
419            .iter()
420            .map(|m| m.title.as_str())
421            .next()
422            .unwrap_or("(reflection)");
423        let title = format!("[reflection] {base_title}");
424
425        let tier = cluster
426            .iter()
427            .map(|m| m.tier.clone())
428            .max_by_key(tier_rank)
429            .unwrap_or(Tier::Mid);
430        let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
431
432        let now = Utc::now().to_rfc3339();
433        Ok(Memory {
434            id: uuid::Uuid::new_v4().to_string(),
435            tier,
436            namespace: cluster[0].namespace.clone(),
437            title,
438            content: summary_text,
439            tags: vec![],
440            priority,
441            confidence: 1.0,
442            // Substrate `validate_source` accepts a closed set; "system"
443            // is the curator's canonical entry-point for autonomous
444            // writes (see consolidation pass, autonomy passes).
445            source: "system".to_string(),
446            access_count: 0,
447            created_at: now.clone(),
448            updated_at: now,
449            last_accessed_at: None,
450            expires_at: None,
451            metadata: serde_json::json!({}),
452            reflection_depth: 0,
453            memory_kind: MemoryKind::Reflection,
454            entity_id: None,
455            persona_version: None,
456            citations: Vec::new(),
457            source_uri: None,
458            source_span: None,
459            confidence_source: ConfidenceSource::CallerProvided,
460            confidence_signals: None,
461            confidence_decayed_at: None,
462            version: 1,
463        })
464    }
465
466    /// Persist `summary` plus a `reflects_on` link to each id in
467    /// `sources`, via the substrate [`MemoryStore::reflect`] path.
468    ///
469    /// The substrate enforces (a) input validation, (b) depth-cap
470    /// refusal with audit row, (c) transactional atomicity (all
471    /// `reflects_on` links land or none do), (d) no-cycle guarantee
472    /// inherited from L1-2's `reflects_on` invariant. The SQLite adapter
473    /// routes this through `storage::reflect_with_hooks`; the Postgres
474    /// adapter through its native sqlx `reflect` — both honour the same
475    /// [`ReflectInput`] contract (issue #1548).
476    ///
477    /// No-op when `self.dry_run = true`.
478    async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
479        if self.dry_run || sources.is_empty() {
480            return Ok(());
481        }
482
483        // Curator-side max-depth guard. The substrate enforces the
484        // namespace policy cap on top — this is the operator-supplied
485        // belt-and-braces. We need to know the proposed depth before
486        // calling reflect(); pre-compute it the same way the substrate
487        // does (max source depth + 1) so the curator can refuse
488        // *before* burning an LLM round-trip in the next cycle.
489        if let Some(cap) = self.max_depth {
490            let mut max_src_depth: i32 = 0;
491            for id in sources {
492                if let Some(m) = store_get_opt(self.store, &self.ctx, id).await? {
493                    max_src_depth = max_src_depth.max(m.reflection_depth);
494                }
495            }
496            let new_depth =
497                u32::try_from(max_src_depth.max(0).saturating_add(1)).unwrap_or(u32::MAX);
498            if new_depth > cap {
499                anyhow::bail!(
500                    "ReflectionPass::persist: proposed depth {new_depth} exceeds \
501                     curator --max-depth {cap}"
502                );
503            }
504        }
505
506        let input = ReflectInput {
507            source_ids: sources.to_vec(),
508            title: summary.title.clone(),
509            content: summary.content.clone(),
510            namespace: Some(summary.namespace.clone()),
511            tier: summary.tier.clone(),
512            tags: summary.tags.clone(),
513            priority: summary.priority,
514            confidence: summary.confidence,
515            source: summary.source.clone(),
516            agent_id: self.agent_id(),
517            metadata: summary.metadata.clone(),
518        };
519
520        // Issue #815 — thread the curator's signing keypair into the
521        // reflect call so the substrate's signed-link path produces
522        // `attest_level='self_signed'` rows for every `reflects_on` edge
523        // this pass writes. The SAL `reflect(ctx, input, signing_key)`
524        // surface folds the keypair into the adapter's hook bundle (the
525        // SQLite adapter sets `ReflectHooks::active_keypair`; Postgres
526        // signs natively), so the wire shape is byte-identical across
527        // backends.
528        match self.store.reflect(&self.ctx, &input, self.keypair).await {
529            Ok(_outcome) => Ok(()),
530            Err(ReflectError::DepthExceeded {
531                attempted,
532                cap,
533                namespace,
534            }) => {
535                anyhow::bail!(
536                    "ReflectionPass::persist: substrate refused — proposed depth \
537                     {attempted} exceeds namespace cap {cap} in '{namespace}'"
538                )
539            }
540            Err(other) => Err(anyhow::anyhow!(other.to_string())),
541        }
542    }
543
544    /// Verify that the persisted reflection identified by `summary_id`
545    /// is readable, typed as Reflection, and that every `reflects_on`
546    /// edge points at an existing source.
547    ///
548    /// **Signature trace.** We deliberately do NOT call into
549    /// `identity::verify::verify_link` here — H2 link signing fills the
550    /// `signature` BLOB column on outbound writes, and `db::create_link`
551    /// (used by `storage::reflect`) goes through that path when the
552    /// daemon's keypair is wired in. The verify check here confirms
553    /// (a) the edge exists, (b) the target memory is alive, (c) the
554    /// `relation` is exactly `reflects_on`. Cryptographic signature
555    /// re-verification belongs at the federation `sync_push` boundary,
556    /// not the curator's verify step (the curator wrote the row
557    /// itself, so it trivially trusts its own signature).
558    ///
559    /// Exercised by the unit tests; the run driver uses the inlined
560    /// [`verify_recent`] check on the live driver path.
561    #[allow(dead_code)]
562    async fn verify(&self, summary_id: MemoryId) -> Result<()> {
563        let mem = store_get_opt(self.store, &self.ctx, &summary_id)
564            .await
565            .context("ReflectionPass::verify: store.get failed")?;
566        let mem = mem
567            .ok_or_else(|| anyhow::anyhow!("verify: reflection {} not found in DB", summary_id))?;
568        if mem.memory_kind != MemoryKind::Reflection {
569            anyhow::bail!(
570                "verify: memory {} is {:?}, expected Reflection",
571                summary_id,
572                mem.memory_kind
573            );
574        }
575
576        let links = self
577            .store
578            .get_links_for_anchor(&summary_id)
579            .await
580            .map_err(|e| anyhow::anyhow!(e))
581            .context("ReflectionPass::verify: store.get_links_for_anchor failed")?;
582        let mut saw_reflects_on = false;
583        for link in &links {
584            // Only check outbound `reflects_on` edges originated at this
585            // reflection. Inbound edges (other memories that reflect on
586            // ours) are not in this pass's scope.
587            if link.source_id != summary_id {
588                continue;
589            }
590            if link.relation != crate::models::MemoryLinkRelation::ReflectsOn {
591                continue;
592            }
593            saw_reflects_on = true;
594            // Confirm the target exists. Soft-deleted sources are still
595            // returned by store.get because the row is preserved; this is
596            // the same contract `storage::reflect` relies on.
597            let target = store_get_opt(self.store, &self.ctx, &link.target_id).await?;
598            if target.is_none() {
599                anyhow::bail!(
600                    "verify: reflects_on edge target {} not found",
601                    link.target_id
602                );
603            }
604        }
605        if !saw_reflects_on {
606            anyhow::bail!("verify: reflection {} has no reflects_on edge", summary_id);
607        }
608        Ok(())
609    }
610}
611
612// ---------------------------------------------------------------------------
613// Report + run helpers (consumed by CLI --reflect)
614// ---------------------------------------------------------------------------
615
616/// Structured per-namespace outcome of a single reflection-pass
617/// invocation.  Aggregated across namespaces by [`run_reflection_pass`].
618#[derive(Debug, Clone, Default, Serialize, Deserialize)]
619pub struct ReflectionPassReport {
620    /// RFC3339 timestamps; populated by `run_reflection_pass`.
621    pub started_at: String,
622    pub completed_at: String,
623    /// Number of namespaces visited (`--all-namespaces`) or `1`
624    /// when a single `--namespace` was supplied.
625    pub namespaces_visited: usize,
626    /// Eligible candidate Observations scanned across all visited
627    /// namespaces.
628    pub observations_scanned: usize,
629    /// Number of clusters formed (pre-eligibility).
630    pub clusters_formed: usize,
631    /// Number of clusters that survived the eligibility gate.
632    pub clusters_eligible: usize,
633    /// Number of reflections successfully persisted. Always `0` when
634    /// `dry_run = true`.
635    pub reflections_persisted: usize,
636    /// Number of refused-by-depth-cap clusters (substrate refusal or
637    /// curator `--max-depth` guard).
638    pub depth_refusals: usize,
639    /// LLM call failures, persist errors, and verify errors that
640    /// did NOT abort the pass.
641    pub errors: Vec<String>,
642    /// Dry-run proposals — populated when `dry_run = true`, empty
643    /// otherwise. Each entry is `(namespace, proposed_title,
644    /// source_ids)`.
645    #[serde(default)]
646    pub dry_run_proposals: Vec<DryRunProposal>,
647    /// `true` if the pass was a dry-run.
648    pub dry_run: bool,
649}
650
651/// Compact description of a proposed reflection when the pass runs
652/// in `--dry-run` mode. Re-serialised into the CLI's JSON output so
653/// operators can inspect proposed clusters before committing.
654#[derive(Debug, Clone, Serialize, Deserialize)]
655pub struct DryRunProposal {
656    pub namespace: String,
657    pub proposed_title: String,
658    pub source_ids: Vec<String>,
659}
660
661/// Drive a single reflection-pass invocation over `namespace` (when
662/// `Some`) or every observable namespace (when `None`).
663///
664/// `enabled_check` is the operator-supplied predicate that consults
665/// the per-namespace [`ReflectionPassConfig::enabled`] flag. When the
666/// flag is `false` for a given namespace the pass skips it entirely
667/// and records nothing in the report.
668///
669/// This is the CLI entry-point — `src/cli/curator.rs` calls into it
670/// when the operator passes `--reflect`. The autonomy daemon's
671/// per-cycle sweep does NOT call this today (manual-only for v0.7.0
672/// per #666 acceptance).
673#[cfg(feature = "sal")]
674pub async fn run_reflection_pass(
675    store: &dyn MemoryStore,
676    llm: &dyn AutonomyLlm,
677    keypair: Option<&AgentKeypair>,
678    namespace: Option<&str>,
679    max_depth: Option<u32>,
680    dry_run: bool,
681    enabled_check: impl Fn(&str) -> bool,
682) -> Result<ReflectionPassReport> {
683    let mut report = ReflectionPassReport {
684        started_at: Utc::now().to_rfc3339(),
685        dry_run,
686        ..Default::default()
687    };
688
689    let pass = ReflectionPass::new(store, llm, keypair, max_depth, dry_run);
690
691    let namespaces: Vec<String> = match namespace {
692        Some(ns) => vec![ns.to_string()],
693        None => {
694            // Enumerate every namespace with at least one row, then
695            // filter via the operator's enabled_check at the call site.
696            let counts = store
697                .list_namespaces()
698                .await
699                .map_err(|e| anyhow::anyhow!(e))
700                .context("run_reflection_pass: list_namespaces failed")?;
701            counts
702                .into_iter()
703                .map(|nc| nc.namespace)
704                .filter(|ns| !ns.starts_with('_'))
705                .collect()
706        }
707    };
708    report.namespaces_visited = namespaces.len();
709
710    for ns in &namespaces {
711        if !enabled_check(ns) {
712            continue;
713        }
714
715        // Pull candidate Observations from this namespace. Cap at
716        // MAX_CLUSTER_SIZE * 16 so a runaway namespace doesn't OOM the
717        // pass; the per-namespace load is bounded by the curator's
718        // existing batch contract.
719        let candidates = match store_list_namespace(
720            store,
721            &pass.ctx,
722            ns.as_str(),
723            MAX_CLUSTER_SIZE * 16,
724        )
725        .await
726        {
727            Ok(v) => v,
728            Err(e) => {
729                report
730                    .errors
731                    .push(format!("namespace '{ns}': store.list failed: {e}"));
732                continue;
733            }
734        };
735        let scanned_here = candidates.len();
736        report.observations_scanned += scanned_here;
737
738        // Stage 1 — cluster.
739        let clusters = pass.cluster(&candidates);
740        report.clusters_formed += clusters.len();
741
742        for cluster_ids in clusters {
743            // Resolve cluster ids back to Memory for eligibility check.
744            let mut cluster: Vec<Memory> = cluster_ids
745                .iter()
746                .filter_map(|id| candidates.iter().find(|m| &m.id == id).cloned())
747                .collect();
748
749            if !pass.eligible(&cluster) {
750                continue;
751            }
752            report.clusters_eligible += 1;
753
754            // Deterministic ordering so the produced reflection ids are
755            // stable across re-runs on the same input (helps debugging).
756            cluster.sort_by(|a, b| a.id.cmp(&b.id));
757
758            let summary = match pass.summarize(&cluster) {
759                Ok(s) => s,
760                Err(e) => {
761                    report
762                        .errors
763                        .push(format!("namespace '{ns}': summarize failed: {e}"));
764                    continue;
765                }
766            };
767
768            let source_ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
769
770            if dry_run {
771                report.dry_run_proposals.push(DryRunProposal {
772                    namespace: ns.clone(),
773                    proposed_title: summary.title.clone(),
774                    source_ids: source_ids.clone(),
775                });
776                continue;
777            }
778
779            match pass.persist(&summary, &source_ids).await {
780                Ok(()) => {
781                    report.reflections_persisted += 1;
782                    // Best-effort verify on the most recent reflection
783                    // in this namespace. We re-derive the id by listing
784                    // the namespace and finding the freshest Reflection
785                    // whose `reflects_on` ids match our cluster.
786                    if let Err(e) = verify_recent(store, &pass.ctx, ns, &source_ids).await {
787                        report
788                            .errors
789                            .push(format!("namespace '{ns}': verify failed: {e}"));
790                    }
791                }
792                Err(e) => {
793                    let msg = e.to_string();
794                    if msg.contains("exceeds") && msg.contains("depth") {
795                        report.depth_refusals += 1;
796                    } else {
797                        report
798                            .errors
799                            .push(format!("namespace '{ns}': persist failed: {e}"));
800                    }
801                }
802            }
803        }
804    }
805
806    report.completed_at = Utc::now().to_rfc3339();
807    Ok(report)
808}
809
810/// Best-effort verify helper used by [`run_reflection_pass`]. Looks up
811/// the most-recent Reflection in `namespace` and confirms its outbound
812/// `reflects_on` edges cover exactly the supplied `source_ids`.
813#[cfg(feature = "sal")]
814async fn verify_recent(
815    store: &dyn MemoryStore,
816    ctx: &CallerContext,
817    namespace: &str,
818    source_ids: &[String],
819) -> Result<()> {
820    let candidates = store_list_namespace(store, ctx, namespace, 16)
821        .await
822        .context("verify_recent: store.list failed")?;
823    let target_set: HashSet<&str> = source_ids.iter().map(String::as_str).collect();
824    for cand in candidates
825        .iter()
826        .filter(|m| m.memory_kind == MemoryKind::Reflection)
827    {
828        let links = store
829            .get_links_for_anchor(&cand.id)
830            .await
831            .map_err(|e| anyhow::anyhow!(e))?;
832        let outbound: HashSet<&str> = links
833            .iter()
834            .filter(|l| {
835                l.source_id == cand.id
836                    && l.relation == crate::models::MemoryLinkRelation::ReflectsOn
837            })
838            .map(|l| l.target_id.as_str())
839            .collect();
840        if outbound == target_set {
841            // Round-trip the verify step against this reflection.
842            // Reuse the trait method so the verification path is
843            // identical to what the pass would do on the standalone
844            // run.
845            // We don't have a `ReflectionPass` here so we inline the
846            // same checks via the link walk we already did.
847            return Ok(());
848        }
849    }
850    anyhow::bail!(
851        "verify_recent: no Reflection in namespace '{namespace}' carries the \
852         expected reflects_on edge set"
853    )
854}
855
856// ---------------------------------------------------------------------------
857// Internal helpers
858// ---------------------------------------------------------------------------
859
860fn tier_rank(t: &Tier) -> u8 {
861    match t {
862        Tier::Short => 0,
863        Tier::Mid => 1,
864        Tier::Long => 2,
865    }
866}
867
868/// Returns `true` when `m` is a clusterable observation: typed as
869/// Observation, not in an internal namespace, and recalled at least
870/// `MIN_RECALL_COUNT` times. The recall threshold is the substrate
871/// proxy for the spec's "recall co-occurrence frequency" signal.
872fn is_clusterable_observation(m: &Memory) -> bool {
873    m.memory_kind == MemoryKind::Observation
874        && !m.namespace.starts_with('_')
875        && m.access_count >= MIN_RECALL_COUNT
876}
877
878/// Returns `true` when two observations co-occur enough to seed a
879/// reflection cluster: same namespace, created within
880/// [`TEMPORAL_WINDOW_DAYS`] of each other, Jaccard ≥
881/// [`REFLECTION_JACCARD_THRESHOLD`].
882fn pair_co_occurs(a: &Memory, b: &Memory) -> bool {
883    if a.namespace != b.namespace {
884        return false;
885    }
886    if let (Some(ta), Some(tb)) = (parse_rfc3339(&a.created_at), parse_rfc3339(&b.created_at)) {
887        let delta = (ta - tb).num_days().abs();
888        if delta > TEMPORAL_WINDOW_DAYS {
889            return false;
890        }
891    }
892    jaccard_similarity(&a.content, &b.content) >= REFLECTION_JACCARD_THRESHOLD
893}
894
895/// Parse an RFC3339 timestamp into a `DateTime<Utc>`. Returns `None`
896/// on parse failure (the caller treats that as "no temporal signal"
897/// and lets the Jaccard step decide).
898fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
899    DateTime::parse_from_rfc3339(s)
900        .ok()
901        .map(|d| d.with_timezone(&Utc))
902}
903
904/// Jaccard similarity over alphanumeric tokens of length ≥ 3,
905/// lowercased. Mirror of the helper used by `consolidate` —
906/// duplicated here so the reflection pass has zero runtime
907/// dependency on the consolidate module ordering.
908fn jaccard_similarity(a: &str, b: &str) -> f64 {
909    let tokens = |s: &str| -> HashSet<String> {
910        s.split(|c: char| !c.is_alphanumeric())
911            .filter(|t| t.len() >= 3)
912            .map(str::to_lowercase)
913            .collect()
914    };
915    let ta = tokens(a);
916    let tb = tokens(b);
917    if ta.is_empty() && tb.is_empty() {
918        return 0.0;
919    }
920    let inter = ta.intersection(&tb).count();
921    let union = ta.union(&tb).count();
922    if union == 0 {
923        0.0
924    } else {
925        #[allow(clippy::cast_precision_loss)]
926        let result = inter as f64 / union as f64;
927        result
928    }
929}
930
931/// Compute the upper bound on the window duration in seconds.
932/// Exposed for test assertions; not used outside this module.
933#[cfg(test)]
934pub(crate) fn temporal_window_seconds() -> i64 {
935    chrono::Duration::days(TEMPORAL_WINDOW_DAYS).num_seconds()
936}
937
938// ---------------------------------------------------------------------------
939// Unit tests
940// ---------------------------------------------------------------------------
941
942// The reflection-pass suite exercises the SAL-gated `ReflectionPass` over a
943// `StubLlm: AutonomyLlm`; both are sal-only, so the whole module is gated
944// (non-sal builds have no ReflectionPass to test).
945#[cfg(all(test, feature = "sal"))]
946mod tests {
947    use super::*;
948    use crate::models::{Memory, MemoryKind, Tier};
949    use anyhow::Result;
950    use chrono::Duration;
951    use std::sync::Mutex;
952
953    // ---- Deterministic stub LLM ------------------------------------------
954
955    /// Deterministic stub for `AutonomyLlm`. Tests use this in place of
956    /// the production `OllamaClient` so the reflection-pass suite never
957    /// touches the network. The stub records every prompt it receives
958    /// so per-test assertions can pin "summarize was called for cluster
959    /// N" without inspecting log output.
960    pub(super) struct StubLlm {
961        pub(super) summary: String,
962        pub(super) calls: Mutex<Vec<String>>,
963    }
964
965    impl StubLlm {
966        pub(super) fn new(summary: &str) -> Self {
967            Self {
968                summary: summary.to_string(),
969                calls: Mutex::new(Vec::new()),
970            }
971        }
972    }
973
974    impl AutonomyLlm for StubLlm {
975        fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
976            Ok(vec![])
977        }
978        fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
979            Ok(false)
980        }
981        fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
982            self.calls
983                .lock()
984                .unwrap()
985                .push(format!("summarize:{}", memories.len()));
986            Ok(self.summary.clone())
987        }
988    }
989
990    // ---- Memory factory --------------------------------------------------
991
992    fn make_obs(id: &str, ns: &str, title: &str, content: &str, access: i64) -> Memory {
993        let now = Utc::now().to_rfc3339();
994        Memory {
995            id: id.to_string(),
996            tier: Tier::Long,
997            namespace: ns.to_string(),
998            title: title.to_string(),
999            content: content.to_string(),
1000            tags: vec![],
1001            priority: 5,
1002            confidence: 1.0,
1003            source: "test".to_string(),
1004            access_count: access,
1005            created_at: now.clone(),
1006            updated_at: now,
1007            last_accessed_at: None,
1008            expires_at: None,
1009            metadata: serde_json::json!({}),
1010            reflection_depth: 0,
1011            memory_kind: MemoryKind::Observation,
1012            entity_id: None,
1013            persona_version: None,
1014            citations: Vec::new(),
1015            source_uri: None,
1016            source_span: None,
1017            confidence_source: ConfidenceSource::CallerProvided,
1018            confidence_signals: None,
1019            confidence_decayed_at: None,
1020            version: 1,
1021        }
1022    }
1023
1024    // ---- Eligibility -----------------------------------------------------
1025
1026    #[test]
1027    fn eligible_rejects_below_min_cluster_size() {
1028        // The cluster-size invariant is checked at the eligibility gate
1029        // — pure data check, no DB / LLM dependency.
1030        let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1031            .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1032            .collect();
1033        let result = cluster.len() >= MIN_CLUSTER_SIZE
1034            && cluster.len() <= MAX_CLUSTER_SIZE
1035            && !cluster[0].namespace.starts_with('_')
1036            && cluster.iter().all(|m| {
1037                m.memory_kind == MemoryKind::Observation
1038                    && m.namespace == cluster[0].namespace
1039                    && m.access_count >= MIN_RECALL_COUNT
1040            });
1041        assert!(!result, "below-MIN cluster must not be eligible");
1042    }
1043
1044    #[test]
1045    fn eligible_rejects_reflection_kind_member() {
1046        // Reflection-on-reflection chains form across passes (sequential
1047        // runs at depth=1 → depth=2). A single-pass cluster that already
1048        // contains a typed Reflection must NOT be eligible — that's the
1049        // job of a follow-up pass, not this one.
1050        let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1051            .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1052            .collect();
1053        cluster[0].memory_kind = MemoryKind::Reflection;
1054        let result = cluster
1055            .iter()
1056            .all(|m| m.memory_kind == MemoryKind::Observation);
1057        assert!(!result, "mixed-kind cluster must not be eligible");
1058    }
1059
1060    #[test]
1061    fn eligible_rejects_internal_namespace() {
1062        let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1063            .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "kubernetes deploy", 1))
1064            .collect();
1065        let result = !cluster[0].namespace.starts_with('_');
1066        assert!(!result, "internal-namespace cluster must not be eligible");
1067    }
1068
1069    // ---- Clustering ------------------------------------------------------
1070
1071    #[test]
1072    fn cluster_groups_three_co_occurring_observations() {
1073        // Three observations in the same namespace, all with shared
1074        // Jaccard tokens, access_count >= 1 → form a single cluster.
1075        let m1 = make_obs("a", "ns", "t1", "kubernetes rolling deploy strategy", 2);
1076        let m2 = make_obs("b", "ns", "t2", "kubernetes deploy canary strategy", 3);
1077        let m3 = make_obs("c", "ns", "t3", "kubernetes rolling deploy approach", 1);
1078
1079        // We can't construct a real ReflectionPass without a Connection,
1080        // so test the cluster() logic via the standalone pair_co_occurs
1081        // helper plus a manual seeded walk.
1082        let obs = [m1.clone(), m2.clone(), m3.clone()];
1083        let pairs = [
1084            pair_co_occurs(&m1, &m2),
1085            pair_co_occurs(&m1, &m3),
1086            pair_co_occurs(&m2, &m3),
1087        ];
1088        assert!(
1089            pairs.iter().all(|p| *p),
1090            "all three pairs must co-occur, got {pairs:?}"
1091        );
1092        assert_eq!(obs.len(), MIN_CLUSTER_SIZE);
1093    }
1094
1095    #[test]
1096    fn cluster_skips_observations_with_zero_access_count() {
1097        // access_count = 0 → not clusterable. This is the substrate
1098        // proxy for "no recall co-occurrence signal".
1099        let cold = make_obs("cold", "ns", "t", "kubernetes deploy", 0);
1100        assert!(!is_clusterable_observation(&cold));
1101    }
1102
1103    #[test]
1104    fn pair_co_occurs_rejects_cross_namespace() {
1105        let a = make_obs("a", "ns1", "t", "shared content tokens", 1);
1106        let b = make_obs("b", "ns2", "t", "shared content tokens", 1);
1107        assert!(!pair_co_occurs(&a, &b));
1108    }
1109
1110    #[test]
1111    fn pair_co_occurs_respects_temporal_window() {
1112        // Build two memories whose created_at straddle the 7-day
1113        // window. The pair must NOT co-occur.
1114        let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1115        let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1116        let now = Utc::now();
1117        a.created_at = now.to_rfc3339();
1118        b.created_at = (now - Duration::days(TEMPORAL_WINDOW_DAYS + 2)).to_rfc3339();
1119        assert!(
1120            !pair_co_occurs(&a, &b),
1121            "outside-window pair must not co-occur"
1122        );
1123    }
1124
1125    #[test]
1126    fn pair_co_occurs_below_jaccard_threshold_is_false() {
1127        let a = make_obs("a", "ns", "t", "kubernetes deploy strategy", 1);
1128        let b = make_obs(
1129            "b",
1130            "ns",
1131            "t",
1132            "completely unrelated quantum mechanics text",
1133            1,
1134        );
1135        assert!(!pair_co_occurs(&a, &b));
1136    }
1137
1138    // ---- Helpers ---------------------------------------------------------
1139
1140    #[test]
1141    fn jaccard_similarity_is_symmetric() {
1142        let a = "kubernetes rolling deploy canary";
1143        let b = "kubernetes canary rolling deploy strategy";
1144        let sim_ab = jaccard_similarity(a, b);
1145        let sim_ba = jaccard_similarity(b, a);
1146        assert!((sim_ab - sim_ba).abs() < 1e-9);
1147    }
1148
1149    #[test]
1150    fn jaccard_similarity_empty_strings_zero() {
1151        assert_eq!(jaccard_similarity("", ""), 0.0);
1152    }
1153
1154    #[test]
1155    fn temporal_window_is_7_days() {
1156        // 7 * 24 * 3600 = 604_800 seconds (named as
1157        // `crate::SECS_PER_WEEK` post PR #1174 PR3 — pm-v3.1).
1158        assert_eq!(temporal_window_seconds(), crate::SECS_PER_WEEK);
1159    }
1160
1161    #[test]
1162    fn config_default_is_disabled() {
1163        // Per spec acceptance — operators must opt in per namespace.
1164        let cfg = ReflectionPassConfig::default();
1165        assert!(!cfg.enabled);
1166        assert!(cfg.max_depth.is_none());
1167    }
1168
1169    #[test]
1170    fn config_round_trips_json() {
1171        let cfg = ReflectionPassConfig {
1172            enabled: true,
1173            max_depth: Some(2),
1174        };
1175        let json = serde_json::to_string(&cfg).unwrap();
1176        let back: ReflectionPassConfig = serde_json::from_str(&json).unwrap();
1177        assert_eq!(cfg, back);
1178    }
1179
1180    // ---- Stub LLM contract ----------------------------------------------
1181
1182    #[test]
1183    fn stub_llm_records_calls() {
1184        let stub = StubLlm::new("synthesised pattern");
1185        let out = stub
1186            .summarize_memories(&[("t1".into(), "c1".into()), ("t2".into(), "c2".into())])
1187            .unwrap();
1188        assert_eq!(out, "synthesised pattern");
1189        let calls = stub.calls.lock().unwrap();
1190        assert_eq!(calls.len(), 1);
1191        assert!(calls[0].starts_with("summarize:"));
1192    }
1193
1194    // ---- Report serialisation -------------------------------------------
1195
1196    #[test]
1197    fn report_serialises_to_json() {
1198        let r = ReflectionPassReport {
1199            started_at: "2026-01-01T00:00:00Z".into(),
1200            completed_at: "2026-01-01T00:00:01Z".into(),
1201            namespaces_visited: 1,
1202            observations_scanned: 30,
1203            clusters_formed: 3,
1204            clusters_eligible: 3,
1205            reflections_persisted: 3,
1206            depth_refusals: 0,
1207            errors: vec![],
1208            dry_run_proposals: vec![],
1209            dry_run: false,
1210        };
1211        let json = serde_json::to_string(&r).unwrap();
1212        assert!(json.contains("reflections_persisted"));
1213        assert!(json.contains("clusters_eligible"));
1214        let back: ReflectionPassReport = serde_json::from_str(&json).unwrap();
1215        assert_eq!(back.observations_scanned, 30);
1216    }
1217
1218    // ---- ReflectionPass trait coverage (with real DB) ---------------------
1219    //
1220    // Issue #1548 — the reflection pass now operates over the SAL
1221    // `MemoryStore` trait rather than a raw `rusqlite::Connection`. The
1222    // trait is `sal`-gated, so this whole block is too. Tests open a
1223    // `SqliteStore` (which thin-wraps `crate::db::open`) and pass
1224    // `&store` into the pass. Seeding + assertions still go through the
1225    // `crate::db::*` free functions over a raw connection opened at the
1226    // same path (`conn_of`) so the test bodies stay close to the
1227    // pre-trait shape — exercising the SAME underlying SQLite file the
1228    // store reads. The pass methods are now `async`, so the trait-driving
1229    // tests are `#[tokio::test]`.
1230    #[cfg(feature = "sal")]
1231    mod sal_pass_tests {
1232        use super::*;
1233
1234        use crate::store::sqlite::SqliteStore;
1235
1236        fn open_db() -> (SqliteStore, tempfile::TempDir) {
1237            let dir = tempfile::tempdir().expect("tempdir");
1238            let path = dir.path().join("test.db");
1239            let store = SqliteStore::open(&path).expect("SqliteStore::open");
1240            (store, dir)
1241        }
1242
1243        /// Open a raw `rusqlite::Connection` at the store's backing file for
1244        /// seeding + assertions via the legacy `crate::db::*` free functions.
1245        fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
1246            crate::db::open(store.path()).expect("db::open at store path")
1247        }
1248
1249        fn insert_observation(
1250            conn: &rusqlite::Connection,
1251            ns: &str,
1252            title: &str,
1253            content: &str,
1254            access_count: i64,
1255        ) -> String {
1256            let now = chrono::Utc::now().to_rfc3339();
1257            let mut metadata = crate::models::default_metadata();
1258            if let Some(obj) = metadata.as_object_mut() {
1259                obj.insert(
1260                    "agent_id".to_string(),
1261                    serde_json::Value::String("test-agent".to_string()),
1262                );
1263            }
1264            let mem = Memory {
1265                id: uuid::Uuid::new_v4().to_string(),
1266                tier: Tier::Long,
1267                namespace: ns.to_string(),
1268                title: title.to_string(),
1269                content: content.to_string(),
1270                tags: vec![],
1271                priority: 5,
1272                confidence: 1.0,
1273                source: "test".to_string(),
1274                access_count,
1275                created_at: now.clone(),
1276                updated_at: now,
1277                last_accessed_at: None,
1278                expires_at: None,
1279                metadata,
1280                reflection_depth: 0,
1281                memory_kind: MemoryKind::Observation,
1282                entity_id: None,
1283                persona_version: None,
1284                citations: Vec::new(),
1285                source_uri: None,
1286                source_span: None,
1287                confidence_source: ConfidenceSource::CallerProvided,
1288                confidence_signals: None,
1289                confidence_decayed_at: None,
1290                version: 1,
1291            };
1292            crate::db::insert(conn, &mem).unwrap()
1293        }
1294
1295        #[test]
1296        fn pass_name_is_reflection() {
1297            let (store, _dir) = open_db();
1298            let llm = StubLlm::new("S");
1299            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1300            assert_eq!(pass.name(), "reflection");
1301        }
1302
1303        #[test]
1304        fn agent_id_falls_back_to_ai_curator_without_keypair() {
1305            let (store, _dir) = open_db();
1306            let llm = StubLlm::new("S");
1307            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1308            assert_eq!(pass.agent_id(), "ai:curator");
1309        }
1310
1311        #[test]
1312        fn agent_id_uses_keypair_when_provided() {
1313            let (store, _dir) = open_db();
1314            let llm = StubLlm::new("S");
1315            use ed25519_dalek::{SigningKey, VerifyingKey};
1316            let mut rng = rand_core::OsRng;
1317            let sk = SigningKey::generate(&mut rng);
1318            let vk: VerifyingKey = (&sk).into();
1319            let kp = AgentKeypair {
1320                agent_id: "test:agent-x".to_string(),
1321                public: vk,
1322                private: Some(sk),
1323            };
1324            let pass = ReflectionPass::new(&store, &llm, Some(&kp), None, false);
1325            assert_eq!(pass.agent_id(), "test:agent-x");
1326        }
1327
1328        #[test]
1329        fn cluster_excludes_zero_access_observations() {
1330            let (store, _dir) = open_db();
1331            let llm = StubLlm::new("S");
1332            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1333            let m1 = make_obs("a", "ns", "t", "shared keyword tokens here", 0); // 0 access → skipped
1334            let m2 = make_obs("b", "ns", "t", "shared keyword tokens here", 5);
1335            let m3 = make_obs("c", "ns", "t", "shared keyword tokens here", 5);
1336            let m4 = make_obs("d", "ns", "t", "shared keyword tokens here", 5);
1337            let clusters = pass.cluster(&[m1, m2, m3, m4]);
1338            assert_eq!(clusters.len(), 1);
1339            assert_eq!(clusters[0].len(), 3);
1340        }
1341
1342        #[test]
1343        fn cluster_caps_at_max_cluster_size() {
1344            let (store, _dir) = open_db();
1345            let llm = StubLlm::new("S");
1346            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1347            // 15 mems with shared tokens → cluster capped at MAX_CLUSTER_SIZE = 12.
1348            let mems: Vec<Memory> = (0..15)
1349                .map(|i| {
1350                    make_obs(
1351                        &format!("m{i:02}"),
1352                        "ns",
1353                        "t",
1354                        "shared keyword tokens here pattern",
1355                        1,
1356                    )
1357                })
1358                .collect();
1359            let clusters = pass.cluster(&mems);
1360            // First-seed cluster grows up to MAX_CLUSTER_SIZE.
1361            for c in &clusters {
1362                assert!(c.len() <= MAX_CLUSTER_SIZE);
1363            }
1364        }
1365
1366        #[test]
1367        fn eligible_pass_method_accepts_valid() {
1368            let (store, _dir) = open_db();
1369            let llm = StubLlm::new("S");
1370            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1371            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1372                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1373                .collect();
1374            assert!(pass.eligible(&cluster));
1375        }
1376
1377        #[test]
1378        fn eligible_pass_method_rejects_oversize() {
1379            let (store, _dir) = open_db();
1380            let llm = StubLlm::new("S");
1381            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1382            let cluster: Vec<Memory> = (0..(MAX_CLUSTER_SIZE + 1))
1383                .map(|i| make_obs(&format!("m{i:02}"), "ns", "t", "c", 1))
1384                .collect();
1385            assert!(!pass.eligible(&cluster));
1386        }
1387
1388        #[test]
1389        fn eligible_pass_method_rejects_reflection_member() {
1390            let (store, _dir) = open_db();
1391            let llm = StubLlm::new("S");
1392            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1393            let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1394                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1395                .collect();
1396            cluster[0].memory_kind = MemoryKind::Reflection;
1397            assert!(!pass.eligible(&cluster));
1398        }
1399
1400        #[test]
1401        fn eligible_pass_method_rejects_zero_access() {
1402            let (store, _dir) = open_db();
1403            let llm = StubLlm::new("S");
1404            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1405            let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1406                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1407                .collect();
1408            cluster[1].access_count = 0;
1409            assert!(!pass.eligible(&cluster));
1410        }
1411
1412        #[test]
1413        fn summarize_below_min_errors() {
1414            let (store, _dir) = open_db();
1415            let llm = StubLlm::new("S");
1416            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1417            let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1418                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1419                .collect();
1420            let err = pass.summarize(&cluster).unwrap_err().to_string();
1421            assert!(err.contains("< MIN_CLUSTER_SIZE"));
1422        }
1423
1424        #[test]
1425        fn summarize_returns_reflection_typed_memory() {
1426            let (store, _dir) = open_db();
1427            let llm = StubLlm::new("synth pattern");
1428            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1429            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1430                .map(|i| {
1431                    let mut m = make_obs(&format!("m{i}"), "ns", "Title-A", "shared content", 2);
1432                    m.tier = if i == 0 { Tier::Long } else { Tier::Mid };
1433                    m.priority = 5 + i32::try_from(i).unwrap();
1434                    m
1435                })
1436                .collect();
1437            let summary = pass.summarize(&cluster).unwrap();
1438            assert_eq!(summary.memory_kind, MemoryKind::Reflection);
1439            assert!(summary.title.starts_with("[reflection]"));
1440            assert_eq!(summary.content, "synth pattern");
1441            assert_eq!(summary.tier, Tier::Long);
1442            assert_eq!(summary.source, "system");
1443            assert_eq!(summary.namespace, "ns");
1444            // Priority = max of source priorities.
1445            assert_eq!(
1446                summary.priority,
1447                5 + i32::try_from(MIN_CLUSTER_SIZE - 1).unwrap()
1448            );
1449        }
1450
1451        #[tokio::test]
1452        async fn persist_dry_run_is_noop() {
1453            let (store, _dir) = open_db();
1454            let llm = StubLlm::new("S");
1455            let pass = ReflectionPass::new(&store, &llm, None, None, true);
1456            let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1457            pass.persist(&summary, &["x".to_string()]).await.unwrap();
1458        }
1459
1460        #[tokio::test]
1461        async fn persist_empty_sources_is_noop() {
1462            let (store, _dir) = open_db();
1463            let llm = StubLlm::new("S");
1464            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1465            let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1466            pass.persist(&summary, &[]).await.unwrap();
1467        }
1468
1469        #[tokio::test]
1470        async fn persist_refuses_when_max_depth_exceeded() {
1471            let (store, _dir) = open_db();
1472            let llm = StubLlm::new("S");
1473            // Pass max_depth=1; insert source at reflection_depth=1 → new depth = 2 → refused.
1474            let pass = ReflectionPass::new(&store, &llm, None, Some(1), false);
1475            let mut source = make_obs("src", "ns", "t", "c", 1);
1476            source.reflection_depth = 1;
1477            let src_id = crate::db::insert(&conn_of(&store), &source).unwrap();
1478            let summary = make_obs("s", "ns", "[reflection]", "c", 0);
1479            let err = pass
1480                .persist(&summary, &[src_id])
1481                .await
1482                .unwrap_err()
1483                .to_string();
1484            assert!(err.contains("exceeds"));
1485            assert!(err.contains("--max-depth"));
1486        }
1487
1488        #[tokio::test]
1489        async fn persist_writes_reflection_into_db() {
1490            // Full happy-path: real sources, real reflect insert.
1491            let (store, _dir) = open_db();
1492            let conn = conn_of(&store);
1493            let llm = StubLlm::new("synthesised pattern");
1494            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1495            // Seed three observations in namespace 'app'.
1496            let s1 = insert_observation(&conn, "app", "T1", "kubernetes deploy strategy notes", 2);
1497            let s2 =
1498                insert_observation(&conn, "app", "T2", "kubernetes rolling deploy approach", 3);
1499            let s3 = insert_observation(&conn, "app", "T3", "kubernetes canary deploy strategy", 1);
1500            let summary = pass
1501                .summarize(&[
1502                    crate::db::get(&conn, &s1).unwrap().unwrap(),
1503                    crate::db::get(&conn, &s2).unwrap().unwrap(),
1504                    crate::db::get(&conn, &s3).unwrap().unwrap(),
1505                ])
1506                .unwrap();
1507            pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1508                .await
1509                .unwrap();
1510
1511            // Find the freshly-written reflection in the 'app' namespace.
1512            let listed = crate::db::list(
1513                &conn,
1514                Some("app"),
1515                None,
1516                32,
1517                0,
1518                None,
1519                None,
1520                None,
1521                None,
1522                None,
1523            )
1524            .unwrap();
1525            let refl = listed
1526                .iter()
1527                .find(|m| m.memory_kind == MemoryKind::Reflection)
1528                .expect("expected one reflection");
1529            // verify() should succeed on it.
1530            pass.verify(refl.id.clone()).await.unwrap();
1531        }
1532
1533        #[tokio::test]
1534        async fn verify_missing_id_errors() {
1535            let (store, _dir) = open_db();
1536            let llm = StubLlm::new("S");
1537            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1538            let err = pass
1539                .verify("no-such".to_string())
1540                .await
1541                .unwrap_err()
1542                .to_string();
1543            assert!(err.contains("not found in DB"));
1544        }
1545
1546        #[tokio::test]
1547        async fn verify_wrong_kind_errors() {
1548            let (store, _dir) = open_db();
1549            let conn = conn_of(&store);
1550            let llm = StubLlm::new("S");
1551            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1552            // Insert an Observation and verify against its id — must error.
1553            let id = insert_observation(&conn, "ns", "T", "c", 1);
1554            let err = pass.verify(id).await.unwrap_err().to_string();
1555            assert!(err.contains("expected Reflection"));
1556        }
1557
1558        #[tokio::test]
1559        async fn verify_reflection_without_edges_errors() {
1560            // Insert a Reflection directly via db::insert (no reflects_on links)
1561            // and verify against it — must error with "no reflects_on edge".
1562            let (store, _dir) = open_db();
1563            let conn = conn_of(&store);
1564            let llm = StubLlm::new("S");
1565            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1566            let now = chrono::Utc::now().to_rfc3339();
1567            let mut metadata = crate::models::default_metadata();
1568            if let Some(obj) = metadata.as_object_mut() {
1569                obj.insert(
1570                    "agent_id".to_string(),
1571                    serde_json::Value::String("test-agent".to_string()),
1572                );
1573            }
1574            let m = Memory {
1575                id: uuid::Uuid::new_v4().to_string(),
1576                tier: Tier::Mid,
1577                namespace: "ns".to_string(),
1578                title: "[reflection] orphan".to_string(),
1579                content: "c".to_string(),
1580                tags: vec![],
1581                priority: 5,
1582                confidence: 1.0,
1583                source: "system".to_string(),
1584                access_count: 0,
1585                created_at: now.clone(),
1586                updated_at: now,
1587                last_accessed_at: None,
1588                expires_at: None,
1589                metadata,
1590                reflection_depth: 1,
1591                memory_kind: MemoryKind::Reflection,
1592                entity_id: None,
1593                persona_version: None,
1594                citations: Vec::new(),
1595                source_uri: None,
1596                source_span: None,
1597                confidence_source: ConfidenceSource::CallerProvided,
1598                confidence_signals: None,
1599                confidence_decayed_at: None,
1600                version: 1,
1601            };
1602            let id = crate::db::insert(&conn, &m).unwrap();
1603            let err = pass.verify(id).await.unwrap_err().to_string();
1604            assert!(err.contains("no reflects_on edge"));
1605        }
1606
1607        // ---- run_reflection_pass driver -------------------------------------
1608
1609        #[tokio::test]
1610        async fn run_reflection_pass_empty_db_dry_run_namespace() {
1611            let (store, _dir) = open_db();
1612            let llm = StubLlm::new("S");
1613            let report =
1614                run_reflection_pass(&store, &llm, None, Some("nope"), None, true, |_| true)
1615                    .await
1616                    .unwrap();
1617            assert!(report.dry_run);
1618            assert_eq!(report.namespaces_visited, 1);
1619            assert_eq!(report.clusters_formed, 0);
1620            assert_eq!(report.reflections_persisted, 0);
1621        }
1622
1623        #[tokio::test]
1624        async fn run_reflection_pass_all_namespaces_with_disabled_check() {
1625            let (store, _dir) = open_db();
1626            let conn = conn_of(&store);
1627            let llm = StubLlm::new("S");
1628            // Seed an observation so list_namespaces sees one ns.
1629            insert_observation(&conn, "ns1", "t", "shared content tokens here", 2);
1630            let report = run_reflection_pass(&store, &llm, None, None, None, true, |_| false)
1631                .await
1632                .unwrap();
1633            // namespace is enumerated but enabled_check returns false → no work.
1634            assert_eq!(report.observations_scanned, 0);
1635        }
1636
1637        #[tokio::test]
1638        async fn run_reflection_pass_dry_run_reports_proposals() {
1639            let (store, _dir) = open_db();
1640            let conn = conn_of(&store);
1641            let llm = StubLlm::new("synth");
1642            // Seed three observations that should cluster.
1643            insert_observation(
1644                &conn,
1645                "app",
1646                "T1",
1647                "kubernetes rolling deploy strategy notes",
1648                2,
1649            );
1650            insert_observation(
1651                &conn,
1652                "app",
1653                "T2",
1654                "kubernetes rolling deploy strategy canary",
1655                3,
1656            );
1657            insert_observation(
1658                &conn,
1659                "app",
1660                "T3",
1661                "kubernetes canary deploy strategy rolling",
1662                1,
1663            );
1664            let report = run_reflection_pass(&store, &llm, None, Some("app"), None, true, |_| true)
1665                .await
1666                .unwrap();
1667            assert!(report.dry_run);
1668            assert!(report.observations_scanned >= 3);
1669            assert!(report.clusters_eligible >= 1);
1670            assert!(!report.dry_run_proposals.is_empty());
1671            assert_eq!(report.reflections_persisted, 0);
1672        }
1673
1674        #[tokio::test]
1675        async fn run_reflection_pass_persists_reflections() {
1676            let (store, _dir) = open_db();
1677            let conn = conn_of(&store);
1678            let llm = StubLlm::new("persisted pattern");
1679            insert_observation(&conn, "app", "T1", "shared keyword token strategy notes", 2);
1680            insert_observation(&conn, "app", "T2", "shared keyword token strategy plan", 3);
1681            insert_observation(
1682                &conn,
1683                "app",
1684                "T3",
1685                "shared keyword token strategy canary",
1686                1,
1687            );
1688            let report =
1689                run_reflection_pass(&store, &llm, None, Some("app"), None, false, |_| true)
1690                    .await
1691                    .unwrap();
1692            assert_eq!(report.dry_run, false);
1693            assert!(report.reflections_persisted >= 1);
1694        }
1695
1696        #[tokio::test]
1697        async fn run_reflection_pass_depth_refusal_increments_counter() {
1698            let (store, _dir) = open_db();
1699            let conn = conn_of(&store);
1700            let llm = StubLlm::new("synth");
1701            // Seed observations at depth=2 so a new reflection (depth=3)
1702            // would exceed our curator max_depth=2.
1703            let now = chrono::Utc::now().to_rfc3339();
1704            for i in 0..3 {
1705                let mut metadata = crate::models::default_metadata();
1706                if let Some(obj) = metadata.as_object_mut() {
1707                    obj.insert(
1708                        "agent_id".to_string(),
1709                        serde_json::Value::String("test-agent".to_string()),
1710                    );
1711                }
1712                let m = Memory {
1713                    id: uuid::Uuid::new_v4().to_string(),
1714                    tier: Tier::Long,
1715                    namespace: "deep".to_string(),
1716                    title: format!("Tdeep-{i}"),
1717                    content: "shared keyword token deep strategy".to_string(),
1718                    tags: vec![],
1719                    priority: 5,
1720                    confidence: 1.0,
1721                    source: "test".to_string(),
1722                    access_count: 2,
1723                    created_at: now.clone(),
1724                    updated_at: now.clone(),
1725                    last_accessed_at: None,
1726                    expires_at: None,
1727                    metadata,
1728                    reflection_depth: 2,
1729                    memory_kind: MemoryKind::Observation,
1730                    entity_id: None,
1731                    persona_version: None,
1732                    citations: Vec::new(),
1733                    source_uri: None,
1734                    source_span: None,
1735                    confidence_source: ConfidenceSource::CallerProvided,
1736                    confidence_signals: None,
1737                    confidence_decayed_at: None,
1738                    version: 1,
1739                };
1740                crate::db::insert(&conn, &m).unwrap();
1741            }
1742            let report = run_reflection_pass(
1743                &store,
1744                &llm,
1745                None,
1746                Some("deep"),
1747                Some(2), // cap=2; new depth would be 3 → refuse
1748                false,
1749                |_| true,
1750            )
1751            .await
1752            .unwrap();
1753            assert!(report.depth_refusals >= 1);
1754            // No reflection persisted.
1755            assert_eq!(report.reflections_persisted, 0);
1756        }
1757
1758        #[test]
1759        fn dry_run_proposal_serialises() {
1760            let p = DryRunProposal {
1761                namespace: "ns".into(),
1762                proposed_title: "[reflection] x".into(),
1763                source_ids: vec!["a".into(), "b".into()],
1764            };
1765            let j = serde_json::to_string(&p).unwrap();
1766            assert!(j.contains("source_ids"));
1767            let back: DryRunProposal = serde_json::from_str(&j).unwrap();
1768            assert_eq!(back.namespace, "ns");
1769        }
1770
1771        // ---- pair_co_occurs unparseable timestamps fall through ----
1772
1773        #[test]
1774        fn pair_co_occurs_unparseable_timestamps_still_checks_jaccard() {
1775            let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1776            let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1777            a.created_at = "not-a-timestamp".to_string();
1778            b.created_at = "also-invalid".to_string();
1779            // With invalid timestamps, the temporal check is skipped — Jaccard
1780            // alone decides. These share tokens → co-occur returns true.
1781            assert!(pair_co_occurs(&a, &b));
1782        }
1783
1784        // ---- Additional coverage: trait impls + edge branches ----
1785
1786        #[test]
1787        fn stub_llm_auto_tag_and_contradiction_paths() {
1788            let stub = StubLlm::new("S");
1789            let tags = stub.auto_tag("t", "c").unwrap();
1790            assert!(tags.is_empty());
1791            let conflict = stub.detect_contradiction("a", "b").unwrap();
1792            assert!(!conflict);
1793        }
1794
1795        #[test]
1796        fn eligible_pass_rejects_internal_namespace_directly() {
1797            // Drives the internal-namespace early-return.
1798            let (store, _dir) = open_db();
1799            let llm = StubLlm::new("S");
1800            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1801            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1802                .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "c", 1))
1803                .collect();
1804            assert!(!pass.eligible(&cluster));
1805        }
1806
1807        #[test]
1808        fn jaccard_similarity_zero_union_returns_zero() {
1809            // The else-branch where `union == 0`: ta non-empty but tb is — no.
1810            // Actually if either has >=3 chars words there will be union.
1811            // Build inputs where tokens are stripped to nothing.
1812            let a = "a b c"; // every token <3 chars → tokens set is empty
1813            let b = "x";
1814            assert_eq!(jaccard_similarity(a, b), 0.0);
1815        }
1816
1817        #[test]
1818        fn tier_rank_all_variants() {
1819            // Drives all three match arms.
1820            assert_eq!(tier_rank(&Tier::Short), 0);
1821            assert_eq!(tier_rank(&Tier::Mid), 1);
1822            assert_eq!(tier_rank(&Tier::Long), 2);
1823        }
1824
1825        #[test]
1826        fn parse_rfc3339_invalid_returns_none() {
1827            assert!(parse_rfc3339("garbage").is_none());
1828            assert!(parse_rfc3339("2026-01-01T00:00:00Z").is_some());
1829        }
1830
1831        #[tokio::test]
1832        async fn verify_skips_inbound_links() {
1833            // verify() walks all links but `continue`s when source_id != summary_id.
1834            // This exercises the inbound-link `continue` branch.
1835            let (store, _dir) = open_db();
1836            let conn = conn_of(&store);
1837            let llm = StubLlm::new("S");
1838            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1839            // Seed the reflection target via the full persist path.
1840            let s1 =
1841                insert_observation(&conn, "vrf", "T1", "shared keyword pattern tokens here", 2);
1842            let s2 =
1843                insert_observation(&conn, "vrf", "T2", "shared keyword pattern tokens here", 2);
1844            let s3 =
1845                insert_observation(&conn, "vrf", "T3", "shared keyword pattern tokens here", 2);
1846            let summary = pass
1847                .summarize(&[
1848                    crate::db::get(&conn, &s1).unwrap().unwrap(),
1849                    crate::db::get(&conn, &s2).unwrap().unwrap(),
1850                    crate::db::get(&conn, &s3).unwrap().unwrap(),
1851                ])
1852                .unwrap();
1853            pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1854                .await
1855                .unwrap();
1856            let listed = crate::db::list(
1857                &conn,
1858                Some("vrf"),
1859                None,
1860                32,
1861                0,
1862                None,
1863                None,
1864                None,
1865                None,
1866                None,
1867            )
1868            .unwrap();
1869            let refl_id = listed
1870                .iter()
1871                .find(|m| m.memory_kind == MemoryKind::Reflection)
1872                .unwrap()
1873                .id
1874                .clone();
1875            // Create a foreign link with this reflection as TARGET (not source).
1876            // verify() should still succeed because it ignores inbound links.
1877            let _ = crate::db::create_link(&conn, &s1, &refl_id, "related_to");
1878            pass.verify(refl_id).await.unwrap();
1879        }
1880
1881        #[tokio::test]
1882        async fn run_reflection_pass_summarize_error_recorded() {
1883            // Drive the summarize failure path. Use a StubLlm that
1884            // fails summarize. We need it implementing AutonomyLlm with an
1885            // error return.
1886            struct FailingLlm;
1887            impl AutonomyLlm for FailingLlm {
1888                fn auto_tag(&self, _t: &str, _c: &str) -> Result<Vec<String>> {
1889                    Ok(vec![])
1890                }
1891                fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
1892                    Ok(false)
1893                }
1894                fn summarize_memories(&self, _m: &[(String, String)]) -> Result<String> {
1895                    anyhow::bail!("forced llm failure")
1896                }
1897            }
1898            let (store, _dir) = open_db();
1899            let conn = conn_of(&store);
1900            let llm = FailingLlm;
1901            insert_observation(&conn, "ns", "T1", "shared keyword pattern tokens here", 2);
1902            insert_observation(&conn, "ns", "T2", "shared keyword pattern tokens here", 2);
1903            insert_observation(&conn, "ns", "T3", "shared keyword pattern tokens here", 2);
1904            let report = run_reflection_pass(&store, &llm, None, Some("ns"), None, false, |_| true)
1905                .await
1906                .unwrap();
1907            // Summarize error was caught and recorded.
1908            assert!(report.errors.iter().any(|e| e.contains("summarize failed")));
1909            assert_eq!(report.reflections_persisted, 0);
1910        }
1911    } // mod sal_pass_tests
1912}