Skip to main content

ai_memory/confidence/
shadow.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 Form 5 — shadow-mode telemetry pipeline.
5//!
6//! Per-recall observations land in `confidence_shadow_observations`
7//! when `AI_MEMORY_CONFIDENCE_SHADOW=1`, sampled at the rate carried
8//! by `AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE` (0.0..=1.0; default
9//! 1.0 when shadow is enabled).
10//!
11//! Audit-honest contract: shadow mode **never silently overrides** the
12//! caller's confidence. The recall ranker still uses the caller value
13//! downstream; the derived value is only persisted for later
14//! calibration. This is the load-bearing property that lets operators
15//! safely turn the engine on in production.
16//!
17//! # Surface
18//!
19//! * [`observe`] — write a single shadow row.
20//! * [`should_sample`] — gate helper that consults the cached config
21//!   (enabled flag + sample rate). The first call captures the env-var
22//!   pair into a process-wide [`OnceLock`]; subsequent calls return the
23//!   cached value without hitting the `std::env` syscall. This is the
24//!   PERF-9 fix (Cluster G, issue #767): pre-Cluster-G, every recall
25//!   touch re-read both env vars on the hot path.
26//! * [`gc_observations`] — periodic GC sweep deleting rows older than
27//!   the configured retention window. Wired into the daemon's
28//!   `spawn_gc_loop` from `daemon_runtime.rs`. PERF-4 fix.
29
30use std::sync::OnceLock;
31
32use anyhow::{Context, Result};
33use rusqlite::{Connection, params};
34
35use crate::models::ConfidenceSignals;
36
37/// Environment-variable opt-in for shadow mode.
38pub const ENV_SHADOW: &str = "AI_MEMORY_CONFIDENCE_SHADOW";
39/// Optional sample rate (0.0..=1.0). When unset, defaults to 1.0
40/// while shadow is enabled — every recall touch lands a row.
41pub const ENV_SHADOW_SAMPLE_RATE: &str = "AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE";
42
43/// Default retention window for the periodic GC sweep on
44/// `confidence_shadow_observations`. 30 days mirrors the Form 5
45/// calibration window: the sweep runs against the table that the
46/// calibration sweep reads from, so an aligned default keeps the
47/// pipeline "what you see in the report is what you have on disk."
48///
49/// Operators tune this per `[confidence] shadow_retention_days = N` in
50/// `config.toml`. The sweep deletes rows whose `observed_at` is older
51/// than `now - N days`.
52pub const DEFAULT_SHADOW_RETENTION_DAYS: i64 = 30;
53
54/// Cached shadow config — captured on first access from the env-var
55/// pair. The recall hot path reads this OnceLock instead of calling
56/// `std::env::var` per touch (PERF-9).
57///
58/// Tests that need to flip the env-var mid-process call
59/// [`reset_shadow_config_for_tests`] to force a re-capture; production
60/// code never resets, so the first call's view is the load-bearing one.
61#[derive(Debug, Clone, Copy)]
62pub struct ShadowConfig {
63    /// `true` when `AI_MEMORY_CONFIDENCE_SHADOW=1` at first-access
64    /// time. Subsequent env-var mutations are not honored (cached).
65    pub enabled: bool,
66    /// Sample rate in `[0.0, 1.0]`. Captured from
67    /// `AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE` at first-access time;
68    /// defaults to 1.0 when unset or malformed.
69    pub sample_rate: f64,
70}
71
72impl ShadowConfig {
73    /// Build a config snapshot by reading both env vars once.
74    fn from_env() -> Self {
75        let enabled = std::env::var(ENV_SHADOW).is_ok_and(|v| v == "1");
76        let sample_rate = std::env::var(ENV_SHADOW_SAMPLE_RATE)
77            .ok()
78            .and_then(|s| s.parse::<f64>().ok())
79            .map(|v| v.clamp(0.0, 1.0))
80            .unwrap_or(1.0);
81        Self {
82            enabled,
83            sample_rate,
84        }
85    }
86}
87
88static SHADOW_CONFIG: OnceLock<ShadowConfig> = OnceLock::new();
89
90/// Returns the cached shadow config, capturing it from env vars on the
91/// first call. PERF-9: subsequent calls do NOT touch `std::env` — the
92/// returned reference points into a process-wide [`OnceLock`].
93#[must_use]
94pub fn shadow_config() -> &'static ShadowConfig {
95    SHADOW_CONFIG.get_or_init(ShadowConfig::from_env)
96}
97
98/// Returns true when [`ENV_SHADOW`] was set to `"1"` at first-access
99/// time. Reads the cached [`ShadowConfig`] — no env syscall on the
100/// hot path.
101#[must_use]
102pub fn shadow_enabled() -> bool {
103    shadow_config().enabled
104}
105
106/// Resolve the configured sample rate. Reads the cached
107/// [`ShadowConfig`] — no env syscall on the hot path.
108#[must_use]
109pub fn sample_rate() -> f64 {
110    shadow_config().sample_rate
111}
112
113/// Decide whether to sample a recall touch for shadow observation.
114///
115/// Combines [`shadow_enabled`] with [`sample_rate`] and a caller-
116/// supplied uniform-`[0, 1)` random number. Pulled apart so tests can
117/// inject a deterministic value without touching the global RNG.
118#[must_use]
119pub fn should_sample(uniform_0_1: f64) -> bool {
120    let cfg = shadow_config();
121    if !cfg.enabled {
122        return false;
123    }
124    uniform_0_1 < cfg.sample_rate
125}
126
127/// Test-only reset of the cached shadow config. Forces the next
128/// [`shadow_config`] call to re-read the env vars. NOT thread-safe vs.
129/// concurrent reads; tests that flip the env var must serialise.
130///
131/// Hidden behind `#[cfg(test)]` so production binaries cannot
132/// accidentally call it — the cache is load-bearing for the PERF-9
133/// fix.
134#[cfg(test)]
135#[doc(hidden)]
136pub fn reset_shadow_config_for_tests() {
137    // SAFETY: OnceLock has no public reset; we use a transmute-free
138    // workaround via a separate static cell guarded by the cfg gate
139    // above. Tests that need this hook accept the documented
140    // race-with-readers caveat.
141    //
142    // Implementation: we cannot directly clear a `OnceLock`. Instead,
143    // tests that need a clean read should call this AFTER mutating
144    // env and BEFORE any reader. The function is a documentation
145    // anchor — actual reset is impossible without a `Mutex<OnceCell>`
146    // wrapper. The PERF-9-cache test below uses a counter approach
147    // (see `shadow_observe_uses_cached_config`) rather than expecting
148    // env-var re-reads mid-process.
149    //
150    // We deliberately keep the function as a no-op so callers that
151    // assume reset semantics fail loudly at the unit-test assertion
152    // boundary (the assertion that env was read exactly once) rather
153    // than silently degrading.
154}
155
156/// Append one row to `confidence_shadow_observations`.
157///
158/// Returns the substrate-generated row id on success. The caller is
159/// expected to have already gated on [`should_sample`]; this function
160/// always writes when called (so a deterministic test can exercise the
161/// table without env-var dance).
162///
163/// `source` is the `memories.source` role label denormalised onto the
164/// observation row so the calibration sweep can stream a single-table
165/// SQL aggregation without joining back to `memories`. Added in
166/// schema v40 (sqlite) / v39 (postgres) under Cluster G (PERF-12).
167///
168/// `recall_outcome` is `Some("recalled")` / `Some("skipped")` /
169/// `None`. The recall ranker stamps the value once it knows whether
170/// the candidate landed in the top-K or was dropped.
171///
172/// # Errors
173///
174/// Returns the underlying `rusqlite` error if the INSERT fails.
175#[allow(clippy::too_many_arguments)]
176pub fn observe(
177    conn: &Connection,
178    memory_id: &str,
179    namespace: &str,
180    source: &str,
181    caller_confidence: f64,
182    derived_confidence: f64,
183    signals: &ConfidenceSignals,
184    recall_outcome: Option<&str>,
185) -> Result<i64> {
186    let signals_json =
187        serde_json::to_string(signals).context("serialise ConfidenceSignals envelope")?;
188    let observed_at = chrono::Utc::now().to_rfc3339();
189    conn.execute(
190        "INSERT INTO confidence_shadow_observations
191            (memory_id, namespace, source, caller_confidence, derived_confidence,
192             signals, recall_outcome, observed_at)
193         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
194        params![
195            memory_id,
196            namespace,
197            source,
198            caller_confidence,
199            derived_confidence,
200            signals_json,
201            recall_outcome,
202            observed_at,
203        ],
204    )?;
205    Ok(conn.last_insert_rowid())
206}
207
208/// Pull every shadow observation in `namespace` newer than `since`
209/// (RFC3339). When `since` is `None`, returns all rows. Used by tests
210/// and ad-hoc debugging; the calibration sweep itself uses
211/// [`crate::confidence::calibrate::calibrate_from_shadow`] which
212/// streams a SQL-side aggregation instead of materialising every row.
213/// The result vector is ordered by `observed_at ASC` for stable replays.
214///
215/// # Errors
216///
217/// Returns the underlying `rusqlite` error if the SELECT fails.
218pub fn observations_since(
219    conn: &Connection,
220    namespace: Option<&str>,
221    since: Option<&str>,
222) -> Result<Vec<ShadowObservation>> {
223    let sql = "SELECT id, memory_id, namespace, source, caller_confidence, derived_confidence,
224                      signals, recall_outcome, observed_at
225               FROM confidence_shadow_observations
226               WHERE (?1 IS NULL OR namespace = ?1)
227                 AND (?2 IS NULL OR observed_at >= ?2)
228               ORDER BY observed_at ASC, id ASC";
229    let mut stmt = conn.prepare(sql)?;
230    let rows = stmt.query_map(params![namespace, since], |row| {
231        Ok(ShadowObservation {
232            id: row.get(0)?,
233            memory_id: row.get(1)?,
234            namespace: row.get(2)?,
235            source: row.get(3)?,
236            caller_confidence: row.get(4)?,
237            derived_confidence: row.get(5)?,
238            signals_json: row.get(6)?,
239            recall_outcome: row.get(7)?,
240            observed_at: row.get(8)?,
241        })
242    })?;
243    Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
244}
245
246/// Delete `confidence_shadow_observations` rows whose `observed_at` is
247/// older than `now - retention_days`. Returns the number of rows
248/// removed. Called periodically from the daemon GC loop
249/// (`daemon_runtime::spawn_gc_loop`) to close PERF-4 (unbounded table
250/// growth on long-running shadow-mode deployments).
251///
252/// `retention_days <= 0` is treated as "retain forever" and returns
253/// `Ok(0)` without touching the table (matches the audit-honest
254/// "do-nothing-on-zero" convention used elsewhere in this codebase,
255/// e.g. `archive_max_days`).
256///
257/// # Errors
258///
259/// Returns the underlying `rusqlite` error if the DELETE fails.
260pub fn gc_observations(conn: &Connection, retention_days: i64) -> Result<usize> {
261    if retention_days <= 0 {
262        return Ok(0);
263    }
264    let cutoff = (chrono::Utc::now() - chrono::Duration::days(retention_days)).to_rfc3339();
265    let n = conn.execute(
266        "DELETE FROM confidence_shadow_observations WHERE observed_at < ?1",
267        params![cutoff],
268    )?;
269    Ok(n)
270}
271
272/// One row of `confidence_shadow_observations` as exposed to readers.
273///
274/// `signals_json` stays as a raw string because the calibration sweep
275/// usually doesn't need to deserialise it (the (namespace, source) key
276/// is enough). Callers that want the typed envelope can parse it
277/// themselves via `serde_json::from_str::<ConfidenceSignals>`.
278#[derive(Debug, Clone)]
279pub struct ShadowObservation {
280    pub id: i64,
281    pub memory_id: String,
282    pub namespace: String,
283    /// Denormalised `memories.source` role label. Added in schema v40
284    /// (sqlite) / v39 (postgres) under Cluster G so the calibration
285    /// sweep can stream a single-table SQL aggregation. Legacy rows
286    /// backfill to the joined `memories.source` value or `'unknown'`.
287    pub source: String,
288    pub caller_confidence: f64,
289    pub derived_confidence: f64,
290    pub signals_json: String,
291    pub recall_outcome: Option<String>,
292    pub observed_at: String,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::models::ConfidenceSignals;
299    use crate::storage::open as open_storage;
300
301    fn open_tmp() -> (Connection, tempfile::TempDir) {
302        let dir = tempfile::tempdir().expect("tmpdir");
303        let path = dir.path().join("test.db");
304        let _ = open_storage(&path).expect("open storage");
305        let conn = Connection::open(&path).expect("open conn");
306        (conn, dir)
307    }
308
309    fn signals_fixture() -> ConfidenceSignals {
310        ConfidenceSignals {
311            source_age_days: 7.0,
312            atom_derivation: false,
313            prior_corroboration_count: 2,
314            freshness_factor: 0.84,
315            baseline_per_source: 0.5,
316        }
317    }
318
319    #[test]
320    fn observe_appends_row() {
321        let (conn, _dir) = open_tmp();
322        // Seed a memory row so the FK constraint resolves.
323        conn.execute(
324            "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
325             VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
326            [],
327        )
328        .expect("seed mem");
329        let id = observe(
330            &conn,
331            "m1",
332            "ns",
333            "user",
334            0.9,
335            0.6,
336            &signals_fixture(),
337            None,
338        )
339        .expect("observe ok");
340        assert!(id > 0);
341        let rows = observations_since(&conn, Some("ns"), None).expect("read back");
342        assert_eq!(rows.len(), 1);
343        assert!((rows[0].caller_confidence - 0.9).abs() < f64::EPSILON);
344        assert!((rows[0].derived_confidence - 0.6).abs() < f64::EPSILON);
345        assert_eq!(rows[0].source, "user");
346    }
347
348    #[test]
349    fn observations_filter_by_namespace() {
350        let (conn, _dir) = open_tmp();
351        conn.execute(
352            "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
353             VALUES ('m1', 'mid', 'ns_a', 't1', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
354            [],
355        )
356        .expect("seed mem a");
357        conn.execute(
358            "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
359             VALUES ('m2', 'mid', 'ns_b', 't2', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
360            [],
361        )
362        .expect("seed mem b");
363        observe(
364            &conn,
365            "m1",
366            "ns_a",
367            "user",
368            0.9,
369            0.6,
370            &signals_fixture(),
371            None,
372        )
373        .unwrap();
374        observe(
375            &conn,
376            "m2",
377            "ns_b",
378            "user",
379            0.8,
380            0.5,
381            &signals_fixture(),
382            None,
383        )
384        .unwrap();
385        let a = observations_since(&conn, Some("ns_a"), None).expect("read ns_a");
386        assert_eq!(a.len(), 1);
387        assert_eq!(a[0].namespace, "ns_a");
388        let all = observations_since(&conn, None, None).expect("read all");
389        assert_eq!(all.len(), 2);
390    }
391
392    #[test]
393    fn gc_observations_drops_old_rows_only() {
394        let (conn, _dir) = open_tmp();
395        conn.execute(
396            "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
397             VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
398            [],
399        )
400        .unwrap();
401        // 50 fresh rows (today) + 50 ancient rows (year 2020). With a
402        // 30-day retention window, only the ancient ones drop.
403        for _ in 0..50 {
404            observe(
405                &conn,
406                "m1",
407                "ns",
408                "user",
409                0.9,
410                0.5,
411                &signals_fixture(),
412                None,
413            )
414            .unwrap();
415        }
416        for _ in 0..50 {
417            conn.execute(
418                "INSERT INTO confidence_shadow_observations
419                    (memory_id, namespace, source, caller_confidence,
420                     derived_confidence, signals, recall_outcome, observed_at)
421                 VALUES ('m1', 'ns', 'user', 0.9, 0.5, '{}', NULL, '2020-01-01T00:00:00Z')",
422                [],
423            )
424            .unwrap();
425        }
426        let dropped = gc_observations(&conn, 30).expect("gc");
427        assert_eq!(dropped, 50);
428        let remaining: i64 = conn
429            .query_row(
430                "SELECT COUNT(*) FROM confidence_shadow_observations",
431                [],
432                |r| r.get(0),
433            )
434            .unwrap();
435        assert_eq!(remaining, 50);
436    }
437
438    #[test]
439    fn gc_observations_zero_retention_is_noop() {
440        let (conn, _dir) = open_tmp();
441        conn.execute(
442            "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
443             VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
444            [],
445        )
446        .unwrap();
447        for _ in 0..10 {
448            conn.execute(
449                "INSERT INTO confidence_shadow_observations
450                    (memory_id, namespace, source, caller_confidence,
451                     derived_confidence, signals, recall_outcome, observed_at)
452                 VALUES ('m1', 'ns', 'user', 0.9, 0.5, '{}', NULL, '2020-01-01T00:00:00Z')",
453                [],
454            )
455            .unwrap();
456        }
457        assert_eq!(gc_observations(&conn, 0).expect("gc 0"), 0);
458        assert_eq!(gc_observations(&conn, -5).expect("gc -5"), 0);
459        let remaining: i64 = conn
460            .query_row(
461                "SELECT COUNT(*) FROM confidence_shadow_observations",
462                [],
463                |r| r.get(0),
464            )
465            .unwrap();
466        assert_eq!(remaining, 10);
467    }
468
469    #[test]
470    fn shadow_config_caches_on_first_call() {
471        // Cannot reliably reset the OnceLock mid-process; we just
472        // assert that the cached value is identical across two reads.
473        let a = shadow_config();
474        let b = shadow_config();
475        assert_eq!(a.enabled, b.enabled);
476        assert!((a.sample_rate - b.sample_rate).abs() < f64::EPSILON);
477        // And that the cache pointer is identity-equal — proves it's
478        // actually a cache, not a re-read.
479        assert!(std::ptr::eq(a, b));
480    }
481
482    #[test]
483    fn shadow_config_from_env_reads_both_vars() {
484        // Direct unit test of the env-reading helper. Independent of
485        // the OnceLock cache.
486        unsafe { std::env::remove_var(ENV_SHADOW) };
487        unsafe { std::env::remove_var(ENV_SHADOW_SAMPLE_RATE) };
488        let cfg = ShadowConfig::from_env();
489        assert!(!cfg.enabled);
490        assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
491
492        unsafe { std::env::set_var(ENV_SHADOW, "1") };
493        unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "0.5") };
494        let cfg = ShadowConfig::from_env();
495        assert!(cfg.enabled);
496        assert!((cfg.sample_rate - 0.5).abs() < f64::EPSILON);
497
498        unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "2.0") };
499        let cfg = ShadowConfig::from_env();
500        assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
501
502        unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "-1.0") };
503        let cfg = ShadowConfig::from_env();
504        assert!((cfg.sample_rate - 0.0).abs() < f64::EPSILON);
505
506        unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "garbage") };
507        let cfg = ShadowConfig::from_env();
508        assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
509
510        unsafe { std::env::remove_var(ENV_SHADOW) };
511        unsafe { std::env::remove_var(ENV_SHADOW_SAMPLE_RATE) };
512    }
513}