ai_memory/confidence/shadow.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 Form 5 — shadow-mode telemetry pipeline.
5//!
6//! Per-recall observations land in `confidence_shadow_observations`
7//! when `AI_MEMORY_CONFIDENCE_SHADOW=1`, sampled at the rate carried
8//! by `AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE` (0.0..=1.0; default
9//! 1.0 when shadow is enabled).
10//!
11//! Audit-honest contract: shadow mode **never silently overrides** the
12//! caller's confidence. The recall ranker still uses the caller value
13//! downstream; the derived value is only persisted for later
14//! calibration. This is the load-bearing property that lets operators
15//! safely turn the engine on in production.
16//!
17//! # Surface
18//!
19//! * [`observe`] — write a single shadow row.
20//! * [`should_sample`] — gate helper that consults the cached config
21//! (enabled flag + sample rate). The first call captures the env-var
22//! pair into a process-wide [`OnceLock`]; subsequent calls return the
23//! cached value without hitting the `std::env` syscall. This is the
24//! PERF-9 fix (Cluster G, issue #767): pre-Cluster-G, every recall
25//! touch re-read both env vars on the hot path.
26//! * [`gc_observations`] — periodic GC sweep deleting rows older than
27//! the configured retention window. Wired into the daemon's
28//! `spawn_gc_loop` from `daemon_runtime.rs`. PERF-4 fix.
29
30use std::sync::OnceLock;
31
32use anyhow::{Context, Result};
33use rusqlite::{Connection, params};
34
35use crate::models::ConfidenceSignals;
36
37/// Environment-variable opt-in for shadow mode.
38pub const ENV_SHADOW: &str = "AI_MEMORY_CONFIDENCE_SHADOW";
39/// Optional sample rate (0.0..=1.0). When unset, defaults to 1.0
40/// while shadow is enabled — every recall touch lands a row.
41pub const ENV_SHADOW_SAMPLE_RATE: &str = "AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE";
42
43/// Default retention window for the periodic GC sweep on
44/// `confidence_shadow_observations`. 30 days mirrors the Form 5
45/// calibration window: the sweep runs against the table that the
46/// calibration sweep reads from, so an aligned default keeps the
47/// pipeline "what you see in the report is what you have on disk."
48///
49/// Operators tune this per `[confidence] shadow_retention_days = N` in
50/// `config.toml`. The sweep deletes rows whose `observed_at` is older
51/// than `now - N days`.
52pub const DEFAULT_SHADOW_RETENTION_DAYS: i64 = 30;
53
54/// Cached shadow config — captured on first access from the env-var
55/// pair. The recall hot path reads this OnceLock instead of calling
56/// `std::env::var` per touch (PERF-9).
57///
58/// Tests that need to flip the env-var mid-process call
59/// [`reset_shadow_config_for_tests`] to force a re-capture; production
60/// code never resets, so the first call's view is the load-bearing one.
61#[derive(Debug, Clone, Copy)]
62pub struct ShadowConfig {
63 /// `true` when `AI_MEMORY_CONFIDENCE_SHADOW=1` at first-access
64 /// time. Subsequent env-var mutations are not honored (cached).
65 pub enabled: bool,
66 /// Sample rate in `[0.0, 1.0]`. Captured from
67 /// `AI_MEMORY_CONFIDENCE_SHADOW_SAMPLE_RATE` at first-access time;
68 /// defaults to 1.0 when unset or malformed.
69 pub sample_rate: f64,
70}
71
72impl ShadowConfig {
73 /// Build a config snapshot by reading both env vars once.
74 fn from_env() -> Self {
75 let enabled = std::env::var(ENV_SHADOW).is_ok_and(|v| v == "1");
76 let sample_rate = std::env::var(ENV_SHADOW_SAMPLE_RATE)
77 .ok()
78 .and_then(|s| s.parse::<f64>().ok())
79 .map(|v| v.clamp(0.0, 1.0))
80 .unwrap_or(1.0);
81 Self {
82 enabled,
83 sample_rate,
84 }
85 }
86}
87
88static SHADOW_CONFIG: OnceLock<ShadowConfig> = OnceLock::new();
89
90/// Returns the cached shadow config, capturing it from env vars on the
91/// first call. PERF-9: subsequent calls do NOT touch `std::env` — the
92/// returned reference points into a process-wide [`OnceLock`].
93#[must_use]
94pub fn shadow_config() -> &'static ShadowConfig {
95 SHADOW_CONFIG.get_or_init(ShadowConfig::from_env)
96}
97
98/// Returns true when [`ENV_SHADOW`] was set to `"1"` at first-access
99/// time. Reads the cached [`ShadowConfig`] — no env syscall on the
100/// hot path.
101#[must_use]
102pub fn shadow_enabled() -> bool {
103 shadow_config().enabled
104}
105
106/// Resolve the configured sample rate. Reads the cached
107/// [`ShadowConfig`] — no env syscall on the hot path.
108#[must_use]
109pub fn sample_rate() -> f64 {
110 shadow_config().sample_rate
111}
112
113/// Decide whether to sample a recall touch for shadow observation.
114///
115/// Combines [`shadow_enabled`] with [`sample_rate`] and a caller-
116/// supplied uniform-`[0, 1)` random number. Pulled apart so tests can
117/// inject a deterministic value without touching the global RNG.
118#[must_use]
119pub fn should_sample(uniform_0_1: f64) -> bool {
120 let cfg = shadow_config();
121 if !cfg.enabled {
122 return false;
123 }
124 uniform_0_1 < cfg.sample_rate
125}
126
127/// Test-only reset of the cached shadow config. Forces the next
128/// [`shadow_config`] call to re-read the env vars. NOT thread-safe vs.
129/// concurrent reads; tests that flip the env var must serialise.
130///
131/// Hidden behind `#[cfg(test)]` so production binaries cannot
132/// accidentally call it — the cache is load-bearing for the PERF-9
133/// fix.
134#[cfg(test)]
135#[doc(hidden)]
136pub fn reset_shadow_config_for_tests() {
137 // SAFETY: OnceLock has no public reset; we use a transmute-free
138 // workaround via a separate static cell guarded by the cfg gate
139 // above. Tests that need this hook accept the documented
140 // race-with-readers caveat.
141 //
142 // Implementation: we cannot directly clear a `OnceLock`. Instead,
143 // tests that need a clean read should call this AFTER mutating
144 // env and BEFORE any reader. The function is a documentation
145 // anchor — actual reset is impossible without a `Mutex<OnceCell>`
146 // wrapper. The PERF-9-cache test below uses a counter approach
147 // (see `shadow_observe_uses_cached_config`) rather than expecting
148 // env-var re-reads mid-process.
149 //
150 // We deliberately keep the function as a no-op so callers that
151 // assume reset semantics fail loudly at the unit-test assertion
152 // boundary (the assertion that env was read exactly once) rather
153 // than silently degrading.
154}
155
156/// Append one row to `confidence_shadow_observations`.
157///
158/// Returns the substrate-generated row id on success. The caller is
159/// expected to have already gated on [`should_sample`]; this function
160/// always writes when called (so a deterministic test can exercise the
161/// table without env-var dance).
162///
163/// `source` is the `memories.source` role label denormalised onto the
164/// observation row so the calibration sweep can stream a single-table
165/// SQL aggregation without joining back to `memories`. Added in
166/// schema v40 (sqlite) / v39 (postgres) under Cluster G (PERF-12).
167///
168/// `recall_outcome` is `Some("recalled")` / `Some("skipped")` /
169/// `None`. The recall ranker stamps the value once it knows whether
170/// the candidate landed in the top-K or was dropped.
171///
172/// # Errors
173///
174/// Returns the underlying `rusqlite` error if the INSERT fails.
175#[allow(clippy::too_many_arguments)]
176pub fn observe(
177 conn: &Connection,
178 memory_id: &str,
179 namespace: &str,
180 source: &str,
181 caller_confidence: f64,
182 derived_confidence: f64,
183 signals: &ConfidenceSignals,
184 recall_outcome: Option<&str>,
185) -> Result<i64> {
186 let signals_json =
187 serde_json::to_string(signals).context("serialise ConfidenceSignals envelope")?;
188 let observed_at = chrono::Utc::now().to_rfc3339();
189 conn.execute(
190 "INSERT INTO confidence_shadow_observations
191 (memory_id, namespace, source, caller_confidence, derived_confidence,
192 signals, recall_outcome, observed_at)
193 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
194 params![
195 memory_id,
196 namespace,
197 source,
198 caller_confidence,
199 derived_confidence,
200 signals_json,
201 recall_outcome,
202 observed_at,
203 ],
204 )?;
205 Ok(conn.last_insert_rowid())
206}
207
208/// Pull every shadow observation in `namespace` newer than `since`
209/// (RFC3339). When `since` is `None`, returns all rows. Used by tests
210/// and ad-hoc debugging; the calibration sweep itself uses
211/// [`crate::confidence::calibrate::calibrate_from_shadow`] which
212/// streams a SQL-side aggregation instead of materialising every row.
213/// The result vector is ordered by `observed_at ASC` for stable replays.
214///
215/// # Errors
216///
217/// Returns the underlying `rusqlite` error if the SELECT fails.
218pub fn observations_since(
219 conn: &Connection,
220 namespace: Option<&str>,
221 since: Option<&str>,
222) -> Result<Vec<ShadowObservation>> {
223 let sql = "SELECT id, memory_id, namespace, source, caller_confidence, derived_confidence,
224 signals, recall_outcome, observed_at
225 FROM confidence_shadow_observations
226 WHERE (?1 IS NULL OR namespace = ?1)
227 AND (?2 IS NULL OR observed_at >= ?2)
228 ORDER BY observed_at ASC, id ASC";
229 let mut stmt = conn.prepare(sql)?;
230 let rows = stmt.query_map(params![namespace, since], |row| {
231 Ok(ShadowObservation {
232 id: row.get(0)?,
233 memory_id: row.get(1)?,
234 namespace: row.get(2)?,
235 source: row.get(3)?,
236 caller_confidence: row.get(4)?,
237 derived_confidence: row.get(5)?,
238 signals_json: row.get(6)?,
239 recall_outcome: row.get(7)?,
240 observed_at: row.get(8)?,
241 })
242 })?;
243 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
244}
245
246/// Delete `confidence_shadow_observations` rows whose `observed_at` is
247/// older than `now - retention_days`. Returns the number of rows
248/// removed. Called periodically from the daemon GC loop
249/// (`daemon_runtime::spawn_gc_loop`) to close PERF-4 (unbounded table
250/// growth on long-running shadow-mode deployments).
251///
252/// `retention_days <= 0` is treated as "retain forever" and returns
253/// `Ok(0)` without touching the table (matches the audit-honest
254/// "do-nothing-on-zero" convention used elsewhere in this codebase,
255/// e.g. `archive_max_days`).
256///
257/// # Errors
258///
259/// Returns the underlying `rusqlite` error if the DELETE fails.
260pub fn gc_observations(conn: &Connection, retention_days: i64) -> Result<usize> {
261 if retention_days <= 0 {
262 return Ok(0);
263 }
264 let cutoff = (chrono::Utc::now() - chrono::Duration::days(retention_days)).to_rfc3339();
265 let n = conn.execute(
266 "DELETE FROM confidence_shadow_observations WHERE observed_at < ?1",
267 params![cutoff],
268 )?;
269 Ok(n)
270}
271
272/// One row of `confidence_shadow_observations` as exposed to readers.
273///
274/// `signals_json` stays as a raw string because the calibration sweep
275/// usually doesn't need to deserialise it (the (namespace, source) key
276/// is enough). Callers that want the typed envelope can parse it
277/// themselves via `serde_json::from_str::<ConfidenceSignals>`.
278#[derive(Debug, Clone)]
279pub struct ShadowObservation {
280 pub id: i64,
281 pub memory_id: String,
282 pub namespace: String,
283 /// Denormalised `memories.source` role label. Added in schema v40
284 /// (sqlite) / v39 (postgres) under Cluster G so the calibration
285 /// sweep can stream a single-table SQL aggregation. Legacy rows
286 /// backfill to the joined `memories.source` value or `'unknown'`.
287 pub source: String,
288 pub caller_confidence: f64,
289 pub derived_confidence: f64,
290 pub signals_json: String,
291 pub recall_outcome: Option<String>,
292 pub observed_at: String,
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::models::ConfidenceSignals;
299 use crate::storage::open as open_storage;
300
301 fn open_tmp() -> (Connection, tempfile::TempDir) {
302 let dir = tempfile::tempdir().expect("tmpdir");
303 let path = dir.path().join("test.db");
304 let _ = open_storage(&path).expect("open storage");
305 let conn = Connection::open(&path).expect("open conn");
306 (conn, dir)
307 }
308
309 fn signals_fixture() -> ConfidenceSignals {
310 ConfidenceSignals {
311 source_age_days: 7.0,
312 atom_derivation: false,
313 prior_corroboration_count: 2,
314 freshness_factor: 0.84,
315 baseline_per_source: 0.5,
316 }
317 }
318
319 #[test]
320 fn observe_appends_row() {
321 let (conn, _dir) = open_tmp();
322 // Seed a memory row so the FK constraint resolves.
323 conn.execute(
324 "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
325 VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
326 [],
327 )
328 .expect("seed mem");
329 let id = observe(
330 &conn,
331 "m1",
332 "ns",
333 "user",
334 0.9,
335 0.6,
336 &signals_fixture(),
337 None,
338 )
339 .expect("observe ok");
340 assert!(id > 0);
341 let rows = observations_since(&conn, Some("ns"), None).expect("read back");
342 assert_eq!(rows.len(), 1);
343 assert!((rows[0].caller_confidence - 0.9).abs() < f64::EPSILON);
344 assert!((rows[0].derived_confidence - 0.6).abs() < f64::EPSILON);
345 assert_eq!(rows[0].source, "user");
346 }
347
348 #[test]
349 fn observations_filter_by_namespace() {
350 let (conn, _dir) = open_tmp();
351 conn.execute(
352 "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
353 VALUES ('m1', 'mid', 'ns_a', 't1', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
354 [],
355 )
356 .expect("seed mem a");
357 conn.execute(
358 "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
359 VALUES ('m2', 'mid', 'ns_b', 't2', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
360 [],
361 )
362 .expect("seed mem b");
363 observe(
364 &conn,
365 "m1",
366 "ns_a",
367 "user",
368 0.9,
369 0.6,
370 &signals_fixture(),
371 None,
372 )
373 .unwrap();
374 observe(
375 &conn,
376 "m2",
377 "ns_b",
378 "user",
379 0.8,
380 0.5,
381 &signals_fixture(),
382 None,
383 )
384 .unwrap();
385 let a = observations_since(&conn, Some("ns_a"), None).expect("read ns_a");
386 assert_eq!(a.len(), 1);
387 assert_eq!(a[0].namespace, "ns_a");
388 let all = observations_since(&conn, None, None).expect("read all");
389 assert_eq!(all.len(), 2);
390 }
391
392 #[test]
393 fn gc_observations_drops_old_rows_only() {
394 let (conn, _dir) = open_tmp();
395 conn.execute(
396 "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
397 VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
398 [],
399 )
400 .unwrap();
401 // 50 fresh rows (today) + 50 ancient rows (year 2020). With a
402 // 30-day retention window, only the ancient ones drop.
403 for _ in 0..50 {
404 observe(
405 &conn,
406 "m1",
407 "ns",
408 "user",
409 0.9,
410 0.5,
411 &signals_fixture(),
412 None,
413 )
414 .unwrap();
415 }
416 for _ in 0..50 {
417 conn.execute(
418 "INSERT INTO confidence_shadow_observations
419 (memory_id, namespace, source, caller_confidence,
420 derived_confidence, signals, recall_outcome, observed_at)
421 VALUES ('m1', 'ns', 'user', 0.9, 0.5, '{}', NULL, '2020-01-01T00:00:00Z')",
422 [],
423 )
424 .unwrap();
425 }
426 let dropped = gc_observations(&conn, 30).expect("gc");
427 assert_eq!(dropped, 50);
428 let remaining: i64 = conn
429 .query_row(
430 "SELECT COUNT(*) FROM confidence_shadow_observations",
431 [],
432 |r| r.get(0),
433 )
434 .unwrap();
435 assert_eq!(remaining, 50);
436 }
437
438 #[test]
439 fn gc_observations_zero_retention_is_noop() {
440 let (conn, _dir) = open_tmp();
441 conn.execute(
442 "INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at)
443 VALUES ('m1', 'mid', 'ns', 't', 'c', '2026-05-15T00:00:00Z', '2026-05-15T00:00:00Z')",
444 [],
445 )
446 .unwrap();
447 for _ in 0..10 {
448 conn.execute(
449 "INSERT INTO confidence_shadow_observations
450 (memory_id, namespace, source, caller_confidence,
451 derived_confidence, signals, recall_outcome, observed_at)
452 VALUES ('m1', 'ns', 'user', 0.9, 0.5, '{}', NULL, '2020-01-01T00:00:00Z')",
453 [],
454 )
455 .unwrap();
456 }
457 assert_eq!(gc_observations(&conn, 0).expect("gc 0"), 0);
458 assert_eq!(gc_observations(&conn, -5).expect("gc -5"), 0);
459 let remaining: i64 = conn
460 .query_row(
461 "SELECT COUNT(*) FROM confidence_shadow_observations",
462 [],
463 |r| r.get(0),
464 )
465 .unwrap();
466 assert_eq!(remaining, 10);
467 }
468
469 #[test]
470 fn shadow_config_caches_on_first_call() {
471 // Cannot reliably reset the OnceLock mid-process; we just
472 // assert that the cached value is identical across two reads.
473 let a = shadow_config();
474 let b = shadow_config();
475 assert_eq!(a.enabled, b.enabled);
476 assert!((a.sample_rate - b.sample_rate).abs() < f64::EPSILON);
477 // And that the cache pointer is identity-equal — proves it's
478 // actually a cache, not a re-read.
479 assert!(std::ptr::eq(a, b));
480 }
481
482 #[test]
483 fn shadow_config_from_env_reads_both_vars() {
484 // Direct unit test of the env-reading helper. Independent of
485 // the OnceLock cache.
486 unsafe { std::env::remove_var(ENV_SHADOW) };
487 unsafe { std::env::remove_var(ENV_SHADOW_SAMPLE_RATE) };
488 let cfg = ShadowConfig::from_env();
489 assert!(!cfg.enabled);
490 assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
491
492 unsafe { std::env::set_var(ENV_SHADOW, "1") };
493 unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "0.5") };
494 let cfg = ShadowConfig::from_env();
495 assert!(cfg.enabled);
496 assert!((cfg.sample_rate - 0.5).abs() < f64::EPSILON);
497
498 unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "2.0") };
499 let cfg = ShadowConfig::from_env();
500 assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
501
502 unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "-1.0") };
503 let cfg = ShadowConfig::from_env();
504 assert!((cfg.sample_rate - 0.0).abs() < f64::EPSILON);
505
506 unsafe { std::env::set_var(ENV_SHADOW_SAMPLE_RATE, "garbage") };
507 let cfg = ShadowConfig::from_env();
508 assert!((cfg.sample_rate - 1.0).abs() < f64::EPSILON);
509
510 unsafe { std::env::remove_var(ENV_SHADOW) };
511 unsafe { std::env::remove_var(ENV_SHADOW_SAMPLE_RATE) };
512 }
513}