Skip to main content

ai_memory/curator/
reflection_pass.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Reflection-pass curator mode — v0.7.0 Layer 2 Task L2-1 (issue #666).
5//!
6//! Implements [`ReflectionPass`], a [`CompactionPass`] that clusters
7//! Observation memories by recall co-occurrence + namespace + temporal
8//! proximity, asks an LLM to summarise the pattern, and persists the
9//! summary as a typed Reflection memory via the substrate
10//! [`crate::storage::reflect_with_hooks`] path — so every reflection
11//! lands with a `reflects_on` edge to every source, the
12//! `metadata.reflection_metadata` block stamped, and (via the
13//! atomic-write contract) inside a single `BEGIN IMMEDIATE` /
14//! `COMMIT` transaction.
15//!
16//! # Why a fresh `CompactionPass` (and not the v0.6.x consolidate path)
17//!
18//! Consolidation collapses near-duplicate memories into a single
19//! canonical body, soft-deleting the originals. **Reflection is
20//! additive** — the sources remain readable, and the new memory is a
21//! typed `Reflection` carrying provenance edges back. That difference
22//! shows up in three places in the impl:
23//!
24//! 1. `persist()` writes via [`crate::storage::reflect`], not via
25//!    `db::consolidate`. The substrate handles the depth cap, the
26//!    `reflects_on` link insert, and the atomic boundary.
27//! 2. `eligible()` requires every cluster member to be
28//!    [`crate::models::MemoryKind::Observation`]. Reflections never
29//!    fold into a parent reflection in this pass (the L2-1 acceptance
30//!    is one level of reflection at a time; multi-level chains form
31//!    naturally across passes if `max_depth >= 2`).
32//! 3. `cluster()` uses a hybrid signal — Jaccard pre-filter +
33//!    optional cosine — but constrains pairs to memories that have
34//!    been recalled together (`access_count >= 1`) within a sliding
35//!    7-day window. This is the "recall co-occurrence" proxy
36//!    documented in #666: we cannot directly observe recall
37//!    co-occurrence without a recall-event log (out of scope here),
38//!    so we use the substrate-visible signals — `access_count`,
39//!    `last_accessed_at`, `created_at` proximity — that approximate
40//!    it within the bounds of one SQLite read.
41//!
42//! # Visibility contract (R7)
43//!
44//! All items are at most `pub(crate)`. The only externally-visible
45//! re-export is the [`ReflectionPassConfig`] struct that the CLI
46//! flag wiring (see `src/cli/curator.rs`) consumes, plus
47//! [`run_reflection_pass`] which the CLI's `--reflect` mode invokes.
48
49// The pass internals run only through the SAL-gated curator path; in a
50// non-sal build only the config/report structs are live, so relax the
51// dead-code / unused-import lints there only (sal builds enforce fully).
52#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
53
54use std::collections::HashSet;
55
56use chrono::{DateTime, Utc};
57use serde::{Deserialize, Serialize};
58
59#[cfg(feature = "sal")]
60use anyhow::Context;
61use anyhow::Result;
62
63#[cfg(feature = "sal")]
64use crate::autonomy::AutonomyLlm;
65#[cfg(feature = "sal")]
66use crate::identity::keypair::AgentKeypair;
67use crate::models::{Memory, MemoryKind, Tier};
68#[cfg(feature = "sal")]
69use crate::storage::reflect::{ReflectError, ReflectInput};
70#[cfg(feature = "sal")]
71use crate::store::{CallerContext, Filter, MemoryStore, StoreError};
72
73#[cfg(feature = "sal")]
74use super::pipeline::MemoryId;
75
76#[cfg(any(feature = "sal", test))]
77use crate::models::ConfidenceSource;
78
79/// Fetch a single memory by id through the SAL trait, mapping the
80/// `NotFound` verdict to `Ok(None)` so call sites keep the
81/// `Option`-shaped contract the pre-#1548 `db::get` free function
82/// returned. Any other [`StoreError`] propagates as an `anyhow` error.
83///
84/// The curator runs as an operator-class background sweep, so the
85/// supplied `ctx` carries `bypass_visibility` (see
86/// [`curator_caller_context`]) — preserving the pre-trait raw-connection
87/// behaviour where `db::get` applied no scope=private visibility filter.
88#[cfg(feature = "sal")]
89async fn store_get_opt(
90    store: &dyn MemoryStore,
91    ctx: &CallerContext,
92    id: &str,
93) -> Result<Option<Memory>> {
94    match store.get(ctx, id).await {
95        Ok(mem) => Ok(Some(mem)),
96        Err(StoreError::NotFound { .. }) => Ok(None),
97        Err(e) => Err(anyhow::anyhow!(e)),
98    }
99}
100
101/// Build the curator's operator-class caller context. The reflection
102/// pass is a background substrate-maintenance sweep, not a tenant-facing
103/// request, so it reads every row regardless of `metadata.scope` —
104/// exactly the posture the pre-#1548 raw-`rusqlite::Connection` path had
105/// (the legacy `db::*` free functions applied no SAL visibility filter).
106/// `agent_id` carries the curator's resolved signing identity so audit
107/// trails attribute the sweep correctly.
108#[cfg(feature = "sal")]
109fn curator_caller_context(agent_id: &str) -> CallerContext {
110    CallerContext::for_admin(agent_id)
111}
112
113/// List up to `limit` memories in `namespace` through the SAL trait.
114/// Replaces the pre-#1548 `db::list(conn, Some(ns), None, limit, 0, …)`
115/// free-function call with the backend-agnostic [`MemoryStore::list`]
116/// surface. The reflection pass only ever needs the namespace + limit
117/// dimensions of the legacy positional signature; the remaining
118/// (offset / tier / since / until / tags / agent_id) slots were always
119/// passed as `0` / `None`, so they are simply absent from [`Filter`].
120#[cfg(feature = "sal")]
121async fn store_list_namespace(
122    store: &dyn MemoryStore,
123    ctx: &CallerContext,
124    namespace: &str,
125    limit: usize,
126) -> Result<Vec<Memory>> {
127    let filter = Filter {
128        namespace: Some(namespace.to_string()),
129        limit,
130        ..Default::default()
131    };
132    store
133        .list(ctx, &filter)
134        .await
135        .map_err(|e| anyhow::anyhow!(e))
136}
137
138// ---------------------------------------------------------------------------
139// Constants — per #666 spec ("≥3 members", "7-day temporal window", …)
140// ---------------------------------------------------------------------------
141
142/// Minimum members per reflection cluster. Below this the eligibility
143/// gate refuses — a "pattern" derived from two observations is just a
144/// pair, not a generalisation.
145pub(crate) const MIN_CLUSTER_SIZE: usize = 3;
146
147/// Maximum members per reflection cluster — prevents pathological
148/// mega-merges where every observation in a namespace folds into one
149/// reflection.
150pub(crate) const MAX_CLUSTER_SIZE: usize = 12;
151
152/// Sliding window for temporal co-occurrence. Two observations within
153/// this many days of each other (by `created_at`) and both in the
154/// same namespace are candidates for clustering. 7 days matches the
155/// spec in #666 ("temporal_proximity: 7-day window").
156pub(crate) const TEMPORAL_WINDOW_DAYS: i64 = 7;
157
158/// Jaccard-keyword similarity threshold for the cheap pre-filter that
159/// gates pairs into the cluster. Looser than the consolidation
160/// threshold (0.55) because reflection looks for *related* — not
161/// near-duplicate — observations.
162pub(crate) const REFLECTION_JACCARD_THRESHOLD: f64 = 0.30;
163
164/// Minimum `access_count` for an observation to qualify as
165/// "co-recalled". Substrate proxy for the spec's "recall
166/// co-occurrence frequency" signal — without a per-recall event log
167/// we approximate via touch-count on the source row, which the recall
168/// pipeline bumps on every hit.
169pub(crate) const MIN_RECALL_COUNT: i64 = 1;
170
171// ---------------------------------------------------------------------------
172// ReflectionPassConfig — per-namespace opt-in (defaults to `enabled = false`)
173// ---------------------------------------------------------------------------
174
175/// Per-namespace configuration for the reflection pass.
176///
177/// Defaults to `enabled = false` per #666 acceptance: reflection is
178/// opt-in because (a) it depends on the Ollama LLM being available
179/// at the time the pass runs, and (b) it writes new (typed) memories
180/// to the namespace, which operators may want to gate by namespace
181/// rather than enable globally.
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183pub struct ReflectionPassConfig {
184    /// When `false` (default), the pass skips this namespace entirely.
185    #[serde(default)]
186    pub enabled: bool,
187    /// Per-namespace override of the operator-supplied `--max-depth`
188    /// flag. When `None`, the pass uses the resolved governance-policy
189    /// `max_reflection_depth` (default `3`) as its ceiling. When
190    /// `Some(N)`, the pass refuses to *propose* a reflection whose
191    /// new depth would exceed `N` (the substrate cap still applies
192    /// on top — this is a curator-side guard rail, not a substrate
193    /// override).
194    #[serde(default, skip_serializing_if = "Option::is_none")]
195    pub max_depth: Option<u32>,
196}
197
198impl Default for ReflectionPassConfig {
199    fn default() -> Self {
200        Self {
201            enabled: false,
202            max_depth: None,
203        }
204    }
205}
206
207// ---------------------------------------------------------------------------
208// ReflectionPass — SAL-backed reflection synthesis (issue #1548)
209// ---------------------------------------------------------------------------
210
211/// Compaction pass that synthesises typed `Reflection` memories from
212/// clusters of co-occurring Observations.
213///
214/// Operates over the SAL [`MemoryStore`] trait (issue #1548) so it runs
215/// against the SQLite *or* Postgres adapter; gated on the `sal` feature
216/// (the trait itself is `sal`-gated). Invoked from BOTH the curator's
217/// `--reflect` CLI mode AND the `--store-url --daemon` upkeep sweep — both
218/// call [`run_reflection_pass`], so the pass runs identically on a one-shot
219/// CLI invocation and on each daemon cycle. It is NOT yet wired into the
220/// (non-`--store-url`) autonomy loop's per-cycle sweep (that's a v0.7.1+
221/// pivot once the operator has run the pass manually and vetted the
222/// proposed reflections).
223#[cfg(feature = "sal")]
224pub(crate) struct ReflectionPass<'a> {
225    /// SAL store handle. Reads through [`MemoryStore::list`] /
226    /// [`MemoryStore::get_links_for_anchor`]; writes through
227    /// [`MemoryStore::reflect`]. Works against the SQLite *or* Postgres
228    /// adapter (issue #1548) — the pass is backend-agnostic.
229    pub(crate) store: &'a dyn MemoryStore,
230    /// Operator-class caller context (see [`curator_caller_context`]).
231    /// Carries the curator's `agent_id` and `bypass_visibility` so the
232    /// background sweep reads every row regardless of `metadata.scope`,
233    /// matching the pre-trait raw-connection behaviour.
234    pub(crate) ctx: CallerContext,
235    /// LLM trait object. Tests inject a deterministic stub; production
236    /// passes an `&OllamaClient` (which implements [`AutonomyLlm`]).
237    pub(crate) llm: &'a dyn AutonomyLlm,
238    /// Curator's signing keypair. Stamped into the reflection's
239    /// `metadata.agent_id` so every `reflects_on` edge is attributable
240    /// to the same Ed25519 identity. `None` only in tests that exercise
241    /// the no-keypair fallback; production callers must pass `Some(_)`.
242    pub(crate) keypair: Option<&'a AgentKeypair>,
243    /// Curator-side cap on proposed reflection depth. Belt-and-braces
244    /// guard on top of the substrate's per-namespace
245    /// `max_reflection_depth` policy: even when the substrate would
246    /// allow the write, the curator refuses if `max_depth` is set
247    /// and the proposed depth exceeds it. `None` defers entirely to
248    /// substrate policy.
249    pub(crate) max_depth: Option<u32>,
250    /// Suppress every DB write. When `true`, `persist()` returns
251    /// `Ok(())` without calling [`reflect_with_hooks`] and the
252    /// reflection memory is reported as a proposal in the
253    /// [`ReflectionPassReport`].
254    pub(crate) dry_run: bool,
255}
256
257#[cfg(feature = "sal")]
258impl<'a> ReflectionPass<'a> {
259    /// Construct a `ReflectionPass`. `keypair` is the curator's
260    /// signing identity — used for `metadata.agent_id` and for
261    /// `verify()`'s signature-trace check. `max_depth` is the optional
262    /// curator-side ceiling; `dry_run` suppresses writes.
263    pub(crate) fn new(
264        store: &'a dyn MemoryStore,
265        llm: &'a dyn AutonomyLlm,
266        keypair: Option<&'a AgentKeypair>,
267        max_depth: Option<u32>,
268        dry_run: bool,
269    ) -> Self {
270        let agent_id = keypair.map_or_else(
271            || crate::identity::sentinels::AI_CURATOR.to_string(),
272            |k| k.agent_id.clone(),
273        );
274        Self {
275            store,
276            ctx: curator_caller_context(&agent_id),
277            llm,
278            keypair,
279            max_depth,
280            dry_run,
281        }
282    }
283
284    /// Resolve the agent id stamped on every reflection this pass
285    /// writes. Falls back to `"ai:curator"` when the curator was
286    /// started without a keypair — the same fall-back the autonomy
287    /// `consolidate` path uses, kept consistent so a forensic walk of
288    /// `metadata.agent_id` finds curator-written rows under either
289    /// tag.
290    fn agent_id(&self) -> String {
291        self.keypair.map_or_else(
292            || crate::identity::sentinels::AI_CURATOR.to_string(),
293            |k| k.agent_id.clone(),
294        )
295    }
296
297    /// Human-readable pass name used in log messages and reports.
298    /// Currently exercised by the unit tests; the daemon log lines use
299    /// the report fields directly.
300    #[allow(dead_code)]
301    fn name(&self) -> &str {
302        "reflection"
303    }
304
305    /// Partition `memories` into clusters of co-occurring Observations.
306    ///
307    /// Algorithm (one pass per namespace):
308    ///
309    /// 1. Filter to typed `Observation` memories with `access_count >=
310    ///    MIN_RECALL_COUNT` — substrate proxy for "has been recalled
311    ///    recently enough to count as live".
312    /// 2. Within each namespace, walk pairs and seed a cluster when
313    ///    both: (a) the temporal distance between `created_at` is
314    ///    within [`TEMPORAL_WINDOW_DAYS`], and (b) the Jaccard
315    ///    similarity of contents is ≥ [`REFLECTION_JACCARD_THRESHOLD`].
316    /// 3. Cap each cluster at [`MAX_CLUSTER_SIZE`].
317    /// 4. Discard clusters below [`MIN_CLUSTER_SIZE`] (eligibility
318    ///    enforces this too, but discarding here keeps the API tight).
319    fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
320        let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> =
321            std::collections::HashMap::new();
322        for m in memories {
323            if !is_clusterable_observation(m) {
324                continue;
325            }
326            by_ns.entry(&m.namespace).or_default().push(m);
327        }
328
329        let mut clusters: Vec<Vec<MemoryId>> = Vec::new();
330        for (_ns, group) in by_ns {
331            let mut used = vec![false; group.len()];
332            for i in 0..group.len() {
333                if used[i] {
334                    continue;
335                }
336                let mut cluster = vec![group[i].id.clone()];
337                used[i] = true;
338                for j in (i + 1)..group.len() {
339                    if used[j] {
340                        continue;
341                    }
342                    if cluster.len() >= MAX_CLUSTER_SIZE {
343                        break;
344                    }
345                    if pair_co_occurs(group[i], group[j]) {
346                        cluster.push(group[j].id.clone());
347                        used[j] = true;
348                    }
349                }
350                if cluster.len() >= MIN_CLUSTER_SIZE {
351                    clusters.push(cluster);
352                }
353            }
354        }
355        clusters
356    }
357
358    /// Secondary eligibility gate.
359    ///
360    /// A cluster passes when:
361    ///
362    /// * It has ≥ [`MIN_CLUSTER_SIZE`] and ≤ [`MAX_CLUSTER_SIZE`] members.
363    /// * Every member is `MemoryKind::Observation` — reflections that
364    ///   carry meta-pattern depth should be folded by a separate
365    ///   higher-depth pass, not this one.
366    /// * All members share the same (non-reserved) namespace.
367    /// * Every member is not soft-deleted (the substrate `list` call
368    ///   excludes soft-deleted rows but defensive recheck cheap).
369    fn eligible(&self, cluster: &[Memory]) -> bool {
370        if cluster.len() < MIN_CLUSTER_SIZE || cluster.len() > MAX_CLUSTER_SIZE {
371            return false;
372        }
373        let ns = &cluster[0].namespace;
374        if ns.starts_with('_') {
375            return false;
376        }
377        cluster.iter().all(|m| {
378            m.memory_kind == MemoryKind::Observation
379                && &m.namespace == ns
380                && m.access_count >= MIN_RECALL_COUNT
381        })
382    }
383
384    /// LLM-summarise the cluster into a single proposed Reflection
385    /// memory. Does NOT touch the database — the returned `Memory`
386    /// is a *proposal* that `persist()` (or the dry-run reporter)
387    /// consumes.
388    ///
389    /// The proposal carries:
390    ///
391    /// * Title prefixed with `[reflection]` so an operator inspecting
392    ///   the namespace immediately sees the synthetic origin.
393    /// * `memory_kind = Reflection`. The substrate `reflect` path
394    ///   will set this anyway; we set it here so the in-memory
395    ///   proposal is internally consistent.
396    /// * `reflection_depth` left at 0 — the substrate computes the
397    ///   real depth (`max(source.reflection_depth) + 1`) on insert.
398    /// * Tier = max of source tiers (never downgrade).
399    /// * Priority = max of source priorities (the reflection inherits
400    ///   the salience of its highest-priority source).
401    fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
402        if cluster.len() < MIN_CLUSTER_SIZE {
403            anyhow::bail!(
404                "summarize: cluster has {} members (< MIN_CLUSTER_SIZE = {})",
405                cluster.len(),
406                MIN_CLUSTER_SIZE
407            );
408        }
409
410        let input: Vec<(String, String)> = cluster
411            .iter()
412            .map(|m| (m.title.clone(), m.content.clone()))
413            .collect();
414        let summary_text = self
415            .llm
416            .summarize_memories(&input)
417            .context("ReflectionPass::summarize: LLM call failed")?;
418
419        let base_title = cluster
420            .iter()
421            .map(|m| m.title.as_str())
422            .next()
423            .unwrap_or("(reflection)");
424        let title = format!("[reflection] {base_title}");
425
426        let tier = cluster
427            .iter()
428            .map(|m| m.tier.clone())
429            .max_by_key(tier_rank)
430            .unwrap_or(Tier::Mid);
431        let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
432
433        let now = Utc::now().to_rfc3339();
434        Ok(Memory {
435            id: uuid::Uuid::new_v4().to_string(),
436            tier,
437            namespace: cluster[0].namespace.clone(),
438            title,
439            content: summary_text,
440            tags: vec![],
441            priority,
442            confidence: 1.0,
443            // Substrate `validate_source` accepts a closed set; "system"
444            // is the curator's canonical entry-point for autonomous
445            // writes (see consolidation pass, autonomy passes).
446            source: "system".to_string(),
447            access_count: 0,
448            created_at: now.clone(),
449            updated_at: now,
450            last_accessed_at: None,
451            expires_at: None,
452            metadata: serde_json::json!({}),
453            reflection_depth: 0,
454            memory_kind: MemoryKind::Reflection,
455            entity_id: None,
456            persona_version: None,
457            citations: Vec::new(),
458            source_uri: None,
459            source_span: None,
460            confidence_source: ConfidenceSource::CallerProvided,
461            confidence_signals: None,
462            confidence_decayed_at: None,
463            version: 1,
464        })
465    }
466
467    /// Persist `summary` plus a `reflects_on` link to each id in
468    /// `sources`, via the substrate [`MemoryStore::reflect`] path.
469    ///
470    /// The substrate enforces (a) input validation, (b) depth-cap
471    /// refusal with audit row, (c) transactional atomicity (all
472    /// `reflects_on` links land or none do), (d) no-cycle guarantee
473    /// inherited from L1-2's `reflects_on` invariant. The SQLite adapter
474    /// routes this through `storage::reflect_with_hooks`; the Postgres
475    /// adapter through its native sqlx `reflect` — both honour the same
476    /// [`ReflectInput`] contract (issue #1548).
477    ///
478    /// No-op when `self.dry_run = true`.
479    async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
480        if self.dry_run || sources.is_empty() {
481            return Ok(());
482        }
483
484        // Curator-side max-depth guard. The substrate enforces the
485        // namespace policy cap on top — this is the operator-supplied
486        // belt-and-braces. We need to know the proposed depth before
487        // calling reflect(); pre-compute it the same way the substrate
488        // does (max source depth + 1) so the curator can refuse
489        // *before* burning an LLM round-trip in the next cycle.
490        if let Some(cap) = self.max_depth {
491            let mut max_src_depth: i32 = 0;
492            for id in sources {
493                if let Some(m) = store_get_opt(self.store, &self.ctx, id).await? {
494                    max_src_depth = max_src_depth.max(m.reflection_depth);
495                }
496            }
497            let new_depth =
498                u32::try_from(max_src_depth.max(0).saturating_add(1)).unwrap_or(u32::MAX);
499            if new_depth > cap {
500                anyhow::bail!(
501                    "ReflectionPass::persist: proposed depth {new_depth} exceeds \
502                     curator --max-depth {cap}"
503                );
504            }
505        }
506
507        let input = ReflectInput {
508            source_ids: sources.to_vec(),
509            title: summary.title.clone(),
510            content: summary.content.clone(),
511            namespace: Some(summary.namespace.clone()),
512            tier: summary.tier.clone(),
513            tags: summary.tags.clone(),
514            priority: summary.priority,
515            confidence: summary.confidence,
516            source: summary.source.clone(),
517            agent_id: self.agent_id(),
518            metadata: summary.metadata.clone(),
519        };
520
521        // Issue #815 — thread the curator's signing keypair into the
522        // reflect call so the substrate's signed-link path produces
523        // `attest_level='self_signed'` rows for every `reflects_on` edge
524        // this pass writes. The SAL `reflect(ctx, input, signing_key)`
525        // surface folds the keypair into the adapter's hook bundle (the
526        // SQLite adapter sets `ReflectHooks::active_keypair`; Postgres
527        // signs natively), so the wire shape is byte-identical across
528        // backends.
529        match self.store.reflect(&self.ctx, &input, self.keypair).await {
530            Ok(_outcome) => Ok(()),
531            Err(ReflectError::DepthExceeded {
532                attempted,
533                cap,
534                namespace,
535            }) => {
536                anyhow::bail!(
537                    "ReflectionPass::persist: substrate refused — proposed depth \
538                     {attempted} exceeds namespace cap {cap} in '{namespace}'"
539                )
540            }
541            Err(other) => Err(anyhow::anyhow!(other.to_string())),
542        }
543    }
544
545    /// Verify that the persisted reflection identified by `summary_id`
546    /// is readable, typed as Reflection, and that every `reflects_on`
547    /// edge points at an existing source.
548    ///
549    /// **Signature trace.** We deliberately do NOT call into
550    /// `identity::verify::verify_link` here — H2 link signing fills the
551    /// `signature` BLOB column on outbound writes, and `db::create_link`
552    /// (used by `storage::reflect`) goes through that path when the
553    /// daemon's keypair is wired in. The verify check here confirms
554    /// (a) the edge exists, (b) the target memory is alive, (c) the
555    /// `relation` is exactly `reflects_on`. Cryptographic signature
556    /// re-verification belongs at the federation `sync_push` boundary,
557    /// not the curator's verify step (the curator wrote the row
558    /// itself, so it trivially trusts its own signature).
559    ///
560    /// Exercised by the unit tests; the run driver uses the inlined
561    /// [`verify_recent`] check on the live driver path.
562    #[allow(dead_code)]
563    async fn verify(&self, summary_id: MemoryId) -> Result<()> {
564        let mem = store_get_opt(self.store, &self.ctx, &summary_id)
565            .await
566            .context("ReflectionPass::verify: store.get failed")?;
567        let mem = mem
568            .ok_or_else(|| anyhow::anyhow!("verify: reflection {} not found in DB", summary_id))?;
569        if mem.memory_kind != MemoryKind::Reflection {
570            anyhow::bail!(
571                "verify: memory {} is {:?}, expected Reflection",
572                summary_id,
573                mem.memory_kind
574            );
575        }
576
577        let links = self
578            .store
579            .get_links_for_anchor(&summary_id)
580            .await
581            .map_err(|e| anyhow::anyhow!(e))
582            .context("ReflectionPass::verify: store.get_links_for_anchor failed")?;
583        let mut saw_reflects_on = false;
584        for link in &links {
585            // Only check outbound `reflects_on` edges originated at this
586            // reflection. Inbound edges (other memories that reflect on
587            // ours) are not in this pass's scope.
588            if link.source_id != summary_id {
589                continue;
590            }
591            if link.relation != crate::models::MemoryLinkRelation::ReflectsOn {
592                continue;
593            }
594            saw_reflects_on = true;
595            // Confirm the target exists. Soft-deleted sources are still
596            // returned by store.get because the row is preserved; this is
597            // the same contract `storage::reflect` relies on.
598            let target = store_get_opt(self.store, &self.ctx, &link.target_id).await?;
599            if target.is_none() {
600                anyhow::bail!(
601                    "verify: reflects_on edge target {} not found",
602                    link.target_id
603                );
604            }
605        }
606        if !saw_reflects_on {
607            anyhow::bail!("verify: reflection {} has no reflects_on edge", summary_id);
608        }
609        Ok(())
610    }
611}
612
613// ---------------------------------------------------------------------------
614// Report + run helpers (consumed by CLI --reflect)
615// ---------------------------------------------------------------------------
616
617/// Structured per-namespace outcome of a single reflection-pass
618/// invocation.  Aggregated across namespaces by [`run_reflection_pass`].
619#[derive(Debug, Clone, Default, Serialize, Deserialize)]
620pub struct ReflectionPassReport {
621    /// RFC3339 timestamps; populated by `run_reflection_pass`.
622    pub started_at: String,
623    pub completed_at: String,
624    /// Number of namespaces visited (`--all-namespaces`) or `1`
625    /// when a single `--namespace` was supplied.
626    pub namespaces_visited: usize,
627    /// Eligible candidate Observations scanned across all visited
628    /// namespaces.
629    pub observations_scanned: usize,
630    /// Number of clusters formed (pre-eligibility).
631    pub clusters_formed: usize,
632    /// Number of clusters that survived the eligibility gate.
633    pub clusters_eligible: usize,
634    /// Number of reflections successfully persisted. Always `0` when
635    /// `dry_run = true`.
636    pub reflections_persisted: usize,
637    /// Number of refused-by-depth-cap clusters (substrate refusal or
638    /// curator `--max-depth` guard).
639    pub depth_refusals: usize,
640    /// LLM call failures, persist errors, and verify errors that
641    /// did NOT abort the pass.
642    pub errors: Vec<String>,
643    /// Dry-run proposals — populated when `dry_run = true`, empty
644    /// otherwise. Each entry is `(namespace, proposed_title,
645    /// source_ids)`.
646    #[serde(default)]
647    pub dry_run_proposals: Vec<DryRunProposal>,
648    /// `true` if the pass was a dry-run.
649    pub dry_run: bool,
650}
651
652/// Compact description of a proposed reflection when the pass runs
653/// in `--dry-run` mode. Re-serialised into the CLI's JSON output so
654/// operators can inspect proposed clusters before committing.
655#[derive(Debug, Clone, Serialize, Deserialize)]
656pub struct DryRunProposal {
657    pub namespace: String,
658    pub proposed_title: String,
659    pub source_ids: Vec<String>,
660}
661
662/// Drive a single reflection-pass invocation over `namespace` (when
663/// `Some`) or every observable namespace (when `None`).
664///
665/// `enabled_check` is the operator-supplied predicate that consults
666/// the per-namespace [`ReflectionPassConfig::enabled`] flag. When the
667/// flag is `false` for a given namespace the pass skips it entirely
668/// and records nothing in the report.
669///
670/// This is the CLI entry-point — `src/cli/curator.rs` calls into it
671/// when the operator passes `--reflect`. The autonomy daemon's
672/// per-cycle sweep does NOT call this today (manual-only for v0.7.0
673/// per #666 acceptance).
674#[cfg(feature = "sal")]
675pub async fn run_reflection_pass(
676    store: &dyn MemoryStore,
677    llm: &dyn AutonomyLlm,
678    keypair: Option<&AgentKeypair>,
679    namespace: Option<&str>,
680    max_depth: Option<u32>,
681    dry_run: bool,
682    enabled_check: impl Fn(&str) -> bool,
683) -> Result<ReflectionPassReport> {
684    let mut report = ReflectionPassReport {
685        started_at: Utc::now().to_rfc3339(),
686        dry_run,
687        ..Default::default()
688    };
689
690    let pass = ReflectionPass::new(store, llm, keypair, max_depth, dry_run);
691
692    let namespaces: Vec<String> = match namespace {
693        Some(ns) => vec![ns.to_string()],
694        None => {
695            // Enumerate every namespace with at least one row, then
696            // filter via the operator's enabled_check at the call site.
697            let counts = store
698                .list_namespaces()
699                .await
700                .map_err(|e| anyhow::anyhow!(e))
701                .context("run_reflection_pass: list_namespaces failed")?;
702            counts
703                .into_iter()
704                .map(|nc| nc.namespace)
705                .filter(|ns| !ns.starts_with('_'))
706                .collect()
707        }
708    };
709    report.namespaces_visited = namespaces.len();
710
711    for ns in &namespaces {
712        if !enabled_check(ns) {
713            continue;
714        }
715
716        // Pull candidate Observations from this namespace. Cap at
717        // MAX_CLUSTER_SIZE * 16 so a runaway namespace doesn't OOM the
718        // pass; the per-namespace load is bounded by the curator's
719        // existing batch contract.
720        let candidates = match store_list_namespace(
721            store,
722            &pass.ctx,
723            ns.as_str(),
724            MAX_CLUSTER_SIZE * 16,
725        )
726        .await
727        {
728            Ok(v) => v,
729            Err(e) => {
730                report
731                    .errors
732                    .push(format!("namespace '{ns}': store.list failed: {e}"));
733                continue;
734            }
735        };
736        let scanned_here = candidates.len();
737        report.observations_scanned += scanned_here;
738
739        // Stage 1 — cluster.
740        let clusters = pass.cluster(&candidates);
741        report.clusters_formed += clusters.len();
742
743        for cluster_ids in clusters {
744            // Resolve cluster ids back to Memory for eligibility check.
745            let mut cluster: Vec<Memory> = cluster_ids
746                .iter()
747                .filter_map(|id| candidates.iter().find(|m| &m.id == id).cloned())
748                .collect();
749
750            if !pass.eligible(&cluster) {
751                continue;
752            }
753            report.clusters_eligible += 1;
754
755            // Deterministic ordering so the produced reflection ids are
756            // stable across re-runs on the same input (helps debugging).
757            cluster.sort_by(|a, b| a.id.cmp(&b.id));
758
759            let summary = match pass.summarize(&cluster) {
760                Ok(s) => s,
761                Err(e) => {
762                    report
763                        .errors
764                        .push(format!("namespace '{ns}': summarize failed: {e}"));
765                    continue;
766                }
767            };
768
769            let source_ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
770
771            if dry_run {
772                report.dry_run_proposals.push(DryRunProposal {
773                    namespace: ns.clone(),
774                    proposed_title: summary.title.clone(),
775                    source_ids: source_ids.clone(),
776                });
777                continue;
778            }
779
780            match pass.persist(&summary, &source_ids).await {
781                Ok(()) => {
782                    report.reflections_persisted += 1;
783                    // Best-effort verify on the most recent reflection
784                    // in this namespace. We re-derive the id by listing
785                    // the namespace and finding the freshest Reflection
786                    // whose `reflects_on` ids match our cluster.
787                    if let Err(e) = verify_recent(store, &pass.ctx, ns, &source_ids).await {
788                        report
789                            .errors
790                            .push(format!("namespace '{ns}': verify failed: {e}"));
791                    }
792                }
793                Err(e) => {
794                    let msg = e.to_string();
795                    if msg.contains("exceeds") && msg.contains("depth") {
796                        report.depth_refusals += 1;
797                    } else {
798                        report
799                            .errors
800                            .push(format!("namespace '{ns}': persist failed: {e}"));
801                    }
802                }
803            }
804        }
805    }
806
807    report.completed_at = Utc::now().to_rfc3339();
808    Ok(report)
809}
810
811/// Best-effort verify helper used by [`run_reflection_pass`]. Looks up
812/// the most-recent Reflection in `namespace` and confirms its outbound
813/// `reflects_on` edges cover exactly the supplied `source_ids`.
814#[cfg(feature = "sal")]
815async fn verify_recent(
816    store: &dyn MemoryStore,
817    ctx: &CallerContext,
818    namespace: &str,
819    source_ids: &[String],
820) -> Result<()> {
821    let candidates = store_list_namespace(store, ctx, namespace, 16)
822        .await
823        .context("verify_recent: store.list failed")?;
824    let target_set: HashSet<&str> = source_ids.iter().map(String::as_str).collect();
825    for cand in candidates
826        .iter()
827        .filter(|m| m.memory_kind == MemoryKind::Reflection)
828    {
829        let links = store
830            .get_links_for_anchor(&cand.id)
831            .await
832            .map_err(|e| anyhow::anyhow!(e))?;
833        let outbound: HashSet<&str> = links
834            .iter()
835            .filter(|l| {
836                l.source_id == cand.id
837                    && l.relation == crate::models::MemoryLinkRelation::ReflectsOn
838            })
839            .map(|l| l.target_id.as_str())
840            .collect();
841        if outbound == target_set {
842            // Round-trip the verify step against this reflection.
843            // Reuse the trait method so the verification path is
844            // identical to what the pass would do on the standalone
845            // run.
846            // We don't have a `ReflectionPass` here so we inline the
847            // same checks via the link walk we already did.
848            return Ok(());
849        }
850    }
851    anyhow::bail!(
852        "verify_recent: no Reflection in namespace '{namespace}' carries the \
853         expected reflects_on edge set"
854    )
855}
856
857// ---------------------------------------------------------------------------
858// Internal helpers
859// ---------------------------------------------------------------------------
860
861fn tier_rank(t: &Tier) -> u8 {
862    match t {
863        Tier::Short => 0,
864        Tier::Mid => 1,
865        Tier::Long => 2,
866    }
867}
868
869/// Returns `true` when `m` is a clusterable observation: typed as
870/// Observation, not in an internal namespace, and recalled at least
871/// `MIN_RECALL_COUNT` times. The recall threshold is the substrate
872/// proxy for the spec's "recall co-occurrence frequency" signal.
873fn is_clusterable_observation(m: &Memory) -> bool {
874    m.memory_kind == MemoryKind::Observation
875        && !m.namespace.starts_with('_')
876        && m.access_count >= MIN_RECALL_COUNT
877}
878
879/// Returns `true` when two observations co-occur enough to seed a
880/// reflection cluster: same namespace, created within
881/// [`TEMPORAL_WINDOW_DAYS`] of each other, Jaccard ≥
882/// [`REFLECTION_JACCARD_THRESHOLD`].
883fn pair_co_occurs(a: &Memory, b: &Memory) -> bool {
884    if a.namespace != b.namespace {
885        return false;
886    }
887    if let (Some(ta), Some(tb)) = (parse_rfc3339(&a.created_at), parse_rfc3339(&b.created_at)) {
888        let delta = (ta - tb).num_days().abs();
889        if delta > TEMPORAL_WINDOW_DAYS {
890            return false;
891        }
892    }
893    jaccard_similarity(&a.content, &b.content) >= REFLECTION_JACCARD_THRESHOLD
894}
895
896/// Parse an RFC3339 timestamp into a `DateTime<Utc>`. Returns `None`
897/// on parse failure (the caller treats that as "no temporal signal"
898/// and lets the Jaccard step decide).
899fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
900    DateTime::parse_from_rfc3339(s)
901        .ok()
902        .map(|d| d.with_timezone(&Utc))
903}
904
905/// Jaccard similarity over alphanumeric tokens of length ≥ 3,
906/// lowercased. Mirror of the helper used by `consolidate` —
907/// duplicated here so the reflection pass has zero runtime
908/// dependency on the consolidate module ordering.
909fn jaccard_similarity(a: &str, b: &str) -> f64 {
910    let tokens = |s: &str| -> HashSet<String> {
911        s.split(|c: char| !c.is_alphanumeric())
912            .filter(|t| t.len() >= 3)
913            .map(str::to_lowercase)
914            .collect()
915    };
916    let ta = tokens(a);
917    let tb = tokens(b);
918    if ta.is_empty() && tb.is_empty() {
919        return 0.0;
920    }
921    let inter = ta.intersection(&tb).count();
922    let union = ta.union(&tb).count();
923    if union == 0 {
924        0.0
925    } else {
926        #[allow(clippy::cast_precision_loss)]
927        let result = inter as f64 / union as f64;
928        result
929    }
930}
931
932/// Compute the upper bound on the window duration in seconds.
933/// Exposed for test assertions; not used outside this module.
934#[cfg(test)]
935pub(crate) fn temporal_window_seconds() -> i64 {
936    chrono::Duration::days(TEMPORAL_WINDOW_DAYS).num_seconds()
937}
938
939// ---------------------------------------------------------------------------
940// Unit tests
941// ---------------------------------------------------------------------------
942
943// The reflection-pass suite exercises the SAL-gated `ReflectionPass` over a
944// `StubLlm: AutonomyLlm`; both are sal-only, so the whole module is gated
945// (non-sal builds have no ReflectionPass to test).
946#[cfg(all(test, feature = "sal"))]
947mod tests {
948    use super::*;
949    use crate::models::{Memory, MemoryKind, Tier};
950    use anyhow::Result;
951    use chrono::Duration;
952    use std::sync::Mutex;
953
954    // ---- Deterministic stub LLM ------------------------------------------
955
956    /// Deterministic stub for `AutonomyLlm`. Tests use this in place of
957    /// the production `OllamaClient` so the reflection-pass suite never
958    /// touches the network. The stub records every prompt it receives
959    /// so per-test assertions can pin "summarize was called for cluster
960    /// N" without inspecting log output.
961    pub(super) struct StubLlm {
962        pub(super) summary: String,
963        pub(super) calls: Mutex<Vec<String>>,
964    }
965
966    impl StubLlm {
967        pub(super) fn new(summary: &str) -> Self {
968            Self {
969                summary: summary.to_string(),
970                calls: Mutex::new(Vec::new()),
971            }
972        }
973    }
974
975    impl AutonomyLlm for StubLlm {
976        fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
977            Ok(vec![])
978        }
979        fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
980            Ok(false)
981        }
982        fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
983            self.calls
984                .lock()
985                .unwrap()
986                .push(format!("summarize:{}", memories.len()));
987            Ok(self.summary.clone())
988        }
989    }
990
991    // ---- Memory factory --------------------------------------------------
992
993    fn make_obs(id: &str, ns: &str, title: &str, content: &str, access: i64) -> Memory {
994        let now = Utc::now().to_rfc3339();
995        Memory {
996            id: id.to_string(),
997            tier: Tier::Long,
998            namespace: ns.to_string(),
999            title: title.to_string(),
1000            content: content.to_string(),
1001            tags: vec![],
1002            priority: 5,
1003            confidence: 1.0,
1004            source: "test".to_string(),
1005            access_count: access,
1006            created_at: now.clone(),
1007            updated_at: now,
1008            last_accessed_at: None,
1009            expires_at: None,
1010            metadata: serde_json::json!({}),
1011            reflection_depth: 0,
1012            memory_kind: MemoryKind::Observation,
1013            entity_id: None,
1014            persona_version: None,
1015            citations: Vec::new(),
1016            source_uri: None,
1017            source_span: None,
1018            confidence_source: ConfidenceSource::CallerProvided,
1019            confidence_signals: None,
1020            confidence_decayed_at: None,
1021            version: 1,
1022        }
1023    }
1024
1025    // ---- Eligibility -----------------------------------------------------
1026
1027    #[test]
1028    fn eligible_rejects_below_min_cluster_size() {
1029        // The cluster-size invariant is checked at the eligibility gate
1030        // — pure data check, no DB / LLM dependency.
1031        let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1032            .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1033            .collect();
1034        let result = cluster.len() >= MIN_CLUSTER_SIZE
1035            && cluster.len() <= MAX_CLUSTER_SIZE
1036            && !cluster[0].namespace.starts_with('_')
1037            && cluster.iter().all(|m| {
1038                m.memory_kind == MemoryKind::Observation
1039                    && m.namespace == cluster[0].namespace
1040                    && m.access_count >= MIN_RECALL_COUNT
1041            });
1042        assert!(!result, "below-MIN cluster must not be eligible");
1043    }
1044
1045    #[test]
1046    fn eligible_rejects_reflection_kind_member() {
1047        // Reflection-on-reflection chains form across passes (sequential
1048        // runs at depth=1 → depth=2). A single-pass cluster that already
1049        // contains a typed Reflection must NOT be eligible — that's the
1050        // job of a follow-up pass, not this one.
1051        let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1052            .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1053            .collect();
1054        cluster[0].memory_kind = MemoryKind::Reflection;
1055        let result = cluster
1056            .iter()
1057            .all(|m| m.memory_kind == MemoryKind::Observation);
1058        assert!(!result, "mixed-kind cluster must not be eligible");
1059    }
1060
1061    #[test]
1062    fn eligible_rejects_internal_namespace() {
1063        let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1064            .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "kubernetes deploy", 1))
1065            .collect();
1066        let result = !cluster[0].namespace.starts_with('_');
1067        assert!(!result, "internal-namespace cluster must not be eligible");
1068    }
1069
1070    // ---- Clustering ------------------------------------------------------
1071
1072    #[test]
1073    fn cluster_groups_three_co_occurring_observations() {
1074        // Three observations in the same namespace, all with shared
1075        // Jaccard tokens, access_count >= 1 → form a single cluster.
1076        let m1 = make_obs("a", "ns", "t1", "kubernetes rolling deploy strategy", 2);
1077        let m2 = make_obs("b", "ns", "t2", "kubernetes deploy canary strategy", 3);
1078        let m3 = make_obs("c", "ns", "t3", "kubernetes rolling deploy approach", 1);
1079
1080        // We can't construct a real ReflectionPass without a Connection,
1081        // so test the cluster() logic via the standalone pair_co_occurs
1082        // helper plus a manual seeded walk.
1083        let obs = [m1.clone(), m2.clone(), m3.clone()];
1084        let pairs = [
1085            pair_co_occurs(&m1, &m2),
1086            pair_co_occurs(&m1, &m3),
1087            pair_co_occurs(&m2, &m3),
1088        ];
1089        assert!(
1090            pairs.iter().all(|p| *p),
1091            "all three pairs must co-occur, got {pairs:?}"
1092        );
1093        assert_eq!(obs.len(), MIN_CLUSTER_SIZE);
1094    }
1095
1096    #[test]
1097    fn cluster_skips_observations_with_zero_access_count() {
1098        // access_count = 0 → not clusterable. This is the substrate
1099        // proxy for "no recall co-occurrence signal".
1100        let cold = make_obs("cold", "ns", "t", "kubernetes deploy", 0);
1101        assert!(!is_clusterable_observation(&cold));
1102    }
1103
1104    #[test]
1105    fn pair_co_occurs_rejects_cross_namespace() {
1106        let a = make_obs("a", "ns1", "t", "shared content tokens", 1);
1107        let b = make_obs("b", "ns2", "t", "shared content tokens", 1);
1108        assert!(!pair_co_occurs(&a, &b));
1109    }
1110
1111    #[test]
1112    fn pair_co_occurs_respects_temporal_window() {
1113        // Build two memories whose created_at straddle the 7-day
1114        // window. The pair must NOT co-occur.
1115        let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1116        let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1117        let now = Utc::now();
1118        a.created_at = now.to_rfc3339();
1119        b.created_at = (now - Duration::days(TEMPORAL_WINDOW_DAYS + 2)).to_rfc3339();
1120        assert!(
1121            !pair_co_occurs(&a, &b),
1122            "outside-window pair must not co-occur"
1123        );
1124    }
1125
1126    #[test]
1127    fn pair_co_occurs_below_jaccard_threshold_is_false() {
1128        let a = make_obs("a", "ns", "t", "kubernetes deploy strategy", 1);
1129        let b = make_obs(
1130            "b",
1131            "ns",
1132            "t",
1133            "completely unrelated quantum mechanics text",
1134            1,
1135        );
1136        assert!(!pair_co_occurs(&a, &b));
1137    }
1138
1139    // ---- Helpers ---------------------------------------------------------
1140
1141    #[test]
1142    fn jaccard_similarity_is_symmetric() {
1143        let a = "kubernetes rolling deploy canary";
1144        let b = "kubernetes canary rolling deploy strategy";
1145        let sim_ab = jaccard_similarity(a, b);
1146        let sim_ba = jaccard_similarity(b, a);
1147        assert!((sim_ab - sim_ba).abs() < 1e-9);
1148    }
1149
1150    #[test]
1151    fn jaccard_similarity_empty_strings_zero() {
1152        assert_eq!(jaccard_similarity("", ""), 0.0);
1153    }
1154
1155    #[test]
1156    fn temporal_window_is_7_days() {
1157        // 7 * 24 * 3600 = 604_800 seconds (named as
1158        // `crate::SECS_PER_WEEK` post PR #1174 PR3 — pm-v3.1).
1159        assert_eq!(temporal_window_seconds(), crate::SECS_PER_WEEK);
1160    }
1161
1162    #[test]
1163    fn config_default_is_disabled() {
1164        // Per spec acceptance — operators must opt in per namespace.
1165        let cfg = ReflectionPassConfig::default();
1166        assert!(!cfg.enabled);
1167        assert!(cfg.max_depth.is_none());
1168    }
1169
1170    #[test]
1171    fn config_round_trips_json() {
1172        let cfg = ReflectionPassConfig {
1173            enabled: true,
1174            max_depth: Some(2),
1175        };
1176        let json = serde_json::to_string(&cfg).unwrap();
1177        let back: ReflectionPassConfig = serde_json::from_str(&json).unwrap();
1178        assert_eq!(cfg, back);
1179    }
1180
1181    // ---- Stub LLM contract ----------------------------------------------
1182
1183    #[test]
1184    fn stub_llm_records_calls() {
1185        let stub = StubLlm::new("synthesised pattern");
1186        let out = stub
1187            .summarize_memories(&[("t1".into(), "c1".into()), ("t2".into(), "c2".into())])
1188            .unwrap();
1189        assert_eq!(out, "synthesised pattern");
1190        let calls = stub.calls.lock().unwrap();
1191        assert_eq!(calls.len(), 1);
1192        assert!(calls[0].starts_with("summarize:"));
1193    }
1194
1195    // ---- Report serialisation -------------------------------------------
1196
1197    #[test]
1198    fn report_serialises_to_json() {
1199        let r = ReflectionPassReport {
1200            started_at: "2026-01-01T00:00:00Z".into(),
1201            completed_at: "2026-01-01T00:00:01Z".into(),
1202            namespaces_visited: 1,
1203            observations_scanned: 30,
1204            clusters_formed: 3,
1205            clusters_eligible: 3,
1206            reflections_persisted: 3,
1207            depth_refusals: 0,
1208            errors: vec![],
1209            dry_run_proposals: vec![],
1210            dry_run: false,
1211        };
1212        let json = serde_json::to_string(&r).unwrap();
1213        assert!(json.contains("reflections_persisted"));
1214        assert!(json.contains("clusters_eligible"));
1215        let back: ReflectionPassReport = serde_json::from_str(&json).unwrap();
1216        assert_eq!(back.observations_scanned, 30);
1217    }
1218
1219    // ---- ReflectionPass trait coverage (with real DB) ---------------------
1220    //
1221    // Issue #1548 — the reflection pass now operates over the SAL
1222    // `MemoryStore` trait rather than a raw `rusqlite::Connection`. The
1223    // trait is `sal`-gated, so this whole block is too. Tests open a
1224    // `SqliteStore` (which thin-wraps `crate::db::open`) and pass
1225    // `&store` into the pass. Seeding + assertions still go through the
1226    // `crate::db::*` free functions over a raw connection opened at the
1227    // same path (`conn_of`) so the test bodies stay close to the
1228    // pre-trait shape — exercising the SAME underlying SQLite file the
1229    // store reads. The pass methods are now `async`, so the trait-driving
1230    // tests are `#[tokio::test]`.
1231    #[cfg(feature = "sal")]
1232    mod sal_pass_tests {
1233        use super::*;
1234
1235        use crate::store::sqlite::SqliteStore;
1236
1237        fn open_db() -> (SqliteStore, tempfile::TempDir) {
1238            let dir = tempfile::tempdir().expect("tempdir");
1239            let path = dir.path().join("test.db");
1240            let store = SqliteStore::open(&path).expect("SqliteStore::open");
1241            (store, dir)
1242        }
1243
1244        /// Open a raw `rusqlite::Connection` at the store's backing file for
1245        /// seeding + assertions via the legacy `crate::db::*` free functions.
1246        fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
1247            crate::db::open(store.path()).expect("db::open at store path")
1248        }
1249
1250        fn insert_observation(
1251            conn: &rusqlite::Connection,
1252            ns: &str,
1253            title: &str,
1254            content: &str,
1255            access_count: i64,
1256        ) -> String {
1257            let now = chrono::Utc::now().to_rfc3339();
1258            let mut metadata = crate::models::default_metadata();
1259            if let Some(obj) = metadata.as_object_mut() {
1260                obj.insert(
1261                    "agent_id".to_string(),
1262                    serde_json::Value::String("test-agent".to_string()),
1263                );
1264            }
1265            let mem = Memory {
1266                id: uuid::Uuid::new_v4().to_string(),
1267                tier: Tier::Long,
1268                namespace: ns.to_string(),
1269                title: title.to_string(),
1270                content: content.to_string(),
1271                tags: vec![],
1272                priority: 5,
1273                confidence: 1.0,
1274                source: "test".to_string(),
1275                access_count,
1276                created_at: now.clone(),
1277                updated_at: now,
1278                last_accessed_at: None,
1279                expires_at: None,
1280                metadata,
1281                reflection_depth: 0,
1282                memory_kind: MemoryKind::Observation,
1283                entity_id: None,
1284                persona_version: None,
1285                citations: Vec::new(),
1286                source_uri: None,
1287                source_span: None,
1288                confidence_source: ConfidenceSource::CallerProvided,
1289                confidence_signals: None,
1290                confidence_decayed_at: None,
1291                version: 1,
1292            };
1293            crate::db::insert(conn, &mem).unwrap()
1294        }
1295
1296        #[test]
1297        fn pass_name_is_reflection() {
1298            let (store, _dir) = open_db();
1299            let llm = StubLlm::new("S");
1300            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1301            assert_eq!(pass.name(), "reflection");
1302        }
1303
1304        #[test]
1305        fn agent_id_falls_back_to_ai_curator_without_keypair() {
1306            let (store, _dir) = open_db();
1307            let llm = StubLlm::new("S");
1308            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1309            assert_eq!(pass.agent_id(), "ai:curator");
1310        }
1311
1312        #[test]
1313        fn agent_id_uses_keypair_when_provided() {
1314            let (store, _dir) = open_db();
1315            let llm = StubLlm::new("S");
1316            use ed25519_dalek::{SigningKey, VerifyingKey};
1317            let mut rng = rand_core::OsRng;
1318            let sk = SigningKey::generate(&mut rng);
1319            let vk: VerifyingKey = (&sk).into();
1320            let kp = AgentKeypair {
1321                agent_id: "test:agent-x".to_string(),
1322                public: vk,
1323                private: Some(sk),
1324            };
1325            let pass = ReflectionPass::new(&store, &llm, Some(&kp), None, false);
1326            assert_eq!(pass.agent_id(), "test:agent-x");
1327        }
1328
1329        #[test]
1330        fn cluster_excludes_zero_access_observations() {
1331            let (store, _dir) = open_db();
1332            let llm = StubLlm::new("S");
1333            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1334            let m1 = make_obs("a", "ns", "t", "shared keyword tokens here", 0); // 0 access → skipped
1335            let m2 = make_obs("b", "ns", "t", "shared keyword tokens here", 5);
1336            let m3 = make_obs("c", "ns", "t", "shared keyword tokens here", 5);
1337            let m4 = make_obs("d", "ns", "t", "shared keyword tokens here", 5);
1338            let clusters = pass.cluster(&[m1, m2, m3, m4]);
1339            assert_eq!(clusters.len(), 1);
1340            assert_eq!(clusters[0].len(), 3);
1341        }
1342
1343        #[test]
1344        fn cluster_caps_at_max_cluster_size() {
1345            let (store, _dir) = open_db();
1346            let llm = StubLlm::new("S");
1347            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1348            // 15 mems with shared tokens → cluster capped at MAX_CLUSTER_SIZE = 12.
1349            let mems: Vec<Memory> = (0..15)
1350                .map(|i| {
1351                    make_obs(
1352                        &format!("m{i:02}"),
1353                        "ns",
1354                        "t",
1355                        "shared keyword tokens here pattern",
1356                        1,
1357                    )
1358                })
1359                .collect();
1360            let clusters = pass.cluster(&mems);
1361            // First-seed cluster grows up to MAX_CLUSTER_SIZE.
1362            for c in &clusters {
1363                assert!(c.len() <= MAX_CLUSTER_SIZE);
1364            }
1365        }
1366
1367        #[test]
1368        fn eligible_pass_method_accepts_valid() {
1369            let (store, _dir) = open_db();
1370            let llm = StubLlm::new("S");
1371            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1372            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1373                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1374                .collect();
1375            assert!(pass.eligible(&cluster));
1376        }
1377
1378        #[test]
1379        fn eligible_pass_method_rejects_oversize() {
1380            let (store, _dir) = open_db();
1381            let llm = StubLlm::new("S");
1382            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1383            let cluster: Vec<Memory> = (0..(MAX_CLUSTER_SIZE + 1))
1384                .map(|i| make_obs(&format!("m{i:02}"), "ns", "t", "c", 1))
1385                .collect();
1386            assert!(!pass.eligible(&cluster));
1387        }
1388
1389        #[test]
1390        fn eligible_pass_method_rejects_reflection_member() {
1391            let (store, _dir) = open_db();
1392            let llm = StubLlm::new("S");
1393            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1394            let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1395                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1396                .collect();
1397            cluster[0].memory_kind = MemoryKind::Reflection;
1398            assert!(!pass.eligible(&cluster));
1399        }
1400
1401        #[test]
1402        fn eligible_pass_method_rejects_zero_access() {
1403            let (store, _dir) = open_db();
1404            let llm = StubLlm::new("S");
1405            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1406            let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1407                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1408                .collect();
1409            cluster[1].access_count = 0;
1410            assert!(!pass.eligible(&cluster));
1411        }
1412
1413        #[test]
1414        fn summarize_below_min_errors() {
1415            let (store, _dir) = open_db();
1416            let llm = StubLlm::new("S");
1417            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1418            let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1419                .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1420                .collect();
1421            let err = pass.summarize(&cluster).unwrap_err().to_string();
1422            assert!(err.contains("< MIN_CLUSTER_SIZE"));
1423        }
1424
1425        #[test]
1426        fn summarize_returns_reflection_typed_memory() {
1427            let (store, _dir) = open_db();
1428            let llm = StubLlm::new("synth pattern");
1429            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1430            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1431                .map(|i| {
1432                    let mut m = make_obs(&format!("m{i}"), "ns", "Title-A", "shared content", 2);
1433                    m.tier = if i == 0 { Tier::Long } else { Tier::Mid };
1434                    m.priority = 5 + i32::try_from(i).unwrap();
1435                    m
1436                })
1437                .collect();
1438            let summary = pass.summarize(&cluster).unwrap();
1439            assert_eq!(summary.memory_kind, MemoryKind::Reflection);
1440            assert!(summary.title.starts_with("[reflection]"));
1441            assert_eq!(summary.content, "synth pattern");
1442            assert_eq!(summary.tier, Tier::Long);
1443            assert_eq!(summary.source, "system");
1444            assert_eq!(summary.namespace, "ns");
1445            // Priority = max of source priorities.
1446            assert_eq!(
1447                summary.priority,
1448                5 + i32::try_from(MIN_CLUSTER_SIZE - 1).unwrap()
1449            );
1450        }
1451
1452        #[tokio::test]
1453        async fn persist_dry_run_is_noop() {
1454            let (store, _dir) = open_db();
1455            let llm = StubLlm::new("S");
1456            let pass = ReflectionPass::new(&store, &llm, None, None, true);
1457            let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1458            pass.persist(&summary, &["x".to_string()]).await.unwrap();
1459        }
1460
1461        #[tokio::test]
1462        async fn persist_empty_sources_is_noop() {
1463            let (store, _dir) = open_db();
1464            let llm = StubLlm::new("S");
1465            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1466            let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1467            pass.persist(&summary, &[]).await.unwrap();
1468        }
1469
1470        #[tokio::test]
1471        async fn persist_refuses_when_max_depth_exceeded() {
1472            let (store, _dir) = open_db();
1473            let llm = StubLlm::new("S");
1474            // Pass max_depth=1; insert source at reflection_depth=1 → new depth = 2 → refused.
1475            let pass = ReflectionPass::new(&store, &llm, None, Some(1), false);
1476            let mut source = make_obs("src", "ns", "t", "c", 1);
1477            source.reflection_depth = 1;
1478            let src_id = crate::db::insert(&conn_of(&store), &source).unwrap();
1479            let summary = make_obs("s", "ns", "[reflection]", "c", 0);
1480            let err = pass
1481                .persist(&summary, &[src_id])
1482                .await
1483                .unwrap_err()
1484                .to_string();
1485            assert!(err.contains("exceeds"));
1486            assert!(err.contains("--max-depth"));
1487        }
1488
1489        #[tokio::test]
1490        async fn persist_writes_reflection_into_db() {
1491            // Full happy-path: real sources, real reflect insert.
1492            let (store, _dir) = open_db();
1493            let conn = conn_of(&store);
1494            let llm = StubLlm::new("synthesised pattern");
1495            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1496            // Seed three observations in namespace 'app'.
1497            let s1 = insert_observation(&conn, "app", "T1", "kubernetes deploy strategy notes", 2);
1498            let s2 =
1499                insert_observation(&conn, "app", "T2", "kubernetes rolling deploy approach", 3);
1500            let s3 = insert_observation(&conn, "app", "T3", "kubernetes canary deploy strategy", 1);
1501            let summary = pass
1502                .summarize(&[
1503                    crate::db::get(&conn, &s1).unwrap().unwrap(),
1504                    crate::db::get(&conn, &s2).unwrap().unwrap(),
1505                    crate::db::get(&conn, &s3).unwrap().unwrap(),
1506                ])
1507                .unwrap();
1508            pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1509                .await
1510                .unwrap();
1511
1512            // Find the freshly-written reflection in the 'app' namespace.
1513            let listed = crate::db::list(
1514                &conn,
1515                Some("app"),
1516                None,
1517                32,
1518                0,
1519                None,
1520                None,
1521                None,
1522                None,
1523                None,
1524            )
1525            .unwrap();
1526            let refl = listed
1527                .iter()
1528                .find(|m| m.memory_kind == MemoryKind::Reflection)
1529                .expect("expected one reflection");
1530            // verify() should succeed on it.
1531            pass.verify(refl.id.clone()).await.unwrap();
1532        }
1533
1534        #[tokio::test]
1535        async fn verify_missing_id_errors() {
1536            let (store, _dir) = open_db();
1537            let llm = StubLlm::new("S");
1538            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1539            let err = pass
1540                .verify("no-such".to_string())
1541                .await
1542                .unwrap_err()
1543                .to_string();
1544            assert!(err.contains("not found in DB"));
1545        }
1546
1547        #[tokio::test]
1548        async fn verify_wrong_kind_errors() {
1549            let (store, _dir) = open_db();
1550            let conn = conn_of(&store);
1551            let llm = StubLlm::new("S");
1552            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1553            // Insert an Observation and verify against its id — must error.
1554            let id = insert_observation(&conn, "ns", "T", "c", 1);
1555            let err = pass.verify(id).await.unwrap_err().to_string();
1556            assert!(err.contains("expected Reflection"));
1557        }
1558
1559        #[tokio::test]
1560        async fn verify_reflection_without_edges_errors() {
1561            // Insert a Reflection directly via db::insert (no reflects_on links)
1562            // and verify against it — must error with "no reflects_on edge".
1563            let (store, _dir) = open_db();
1564            let conn = conn_of(&store);
1565            let llm = StubLlm::new("S");
1566            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1567            let now = chrono::Utc::now().to_rfc3339();
1568            let mut metadata = crate::models::default_metadata();
1569            if let Some(obj) = metadata.as_object_mut() {
1570                obj.insert(
1571                    "agent_id".to_string(),
1572                    serde_json::Value::String("test-agent".to_string()),
1573                );
1574            }
1575            let m = Memory {
1576                id: uuid::Uuid::new_v4().to_string(),
1577                tier: Tier::Mid,
1578                namespace: "ns".to_string(),
1579                title: "[reflection] orphan".to_string(),
1580                content: "c".to_string(),
1581                tags: vec![],
1582                priority: 5,
1583                confidence: 1.0,
1584                source: "system".to_string(),
1585                access_count: 0,
1586                created_at: now.clone(),
1587                updated_at: now,
1588                last_accessed_at: None,
1589                expires_at: None,
1590                metadata,
1591                reflection_depth: 1,
1592                memory_kind: MemoryKind::Reflection,
1593                entity_id: None,
1594                persona_version: None,
1595                citations: Vec::new(),
1596                source_uri: None,
1597                source_span: None,
1598                confidence_source: ConfidenceSource::CallerProvided,
1599                confidence_signals: None,
1600                confidence_decayed_at: None,
1601                version: 1,
1602            };
1603            let id = crate::db::insert(&conn, &m).unwrap();
1604            let err = pass.verify(id).await.unwrap_err().to_string();
1605            assert!(err.contains("no reflects_on edge"));
1606        }
1607
1608        // ---- run_reflection_pass driver -------------------------------------
1609
1610        #[tokio::test]
1611        async fn run_reflection_pass_empty_db_dry_run_namespace() {
1612            let (store, _dir) = open_db();
1613            let llm = StubLlm::new("S");
1614            let report =
1615                run_reflection_pass(&store, &llm, None, Some("nope"), None, true, |_| true)
1616                    .await
1617                    .unwrap();
1618            assert!(report.dry_run);
1619            assert_eq!(report.namespaces_visited, 1);
1620            assert_eq!(report.clusters_formed, 0);
1621            assert_eq!(report.reflections_persisted, 0);
1622        }
1623
1624        #[tokio::test]
1625        async fn run_reflection_pass_all_namespaces_with_disabled_check() {
1626            let (store, _dir) = open_db();
1627            let conn = conn_of(&store);
1628            let llm = StubLlm::new("S");
1629            // Seed an observation so list_namespaces sees one ns.
1630            insert_observation(&conn, "ns1", "t", "shared content tokens here", 2);
1631            let report = run_reflection_pass(&store, &llm, None, None, None, true, |_| false)
1632                .await
1633                .unwrap();
1634            // namespace is enumerated but enabled_check returns false → no work.
1635            assert_eq!(report.observations_scanned, 0);
1636        }
1637
1638        #[tokio::test]
1639        async fn run_reflection_pass_dry_run_reports_proposals() {
1640            let (store, _dir) = open_db();
1641            let conn = conn_of(&store);
1642            let llm = StubLlm::new("synth");
1643            // Seed three observations that should cluster.
1644            insert_observation(
1645                &conn,
1646                "app",
1647                "T1",
1648                "kubernetes rolling deploy strategy notes",
1649                2,
1650            );
1651            insert_observation(
1652                &conn,
1653                "app",
1654                "T2",
1655                "kubernetes rolling deploy strategy canary",
1656                3,
1657            );
1658            insert_observation(
1659                &conn,
1660                "app",
1661                "T3",
1662                "kubernetes canary deploy strategy rolling",
1663                1,
1664            );
1665            let report = run_reflection_pass(&store, &llm, None, Some("app"), None, true, |_| true)
1666                .await
1667                .unwrap();
1668            assert!(report.dry_run);
1669            assert!(report.observations_scanned >= 3);
1670            assert!(report.clusters_eligible >= 1);
1671            assert!(!report.dry_run_proposals.is_empty());
1672            assert_eq!(report.reflections_persisted, 0);
1673        }
1674
1675        #[tokio::test]
1676        async fn run_reflection_pass_persists_reflections() {
1677            let (store, _dir) = open_db();
1678            let conn = conn_of(&store);
1679            let llm = StubLlm::new("persisted pattern");
1680            insert_observation(&conn, "app", "T1", "shared keyword token strategy notes", 2);
1681            insert_observation(&conn, "app", "T2", "shared keyword token strategy plan", 3);
1682            insert_observation(
1683                &conn,
1684                "app",
1685                "T3",
1686                "shared keyword token strategy canary",
1687                1,
1688            );
1689            let report =
1690                run_reflection_pass(&store, &llm, None, Some("app"), None, false, |_| true)
1691                    .await
1692                    .unwrap();
1693            assert_eq!(report.dry_run, false);
1694            assert!(report.reflections_persisted >= 1);
1695        }
1696
1697        #[tokio::test]
1698        async fn run_reflection_pass_depth_refusal_increments_counter() {
1699            let (store, _dir) = open_db();
1700            let conn = conn_of(&store);
1701            let llm = StubLlm::new("synth");
1702            // Seed observations at depth=2 so a new reflection (depth=3)
1703            // would exceed our curator max_depth=2.
1704            let now = chrono::Utc::now().to_rfc3339();
1705            for i in 0..3 {
1706                let mut metadata = crate::models::default_metadata();
1707                if let Some(obj) = metadata.as_object_mut() {
1708                    obj.insert(
1709                        "agent_id".to_string(),
1710                        serde_json::Value::String("test-agent".to_string()),
1711                    );
1712                }
1713                let m = Memory {
1714                    id: uuid::Uuid::new_v4().to_string(),
1715                    tier: Tier::Long,
1716                    namespace: "deep".to_string(),
1717                    title: format!("Tdeep-{i}"),
1718                    content: "shared keyword token deep strategy".to_string(),
1719                    tags: vec![],
1720                    priority: 5,
1721                    confidence: 1.0,
1722                    source: "test".to_string(),
1723                    access_count: 2,
1724                    created_at: now.clone(),
1725                    updated_at: now.clone(),
1726                    last_accessed_at: None,
1727                    expires_at: None,
1728                    metadata,
1729                    reflection_depth: 2,
1730                    memory_kind: MemoryKind::Observation,
1731                    entity_id: None,
1732                    persona_version: None,
1733                    citations: Vec::new(),
1734                    source_uri: None,
1735                    source_span: None,
1736                    confidence_source: ConfidenceSource::CallerProvided,
1737                    confidence_signals: None,
1738                    confidence_decayed_at: None,
1739                    version: 1,
1740                };
1741                crate::db::insert(&conn, &m).unwrap();
1742            }
1743            let report = run_reflection_pass(
1744                &store,
1745                &llm,
1746                None,
1747                Some("deep"),
1748                Some(2), // cap=2; new depth would be 3 → refuse
1749                false,
1750                |_| true,
1751            )
1752            .await
1753            .unwrap();
1754            assert!(report.depth_refusals >= 1);
1755            // No reflection persisted.
1756            assert_eq!(report.reflections_persisted, 0);
1757        }
1758
1759        #[test]
1760        fn dry_run_proposal_serialises() {
1761            let p = DryRunProposal {
1762                namespace: "ns".into(),
1763                proposed_title: "[reflection] x".into(),
1764                source_ids: vec!["a".into(), "b".into()],
1765            };
1766            let j = serde_json::to_string(&p).unwrap();
1767            assert!(j.contains("source_ids"));
1768            let back: DryRunProposal = serde_json::from_str(&j).unwrap();
1769            assert_eq!(back.namespace, "ns");
1770        }
1771
1772        // ---- pair_co_occurs unparseable timestamps fall through ----
1773
1774        #[test]
1775        fn pair_co_occurs_unparseable_timestamps_still_checks_jaccard() {
1776            let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1777            let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1778            a.created_at = "not-a-timestamp".to_string();
1779            b.created_at = "also-invalid".to_string();
1780            // With invalid timestamps, the temporal check is skipped — Jaccard
1781            // alone decides. These share tokens → co-occur returns true.
1782            assert!(pair_co_occurs(&a, &b));
1783        }
1784
1785        // ---- Additional coverage: trait impls + edge branches ----
1786
1787        #[test]
1788        fn stub_llm_auto_tag_and_contradiction_paths() {
1789            let stub = StubLlm::new("S");
1790            let tags = stub.auto_tag("t", "c").unwrap();
1791            assert!(tags.is_empty());
1792            let conflict = stub.detect_contradiction("a", "b").unwrap();
1793            assert!(!conflict);
1794        }
1795
1796        #[test]
1797        fn eligible_pass_rejects_internal_namespace_directly() {
1798            // Drives the internal-namespace early-return.
1799            let (store, _dir) = open_db();
1800            let llm = StubLlm::new("S");
1801            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1802            let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1803                .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "c", 1))
1804                .collect();
1805            assert!(!pass.eligible(&cluster));
1806        }
1807
1808        #[test]
1809        fn jaccard_similarity_zero_union_returns_zero() {
1810            // The else-branch where `union == 0`: ta non-empty but tb is — no.
1811            // Actually if either has >=3 chars words there will be union.
1812            // Build inputs where tokens are stripped to nothing.
1813            let a = "a b c"; // every token <3 chars → tokens set is empty
1814            let b = "x";
1815            assert_eq!(jaccard_similarity(a, b), 0.0);
1816        }
1817
1818        #[test]
1819        fn tier_rank_all_variants() {
1820            // Drives all three match arms.
1821            assert_eq!(tier_rank(&Tier::Short), 0);
1822            assert_eq!(tier_rank(&Tier::Mid), 1);
1823            assert_eq!(tier_rank(&Tier::Long), 2);
1824        }
1825
1826        #[test]
1827        fn parse_rfc3339_invalid_returns_none() {
1828            assert!(parse_rfc3339("garbage").is_none());
1829            assert!(parse_rfc3339("2026-01-01T00:00:00Z").is_some());
1830        }
1831
1832        #[tokio::test]
1833        async fn verify_skips_inbound_links() {
1834            // verify() walks all links but `continue`s when source_id != summary_id.
1835            // This exercises the inbound-link `continue` branch.
1836            let (store, _dir) = open_db();
1837            let conn = conn_of(&store);
1838            let llm = StubLlm::new("S");
1839            let pass = ReflectionPass::new(&store, &llm, None, None, false);
1840            // Seed the reflection target via the full persist path.
1841            let s1 =
1842                insert_observation(&conn, "vrf", "T1", "shared keyword pattern tokens here", 2);
1843            let s2 =
1844                insert_observation(&conn, "vrf", "T2", "shared keyword pattern tokens here", 2);
1845            let s3 =
1846                insert_observation(&conn, "vrf", "T3", "shared keyword pattern tokens here", 2);
1847            let summary = pass
1848                .summarize(&[
1849                    crate::db::get(&conn, &s1).unwrap().unwrap(),
1850                    crate::db::get(&conn, &s2).unwrap().unwrap(),
1851                    crate::db::get(&conn, &s3).unwrap().unwrap(),
1852                ])
1853                .unwrap();
1854            pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1855                .await
1856                .unwrap();
1857            let listed = crate::db::list(
1858                &conn,
1859                Some("vrf"),
1860                None,
1861                32,
1862                0,
1863                None,
1864                None,
1865                None,
1866                None,
1867                None,
1868            )
1869            .unwrap();
1870            let refl_id = listed
1871                .iter()
1872                .find(|m| m.memory_kind == MemoryKind::Reflection)
1873                .unwrap()
1874                .id
1875                .clone();
1876            // Create a foreign link with this reflection as TARGET (not source).
1877            // verify() should still succeed because it ignores inbound links.
1878            let _ = crate::db::create_link(&conn, &s1, &refl_id, "related_to");
1879            pass.verify(refl_id).await.unwrap();
1880        }
1881
1882        #[tokio::test]
1883        async fn run_reflection_pass_summarize_error_recorded() {
1884            // Drive the summarize failure path. Use a StubLlm that
1885            // fails summarize. We need it implementing AutonomyLlm with an
1886            // error return.
1887            struct FailingLlm;
1888            impl AutonomyLlm for FailingLlm {
1889                fn auto_tag(&self, _t: &str, _c: &str) -> Result<Vec<String>> {
1890                    Ok(vec![])
1891                }
1892                fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
1893                    Ok(false)
1894                }
1895                fn summarize_memories(&self, _m: &[(String, String)]) -> Result<String> {
1896                    anyhow::bail!("forced llm failure")
1897                }
1898            }
1899            let (store, _dir) = open_db();
1900            let conn = conn_of(&store);
1901            let llm = FailingLlm;
1902            insert_observation(&conn, "ns", "T1", "shared keyword pattern tokens here", 2);
1903            insert_observation(&conn, "ns", "T2", "shared keyword pattern tokens here", 2);
1904            insert_observation(&conn, "ns", "T3", "shared keyword pattern tokens here", 2);
1905            let report = run_reflection_pass(&store, &llm, None, Some("ns"), None, false, |_| true)
1906                .await
1907                .unwrap();
1908            // Summarize error was caught and recorded.
1909            assert!(report.errors.iter().any(|e| e.contains("summarize failed")));
1910            assert_eq!(report.reflections_persisted, 0);
1911        }
1912    } // mod sal_pass_tests
1913}