Skip to main content

ai_memory/hooks/post_reflect/
auto_export.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 QW-1 — auto-export-on-reflect substrate hook.
5//!
6//! When the namespace policy
7//! [`GovernancePolicy::auto_export_reflections_to_filesystem`] resolves
8//! to `Some(true)` for the reflection's target namespace, the
9//! substrate-side `post_reflect` hook deferred-spawns a filesystem
10//! write of the reflection's markdown to
11//! `<out_dir>/<namespace>/<id>.md`.
12//!
13//! # Hard guarantees
14//!
15//! 1. **Non-blocking.** The hook returns synchronously; the disk write
16//!    happens on a detached `std::thread::spawn`. The reflect response
17//!    must not regress in latency when the policy is `Some(true)`.
18//! 2. **Notify-class.** Failure during the disk write is logged via
19//!    `tracing::warn!(target: "post_reflect.auto_export", ...)` and
20//!    NEVER propagated back to the caller. The reflection is already
21//!    committed; making the operator chase a transient disk error is
22//!    worse than a missed file.
23//!    *v0.7-polish SEC-15 / COR-11 (issue #780):* failure is also
24//!    counted via [`crate::metrics::record_auto_export_spawn_failed`]
25//!    and mirrored onto the capabilities-v3
26//!    `hooks.auto_export_spawn_failed_total` field so operators can
27//!    alert on the otherwise-silent disk-write loss without scraping
28//!    `/metrics` directly. The detached worker closure is wrapped in
29//!    `catch_unwind` so a panic inside the closure (e.g., a poisoned
30//!    DB handle or a corrupt JSON column) is converted to the same
31//!    counter+warn path rather than being swallowed by the runtime's
32//!    detached-thread default.
33//! 3. **Capability isolation.** This code runs inside the substrate
34//!    process (CLI, MCP, HTTP daemon). It is gated by the namespace
35//!    policy — an operator who has not explicitly opted in to
36//!    `auto_export_reflections_to_filesystem` will see no disk writes
37//!    from this module ever.
38
39use std::panic::AssertUnwindSafe;
40use std::path::PathBuf;
41use std::sync::Arc;
42
43use crate::cli::commands::export_reflections::{self, ExportFormat};
44use crate::db;
45use crate::storage::reflect::{ReflectHooks, ReflectOutcome};
46
47/// v0.7-polish SEC-15 / COR-11 (issue #780) — test-only panic-injection
48/// env-var. When set to `1` (any case), the next worker spawn panics
49/// inside its closure body so the panic-recovery path
50/// (`catch_unwind` → counter increment → `tracing::warn!`) can be
51/// exercised by an in-process integration test without an unsafe
52/// runtime hack. Production binaries never read this env-var
53/// (`debug_assertions`-gated read-site below), so the hot path stays
54/// uninstrumented in release builds.
55#[cfg(any(test, debug_assertions))]
56const AUTO_EXPORT_INJECT_PANIC_ENV: &str = "AI_MEMORY_AUTO_EXPORT_INJECT_PANIC";
57
58/// Static configuration for the auto-export hook.
59///
60/// Cloned into the spawned worker thread on every reflection write,
61/// so the type is `Send + Sync`. Defaults match the CLI's
62/// `--out-dir` / `--format` defaults so on-disk artefacts produced
63/// by the substrate are interchangeable with those the operator
64/// would have produced with `ai-memory export-reflections`.
65#[derive(Debug, Clone)]
66pub struct AutoExportConfig {
67    /// Root directory the substrate writes reflections under.
68    /// Defaults to `<HOME>/.ai-memory/reflections/`.
69    pub out_dir: PathBuf,
70    /// `md` (default) or `json`. Mirrors `--format`.
71    pub format: ExportFormat,
72}
73
74impl AutoExportConfig {
75    /// Construct with the canonical default `out_dir`.
76    #[must_use]
77    pub fn default_for_home() -> Self {
78        let out_dir = export_reflections::resolve_out_dir(None).unwrap_or_else(|_| {
79            PathBuf::from(crate::AI_MEMORY_HOME_DIR_NAME)
80                .join(export_reflections::REFLECTIONS_SUBDIR)
81        });
82        Self {
83            out_dir,
84            format: ExportFormat::Markdown,
85        }
86    }
87}
88
89impl Default for AutoExportConfig {
90    fn default() -> Self {
91        Self::default_for_home()
92    }
93}
94
95/// Build a [`ReflectHooks`] bundle whose `post_reflect` callback is
96/// the auto-export hook.
97///
98/// The caller passes the database path so the hook can re-open a
99/// read-only connection on the worker thread — the original
100/// connection isn't `Send` (rusqlite). This trade-off matches every
101/// other post-write side-effect in the substrate (subscriptions,
102/// notify, etc.) — each spawns its own thread + opens its own
103/// connection rather than crossing the connection across thread
104/// boundaries.
105#[must_use]
106pub fn build_post_reflect_hook(
107    db_path: PathBuf,
108    config: AutoExportConfig,
109) -> ReflectHooks<'static> {
110    let cfg = Arc::new(config);
111    let dbp = Arc::new(db_path);
112    let cb: Box<dyn Fn(&ReflectOutcome) + Send + Sync + 'static> = Box::new(move |outcome| {
113        let cfg = cfg.clone();
114        let dbp = dbp.clone();
115        let outcome_id = outcome.id.clone();
116        let namespace = outcome.namespace.clone();
117        // Detached worker thread. Notify-class: any failure stays
118        // inside this thread, never reaches the caller.
119        //
120        // v0.7-polish SEC-15 / COR-11 (issue #780): the closure body
121        // is wrapped in `catch_unwind` so a panic inside
122        // `run_auto_export` (poisoned DB handle, corrupt JSON column,
123        // an `unwrap` deep in the export-rendering chain, etc.) is
124        // caught and converted to the same counter+warn path that an
125        // `Err` return takes. Without this, a panic would silently
126        // unwind the detached thread — the reflection would be
127        // committed in the DB but no on-disk artefact would land and
128        // no operator-visible signal would fire.
129        std::thread::spawn(move || {
130            let outcome_id_for_log = outcome_id.clone();
131            let namespace_for_log = namespace.clone();
132            let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
133                // v0.7-polish #780 — debug-only panic injection so the
134                // catch_unwind path is reachable from an in-process
135                // integration test. Release builds never read the
136                // env-var (cfg-gated below).
137                #[cfg(any(test, debug_assertions))]
138                {
139                    if std::env::var(AUTO_EXPORT_INJECT_PANIC_ENV)
140                        .ok()
141                        .is_some_and(|v| v == "1")
142                    {
143                        panic!("auto_export panic injected via {AUTO_EXPORT_INJECT_PANIC_ENV}=1");
144                    }
145                }
146                run_auto_export(&dbp, &outcome_id, &namespace, &cfg)
147            }));
148            match result {
149                Ok(Ok(())) => {}
150                Ok(Err(e)) => {
151                    crate::metrics::record_auto_export_spawn_failed();
152                    tracing::warn!(
153                        target: "post_reflect.auto_export",
154                        "auto-export of reflection {} (ns={}) failed: {}",
155                        outcome_id_for_log,
156                        namespace_for_log,
157                        e,
158                    );
159                }
160                Err(panic_payload) => {
161                    crate::metrics::record_auto_export_spawn_failed();
162                    let panic_msg = panic_payload
163                        .downcast_ref::<String>()
164                        .cloned()
165                        .or_else(|| {
166                            panic_payload
167                                .downcast_ref::<&'static str>()
168                                .map(|s| (*s).to_string())
169                        })
170                        .unwrap_or_else(|| "<non-string panic payload>".to_string());
171                    tracing::warn!(
172                        target: "post_reflect.auto_export",
173                        "auto-export of reflection {} (ns={}) panicked: {}",
174                        outcome_id_for_log,
175                        namespace_for_log,
176                        panic_msg,
177                    );
178                }
179            }
180        });
181    });
182    ReflectHooks {
183        pre_reflect: None,
184        post_reflect: Some(cb),
185        // Issue #815 — auto-export does not need a signing keypair;
186        // signing is owned by the reflect handler that built this hook
187        // bundle. The handler-side construction in
188        // `mcp::tools::reflect::handle_reflect` overrides this field
189        // when an active keypair is available, so the field is left
190        // None here and re-assigned by the caller.
191        active_keypair: None,
192    }
193}
194
195/// Worker-thread entry-point. Encapsulated as a free function so the
196/// hook code path stays one statement (`std::thread::spawn`) and so
197/// unit tests can exercise the write logic without spawning a
198/// thread.
199///
200/// # Errors
201///
202/// Bubbles up DB / I/O errors. The caller in [`build_post_reflect_hook`]
203/// logs + swallows them — this function does NOT decide to swallow.
204pub fn run_auto_export(
205    db_path: &std::path::Path,
206    memory_id: &str,
207    namespace: &str,
208    config: &AutoExportConfig,
209) -> anyhow::Result<()> {
210    let conn = db::open(db_path)?;
211    let policy = db::resolve_governance_policy(&conn, namespace).unwrap_or_default();
212    if !policy.effective_auto_export_reflections_to_filesystem() {
213        // Defence-in-depth: the MCP handler also checks the policy
214        // before installing the hook, but the substrate refuses to
215        // touch the filesystem unless the policy itself blesses it.
216        return Ok(());
217    }
218    let mem = match db::get(&conn, memory_id)? {
219        Some(m) => m,
220        None => {
221            // Race: the reflection was deleted between commit and
222            // hook fire. Nothing to write.
223            return Ok(());
224        }
225    };
226    let edges = collect_outbound_reflects_on(&conn, memory_id)?;
227    let attest_level = export_reflections::summarise_attest_level(&edges);
228    let payload = export_reflections::render_payload(&mem, &edges, attest_level, config.format);
229
230    let ns_dir = config
231        .out_dir
232        .join(export_reflections::sanitise_namespace_for_path(
233            &mem.namespace,
234        ));
235    std::fs::create_dir_all(&ns_dir)?;
236    let path = ns_dir.join(format!("{}.{}", mem.id, config.format.extension()));
237    std::fs::write(&path, payload)?;
238    Ok(())
239}
240
241fn collect_outbound_reflects_on(
242    conn: &rusqlite::Connection,
243    memory_id: &str,
244) -> anyhow::Result<Vec<export_reflections::ReflectsOnEdge>> {
245    let mut stmt = conn.prepare(
246        "SELECT target_id, COALESCE(attest_level, 'unsigned'), created_at \
247         FROM memory_links \
248         WHERE source_id = ?1 AND relation = 'reflects_on' \
249         ORDER BY created_at ASC",
250    )?;
251    let rows = stmt.query_map(rusqlite::params![memory_id], |row| {
252        Ok(export_reflections::ReflectsOnEdge {
253            target_id: row.get(0)?,
254            attest_level: row.get(1)?,
255            created_at: row.get(2)?,
256        })
257    })?;
258    Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::models::{
265        ApproverType, CorePolicy, ExportPolicy, GovernanceLevel, GovernancePolicy, Memory, Tier,
266    };
267    use chrono::Utc;
268    use tempfile::TempDir;
269
270    fn fresh_db() -> (rusqlite::Connection, TempDir, PathBuf) {
271        let dir = TempDir::new().unwrap();
272        let path = dir.path().join("ai-memory.db");
273        let conn = db::open(&path).unwrap();
274        (conn, dir, path)
275    }
276
277    fn seed_observation(conn: &rusqlite::Connection, ns: &str) -> String {
278        let now = Utc::now().to_rfc3339();
279        let mem = Memory {
280            id: uuid::Uuid::new_v4().to_string(),
281            tier: Tier::Mid,
282            namespace: ns.to_string(),
283            title: "obs".into(),
284            content: "obs body".into(),
285            created_at: now.clone(),
286            updated_at: now,
287            ..Default::default()
288        };
289        db::insert(conn, &mem).unwrap()
290    }
291
292    fn enable_auto_export_on_namespace(conn: &rusqlite::Connection, ns: &str) {
293        let policy = GovernancePolicy {
294            core: CorePolicy {
295                write: GovernanceLevel::Any,
296                promote: GovernanceLevel::Any,
297                delete: GovernanceLevel::Owner,
298                approver: ApproverType::Human,
299                inherit: true,
300                max_reflection_depth: None,
301            },
302            export: ExportPolicy {
303                auto_export_reflections_to_filesystem: Some(true),
304            },
305            ..Default::default()
306        };
307        let gov_metadata = serde_json::json!({
308            "agent_id": "ai:test",
309            "governance": serde_json::to_value(&policy).unwrap(),
310        });
311        let now = Utc::now().to_rfc3339();
312        let std_mem = Memory {
313            id: uuid::Uuid::new_v4().to_string(),
314            tier: Tier::Long,
315            namespace: ns.to_string(),
316            title: format!("__standard_{ns}"),
317            content: "standard".into(),
318            created_at: now.clone(),
319            updated_at: now,
320            metadata: gov_metadata,
321            ..Default::default()
322        };
323        let std_id = db::insert(conn, &std_mem).unwrap();
324        db::set_namespace_standard(conn, ns, &std_id, None).unwrap();
325    }
326
327    #[test]
328    fn run_auto_export_skips_when_policy_disabled() {
329        let (conn, dir, db_path) = fresh_db();
330        let src = seed_observation(&conn, "skip-ns");
331        let input = crate::storage::reflect::ReflectInput {
332            source_ids: vec![src.clone()],
333            title: "rfl".into(),
334            content: "rfl body".into(),
335            namespace: Some("skip-ns".into()),
336            tier: Tier::Mid,
337            tags: vec![],
338            priority: 5,
339            confidence: 1.0,
340            source: "cli".into(),
341            agent_id: "ai:test".into(),
342            metadata: serde_json::json!({}),
343        };
344        let outcome = crate::storage::reflect::reflect(&conn, &input).unwrap();
345        let cfg = AutoExportConfig {
346            out_dir: dir.path().join("out"),
347            format: ExportFormat::Markdown,
348        };
349        run_auto_export(&db_path, &outcome.id, &outcome.namespace, &cfg).unwrap();
350        // Out dir must not have been populated.
351        assert!(
352            !dir.path().join("out").join("skip-ns").exists(),
353            "auto-export must not fire when policy is disabled"
354        );
355    }
356
357    #[test]
358    fn run_auto_export_writes_md_when_policy_enabled() {
359        let (conn, dir, db_path) = fresh_db();
360        enable_auto_export_on_namespace(&conn, "write-ns");
361        let src = seed_observation(&conn, "write-ns");
362        let input = crate::storage::reflect::ReflectInput {
363            source_ids: vec![src.clone()],
364            title: "rfl".into(),
365            content: "rfl body line".into(),
366            namespace: Some("write-ns".into()),
367            tier: Tier::Mid,
368            tags: vec![],
369            priority: 5,
370            confidence: 1.0,
371            source: "cli".into(),
372            agent_id: "ai:test".into(),
373            metadata: serde_json::json!({}),
374        };
375        let outcome = crate::storage::reflect::reflect(&conn, &input).unwrap();
376        let cfg = AutoExportConfig {
377            out_dir: dir.path().join("out"),
378            format: ExportFormat::Markdown,
379        };
380        run_auto_export(&db_path, &outcome.id, &outcome.namespace, &cfg).unwrap();
381        let f = dir
382            .path()
383            .join("out")
384            .join("write-ns")
385            .join(format!("{}.md", outcome.id));
386        assert!(f.exists(), "expected exported file at {}", f.display());
387        let body = std::fs::read_to_string(&f).unwrap();
388        assert!(body.contains(&format!("memory_id: {}\n", outcome.id)));
389        assert!(body.contains("namespace: write-ns\n"));
390        assert!(body.contains("reflection_depth: 1\n"));
391        assert!(body.contains("rfl body line"));
392    }
393
394    #[test]
395    fn run_auto_export_writes_json_when_format_json() {
396        let (conn, dir, db_path) = fresh_db();
397        enable_auto_export_on_namespace(&conn, "json-ns");
398        let src = seed_observation(&conn, "json-ns");
399        let input = crate::storage::reflect::ReflectInput {
400            source_ids: vec![src.clone()],
401            title: "rfl".into(),
402            content: "rfl json body".into(),
403            namespace: Some("json-ns".into()),
404            tier: Tier::Mid,
405            tags: vec![],
406            priority: 5,
407            confidence: 1.0,
408            source: "cli".into(),
409            agent_id: "ai:test".into(),
410            metadata: serde_json::json!({}),
411        };
412        let outcome = crate::storage::reflect::reflect(&conn, &input).unwrap();
413        let cfg = AutoExportConfig {
414            out_dir: dir.path().join("out"),
415            format: ExportFormat::Json,
416        };
417        run_auto_export(&db_path, &outcome.id, &outcome.namespace, &cfg).unwrap();
418        let f = dir
419            .path()
420            .join("out")
421            .join("json-ns")
422            .join(format!("{}.json", outcome.id));
423        assert!(f.exists());
424        let parsed: serde_json::Value =
425            serde_json::from_str(&std::fs::read_to_string(&f).unwrap()).unwrap();
426        assert_eq!(parsed["memory_id"].as_str().unwrap(), outcome.id);
427    }
428
429    #[test]
430    fn run_auto_export_swallows_missing_memory() {
431        let (_, dir, db_path) = fresh_db();
432        let cfg = AutoExportConfig {
433            out_dir: dir.path().join("out"),
434            format: ExportFormat::Markdown,
435        };
436        // The auto-export refuses to write because the policy defaults
437        // to disabled — but it must not error either way.
438        let res = run_auto_export(&db_path, "no-such-id", "no-such-ns", &cfg);
439        assert!(res.is_ok());
440    }
441
442    #[test]
443    fn build_post_reflect_hook_does_not_block_reflect_response() {
444        // The acceptance bar: reflect_with_hooks returns within the
445        // same latency envelope as reflect — measured by comparing two
446        // back-to-back writes, one with the auto-export hook installed
447        // and one without. We don't assert a hard ms number (hosts
448        // vary); we assert the hook returns synchronously and the
449        // worker spawns a background thread.
450        let (conn, dir, db_path) = fresh_db();
451        enable_auto_export_on_namespace(&conn, "block-ns");
452        let src = seed_observation(&conn, "block-ns");
453        let hooks = build_post_reflect_hook(
454            db_path.clone(),
455            AutoExportConfig {
456                out_dir: dir.path().join("out"),
457                format: ExportFormat::Markdown,
458            },
459        );
460        let input = crate::storage::reflect::ReflectInput {
461            source_ids: vec![src.clone()],
462            title: "rfl".into(),
463            content: "rfl body".into(),
464            namespace: Some("block-ns".into()),
465            tier: Tier::Mid,
466            tags: vec![],
467            priority: 5,
468            confidence: 1.0,
469            source: "cli".into(),
470            agent_id: "ai:test".into(),
471            metadata: serde_json::json!({}),
472        };
473        let started = std::time::Instant::now();
474        let outcome = crate::storage::reflect::reflect_with_hooks(&conn, &input, &hooks).unwrap();
475        let elapsed = started.elapsed();
476        // The hook spawns a background thread; the reflect call must
477        // return well under the disk-write budget. We use a generous
478        // 500ms ceiling to keep the assertion robust on slow CI
479        // hardware — the point is that the hook doesn't block on
480        // its own disk write.
481        assert!(
482            elapsed < std::time::Duration::from_millis(500),
483            "reflect_with_hooks should not block on auto-export disk write (took {elapsed:?})"
484        );
485        assert_eq!(outcome.namespace, "block-ns");
486        // The file may or may not exist yet — the background thread
487        // could still be running. We don't assert here; the file
488        // existence is exercised by `run_auto_export_writes_md_when_policy_enabled`.
489        let _ = outcome.id;
490    }
491
492    /// v0.7-polish SEC-15 / COR-11 (issue #780) — when the detached
493    /// `auto_export` worker panics, the `catch_unwind` wrapper must
494    /// (a) keep the panic from unwinding the runtime, (b) increment
495    /// `crate::metrics::auto_export_spawn_failed_total` so operators
496    /// see the loss, and (c) leave the reflect path itself unaffected
497    /// (it had already returned). The test induces the panic via the
498    /// debug-only `AI_MEMORY_AUTO_EXPORT_INJECT_PANIC=1` env-var hook
499    /// in the spawn closure, then polls the counter for the increment.
500    ///
501    /// Serialised via a process-wide mutex because the env-var is
502    /// process-global — if two tests in this module raced on it the
503    /// "off" test would see the injection. The mutex scope brackets
504    /// the env-var lifetime + the counter-advance wait so unrelated
505    /// tests (which never set the env-var) are not held up.
506    #[test]
507    fn auto_export_worker_panic_increments_spawn_failed_counter() {
508        use std::sync::Mutex;
509        static ENV_LOCK: Mutex<()> = Mutex::new(());
510        // PoisonError is fine — we only care about exclusive access.
511        let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
512
513        let (conn, dir, db_path) = fresh_db();
514        enable_auto_export_on_namespace(&conn, "panic-ns");
515        let src = seed_observation(&conn, "panic-ns");
516        let hooks = build_post_reflect_hook(
517            db_path.clone(),
518            AutoExportConfig {
519                out_dir: dir.path().join("out"),
520                format: ExportFormat::Markdown,
521            },
522        );
523        let input = crate::storage::reflect::ReflectInput {
524            source_ids: vec![src.clone()],
525            title: "rfl".into(),
526            content: "rfl body".into(),
527            namespace: Some("panic-ns".into()),
528            tier: Tier::Mid,
529            tags: vec![],
530            priority: 5,
531            confidence: 1.0,
532            source: "cli".into(),
533            agent_id: "ai:test".into(),
534            metadata: serde_json::json!({}),
535        };
536
537        let before = crate::metrics::auto_export_spawn_failed_count();
538        // SAFETY: env-var mutation is process-global; the ENV_LOCK
539        // mutex above serialises against any other test in this
540        // module that touches the same key. No other module sets it.
541        // SAFETY justification documented above; unsafe is required
542        // because `set_var` is `unsafe` on edition-2024.
543        unsafe {
544            std::env::set_var(AUTO_EXPORT_INJECT_PANIC_ENV, "1");
545        }
546        let _outcome = crate::storage::reflect::reflect_with_hooks(&conn, &input, &hooks).unwrap();
547
548        // Poll for the counter to advance. The worker is detached;
549        // its thread may not have run by the time reflect_with_hooks
550        // returns. Bound the wait at 5s — generous for any host.
551        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
552        let mut after = before;
553        while std::time::Instant::now() < deadline {
554            after = crate::metrics::auto_export_spawn_failed_count();
555            if after > before {
556                break;
557            }
558            std::thread::sleep(std::time::Duration::from_millis(25));
559        }
560        // SAFETY: same as set_var above — protected by ENV_LOCK.
561        unsafe {
562            std::env::remove_var(AUTO_EXPORT_INJECT_PANIC_ENV);
563        }
564        assert!(
565            after > before,
566            "auto_export_spawn_failed_total did not advance after panic injection \
567             (before={before}, after={after})"
568        );
569    }
570
571    #[test]
572    fn auto_export_config_default_for_home_picks_dot_ai_memory() {
573        let cfg = AutoExportConfig::default_for_home();
574        // Either `<HOME>/.ai-memory/reflections` or
575        // `.ai-memory/reflections` (HOME-less fallback). We don't pin
576        // which — the test harness can run in either environment.
577        assert!(
578            cfg.out_dir.ends_with("reflections"),
579            "default out_dir should end in 'reflections', got {}",
580            cfg.out_dir.display()
581        );
582        assert_eq!(cfg.format, ExportFormat::Markdown);
583    }
584}