Skip to main content

ai_memory/
daemon_runtime.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Daemon runtime — orchestration shell for the `ai-memory` binary.
5//!
6//! W6 lifted `serve()` and the top-level dispatch out of `main.rs` so the
7//! production HTTP daemon, the integration test harness, and the
8//! coverage-instrumented tests in this module all share one source of
9//! truth. `main.rs` keeps its `#[tokio::main]` entry point but immediately
10//! delegates here for every subcommand.
11//!
12//! ## Public surface (post-W6)
13//!
14//! - [`run`] — top-level CLI dispatch (called from `main()`).
15//! - [`serve`] — full HTTP daemon body (TLS or plain).
16//! - [`bootstrap_serve`] — testable struct-returning state builder.
17//! - [`build_router`] — composition wrapper around `lib::build_router`.
18//! - [`build_embedder`], [`build_vector_index`] — single canonical builders
19//!   used by both `serve()` and `cli::recall::run`.
20//! - [`spawn_gc_loop`], [`spawn_wal_checkpoint_loop`] — daemon background
21//!   tasks, returning a [`JoinHandle`] so callers can abort on shutdown.
22//! - [`is_write_command`] — write-command predicate driving the post-write
23//!   WAL checkpoint.
24//! - [`passphrase_from_file`], [`apply_anonymize_default`] — startup helpers.
25//!
26//! ## Pre-W6 helpers retained
27//!
28//! - [`serve_http_with_shutdown`], [`serve_http_with_shutdown_future`] —
29//!   the in-process HTTP harness the integration suite drives.
30//! - [`run_sync_daemon_with_shutdown`],
31//!   [`run_sync_daemon_with_shutdown_using_client`],
32//!   [`sync_cycle_once`] — the sync-daemon body.
33//! - [`run_curator_daemon_with_shutdown`],
34//!   [`run_curator_daemon_with_primitives`] — the curator-daemon body.
35
36use std::io::Write as _;
37use std::path::Path;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::time::{Duration, Instant};
42
43use anyhow::{Context, Result};
44use axum::Router;
45use clap::{Args, CommandFactory, Parser, Subcommand};
46use clap_complete::{Shell, generate};
47use rusqlite::Connection;
48use tokio::sync::{Mutex, Notify};
49use tokio::task::JoinHandle;
50use tracing_subscriber::EnvFilter;
51
52use crate::cli::agents::{AgentsArgs, PendingArgs};
53use crate::cli::archive::ArchiveArgs;
54use crate::cli::backup::{BackupArgs, RestoreArgs};
55use crate::cli::consolidate::{AutoConsolidateArgs, ConsolidateArgs};
56use crate::cli::crud::{DeleteArgs, GetArgs, ListArgs};
57use crate::cli::curator::CuratorArgs;
58use crate::cli::forget::ForgetArgs;
59use crate::cli::io::{ImportArgs, MineArgs};
60use crate::cli::link::{LinkArgs, ResolveArgs};
61use crate::cli::promote::PromoteArgs;
62use crate::cli::recall::RecallArgs;
63use crate::cli::search::SearchArgs;
64use crate::cli::store::StoreArgs;
65use crate::cli::sync::{SyncArgs, SyncDaemonArgs};
66use crate::cli::update::UpdateArgs;
67use crate::config::{AppConfig, FeatureTier};
68use crate::embeddings::Embedder;
69use crate::handlers::{ApiKeyState, AppState, Db};
70use crate::hnsw::VectorIndex;
71use crate::{bench, cli, db, embeddings, federation, hnsw, llm, mcp, tls};
72
73#[cfg(feature = "sal")]
74use crate::migrate;
75
76const DEFAULT_DB: &str = "ai-memory.db";
77const DEFAULT_PORT: u16 = 9077;
78const GC_INTERVAL_SECS: u64 = 1800;
79/// WAL auto-checkpoint cadence in the HTTP daemon. Bounds `*-wal`
80/// file growth between `SQLite`'s internal page-count checkpoints.
81const WAL_CHECKPOINT_INTERVAL_SECS: u64 = 600;
82
83// ---------------------------------------------------------------------------
84// Clap-derived CLI surface
85// ---------------------------------------------------------------------------
86//
87// The clap structs live in the lib crate so `daemon_runtime::run` can
88// take them as parameters. `main.rs` re-exports `Cli` and immediately
89// delegates here.
90
91#[derive(Parser)]
92#[command(
93    name = "ai-memory",
94    version,
95    about = "AI-agnostic persistent memory — MCP server, HTTP API, and CLI for any AI platform"
96)]
97pub struct Cli {
98    #[command(subcommand)]
99    pub command: Command,
100    #[arg(long, env = "AI_MEMORY_DB", default_value = DEFAULT_DB, global = true)]
101    pub db: PathBuf,
102    /// Output as JSON (machine-parseable)
103    #[arg(long, global = true, default_value_t = false)]
104    pub json: bool,
105    /// Agent identifier used for store operations. If unset, an NHI-hardened
106    /// default is synthesized (see `ai-memory store --help`). Accepts the
107    /// `AI_MEMORY_AGENT_ID` environment variable as a fallback.
108    #[arg(long, env = "AI_MEMORY_AGENT_ID", global = true)]
109    pub agent_id: Option<String>,
110    /// v0.6.0.0: path to a file containing the `SQLCipher` passphrase.
111    /// Only meaningful when the binary was built with
112    /// `--features sqlcipher` (standard builds ignore this flag). File
113    /// must be root-readable (mode 0400 recommended). The passphrase is
114    /// read once at startup and exported as `AI_MEMORY_DB_PASSPHRASE`
115    /// for the duration of the process — passing the passphrase
116    /// directly as an env var or as a flag value leaks to the process
117    /// list (`ps -E`) and shell history.
118    #[arg(long, global = true, value_name = "PATH")]
119    pub db_passphrase_file: Option<PathBuf>,
120}
121
122#[derive(Subcommand)]
123pub enum Command {
124    /// Start the HTTP memory daemon
125    Serve(ServeArgs),
126    /// Run as an MCP (Model Context Protocol) tool server over stdio
127    Mcp {
128        /// Feature tier: keyword (FTS only) or semantic (embeddings + FTS)
129        #[arg(long, default_value = "semantic")]
130        tier: String,
131    },
132    /// Store a new memory
133    Store(StoreArgs),
134    /// Update an existing memory by ID
135    Update(UpdateArgs),
136    /// Recall memories relevant to a context
137    Recall(RecallArgs),
138    /// Search memories by text
139    Search(SearchArgs),
140    /// Retrieve a memory by ID
141    Get(GetArgs),
142    /// List memories
143    List(ListArgs),
144    /// Delete a memory by ID
145    Delete(DeleteArgs),
146    /// Promote a memory to long-term
147    Promote(PromoteArgs),
148    /// Delete memories matching a pattern
149    Forget(ForgetArgs),
150    /// Link two memories
151    Link(LinkArgs),
152    /// Consolidate multiple memories into one
153    Consolidate(ConsolidateArgs),
154    /// Run garbage collection
155    Gc,
156    /// Show statistics
157    Stats,
158    /// List all namespaces
159    Namespaces,
160    /// Export all memories as JSON
161    Export,
162    /// Import memories from JSON (stdin)
163    Import(ImportArgs),
164    /// Resolve a contradiction — mark one memory as superseding another
165    Resolve(ResolveArgs),
166    /// Interactive memory shell (REPL)
167    Shell,
168    /// Sync memories between two database files
169    Sync(SyncArgs),
170    /// Run the peer-to-peer sync daemon — continuously exchange memories
171    /// with one or more HTTP peers (Phase 3 Task 3b.1). The defining
172    /// grand-slam capability: two agents on two machines form a live
173    /// knowledge mesh with no cloud, no login, no `SaaS`.
174    SyncDaemon(SyncDaemonArgs),
175    /// Auto-consolidate short-term memories by namespace
176    AutoConsolidate(AutoConsolidateArgs),
177    /// Generate shell completions
178    Completions(CompletionsArgs),
179    /// Generate man page
180    Man,
181    /// Import memories from historical conversations (Claude, `ChatGPT`, Slack exports)
182    Mine(MineArgs),
183    /// Manage the memory archive (list, restore, purge, stats)
184    Archive(ArchiveArgs),
185    /// Register or list agents (Task 1.3)
186    Agents(AgentsArgs),
187    /// List / approve / reject governance-pending actions (Task 1.9)
188    Pending(PendingArgs),
189    /// v0.6.0.0: snapshot the `SQLite` database to a timestamped backup
190    /// file. Uses `SQLite` `VACUUM INTO` which is hot-backup safe (no daemon
191    /// stop required). Writes a `manifest.json` alongside (sha256 + version).
192    Backup(BackupArgs),
193    /// v0.6.0.0: restore the `SQLite` database from a backup file written
194    /// by `ai-memory backup`. Verifies the manifest sha256 before
195    /// replacing the current DB. The current DB is moved aside as a safety
196    /// net before the replacement.
197    Restore(RestoreArgs),
198    /// v0.6.1: run the autonomous curator. `--once` runs a single sweep
199    /// and prints a JSON report; `--daemon` loops with `--interval-secs`
200    /// between cycles. Auto-tags memories without tags and flags
201    /// contradictions against nearby siblings in the same namespace.
202    Curator(CuratorArgs),
203    /// v0.6.3 (Pillar 3 / Stream E): run the canonical performance
204    /// workload and print measured p50/p95/p99 against the budgets in
205    /// `PERFORMANCE.md`. Each invocation seeds a disposable temp DB so
206    /// the user's main DB is untouched. Exits non-zero when any p95
207    /// exceeds its budget by more than the published 10% tolerance.
208    Bench(BenchArgs),
209    /// v0.7: migrate memories between SAL backends. Gated behind
210    /// `--features sal`. Reads pages via `MemoryStore::list`, writes
211    /// via `MemoryStore::store`. Idempotent: source ids are preserved
212    /// and both adapters upsert on id.
213    #[cfg(feature = "sal")]
214    Migrate(MigrateArgs),
215}
216
217#[derive(Args)]
218pub struct BenchArgs {
219    /// Measured iterations per operation. Clamped to `[1, 100_000]`.
220    #[arg(long, default_value_t = bench::DEFAULT_ITERATIONS)]
221    pub iterations: usize,
222    /// Warmup iterations discarded from the percentile sample.
223    /// Clamped to `[0, 10_000]`.
224    #[arg(long, default_value_t = bench::DEFAULT_WARMUP)]
225    pub warmup: usize,
226    /// Emit results as JSON instead of the human-readable table.
227    #[arg(long)]
228    pub json: bool,
229    /// Path to a previous `bench --json` payload. When supplied, the
230    /// fresh run is compared per-operation against this baseline and
231    /// the process exits non-zero if any measured p95 exceeds the
232    /// baseline by more than `--regression-threshold` percent.
233    /// Independent of the absolute-budget guard.
234    #[arg(long, value_name = "PATH")]
235    pub baseline: Option<String>,
236    /// Allowed p95 growth (percent) over the `--baseline` reading
237    /// before a row is flagged as a regression. Clamped to
238    /// `[0.0, 1000.0]`. Has no effect without `--baseline`.
239    #[arg(long, default_value_t = bench::DEFAULT_REGRESSION_THRESHOLD_PCT)]
240    pub regression_threshold: f64,
241    /// Append this run to a JSONL history file (one self-describing
242    /// JSON object per line). Creates the file and any missing parent
243    /// directories on first call. Each entry carries `captured_at`
244    /// (RFC3339), `iterations`, `warmup`, and the same `results` array
245    /// `--json` emits — long-running campaigns can build a regression
246    /// dataset to feed downstream tooling. The CLI table / JSON output
247    /// still prints; this flag only adds the append side effect.
248    #[arg(long, value_name = "PATH")]
249    pub history: Option<PathBuf>,
250}
251
252#[cfg(feature = "sal")]
253#[derive(Args)]
254pub struct MigrateArgs {
255    /// Source URL. `sqlite:///path/to/file.db` or
256    /// `postgres://user:pass@host:port/dbname`.
257    #[arg(long)]
258    pub from: String,
259    /// Destination URL. Same URL shape as `--from`.
260    #[arg(long)]
261    pub to: String,
262    /// Page size. Clamped to [1, 10000]. Default 1000.
263    #[arg(long, default_value_t = 1000)]
264    pub batch: usize,
265    /// Only migrate memories in this namespace.
266    #[arg(long)]
267    pub namespace: Option<String>,
268    /// Emit the report but do NOT write to the destination.
269    #[arg(long)]
270    pub dry_run: bool,
271    /// Emit the report as JSON rather than human-readable text.
272    #[arg(long)]
273    pub json: bool,
274}
275
276#[derive(Args)]
277pub struct ServeArgs {
278    #[arg(long, default_value = "127.0.0.1")]
279    pub host: String,
280    #[arg(long, default_value_t = DEFAULT_PORT)]
281    pub port: u16,
282    /// Path to PEM-encoded TLS certificate (may include the full chain).
283    /// Passing both `--tls-cert` and `--tls-key` switches `serve` to
284    /// HTTPS. rustls under the hood — no OpenSSL dep. Absent both
285    /// flags = plain HTTP (same as every previous release).
286    #[arg(long, requires = "tls_key")]
287    pub tls_cert: Option<PathBuf>,
288    /// Path to PEM-encoded TLS private key (PKCS#8 or RSA).
289    #[arg(long, requires = "tls_cert")]
290    pub tls_key: Option<PathBuf>,
291    /// Path to a file containing SHA-256 fingerprints of trusted client
292    /// certificates, one per line (case-insensitive hex, optionally with
293    /// `:` separators; comments start with `#`). When set, `serve`
294    /// demands client-cert mTLS on every connection and refuses any peer
295    /// whose cert fingerprint is not on the list. Requires `--tls-cert`
296    /// and `--tls-key`. This is the peer-mesh identity gate — a peer
297    /// without an authorised cert can't even open a TCP connection, let
298    /// alone hit `/sync/push`. Layer 2 of the peer-mesh crypto stack;
299    /// attested `agent_id` extraction (Layer 2b) lands post-v0.6.0.
300    #[arg(long, requires = "tls_cert")]
301    pub mtls_allowlist: Option<PathBuf>,
302    /// Seconds to wait for in-flight requests to complete on graceful
303    /// shutdown (SIGINT). Default 30. Bumped from 10 in v0.6.0 because
304    /// large `/sync/push` batches can take longer than 10s under load
305    /// (red-team #233).
306    #[arg(long, default_value_t = 30)]
307    pub shutdown_grace_secs: u64,
308
309    // -------- v0.7 federation (ADR-0001) ---------------------------
310    /// W-of-N write quorum. When >=1 and `--quorum-peers` is non-empty,
311    /// every HTTP write fans out to every peer and returns OK only
312    /// after the local commit + W-1 peer acks land within
313    /// `--quorum-timeout-ms`. Default 0 = federation disabled, daemon
314    /// behaves exactly like v0.6.0.
315    #[arg(long, default_value_t = 0)]
316    pub quorum_writes: usize,
317    /// Comma-separated list of peer base URLs. Each peer is assumed to
318    /// expose `POST /api/v1/sync/push` — the same endpoint the
319    /// sync-daemon already uses.
320    #[arg(long, value_delimiter = ',')]
321    pub quorum_peers: Vec<String>,
322    /// Deadline for quorum-ack collection. After this many ms the
323    /// write returns 503 `quorum_not_met`. Default 2000.
324    #[arg(long, default_value_t = 2000)]
325    pub quorum_timeout_ms: u64,
326    /// Optional mTLS client cert for outbound federation POSTs. Same
327    /// cert material the sync-daemon's `--client-cert` accepts.
328    #[arg(long)]
329    pub quorum_client_cert: Option<PathBuf>,
330    /// Optional mTLS client key for outbound federation POSTs.
331    #[arg(long)]
332    pub quorum_client_key: Option<PathBuf>,
333    /// Optional root CA cert to trust for outbound federation HTTPS.
334    /// Required whenever peers present a cert NOT rooted in Mozilla's
335    /// `webpki-roots` bundle (self-signed, private CA, ephemeral test
336    /// CA, etc.) — without this, the reqwest rustls-tls client rejects
337    /// peer certs and every quorum write times out as `quorum_not_met`.
338    /// See #333.
339    #[arg(long)]
340    pub quorum_ca_cert: Option<PathBuf>,
341    /// v0.6.0.1 (#320) — how often, in seconds, the daemon pulls peers
342    /// for any updates it missed while offline or partitioned. 0 disables
343    /// the catchup loop entirely. Default 30s keeps a post-partition
344    /// node convergent within one interval after resume.
345    #[arg(long, default_value_t = 30)]
346    pub catchup_interval_secs: u64,
347}
348
349#[derive(Args)]
350pub struct CompletionsArgs {
351    pub shell: Shell,
352}
353
354// ---------------------------------------------------------------------------
355// Top-level dispatch
356// ---------------------------------------------------------------------------
357
358/// Top-level CLI dispatch. Called from `main()` after `Cli::parse()`.
359///
360/// Handles:
361/// - `--db-passphrase-file` → exports `AI_MEMORY_DB_PASSPHRASE`.
362/// - `is_write_command` → conditional post-run WAL checkpoint.
363/// - The match arm for every `Command` variant.
364#[allow(clippy::too_many_lines)]
365pub async fn run(cli: Cli, app_config: &AppConfig) -> Result<()> {
366    // v0.6.0.0: read the SQLCipher passphrase from a file and export it as
367    // AI_MEMORY_DB_PASSPHRASE for the duration of the process. File path
368    // comes from the --db-passphrase-file flag (global). No-op on standard
369    // SQLite builds (the env var is ignored unless the binary was built
370    // with --features sqlcipher).
371    if let Some(path) = &cli.db_passphrase_file {
372        let passphrase = passphrase_from_file(path)?;
373        // SAFETY: single-threaded startup before any worker threads spawn.
374        unsafe { std::env::set_var("AI_MEMORY_DB_PASSPHRASE", passphrase) };
375    }
376    let db_path = app_config.effective_db(&cli.db);
377    let j = cli.json;
378    let cli_agent_id: Option<String> = cli.agent_id.clone();
379    // Track whether command writes to DB (for WAL checkpoint)
380    let needs_checkpoint = is_write_command(&cli.command);
381    let db_path_for_checkpoint = if needs_checkpoint {
382        Some(db_path.clone())
383    } else {
384        None
385    };
386
387    let result = match cli.command {
388        Command::Serve(a) => serve(db_path, a, app_config).await,
389        Command::Mcp { tier } => {
390            let feature_tier = app_config.effective_tier(Some(&tier));
391            mcp::run_mcp_server(&db_path, feature_tier, app_config)?;
392            Ok(())
393        }
394        Command::Store(a) => {
395            let stdout = std::io::stdout();
396            let stderr = std::io::stderr();
397            let mut so = stdout.lock();
398            let mut se = stderr.lock();
399            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
400            cli::store::run(
401                &db_path,
402                a,
403                j,
404                app_config,
405                cli_agent_id.as_deref(),
406                &mut out,
407            )
408        }
409        Command::Update(a) => {
410            let stdout = std::io::stdout();
411            let stderr = std::io::stderr();
412            let mut so = stdout.lock();
413            let mut se = stderr.lock();
414            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
415            cli::update::run(&db_path, &a, j, &mut out)
416        }
417        Command::Recall(a) => {
418            let stdout = std::io::stdout();
419            let stderr = std::io::stderr();
420            let mut so = stdout.lock();
421            let mut se = stderr.lock();
422            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
423            cli::recall::run(&db_path, &a, j, app_config, &mut out)
424        }
425        Command::Search(a) => {
426            let stdout = std::io::stdout();
427            let stderr = std::io::stderr();
428            let mut so = stdout.lock();
429            let mut se = stderr.lock();
430            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
431            cli::search::run(&db_path, &a, j, &mut out)
432        }
433        Command::Get(a) => {
434            let stdout = std::io::stdout();
435            let stderr = std::io::stderr();
436            let mut so = stdout.lock();
437            let mut se = stderr.lock();
438            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
439            cli::crud::cmd_get(&db_path, &a, j, &mut out)
440        }
441        Command::List(a) => {
442            let stdout = std::io::stdout();
443            let stderr = std::io::stderr();
444            let mut so = stdout.lock();
445            let mut se = stderr.lock();
446            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
447            cli::crud::cmd_list(&db_path, &a, j, app_config, &mut out)
448        }
449        Command::Delete(a) => {
450            let stdout = std::io::stdout();
451            let stderr = std::io::stderr();
452            let mut so = stdout.lock();
453            let mut se = stderr.lock();
454            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
455            cli::crud::cmd_delete(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
456        }
457        Command::Promote(a) => {
458            let stdout = std::io::stdout();
459            let stderr = std::io::stderr();
460            let mut so = stdout.lock();
461            let mut se = stderr.lock();
462            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
463            cli::promote::cmd_promote(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
464        }
465        Command::Forget(a) => {
466            let stdout = std::io::stdout();
467            let stderr = std::io::stderr();
468            let mut so = stdout.lock();
469            let mut se = stderr.lock();
470            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
471            cli::forget::cmd_forget(&db_path, &a, j, &mut out)
472        }
473        Command::Link(a) => {
474            let stdout = std::io::stdout();
475            let stderr = std::io::stderr();
476            let mut so = stdout.lock();
477            let mut se = stderr.lock();
478            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
479            cli::link::cmd_link(&db_path, &a, j, &mut out)
480        }
481        Command::Consolidate(a) => {
482            let stdout = std::io::stdout();
483            let stderr = std::io::stderr();
484            let mut so = stdout.lock();
485            let mut se = stderr.lock();
486            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
487            cli::consolidate::run(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
488        }
489        Command::Resolve(a) => {
490            let stdout = std::io::stdout();
491            let stderr = std::io::stderr();
492            let mut so = stdout.lock();
493            let mut se = stderr.lock();
494            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
495            cli::link::cmd_resolve(&db_path, &a, j, &mut out)
496        }
497        Command::Shell => cli::shell::run(&db_path),
498        Command::Sync(a) => {
499            let stdout = std::io::stdout();
500            let stderr = std::io::stderr();
501            let mut so = stdout.lock();
502            let mut se = stderr.lock();
503            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
504            cli::sync::run(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
505        }
506        Command::SyncDaemon(a) => cli::sync::run_daemon(&db_path, a, cli_agent_id.as_deref()).await,
507        Command::AutoConsolidate(a) => {
508            let stdout = std::io::stdout();
509            let stderr = std::io::stderr();
510            let mut so = stdout.lock();
511            let mut se = stderr.lock();
512            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
513            cli::consolidate::run_auto(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
514        }
515        Command::Gc => {
516            let stdout = std::io::stdout();
517            let stderr = std::io::stderr();
518            let mut so = stdout.lock();
519            let mut se = stderr.lock();
520            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
521            cli::gc::run_gc(&db_path, j, app_config, &mut out)
522        }
523        Command::Stats => {
524            let stdout = std::io::stdout();
525            let stderr = std::io::stderr();
526            let mut so = stdout.lock();
527            let mut se = stderr.lock();
528            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
529            cli::gc::run_stats(&db_path, j, &mut out)
530        }
531        Command::Namespaces => {
532            let stdout = std::io::stdout();
533            let stderr = std::io::stderr();
534            let mut so = stdout.lock();
535            let mut se = stderr.lock();
536            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
537            cli::gc::run_namespaces(&db_path, j, &mut out)
538        }
539        Command::Export => {
540            let stdout = std::io::stdout();
541            let stderr = std::io::stderr();
542            let mut so = stdout.lock();
543            let mut se = stderr.lock();
544            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
545            cli::io::export(&db_path, &mut out)
546        }
547        Command::Import(a) => {
548            let stdout = std::io::stdout();
549            let stderr = std::io::stderr();
550            let mut so = stdout.lock();
551            let mut se = stderr.lock();
552            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
553            cli::io::import(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
554        }
555        Command::Completions(a) => {
556            generate(
557                a.shell,
558                &mut Cli::command(),
559                "ai-memory",
560                &mut std::io::stdout(),
561            );
562            Ok(())
563        }
564        Command::Man => {
565            let cmd = Cli::command();
566            let man = clap_mangen::Man::new(cmd);
567            man.render(&mut std::io::stdout())?;
568            Ok(())
569        }
570        Command::Mine(a) => {
571            let stdout = std::io::stdout();
572            let stderr = std::io::stderr();
573            let mut so = stdout.lock();
574            let mut se = stderr.lock();
575            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
576            cli::io::mine(
577                &db_path,
578                a,
579                j,
580                app_config,
581                cli_agent_id.as_deref(),
582                &mut out,
583            )
584        }
585        Command::Archive(a) => {
586            let stdout = std::io::stdout();
587            let stderr = std::io::stderr();
588            let mut so = stdout.lock();
589            let mut se = stderr.lock();
590            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
591            cli::archive::run(&db_path, a, j, &mut out)
592        }
593        Command::Agents(a) => {
594            let stdout = std::io::stdout();
595            let stderr = std::io::stderr();
596            let mut so = stdout.lock();
597            let mut se = stderr.lock();
598            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
599            cli::agents::run_agents(&db_path, a, j, &mut out)
600        }
601        Command::Pending(a) => {
602            let stdout = std::io::stdout();
603            let stderr = std::io::stderr();
604            let mut so = stdout.lock();
605            let mut se = stderr.lock();
606            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
607            cli::agents::run_pending(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
608        }
609        Command::Backup(a) => {
610            let stdout = std::io::stdout();
611            let stderr = std::io::stderr();
612            let mut so = stdout.lock();
613            let mut se = stderr.lock();
614            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
615            cli::backup::run_backup(&db_path, &a, j, &mut out)
616        }
617        Command::Restore(a) => {
618            let stdout = std::io::stdout();
619            let stderr = std::io::stderr();
620            let mut so = stdout.lock();
621            let mut se = stderr.lock();
622            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
623            cli::backup::run_restore(&db_path, &a, j, &mut out)
624        }
625        Command::Curator(a) => {
626            let stdout = std::io::stdout();
627            let stderr = std::io::stderr();
628            let mut so = stdout.lock();
629            let mut se = stderr.lock();
630            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
631            cli::curator::run(&db_path, &a, app_config, &mut out).await
632        }
633        Command::Bench(a) => cmd_bench(&a),
634        #[cfg(feature = "sal")]
635        Command::Migrate(a) => cmd_migrate(&a).await,
636    };
637
638    // WAL checkpoint after write commands to prevent unbounded WAL growth
639    if result.is_ok()
640        && let Some(cp_path) = db_path_for_checkpoint
641        && let Ok(conn) = db::open(&cp_path)
642    {
643        let _ = db::checkpoint(&conn);
644    }
645
646    result
647}
648
649// ---------------------------------------------------------------------------
650// is_write_command — predicate for the post-run WAL checkpoint.
651// ---------------------------------------------------------------------------
652
653/// Returns true if `cmd` is a write-class subcommand. The post-run WAL
654/// checkpoint in [`run`] runs only when this returns `true`.
655#[must_use]
656pub fn is_write_command(cmd: &Command) -> bool {
657    matches!(
658        cmd,
659        Command::Store(_)
660            | Command::Update(_)
661            | Command::Delete(_)
662            | Command::Promote(_)
663            | Command::Forget(_)
664            | Command::Link(_)
665            | Command::Consolidate(_)
666            | Command::Resolve(_)
667            | Command::Sync(_)
668            | Command::SyncDaemon(_)
669            | Command::Import(_)
670            | Command::AutoConsolidate(_)
671            | Command::Gc
672    )
673}
674
675// ---------------------------------------------------------------------------
676// Startup helpers (passphrase, anonymize default)
677// ---------------------------------------------------------------------------
678
679/// Read the `SQLCipher` passphrase from `path`. Strips a single trailing
680/// newline / CRLF; rejects an empty passphrase (post-strip) with an error;
681/// preserves all other internal whitespace.
682///
683/// # Errors
684///
685/// - The file cannot be read (e.g. missing, permission denied).
686/// - The passphrase, after stripping the trailing newline, is empty.
687pub fn passphrase_from_file(path: &Path) -> Result<String> {
688    let raw = std::fs::read_to_string(path)
689        .with_context(|| format!("reading passphrase file {}", path.display()))?;
690    let passphrase = raw.trim_end_matches(['\n', '\r']).to_string();
691    if passphrase.is_empty() {
692        anyhow::bail!("passphrase file {} is empty", path.display());
693    }
694    Ok(passphrase)
695}
696
697/// Apply the configured `anonymize_default` to the runtime env: when the
698/// config asks for anonymization but the user hasn't already set
699/// `AI_MEMORY_ANONYMIZE`, set it to `"1"`. Idempotent — repeated calls are
700/// a no-op once the env var is set.
701///
702/// Note: this writes to the process environment; callers must invoke it
703/// from the single-threaded startup region (before any worker threads are
704/// spawned). The production binary calls it from `main()` for that reason.
705pub fn apply_anonymize_default(app_config: &AppConfig) {
706    // #198: config → env mapping for agent_id anonymization. Env var already
707    // set by the caller wins; config is only applied when the env is unset.
708    if app_config.effective_anonymize_default() && std::env::var("AI_MEMORY_ANONYMIZE").is_err() {
709        // SAFETY: single-threaded startup before any worker threads spawn.
710        unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "1") };
711    }
712}
713
714// ---------------------------------------------------------------------------
715// Embedder / vector-index canonical builders
716// ---------------------------------------------------------------------------
717
718/// Construct the [`Embedder`] for a given tier. Returns `None` for the
719/// keyword tier (no embedder requested) and on load failure (caller
720/// degrades to keyword fallback). On failure the diagnostic is emitted
721/// via `tracing::error!` so operators see it in `journalctl`.
722///
723/// This is the single canonical embedder builder used by both `serve()`
724/// (HTTP daemon) and `cli::recall::run` (offline recall). Prior to W6
725/// each call site had its own copy, with subtly different fallback
726/// shapes — the bug at issue #322 was a direct consequence.
727pub async fn build_embedder(feature_tier: FeatureTier, app_config: &AppConfig) -> Option<Embedder> {
728    let tier_config = feature_tier.config();
729    let Some(emb_model) = tier_config.embedding_model else {
730        tracing::info!(
731            "embedder disabled — tier={} keyword-only (FTS5); semantic recall not wired",
732            feature_tier.as_str()
733        );
734        return None;
735    };
736    let embed_url = app_config.effective_embed_url().to_string();
737    // The HF-Hub sync API and candle model-load are blocking CPU work that
738    // internally spin their own tokio runtime. Running them directly in this
739    // async context panics with "Cannot drop a runtime in a context where
740    // blocking is not allowed." Move the whole construction onto the blocking
741    // pool so the inner runtime is owned by a dedicated thread.
742    let build = match tokio::task::spawn_blocking(move || {
743        let embed_client = llm::OllamaClient::new_with_url(&embed_url, "nomic-embed-text")
744            .ok()
745            .map(Arc::new);
746        embeddings::Embedder::for_model(emb_model, embed_client)
747    })
748    .await
749    {
750        Ok(b) => b,
751        Err(e) => {
752            tracing::error!("embedder spawn_blocking join failed: {e}");
753            return None;
754        }
755    };
756    match build {
757        Ok(emb) => {
758            tracing::info!(
759                "embedder loaded ({}) — tier={} semantic recall enabled",
760                emb.model_description(),
761                feature_tier.as_str()
762            );
763            Some(emb)
764        }
765        Err(e) => {
766            // v0.6.2 (#327): make embedder load failures loud. The
767            // prior WARN level was easy to miss in DO droplet logs,
768            // which led to scenario-18 black-holing (semantic recall
769            // falling back to keyword-only without the operator
770            // noticing). An ERROR-level log with an obvious marker
771            // surfaces this immediately in `journalctl -u ai-memory`
772            // or tail -f /var/log/ai-memory-serve.log.
773            tracing::error!(
774                "EMBEDDER LOAD FAILED — tier={} requested semantic features, \
775                 but embedder init errored: {e}. Daemon falls back to keyword-only. \
776                 Semantic recall, sync_push embedding refresh (#322), and HNSW index \
777                 will be NO-OPS. Check network egress to HuggingFace Hub + available \
778                 memory for model weights. To force keyword-only explicitly (silences \
779                 this error), set `tier = \"keyword\"` in config.toml.",
780                feature_tier.as_str()
781            );
782            None
783        }
784    }
785}
786
787/// Build the in-memory [`VectorIndex`] from `conn`. When `embedder_present`
788/// is false, returns `None` (the keyword-only path doesn't need an index).
789/// When the embedder is present but the DB is empty (or query errors),
790/// returns `Some(VectorIndex::empty())` so write paths can populate it
791/// in-place.
792#[must_use]
793pub fn build_vector_index(conn: &Connection, embedder_present: bool) -> Option<VectorIndex> {
794    if !embedder_present {
795        return None;
796    }
797    match db::get_all_embeddings(conn) {
798        Ok(entries) if !entries.is_empty() => Some(hnsw::VectorIndex::build(entries)),
799        _ => Some(hnsw::VectorIndex::empty()),
800    }
801}
802
803// ---------------------------------------------------------------------------
804// Background tasks (GC, WAL checkpoint)
805// ---------------------------------------------------------------------------
806
807/// Spawn the periodic GC loop. Sleeps `interval`, then runs `db::gc` and
808/// `db::auto_purge_archive` against the daemon's shared connection. The
809/// returned [`JoinHandle`] is owned by the caller; `serve()` aborts it on
810/// shutdown.
811#[must_use]
812pub fn spawn_gc_loop(
813    state: Db,
814    archive_max_days: Option<i64>,
815    interval: Duration,
816) -> JoinHandle<()> {
817    tokio::spawn(async move {
818        loop {
819            tokio::time::sleep(interval).await;
820            let lock = state.lock().await;
821            match db::gc(&lock.0, lock.3) {
822                Ok(n) if n > 0 => tracing::info!("gc: expired {n} memories"),
823                _ => {}
824            }
825            // Auto-purge old archives if configured
826            match db::auto_purge_archive(&lock.0, archive_max_days) {
827                Ok(n) if n > 0 => tracing::info!("gc: purged {n} old archived memories"),
828                _ => {}
829            }
830        }
831    })
832}
833
834/// Spawn the periodic WAL checkpoint loop. First checkpoint runs
835/// `interval / 2` after start (staggered from the GC loop to avoid
836/// lock-contention bursts on cold start), then on a fixed cadence.
837#[must_use]
838pub fn spawn_wal_checkpoint_loop(state: Db, interval: Duration) -> JoinHandle<()> {
839    let half = interval / 2;
840    tokio::spawn(async move {
841        // First checkpoint runs halfway through the interval so the two
842        // long-running maintenance tasks never overlap on cold start.
843        tokio::time::sleep(half).await;
844        loop {
845            {
846                let lock = state.lock().await;
847                match db::checkpoint(&lock.0) {
848                    Ok(()) => tracing::debug!("wal checkpoint: ok"),
849                    Err(e) => tracing::warn!("wal checkpoint failed: {e}"),
850                }
851            }
852            tokio::time::sleep(interval).await;
853        }
854    })
855}
856
857// ---------------------------------------------------------------------------
858// Router composition
859// ---------------------------------------------------------------------------
860
861/// Compose the production HTTP router. Thin wrapper around
862/// [`crate::build_router`] (the W3-vintage source of truth for the
863/// route table). `daemon_runtime::build_router` exists so test code in
864/// this module can build the router without naming `crate::build_router`
865/// directly, and so future router-composition logic (e.g. middleware
866/// reorder, custom layers) lives in one place.
867#[must_use]
868pub fn build_router(app_state: AppState, api_key_state: ApiKeyState) -> Router {
869    crate::build_router(api_key_state, app_state)
870}
871
872// ---------------------------------------------------------------------------
873// serve() — the HTTP daemon body, post-W6 split.
874// ---------------------------------------------------------------------------
875
876/// Aggregated state produced by [`bootstrap_serve`].
877pub struct ServeBootstrap {
878    pub app_state: AppState,
879    pub api_key_state: ApiKeyState,
880    pub db_state: Db,
881    pub archive_max_days: Option<i64>,
882    pub task_handles: Vec<JoinHandle<()>>,
883}
884
885/// Build all daemon state and spawn background tasks. Returns the
886/// aggregated state without binding any sockets — testable in isolation.
887pub async fn bootstrap_serve(
888    db_path: &Path,
889    args: &ServeArgs,
890    app_config: &AppConfig,
891) -> Result<ServeBootstrap> {
892    let resolved_ttl = app_config.effective_ttl();
893    let archive_on_gc = app_config.effective_archive_on_gc();
894    let conn = db::open(db_path)?;
895
896    // Issue #219: build the embedder + HNSW index up front so HTTP write
897    // paths can populate them. Previously the daemon never constructed an
898    // embedder, silently excluding every HTTP-authored memory from semantic
899    // recall. Build only when the configured feature tier enables it —
900    // keyword-only deployments keep their zero-dep, zero-RAM profile.
901    // Daemon has no per-invocation tier override; honour the config tier.
902    let feature_tier = app_config.effective_tier(None);
903    let tier_config = feature_tier.config();
904    let embedder = build_embedder(feature_tier, app_config).await;
905    let vector_index = build_vector_index(&conn, embedder.is_some());
906
907    let db_state: Db = Arc::new(Mutex::new((
908        conn,
909        db_path.to_path_buf(),
910        resolved_ttl,
911        archive_on_gc,
912    )));
913
914    // Federation: parsed from --quorum-writes / --quorum-peers. Disabled
915    // entirely when either is absent — daemon behaves exactly like
916    // v0.6.0 in that case.
917    let federation = federation::FederationConfig::build(
918        args.quorum_writes,
919        &args.quorum_peers,
920        std::time::Duration::from_millis(args.quorum_timeout_ms),
921        args.quorum_client_cert.as_deref(),
922        args.quorum_client_key.as_deref(),
923        args.quorum_ca_cert.as_deref(),
924        format!("host:{}", gethostname::gethostname().to_string_lossy()),
925    )
926    .context("federation config")?;
927
928    let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
929
930    if let Some(ref fed) = federation {
931        tracing::info!(
932            "federation enabled: W={} over {} peer(s), timeout {}ms",
933            fed.policy.w,
934            fed.peer_count(),
935            args.quorum_timeout_ms,
936        );
937        // v0.6.0.1 (#320) — post-partition catchup poller. Closes the gap
938        // where a rejoining node only sees post-resume writes.
939        if args.catchup_interval_secs > 0 {
940            let interval = std::time::Duration::from_secs(args.catchup_interval_secs);
941            tracing::info!(
942                "catchup loop enabled: polling {} peer(s) every {}s",
943                fed.peer_count(),
944                args.catchup_interval_secs,
945            );
946            federation::spawn_catchup_loop(fed.clone(), db_state.clone(), interval);
947        } else {
948            tracing::info!("catchup loop disabled (--catchup-interval-secs=0)");
949        }
950    }
951
952    let app_state = AppState {
953        db: db_state.clone(),
954        embedder: Arc::new(embedder),
955        vector_index: Arc::new(Mutex::new(vector_index)),
956        federation: Arc::new(federation),
957        tier_config: Arc::new(tier_config),
958        scoring: Arc::new(app_config.effective_scoring()),
959    };
960
961    // Automatic GC.
962    task_handles.push(spawn_gc_loop(
963        db_state.clone(),
964        app_config.archive_max_days,
965        Duration::from_secs(GC_INTERVAL_SECS),
966    ));
967
968    // v0.6.0 GA: periodic WAL checkpoint. Under continuous writes the WAL
969    // file grows until SQLite's auto-checkpoint fires (every 1000 pages by
970    // default) — which is inconsistent timing and can leave the file at
971    // hundreds of MB between auto-checkpoints. A dedicated task running on
972    // a fixed cadence keeps the WAL bounded and makes operational storage
973    // behaviour predictable. We stagger from GC to avoid lock-contention
974    // bursts. See docs/ARCHITECTURAL_LIMITS.md for why this workaround is
975    // necessary in a single-connection daemon.
976    task_handles.push(spawn_wal_checkpoint_loop(
977        db_state.clone(),
978        Duration::from_secs(WAL_CHECKPOINT_INTERVAL_SECS),
979    ));
980
981    let api_key_state = ApiKeyState {
982        key: app_config.api_key.clone(),
983    };
984    if api_key_state.key.is_some() {
985        tracing::info!("API key authentication enabled");
986    }
987
988    Ok(ServeBootstrap {
989        app_state,
990        api_key_state,
991        db_state,
992        archive_max_days: app_config.archive_max_days,
993        task_handles,
994    })
995}
996
997/// Init the tracing subscriber for the HTTP daemon. Idempotent at the
998/// `tracing-subscriber` level — repeated calls log a warning and no-op
999/// rather than panic. Split out from `serve()` so test code can opt out.
1000fn init_tracing() {
1001    let _ = tracing_subscriber::fmt()
1002        .with_env_filter(
1003            EnvFilter::from_default_env()
1004                .add_directive("ai_memory=info".parse().unwrap())
1005                .add_directive("tower_http=info".parse().unwrap()),
1006        )
1007        .try_init();
1008}
1009
1010/// Run the HTTP memory daemon. Loads TLS state, builds `AppState`, spawns
1011/// the GC + WAL-checkpoint loops, and binds a listener (TLS or plain HTTP).
1012///
1013/// Behaviour is preserved from the pre-W6 inline `main::serve` body — only
1014/// the structure has changed.
1015#[allow(clippy::too_many_lines)]
1016pub async fn serve(db_path: PathBuf, args: ServeArgs, app_config: &AppConfig) -> Result<()> {
1017    init_tracing();
1018
1019    let bootstrap = bootstrap_serve(&db_path, &args, app_config).await?;
1020
1021    let addr = format!("{}:{}", args.host, args.port);
1022    tracing::info!("database: {}", db_path.display());
1023
1024    // Graceful shutdown with WAL checkpoint
1025    let shutdown_state = bootstrap.db_state.clone();
1026    let shutdown = async move {
1027        let _ = tokio::signal::ctrl_c().await;
1028        tracing::info!("shutting down — checkpointing WAL");
1029        let lock = shutdown_state.lock().await;
1030        let _ = db::checkpoint(&lock.0);
1031    };
1032
1033    // Native TLS (Layer 1): if both --tls-cert and --tls-key are provided,
1034    // bind via axum-server + rustls. Plain HTTP otherwise — backward
1035    // compatible with every prior release. The `requires = …` clap
1036    // attributes prevent the half-configured case.
1037    if let (Some(cert), Some(key)) = (&args.tls_cert, &args.tls_key) {
1038        // rustls 0.23 needs an explicit CryptoProvider; install ring
1039        // before any TLS setup. Idempotent — second install is a
1040        // harmless no-op via ignore.
1041        let _ = rustls::crypto::ring::default_provider().install_default();
1042        // Load TLS / mTLS config BEFORE printing the "listening" log
1043        // so a misconfigured cert / key / allowlist surfaces the error
1044        // first (red-team #248).
1045        let tls_config = if let Some(allowlist_path) = &args.mtls_allowlist {
1046            tracing::info!(
1047                "mTLS enabled — client certs required. Allowlist: {}",
1048                allowlist_path.display()
1049            );
1050            tls::load_mtls_rustls_config(cert, key, allowlist_path).await?
1051        } else {
1052            tracing::warn!(
1053                "TLS enabled but mTLS NOT configured — sync endpoints \
1054                 (/api/v1/sync/push, /api/v1/sync/since) accept any client. \
1055                 Set --mtls-allowlist for production peer-mesh deployments \
1056                 (red-team #231)."
1057            );
1058            tls::load_rustls_config(cert, key).await?
1059        };
1060        let app = build_router(bootstrap.app_state, bootstrap.api_key_state);
1061        tracing::info!("ai-memory listening on https://{addr}");
1062        let socket_addr: std::net::SocketAddr = addr.parse()?;
1063        // axum-server doesn't have a direct graceful-shutdown on the
1064        // TLS builder yet; spawn the signal listener on the Handle
1065        // instead so ctrl_c triggers a graceful shutdown. Window is
1066        // operator-configurable via --shutdown-grace-secs (default 30,
1067        // bumped from 10 in v0.6.0 — red-team #233).
1068        let grace = std::time::Duration::from_secs(args.shutdown_grace_secs);
1069        let handle = axum_server::Handle::new();
1070        let handle_clone = handle.clone();
1071        tokio::spawn(async move {
1072            shutdown.await;
1073            handle_clone.graceful_shutdown(Some(grace));
1074        });
1075        axum_server::bind_rustls(socket_addr, tls_config)
1076            .handle(handle)
1077            .serve(app.into_make_service())
1078            .await?;
1079    } else {
1080        tracing::warn!(
1081            "TLS NOT enabled — sync endpoints (/api/v1/sync/push, \
1082             /api/v1/sync/since) accept any caller over plain HTTP. \
1083             Set --tls-cert + --tls-key + --mtls-allowlist for production \
1084             peer-mesh deployments (red-team #231)."
1085        );
1086        tracing::info!("ai-memory listening on http://{addr}");
1087        // Wave 3 (v0.6.3): the non-TLS path delegates to
1088        // `daemon_runtime::serve_http_with_shutdown_future`, which is the
1089        // same `build_router` + `TcpListener::bind` + `axum::serve` body
1090        // the integration tests drive in-process. Production threads its
1091        // WAL-checkpoint-on-shutdown future in directly so the cleanup
1092        // semantic is preserved verbatim.
1093        serve_http_with_shutdown_future(
1094            &addr,
1095            bootstrap.api_key_state,
1096            bootstrap.app_state,
1097            shutdown,
1098        )
1099        .await?;
1100    }
1101    Ok(())
1102}
1103
1104// ---------------------------------------------------------------------------
1105// cmd_bench / cmd_migrate (no-op for non-sal builds)
1106// ---------------------------------------------------------------------------
1107
1108fn cmd_bench(args: &BenchArgs) -> Result<()> {
1109    let iterations = args.iterations.clamp(1, 100_000);
1110    let warmup = args.warmup.min(10_000);
1111    let regression_threshold = args.regression_threshold.clamp(0.0, 1000.0);
1112    // Bench always seeds a disposable in-memory DB so the operator's
1113    // main DB (and disk) are untouched. SQLite's `:memory:` URL and
1114    // WAL-less mode keep the workload bounded by RAM and CPU.
1115    let conn = db::open(Path::new(":memory:"))?;
1116    let config = bench::BenchConfig {
1117        iterations,
1118        warmup,
1119        namespace: bench::BENCH_NAMESPACE.to_string(),
1120    };
1121    let results = bench::run(&conn, &config)?;
1122
1123    let regressions = if let Some(path) = &args.baseline {
1124        let baseline = bench::load_baseline(Path::new(path))?;
1125        Some(bench::compare_against_baseline(
1126            &results,
1127            &baseline,
1128            regression_threshold,
1129        ))
1130    } else {
1131        None
1132    };
1133
1134    if args.json {
1135        println!(
1136            "{}",
1137            serde_json::to_string_pretty(&serde_json::json!({
1138                "iterations": iterations,
1139                "warmup": warmup,
1140                "results": results,
1141                "regressions": regressions,
1142            }))?
1143        );
1144    } else {
1145        print!("{}", bench::render_table(&results));
1146        if let Some(rows) = &regressions {
1147            println!();
1148            print!("{}", bench::render_regression_table(rows));
1149        }
1150    }
1151
1152    if let Some(history_path) = &args.history {
1153        let captured_at = chrono::Utc::now().to_rfc3339();
1154        bench::append_history(history_path, &captured_at, iterations, warmup, &results)?;
1155        let mut stderr = std::io::stderr().lock();
1156        let _ = writeln!(
1157            stderr,
1158            "bench: appended run to history file {}",
1159            history_path.display()
1160        );
1161    }
1162
1163    let budget_failed = results
1164        .iter()
1165        .any(|r| matches!(r.status, bench::Status::Fail));
1166    let regression_failed = regressions
1167        .as_ref()
1168        .is_some_and(|rows| rows.iter().any(|r| r.regressed));
1169
1170    if budget_failed && regression_failed {
1171        anyhow::bail!(
1172            "bench: at least one operation exceeded its p95 budget by >10% AND regressed >{regression_threshold:.1}% vs baseline"
1173        );
1174    }
1175    if budget_failed {
1176        anyhow::bail!("bench: at least one operation exceeded its p95 budget by >10%");
1177    }
1178    if regression_failed {
1179        anyhow::bail!(
1180            "bench: at least one operation regressed >{regression_threshold:.1}% vs baseline"
1181        );
1182    }
1183    Ok(())
1184}
1185
1186#[cfg(feature = "sal")]
1187async fn cmd_migrate(args: &MigrateArgs) -> Result<()> {
1188    let src = migrate::open_store(&args.from)
1189        .await
1190        .context("open source store")?;
1191    let dst = migrate::open_store(&args.to)
1192        .await
1193        .context("open destination store")?;
1194    let report = migrate::migrate(
1195        src.as_ref(),
1196        dst.as_ref(),
1197        args.batch,
1198        args.namespace.clone(),
1199        args.dry_run,
1200    )
1201    .await;
1202    if args.json {
1203        let value = serde_json::json!({
1204            "from_url": args.from,
1205            "to_url": args.to,
1206            "memories_read": report.memories_read,
1207            "memories_written": report.memories_written,
1208            "batches": report.batches,
1209            "errors": report.errors,
1210            "dry_run": report.dry_run,
1211        });
1212        println!("{}", serde_json::to_string_pretty(&value)?);
1213    } else {
1214        println!("migration report");
1215        println!("  from:              {}", args.from);
1216        println!("  to:                {}", args.to);
1217        println!("  memories_read:     {}", report.memories_read);
1218        println!("  memories_written:  {}", report.memories_written);
1219        println!("  batches:           {}", report.batches);
1220        println!("  dry_run:           {}", report.dry_run);
1221        println!("  errors:            {}", report.errors.len());
1222        for e in &report.errors {
1223            println!("    - {e}");
1224        }
1225    }
1226    if !report.errors.is_empty() {
1227        anyhow::bail!("migration completed with {} error(s)", report.errors.len());
1228    }
1229    Ok(())
1230}
1231
1232// ---------------------------------------------------------------------------
1233// Pre-W6 helpers — in-process HTTP harness, sync-daemon body, curator-daemon body.
1234// ---------------------------------------------------------------------------
1235
1236/// Run the HTTP daemon (plain HTTP, no TLS) with a programmable shutdown.
1237///
1238/// Mirrors the `else` branch of `serve()` in pre-W6 `main.rs` (the non-TLS
1239/// path). Builds the production `Router` via `build_router`, binds a
1240/// `TcpListener` to `addr`, and runs `axum::serve` with a graceful-shutdown
1241/// future that resolves when `shutdown.notify_one()` is called.
1242///
1243/// Tests pass a known port (pick one via `free_port()` and pass
1244/// `127.0.0.1:<port>`). The function returns when shutdown completes;
1245/// callers can `tokio::spawn` it and `notify` to stop.
1246pub async fn serve_http_with_shutdown(
1247    addr: &str,
1248    api_key_state: ApiKeyState,
1249    app_state: AppState,
1250    shutdown: Arc<Notify>,
1251) -> Result<()> {
1252    serve_http_with_shutdown_future(addr, api_key_state, app_state, async move {
1253        shutdown.notified().await;
1254    })
1255    .await
1256}
1257
1258/// Variant of [`serve_http_with_shutdown`] that takes an arbitrary
1259/// shutdown future. The production `serve()` needs to run a WAL
1260/// checkpoint after the OS signal but before tearing down the listener;
1261/// that cleanup work is awkward to express through a `Notify` alone.
1262/// Accepting a `Future` lets the caller embed any async cleanup into the
1263/// shutdown future itself, while the helper keeps the `build_router` +
1264/// `TcpListener::bind` + `axum::serve` body it already owns.
1265pub async fn serve_http_with_shutdown_future<F>(
1266    addr: &str,
1267    api_key_state: ApiKeyState,
1268    app_state: AppState,
1269    shutdown: F,
1270) -> Result<()>
1271where
1272    F: std::future::Future<Output = ()> + Send + 'static,
1273{
1274    let app = crate::build_router(api_key_state, app_state);
1275    let listener = tokio::net::TcpListener::bind(addr)
1276        .await
1277        .with_context(|| format!("bind {addr}"))?;
1278    axum::serve(listener, app)
1279        .with_graceful_shutdown(shutdown)
1280        .await
1281        .context("axum::serve")?;
1282    Ok(())
1283}
1284
1285/// Run a single sync cycle against one peer — pull then push.
1286///
1287/// Lifted verbatim (modulo path-of-Path-vs-PathBuf) from the pre-W6
1288/// `main.rs::sync_cycle_once` so the integration sync-daemon test can
1289/// drive it without subprocess. The signature matches the private
1290/// main.rs helper 1:1 to keep call sites identical.
1291pub async fn sync_cycle_once(
1292    client: &reqwest::Client,
1293    db_path: &Path,
1294    local_agent_id: &str,
1295    peer_url: &str,
1296    api_key: Option<&str>,
1297    batch_size: usize,
1298) -> Result<()> {
1299    let peer_url = peer_url.trim_end_matches('/');
1300
1301    // --- PULL --------------------------------------------------------
1302    let since = {
1303        let conn = db::open(db_path)?;
1304        db::sync_state_load(&conn, local_agent_id)?
1305            .entries
1306            .get(peer_url)
1307            .cloned()
1308    };
1309
1310    let mut pull_url = format!(
1311        "{peer_url}/api/v1/sync/since?limit={batch_size}&peer={}",
1312        urlencoding_minimal(local_agent_id)
1313    );
1314    if let Some(ref s) = since {
1315        pull_url.push_str("&since=");
1316        pull_url.push_str(&urlencoding_minimal(s));
1317    }
1318
1319    let mut req = client.get(&pull_url).header("x-agent-id", local_agent_id);
1320    if let Some(key) = api_key {
1321        req = req.header("x-api-key", key);
1322    }
1323    let resp = req.send().await?;
1324    if !resp.status().is_success() {
1325        anyhow::bail!("sync-daemon: pull status {}", resp.status());
1326    }
1327    let pulled: SyncSinceResponse = resp.json().await?;
1328    let pull_count = pulled.memories.len();
1329    let latest_pulled = pulled.memories.last().map(|m| m.updated_at.clone());
1330
1331    {
1332        let conn = db::open(db_path)?;
1333        for mem in &pulled.memories {
1334            if crate::validate::validate_memory(mem).is_ok() {
1335                let _ = db::insert_if_newer(&conn, mem);
1336            }
1337        }
1338        if let Some(ref at) = latest_pulled {
1339            db::sync_state_observe(&conn, local_agent_id, peer_url, at)?;
1340        }
1341    }
1342
1343    // --- PUSH --------------------------------------------------------
1344    let last_pushed = {
1345        let conn = db::open(db_path)?;
1346        db::sync_state_last_pushed(&conn, local_agent_id, peer_url)
1347    };
1348    let outgoing = {
1349        let conn = db::open(db_path)?;
1350        db::memories_updated_since(&conn, last_pushed.as_deref(), batch_size)?
1351    };
1352    let push_count = outgoing.len();
1353    let latest_pushed = outgoing.last().map(|m| m.updated_at.clone());
1354
1355    if !outgoing.is_empty() {
1356        let body = serde_json::json!({
1357            "sender_agent_id": local_agent_id,
1358            "sender_clock": { "entries": {} },
1359            "memories": outgoing,
1360            "dry_run": false,
1361        });
1362        let mut req = client
1363            .post(format!("{peer_url}/api/v1/sync/push"))
1364            .header("x-agent-id", local_agent_id)
1365            .header("content-type", "application/json")
1366            .json(&body);
1367        if let Some(key) = api_key {
1368            req = req.header("x-api-key", key);
1369        }
1370        let resp = req.send().await?;
1371        if !resp.status().is_success() {
1372            anyhow::bail!("sync-daemon: push status {}", resp.status());
1373        }
1374        if let Some(at) = latest_pushed {
1375            let conn = db::open(db_path)?;
1376            db::sync_state_record_push(&conn, local_agent_id, peer_url, &at)?;
1377        }
1378    }
1379
1380    tracing::info!("sync-daemon: peer={peer_url} pulled={pull_count} pushed={push_count}");
1381    Ok(())
1382}
1383
1384/// Run the sync-daemon main loop with a programmable shutdown.
1385///
1386/// Mirrors the body of the pre-W6 `cmd_sync_daemon()` in `main.rs`: for
1387/// each cycle, fan out a `JoinSet` across `peers`, then race a sleep
1388/// against the shutdown notify. Returns when the notify fires. The
1389/// integration test can build a one-cycle test by setting `interval_secs=1`
1390/// and notifying after a short tokio sleep.
1391pub async fn run_sync_daemon_with_shutdown(
1392    db_path: PathBuf,
1393    local_agent_id: String,
1394    peers: Vec<String>,
1395    api_key: Option<String>,
1396    interval_secs: u64,
1397    batch_size: usize,
1398    shutdown: Arc<Notify>,
1399) -> Result<()> {
1400    let client = reqwest::Client::builder()
1401        .timeout(Duration::from_secs(30))
1402        .build()?;
1403    run_sync_daemon_with_shutdown_using_client(
1404        client,
1405        db_path,
1406        local_agent_id,
1407        peers,
1408        api_key,
1409        interval_secs,
1410        batch_size,
1411        shutdown,
1412    )
1413    .await
1414}
1415
1416/// Variant of [`run_sync_daemon_with_shutdown`] that takes a caller-built
1417/// `reqwest::Client`. The production `cmd_sync_daemon()` constructs an
1418/// mTLS-aware client (via `build_rustls_client_config`) and threads it
1419/// in here so the helper drives the same loop body the test version
1420/// drives — keeping `daemon_runtime` as the single source of truth for
1421/// the sync-daemon loop while preserving the production TLS contract.
1422pub async fn run_sync_daemon_with_shutdown_using_client(
1423    client: reqwest::Client,
1424    db_path: PathBuf,
1425    local_agent_id: String,
1426    peers: Vec<String>,
1427    api_key: Option<String>,
1428    interval_secs: u64,
1429    batch_size: usize,
1430    shutdown: Arc<Notify>,
1431) -> Result<()> {
1432    let interval = interval_secs.max(1);
1433    let batch_size = batch_size.max(1);
1434
1435    let db_path_owned: Arc<Path> = Arc::from(db_path.as_path());
1436    let local_agent_id_arc: Arc<str> = Arc::from(local_agent_id.as_str());
1437    let api_key_arc: Option<Arc<str>> = api_key.as_deref().map(Arc::from);
1438    let peers_arc: Vec<Arc<str>> = peers.iter().map(|s| Arc::from(s.as_str())).collect();
1439    loop {
1440        let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1441        for peer_url in &peers_arc {
1442            let client = client.clone();
1443            let db_path = db_path_owned.clone();
1444            let local_agent_id = local_agent_id_arc.clone();
1445            let peer_url = peer_url.clone();
1446            let api_key = api_key_arc.clone();
1447            set.spawn(async move {
1448                if let Err(e) = sync_cycle_once(
1449                    &client,
1450                    &db_path,
1451                    &local_agent_id,
1452                    &peer_url,
1453                    api_key.as_deref(),
1454                    batch_size,
1455                )
1456                .await
1457                {
1458                    tracing::warn!("sync-daemon: peer {peer_url} cycle failed: {e}");
1459                }
1460            });
1461        }
1462        while set.join_next().await.is_some() {}
1463
1464        tokio::select! {
1465            () = tokio::time::sleep(Duration::from_secs(interval)) => {}
1466            () = shutdown.notified() => {
1467                tracing::info!("sync-daemon: shutdown signal received");
1468                return Ok(());
1469            }
1470        }
1471    }
1472}
1473
1474/// Run the curator daemon with a programmable shutdown.
1475///
1476/// Mirrors the daemon arm of the pre-W6 `cmd_curator()`. The inner work is
1477/// `curator::run_daemon` (a blocking, tight-loop-with-`AtomicBool` already
1478/// in lib code), which we drive from a `spawn_blocking`. Tests fire the
1479/// `Notify` to set the shutdown bool and the blocking task observes it
1480/// within ~500ms (`run_daemon`'s sleep tick).
1481pub async fn run_curator_daemon_with_shutdown(
1482    db_path: PathBuf,
1483    cfg: crate::curator::CuratorConfig,
1484    shutdown: Arc<Notify>,
1485) -> Result<()> {
1486    let shutdown_flag = Arc::new(AtomicBool::new(false));
1487    let shutdown_flag_for_signal = shutdown_flag.clone();
1488    tokio::spawn(async move {
1489        shutdown.notified().await;
1490        shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1491    });
1492
1493    let llm_arc: Option<Arc<crate::llm::OllamaClient>> = None;
1494    let db_owned = db_path;
1495    tokio::task::spawn_blocking(move || {
1496        crate::curator::run_daemon(db_owned, llm_arc, cfg, shutdown_flag);
1497    })
1498    .await
1499    .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1500    Ok(())
1501}
1502
1503/// Curator-daemon loop body, primitive-arg flavour for the binary.
1504///
1505/// `ollama_model` of `None` disables the LLM (matching the pre-tiered
1506/// keyword-only path in `build_curator_llm`).
1507#[allow(clippy::too_many_arguments)]
1508pub async fn run_curator_daemon_with_primitives(
1509    db_path: PathBuf,
1510    interval_secs: u64,
1511    max_ops_per_cycle: usize,
1512    dry_run: bool,
1513    include_namespaces: Vec<String>,
1514    exclude_namespaces: Vec<String>,
1515    ollama_model: Option<String>,
1516    shutdown: Arc<Notify>,
1517) -> Result<()> {
1518    let cfg = crate::curator::CuratorConfig {
1519        interval_secs,
1520        max_ops_per_cycle,
1521        dry_run,
1522        include_namespaces,
1523        exclude_namespaces,
1524    };
1525    let llm: Option<Arc<crate::llm::OllamaClient>> =
1526        ollama_model.and_then(|m| crate::llm::OllamaClient::new(&m).ok().map(Arc::new));
1527
1528    let shutdown_flag = Arc::new(AtomicBool::new(false));
1529    let shutdown_flag_for_signal = shutdown_flag.clone();
1530    tokio::spawn(async move {
1531        shutdown.notified().await;
1532        shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1533    });
1534
1535    tokio::task::spawn_blocking(move || {
1536        crate::curator::run_daemon(db_path, llm, cfg, shutdown_flag);
1537    })
1538    .await
1539    .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1540    Ok(())
1541}
1542
1543// -----------------------------------------------------------------------
1544// helpers
1545// -----------------------------------------------------------------------
1546
1547/// Minimal URL-component encoder — only the characters the sync-daemon
1548/// queries actually emit (RFC3339 timestamps with `:` and `+`, and
1549/// agent ids with `:`/`@`/`/`). Mirror of the pre-W6
1550/// `main.rs::urlencoding_minimal`.
1551fn urlencoding_minimal(s: &str) -> String {
1552    use std::fmt::Write as _;
1553    let mut out = String::with_capacity(s.len());
1554    for b in s.bytes() {
1555        match b {
1556            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1557                out.push(b as char);
1558            }
1559            _ => {
1560                let _ = write!(out, "%{b:02X}");
1561            }
1562        }
1563    }
1564    out
1565}
1566
1567/// Mirrors the pre-W6 `main.rs::SyncSinceResponse` — the fields we
1568/// deserialize from the peer's `/api/v1/sync/since` body. `count` and
1569/// `limit` are present in the wire payload but unused on the receive
1570/// side; allowed to be dead so `clippy::pedantic` doesn't trip.
1571#[derive(serde::Deserialize)]
1572struct SyncSinceResponse {
1573    #[allow(dead_code)]
1574    count: usize,
1575    #[allow(dead_code)]
1576    limit: usize,
1577    memories: Vec<crate::models::Memory>,
1578}
1579
1580/// Re-export the `Instant`/`Duration` types so test crate use sites stay
1581/// terse.  Kept private — internal to this module.
1582#[allow(dead_code)]
1583fn _imports_in_use(_: Instant, _: Duration) {}
1584
1585// ===========================================================================
1586// Tests
1587// ===========================================================================
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592    use crate::cli::test_utils::TestEnv;
1593    use crate::config::ResolvedTtl;
1594    use axum::body::Body;
1595    use axum::http::{Request, StatusCode};
1596    use tower::ServiceExt as _;
1597
1598    // ----- helpers -------------------------------------------------------
1599
1600    fn args_with_db(_db: &Path) -> ServeArgs {
1601        ServeArgs {
1602            host: "127.0.0.1".to_string(),
1603            port: 0,
1604            tls_cert: None,
1605            tls_key: None,
1606            mtls_allowlist: None,
1607            shutdown_grace_secs: 30,
1608            quorum_writes: 0,
1609            quorum_peers: vec![],
1610            quorum_timeout_ms: 2000,
1611            quorum_client_cert: None,
1612            quorum_client_key: None,
1613            quorum_ca_cert: None,
1614            catchup_interval_secs: 0,
1615        }
1616    }
1617
1618    fn keyword_app_state(db_path: &Path) -> AppState {
1619        let conn = db::open(db_path).unwrap();
1620        let db_state: Db = Arc::new(Mutex::new((
1621            conn,
1622            db_path.to_path_buf(),
1623            ResolvedTtl::default(),
1624            true,
1625        )));
1626        AppState {
1627            db: db_state,
1628            embedder: Arc::new(None),
1629            vector_index: Arc::new(Mutex::new(None)),
1630            federation: Arc::new(None),
1631            tier_config: Arc::new(FeatureTier::Keyword.config()),
1632            scoring: Arc::new(crate::config::ResolvedScoring::default()),
1633        }
1634    }
1635
1636    /// Mutex env-var guard. Tests that flip env vars must serialize to
1637    /// avoid clobbering each other; `cargo test --test-threads=2` is the
1638    /// upstream gate but a per-test mutex keeps the tests honest.
1639    fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
1640        use std::sync::OnceLock;
1641        static LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1642        LOCK.get_or_init(|| std::sync::Mutex::new(()))
1643            .lock()
1644            .unwrap_or_else(|e| e.into_inner())
1645    }
1646
1647    // ----- is_write_command ---------------------------------------------
1648
1649    #[test]
1650    fn test_is_write_command_all_variants() {
1651        // Use clap's parser to build every Command variant. This avoids
1652        // having to know each Args struct's required-field set by name —
1653        // we just feed the same argv form an operator would use, and
1654        // assert the predicate returns the right answer.
1655        //
1656        // Writes (post-run WAL checkpoint expected):
1657        let writes: &[&[&str]] = &[
1658            &["ai-memory", "store", "title", "content"],
1659            &["ai-memory", "update", "id123", "--title", "t"],
1660            &["ai-memory", "delete", "id123"],
1661            &["ai-memory", "promote", "id123"],
1662            &["ai-memory", "forget", "pattern"],
1663            &["ai-memory", "link", "a", "b"],
1664            &["ai-memory", "consolidate", "ids"],
1665            &["ai-memory", "resolve", "a", "b"],
1666            &["ai-memory", "sync", "--peer", "/tmp/peer.db"],
1667            &[
1668                "ai-memory",
1669                "sync-daemon",
1670                "--peers",
1671                "http://x",
1672                "--interval-secs",
1673                "60",
1674            ],
1675            &["ai-memory", "import"],
1676            &["ai-memory", "auto-consolidate"],
1677            &["ai-memory", "gc"],
1678        ];
1679        let mut writes_checked = 0;
1680        for argv in writes {
1681            // Skip a variant whose required-field set our argv doesn't
1682            // match (clap will reject it). We still get coverage from the
1683            // variants that parse cleanly, which is the bulk.
1684            if let Ok(cli) = Cli::try_parse_from(*argv) {
1685                assert!(
1686                    is_write_command(&cli.command),
1687                    "expected write for {argv:?}"
1688                );
1689                writes_checked += 1;
1690            }
1691        }
1692        assert!(
1693            writes_checked >= 5,
1694            "expected at least 5 write variants checked, got {writes_checked}"
1695        );
1696
1697        // Reads / no-ops (no checkpoint expected):
1698        let reads: &[&[&str]] = &[
1699            &["ai-memory", "mcp"],
1700            &["ai-memory", "recall", "context"],
1701            &["ai-memory", "search", "query"],
1702            &["ai-memory", "get", "id"],
1703            &["ai-memory", "list"],
1704            &["ai-memory", "stats"],
1705            &["ai-memory", "namespaces"],
1706            &["ai-memory", "export"],
1707            &["ai-memory", "shell"],
1708            &["ai-memory", "man"],
1709            &["ai-memory", "completions", "bash"],
1710            &["ai-memory", "archive", "list"],
1711            &["ai-memory", "agents", "list"],
1712            &["ai-memory", "pending", "list"],
1713            &["ai-memory", "bench"],
1714            &["ai-memory", "serve", "--host", "127.0.0.1", "--port", "0"],
1715        ];
1716        let mut reads_checked = 0;
1717        for argv in reads {
1718            if let Ok(cli) = Cli::try_parse_from(*argv) {
1719                assert!(
1720                    !is_write_command(&cli.command),
1721                    "expected read for {argv:?}"
1722                );
1723                reads_checked += 1;
1724            }
1725        }
1726        assert!(
1727            reads_checked >= 8,
1728            "expected at least 8 read variants checked, got {reads_checked}"
1729        );
1730
1731        // Direct construction of the Args-less variants (10 variants
1732        // covered programmatically by clap above; pin the no-Args ones
1733        // here too for explicitness):
1734        assert!(is_write_command(&Command::Gc));
1735        assert!(!is_write_command(&Command::Stats));
1736        assert!(!is_write_command(&Command::Namespaces));
1737        assert!(!is_write_command(&Command::Export));
1738        assert!(!is_write_command(&Command::Shell));
1739        assert!(!is_write_command(&Command::Man));
1740        assert!(!is_write_command(&Command::Mcp {
1741            tier: "keyword".to_string()
1742        }));
1743    }
1744
1745    // ----- build_router via lib::build_router ---------------------------
1746
1747    #[tokio::test]
1748    async fn test_router_has_health_endpoint() {
1749        let env = TestEnv::fresh();
1750        let app_state = keyword_app_state(&env.db_path);
1751        let api_key_state = ApiKeyState { key: None };
1752        let router = build_router(app_state, api_key_state);
1753        let resp = router
1754            .oneshot(
1755                Request::builder()
1756                    .method("GET")
1757                    .uri("/api/v1/health")
1758                    .body(Body::empty())
1759                    .unwrap(),
1760            )
1761            .await
1762            .unwrap();
1763        assert_eq!(resp.status(), StatusCode::OK);
1764    }
1765
1766    #[tokio::test]
1767    async fn test_router_has_metrics_at_both_paths() {
1768        let env = TestEnv::fresh();
1769        let app_state = keyword_app_state(&env.db_path);
1770        let api_key_state = ApiKeyState { key: None };
1771        // /metrics
1772        let r1 = build_router(app_state.clone(), api_key_state.clone())
1773            .oneshot(
1774                Request::builder()
1775                    .method("GET")
1776                    .uri("/metrics")
1777                    .body(Body::empty())
1778                    .unwrap(),
1779            )
1780            .await
1781            .unwrap();
1782        assert_eq!(r1.status(), StatusCode::OK);
1783        // /api/v1/metrics
1784        let r2 = build_router(app_state, api_key_state)
1785            .oneshot(
1786                Request::builder()
1787                    .method("GET")
1788                    .uri("/api/v1/metrics")
1789                    .body(Body::empty())
1790                    .unwrap(),
1791            )
1792            .await
1793            .unwrap();
1794        assert_eq!(r2.status(), StatusCode::OK);
1795    }
1796
1797    #[tokio::test]
1798    async fn test_router_lists_all_v1_memory_routes() {
1799        let env = TestEnv::fresh();
1800        let app_state = keyword_app_state(&env.db_path);
1801        let api_key_state = ApiKeyState { key: None };
1802        let router = build_router(app_state, api_key_state);
1803        let resp = router
1804            .oneshot(
1805                Request::builder()
1806                    .method("GET")
1807                    .uri("/api/v1/memories")
1808                    .body(Body::empty())
1809                    .unwrap(),
1810            )
1811            .await
1812            .unwrap();
1813        // Empty DB returns 200 with an empty list — anything non-error
1814        // proves the route is wired in.
1815        assert!(resp.status().is_success(), "got {}", resp.status());
1816    }
1817
1818    #[tokio::test]
1819    async fn test_router_applies_api_key_middleware_when_key_set() {
1820        let env = TestEnv::fresh();
1821        let app_state = keyword_app_state(&env.db_path);
1822        let api_key_state = ApiKeyState {
1823            key: Some("s3cret".to_string()),
1824        };
1825        let router = build_router(app_state, api_key_state);
1826        let resp = router
1827            .oneshot(
1828                Request::builder()
1829                    .method("GET")
1830                    .uri("/api/v1/memories")
1831                    .body(Body::empty())
1832                    .unwrap(),
1833            )
1834            .await
1835            .unwrap();
1836        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1837    }
1838
1839    #[tokio::test]
1840    async fn test_router_skips_api_key_middleware_when_key_none() {
1841        let env = TestEnv::fresh();
1842        let app_state = keyword_app_state(&env.db_path);
1843        let api_key_state = ApiKeyState { key: None };
1844        let router = build_router(app_state, api_key_state);
1845        let resp = router
1846            .oneshot(
1847                Request::builder()
1848                    .method("GET")
1849                    .uri("/api/v1/memories")
1850                    .body(Body::empty())
1851                    .unwrap(),
1852            )
1853            .await
1854            .unwrap();
1855        assert_eq!(resp.status(), StatusCode::OK);
1856    }
1857
1858    // ----- build_embedder ------------------------------------------------
1859
1860    #[tokio::test]
1861    async fn test_build_embedder_keyword_tier_returns_none() {
1862        let cfg = AppConfig::default();
1863        let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
1864        assert!(emb.is_none());
1865    }
1866
1867    #[tokio::test]
1868    async fn test_build_embedder_load_failure_returns_none() {
1869        // Can't easily induce a load failure without network — skip here.
1870        // Keyword tier covers the None branch; the ERROR-level fallback
1871        // path requires a live HF-hub-style mock, which is out of scope
1872        // for a unit test. The semantic-tier success/failure path is
1873        // exercised under `feature = "test-with-models"` in the
1874        // recall integration tests.
1875        // This test stays as a smoke check — it doesn't attempt to load.
1876    }
1877
1878    // ----- build_vector_index -------------------------------------------
1879
1880    #[test]
1881    fn test_build_vector_index_no_embedder_returns_none() {
1882        let env = TestEnv::fresh();
1883        let conn = db::open(&env.db_path).unwrap();
1884        assert!(build_vector_index(&conn, false).is_none());
1885    }
1886
1887    #[test]
1888    fn test_build_vector_index_empty_db_returns_empty_index() {
1889        let env = TestEnv::fresh();
1890        let conn = db::open(&env.db_path).unwrap();
1891        let idx = build_vector_index(&conn, true);
1892        assert!(
1893            idx.is_some(),
1894            "empty DB with embedder must yield empty index"
1895        );
1896        assert_eq!(idx.unwrap().len(), 0);
1897    }
1898
1899    // ----- spawn_gc_loop / spawn_wal_checkpoint_loop --------------------
1900
1901    #[tokio::test(start_paused = true)]
1902    async fn test_spawn_gc_loop_runs_and_can_be_aborted() {
1903        let env = TestEnv::fresh();
1904        let conn = db::open(&env.db_path).unwrap();
1905        let state: Db = Arc::new(Mutex::new((
1906            conn,
1907            env.db_path.clone(),
1908            ResolvedTtl::default(),
1909            true,
1910        )));
1911        let h = spawn_gc_loop(state, None, Duration::from_secs(60));
1912        // Advance past the first sleep — the loop should now have ticked at
1913        // least once (its sleep arm has resolved). We can't easily observe
1914        // a side effect on an empty DB, so just abort and confirm the
1915        // handle is well-behaved.
1916        tokio::time::advance(Duration::from_secs(61)).await;
1917        // Yield once so the background task can see the tick.
1918        tokio::task::yield_now().await;
1919        h.abort();
1920        // Joining an aborted handle returns `JoinError` with cancelled() == true.
1921        let err = h.await.unwrap_err();
1922        assert!(err.is_cancelled());
1923    }
1924
1925    #[tokio::test(start_paused = true)]
1926    async fn test_spawn_wal_checkpoint_loop_runs_and_can_be_aborted() {
1927        let env = TestEnv::fresh();
1928        let conn = db::open(&env.db_path).unwrap();
1929        let state: Db = Arc::new(Mutex::new((
1930            conn,
1931            env.db_path.clone(),
1932            ResolvedTtl::default(),
1933            true,
1934        )));
1935        let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(60));
1936        // First sleep is interval/2 = 30s. Advance past that + one full
1937        // interval to ensure at least one checkpoint cycle ran.
1938        tokio::time::advance(Duration::from_secs(31)).await;
1939        tokio::task::yield_now().await;
1940        tokio::time::advance(Duration::from_secs(60)).await;
1941        tokio::task::yield_now().await;
1942        h.abort();
1943        let err = h.await.unwrap_err();
1944        assert!(err.is_cancelled());
1945    }
1946
1947    // ----- passphrase_from_file -----------------------------------------
1948
1949    #[test]
1950    fn test_passphrase_strips_trailing_newline() {
1951        let dir = tempfile::tempdir().unwrap();
1952        let p = dir.path().join("pass");
1953        std::fs::write(&p, "secret\n").unwrap();
1954        assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
1955    }
1956
1957    #[test]
1958    fn test_passphrase_strips_trailing_crlf() {
1959        let dir = tempfile::tempdir().unwrap();
1960        let p = dir.path().join("pass");
1961        std::fs::write(&p, "secret\r\n").unwrap();
1962        assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
1963    }
1964
1965    #[test]
1966    fn test_passphrase_empty_file_errors() {
1967        let dir = tempfile::tempdir().unwrap();
1968        let p = dir.path().join("empty");
1969        std::fs::write(&p, "").unwrap();
1970        let err = passphrase_from_file(&p).unwrap_err();
1971        assert!(
1972            err.to_string().contains("empty"),
1973            "expected 'empty' error, got: {err}"
1974        );
1975    }
1976
1977    #[test]
1978    fn test_passphrase_empty_after_trim_errors() {
1979        // File contains only whitespace lines — after trim_end_matches
1980        // it remains "  \t" (internal whitespace preserved). Only "\n"
1981        // / "\r" alone would trigger the empty-after-strip case.
1982        let dir = tempfile::tempdir().unwrap();
1983        let p = dir.path().join("nl-only");
1984        std::fs::write(&p, "\n").unwrap();
1985        let err = passphrase_from_file(&p).unwrap_err();
1986        assert!(err.to_string().contains("empty"));
1987    }
1988
1989    #[test]
1990    fn test_passphrase_nonexistent_file_errors() {
1991        let dir = tempfile::tempdir().unwrap();
1992        let p = dir.path().join("does-not-exist");
1993        let err = passphrase_from_file(&p).unwrap_err();
1994        assert!(
1995            err.to_string().contains("reading passphrase file")
1996                || err.chain().any(|e| e.to_string().contains("No such file"))
1997                || err.chain().any(|e| e.to_string().contains("cannot find")),
1998            "got: {err:#}"
1999        );
2000    }
2001
2002    #[test]
2003    fn test_passphrase_preserves_internal_whitespace() {
2004        let dir = tempfile::tempdir().unwrap();
2005        let p = dir.path().join("pass");
2006        std::fs::write(&p, "my pass phrase\n").unwrap();
2007        assert_eq!(passphrase_from_file(&p).unwrap(), "my pass phrase");
2008    }
2009
2010    // ----- apply_anonymize_default --------------------------------------
2011
2012    #[test]
2013    fn test_anonymize_set_when_config_true_and_env_unset() {
2014        let _g = env_var_lock();
2015        // SAFETY: serialized via env_var_lock.
2016        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2017        let mut cfg = AppConfig::default();
2018        cfg.identity = Some(crate::config::IdentityConfig {
2019            anonymize_default: true,
2020        });
2021        apply_anonymize_default(&cfg);
2022        assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "1");
2023        // SAFETY: serialized via env_var_lock.
2024        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2025    }
2026
2027    #[test]
2028    fn test_anonymize_unchanged_when_env_already_set() {
2029        let _g = env_var_lock();
2030        // SAFETY: serialized via env_var_lock.
2031        unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
2032        let mut cfg = AppConfig::default();
2033        cfg.identity = Some(crate::config::IdentityConfig {
2034            anonymize_default: true,
2035        });
2036        apply_anonymize_default(&cfg);
2037        // Env var is left alone — caller-set value wins.
2038        assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "0");
2039        // SAFETY: serialized via env_var_lock.
2040        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2041    }
2042
2043    #[test]
2044    fn test_anonymize_unchanged_when_config_false() {
2045        let _g = env_var_lock();
2046        // SAFETY: serialized via env_var_lock.
2047        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2048        let cfg = AppConfig::default();
2049        // Default config is false / None for identity.anonymize_default.
2050        apply_anonymize_default(&cfg);
2051        assert!(std::env::var("AI_MEMORY_ANONYMIZE").is_err());
2052    }
2053
2054    // ----- bootstrap_serve ----------------------------------------------
2055
2056    #[tokio::test]
2057    async fn test_bootstrap_serve_keyword_tier_no_embedder() {
2058        let env = TestEnv::fresh();
2059        let mut cfg = AppConfig::default();
2060        cfg.tier = Some("keyword".to_string());
2061        let args = args_with_db(&env.db_path);
2062        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2063        // Keyword tier => no embedder, no vector index.
2064        assert!(bs.app_state.embedder.is_none());
2065        let vi = bs.app_state.vector_index.lock().await;
2066        assert!(vi.is_none());
2067        // Two task handles spawned (gc + wal_checkpoint).
2068        assert_eq!(bs.task_handles.len(), 2);
2069        // Cleanly abort the spawned tasks so they don't leak across tests.
2070        for h in bs.task_handles {
2071            h.abort();
2072        }
2073    }
2074
2075    #[tokio::test]
2076    async fn test_bootstrap_serve_with_api_key_logs_enabled() {
2077        let env = TestEnv::fresh();
2078        let mut cfg = AppConfig::default();
2079        cfg.tier = Some("keyword".to_string());
2080        cfg.api_key = Some("test-key".to_string());
2081        let args = args_with_db(&env.db_path);
2082        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2083        assert_eq!(bs.api_key_state.key.as_deref(), Some("test-key"));
2084        for h in bs.task_handles {
2085            h.abort();
2086        }
2087    }
2088
2089    #[tokio::test]
2090    async fn test_bootstrap_serve_federation_disabled_when_quorum_zero() {
2091        let env = TestEnv::fresh();
2092        let mut cfg = AppConfig::default();
2093        cfg.tier = Some("keyword".to_string());
2094        let args = args_with_db(&env.db_path);
2095        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2096        assert!(bs.app_state.federation.is_none());
2097        for h in bs.task_handles {
2098            h.abort();
2099        }
2100    }
2101
2102    // ----- W12-F: deeper coverage --------------------------------------
2103    //
2104    // Targets the gaps left after W6 + W7 + D6: `bootstrap_serve` variants
2105    // that require a populated DB or federation, the `run` dispatch arms
2106    // not yet exercised, `cmd_bench` end-to-end with a tiny workload,
2107    // `cmd_migrate` (sal feature), `urlencoding_minimal` direct test,
2108    // and the gc / wal-checkpoint loop bodies executing through one
2109    // tick with a measurable side effect.
2110
2111    // ----- bootstrap_serve federation enabled ---------------------------
2112
2113    #[tokio::test]
2114    async fn test_bootstrap_serve_federation_enabled_attaches_config() {
2115        // quorum_writes=1 + one peer → FederationConfig::build returns
2116        // Some, so app_state.federation is wired in. Catchup loop is
2117        // disabled (catchup_interval_secs=0) — the spawn-catchup branch
2118        // is exercised by federation tests; we only verify wiring here.
2119        let env = TestEnv::fresh();
2120        let mut cfg = AppConfig::default();
2121        cfg.tier = Some("keyword".to_string());
2122        let mut args = args_with_db(&env.db_path);
2123        args.quorum_writes = 1;
2124        args.quorum_peers = vec!["http://127.0.0.1:65530".to_string()];
2125        args.quorum_timeout_ms = 100;
2126        args.catchup_interval_secs = 0;
2127        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2128        assert!(bs.app_state.federation.is_some());
2129        for h in bs.task_handles {
2130            h.abort();
2131        }
2132    }
2133
2134    #[tokio::test]
2135    async fn test_bootstrap_serve_federation_enabled_with_catchup_loop() {
2136        // catchup_interval_secs > 0 → spawn_catchup_loop is invoked.
2137        // We can't directly observe the catchup loop's internal handle
2138        // (federation::spawn_catchup_loop returns a JoinHandle owned
2139        // privately by the federation module), but the side branch
2140        // "catchup loop enabled" runs and the bootstrap completes.
2141        let env = TestEnv::fresh();
2142        let mut cfg = AppConfig::default();
2143        cfg.tier = Some("keyword".to_string());
2144        let mut args = args_with_db(&env.db_path);
2145        args.quorum_writes = 1;
2146        args.quorum_peers = vec!["http://127.0.0.1:65531".to_string()];
2147        args.quorum_timeout_ms = 100;
2148        args.catchup_interval_secs = 3600; // long enough not to fire
2149        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2150        assert!(bs.app_state.federation.is_some());
2151        for h in bs.task_handles {
2152            h.abort();
2153        }
2154    }
2155
2156    #[tokio::test]
2157    async fn test_bootstrap_serve_federation_invalid_peer_errors() {
2158        // FederationConfig::build returns Err on duplicate peer URLs
2159        // (#341). The bootstrap_serve `.context("federation config")`
2160        // wrap turns it into a daemon-startup error.
2161        let env = TestEnv::fresh();
2162        let mut cfg = AppConfig::default();
2163        cfg.tier = Some("keyword".to_string());
2164        let mut args = args_with_db(&env.db_path);
2165        args.quorum_writes = 1;
2166        args.quorum_peers = vec![
2167            "http://127.0.0.1:65532".to_string(),
2168            "http://127.0.0.1:65532/".to_string(), // duplicate after trim
2169        ];
2170        let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
2171        let err = match res {
2172            Ok(_) => panic!("expected error from duplicate peer URLs"),
2173            Err(e) => e,
2174        };
2175        let s = format!("{err:#}");
2176        assert!(
2177            s.contains("federation") || s.contains("duplicate"),
2178            "got: {s}"
2179        );
2180    }
2181
2182    // ----- build_vector_index populated DB ------------------------------
2183
2184    #[test]
2185    fn test_build_vector_index_populated_db_returns_built_index() {
2186        // When the DB has stored embeddings AND the embedder is present,
2187        // `build_vector_index` should return Some(VectorIndex) populated
2188        // with those embeddings rather than an empty one.
2189        let env = TestEnv::fresh();
2190        let conn = db::open(&env.db_path).unwrap();
2191        // Insert one memory + an embedding via the public db helpers.
2192        let now = chrono::Utc::now().to_rfc3339();
2193        let mem = crate::models::Memory {
2194            id: uuid::Uuid::new_v4().to_string(),
2195            tier: crate::models::Tier::Mid,
2196            namespace: "ns".to_string(),
2197            title: "t".to_string(),
2198            content: "c".to_string(),
2199            tags: vec![],
2200            priority: 5,
2201            confidence: 1.0,
2202            source: "test".to_string(),
2203            access_count: 0,
2204            created_at: now.clone(),
2205            updated_at: now,
2206            last_accessed_at: None,
2207            expires_at: None,
2208            metadata: crate::models::default_metadata(),
2209        };
2210        let id = db::insert(&conn, &mem).unwrap();
2211        db::set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
2212        let idx = build_vector_index(&conn, true).expect("populated index");
2213        assert!(
2214            idx.len() >= 1,
2215            "expected non-empty index, got len={}",
2216            idx.len()
2217        );
2218    }
2219
2220    // ----- gc loop with non-empty side effect ---------------------------
2221    //
2222    // The existing `test_spawn_gc_loop_runs_and_can_be_aborted` only
2223    // covers the empty-DB path where db::gc returns 0. Seeding an expired
2224    // memory and pointing the gc loop at it lets the `Ok(n) if n > 0`
2225    // arm fire.
2226
2227    #[tokio::test(start_paused = true)]
2228    async fn test_spawn_gc_loop_purges_expired_memories() {
2229        let env = TestEnv::fresh();
2230        let conn = db::open(&env.db_path).unwrap();
2231        // Insert an expired memory (expires_at in the past).
2232        let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
2233        let now = chrono::Utc::now().to_rfc3339();
2234        let mem = crate::models::Memory {
2235            id: uuid::Uuid::new_v4().to_string(),
2236            tier: crate::models::Tier::Short,
2237            namespace: "ns-gc".to_string(),
2238            title: "stale".to_string(),
2239            content: "stale".to_string(),
2240            tags: vec![],
2241            priority: 1,
2242            confidence: 1.0,
2243            source: "test".to_string(),
2244            access_count: 0,
2245            created_at: now.clone(),
2246            updated_at: now,
2247            last_accessed_at: None,
2248            expires_at: Some(past),
2249            metadata: crate::models::default_metadata(),
2250        };
2251        db::insert(&conn, &mem).unwrap();
2252        drop(conn);
2253
2254        let conn = db::open(&env.db_path).unwrap();
2255        let state: Db = Arc::new(Mutex::new((
2256            conn,
2257            env.db_path.clone(),
2258            ResolvedTtl::default(),
2259            true,
2260        )));
2261        // archive_max_days=Some(1) lets the auto_purge_archive arm
2262        // execute too (covers the second match in the loop body).
2263        let h = spawn_gc_loop(state.clone(), Some(1), Duration::from_secs(60));
2264        // Advance past two full intervals to give both branches multiple
2265        // chances to log under paused time.
2266        tokio::time::advance(Duration::from_secs(61)).await;
2267        tokio::task::yield_now().await;
2268        tokio::time::advance(Duration::from_secs(61)).await;
2269        tokio::task::yield_now().await;
2270        h.abort();
2271        let _ = h.await;
2272    }
2273
2274    // ----- WAL checkpoint loop with measurable cycle --------------------
2275
2276    #[tokio::test(start_paused = true)]
2277    async fn test_spawn_wal_checkpoint_loop_runs_multiple_cycles() {
2278        let env = TestEnv::fresh();
2279        let conn = db::open(&env.db_path).unwrap();
2280        let state: Db = Arc::new(Mutex::new((
2281            conn,
2282            env.db_path.clone(),
2283            ResolvedTtl::default(),
2284            true,
2285        )));
2286        let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(2));
2287        // First sleep is 1s (interval/2), then 2s per cycle. Advance
2288        // past three cycles.
2289        for _ in 0..4 {
2290            tokio::time::advance(Duration::from_secs(2)).await;
2291            tokio::task::yield_now().await;
2292        }
2293        h.abort();
2294        let _ = h.await;
2295    }
2296
2297    // ----- urlencoding_minimal -----------------------------------------
2298
2299    #[test]
2300    fn test_urlencoding_minimal_round_trip() {
2301        // Unreserved characters pass through unchanged.
2302        assert_eq!(urlencoding_minimal("abcXYZ-_.~"), "abcXYZ-_.~");
2303        assert_eq!(urlencoding_minimal("0123456789"), "0123456789");
2304        // Reserved / unsafe characters are percent-encoded.
2305        assert_eq!(urlencoding_minimal("a:b"), "a%3Ab");
2306        assert_eq!(urlencoding_minimal("a/b"), "a%2Fb");
2307        assert_eq!(urlencoding_minimal("a@b"), "a%40b");
2308        assert_eq!(urlencoding_minimal("a+b"), "a%2Bb");
2309        assert_eq!(urlencoding_minimal(" "), "%20");
2310        // Empty string is empty.
2311        assert_eq!(urlencoding_minimal(""), "");
2312        // RFC3339 timestamp shape (sync-daemon real input).
2313        assert_eq!(
2314            urlencoding_minimal("2024-01-02T03:04:05+00:00"),
2315            "2024-01-02T03%3A04%3A05%2B00%3A00"
2316        );
2317    }
2318
2319    // ----- run() dispatch for read-only commands ------------------------
2320    //
2321    // Each test parses a CLI argv via clap, hands the resulting `Cli`
2322    // to `daemon_runtime::run`, and asserts the dispatch path returned
2323    // Ok. We don't assert on stdout because run() writes to the
2324    // process stdout directly — what we care about for coverage is
2325    // that the match arm executed and the inner cli handler returned.
2326
2327    fn no_config_env() -> std::sync::MutexGuard<'static, ()> {
2328        // run() reads `AI_MEMORY_NO_CONFIG` indirectly via the AppConfig
2329        // we pass. We don't rely on the env directly here, but holding
2330        // env_var_lock keeps run() tests serialized so they don't race
2331        // on stdout / global subscribers.
2332        env_var_lock()
2333    }
2334
2335    #[tokio::test]
2336    async fn test_run_dispatch_stats_command() {
2337        let _g = no_config_env();
2338        let env = TestEnv::fresh();
2339        let cfg = AppConfig::default();
2340        let cli =
2341            Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "stats"])
2342                .unwrap();
2343        run(cli, &cfg).await.unwrap();
2344    }
2345
2346    #[tokio::test]
2347    async fn test_run_dispatch_namespaces_command() {
2348        let _g = no_config_env();
2349        let env = TestEnv::fresh();
2350        let cfg = AppConfig::default();
2351        let cli = Cli::try_parse_from([
2352            "ai-memory",
2353            "--db",
2354            env.db_path.to_str().unwrap(),
2355            "namespaces",
2356        ])
2357        .unwrap();
2358        run(cli, &cfg).await.unwrap();
2359    }
2360
2361    #[tokio::test]
2362    async fn test_run_dispatch_export_command() {
2363        let _g = no_config_env();
2364        let env = TestEnv::fresh();
2365        let cfg = AppConfig::default();
2366        let cli =
2367            Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "export"])
2368                .unwrap();
2369        run(cli, &cfg).await.unwrap();
2370    }
2371
2372    #[tokio::test]
2373    async fn test_run_dispatch_list_command() {
2374        let _g = no_config_env();
2375        let env = TestEnv::fresh();
2376        let cfg = AppConfig::default();
2377        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "list"])
2378            .unwrap();
2379        run(cli, &cfg).await.unwrap();
2380    }
2381
2382    #[tokio::test]
2383    async fn test_run_dispatch_search_command() {
2384        let _g = no_config_env();
2385        let env = TestEnv::fresh();
2386        let cfg = AppConfig::default();
2387        let cli = Cli::try_parse_from([
2388            "ai-memory",
2389            "--db",
2390            env.db_path.to_str().unwrap(),
2391            "search",
2392            "anyq",
2393        ])
2394        .unwrap();
2395        run(cli, &cfg).await.unwrap();
2396    }
2397
2398    #[tokio::test]
2399    async fn test_run_dispatch_archive_list_command() {
2400        let _g = no_config_env();
2401        let env = TestEnv::fresh();
2402        let cfg = AppConfig::default();
2403        let cli = Cli::try_parse_from([
2404            "ai-memory",
2405            "--db",
2406            env.db_path.to_str().unwrap(),
2407            "archive",
2408            "list",
2409        ])
2410        .unwrap();
2411        run(cli, &cfg).await.unwrap();
2412    }
2413
2414    #[tokio::test]
2415    async fn test_run_dispatch_agents_list_command() {
2416        let _g = no_config_env();
2417        let env = TestEnv::fresh();
2418        let cfg = AppConfig::default();
2419        let cli = Cli::try_parse_from([
2420            "ai-memory",
2421            "--db",
2422            env.db_path.to_str().unwrap(),
2423            "agents",
2424            "list",
2425        ])
2426        .unwrap();
2427        run(cli, &cfg).await.unwrap();
2428    }
2429
2430    #[tokio::test]
2431    async fn test_run_dispatch_pending_list_command() {
2432        let _g = no_config_env();
2433        let env = TestEnv::fresh();
2434        let cfg = AppConfig::default();
2435        let cli = Cli::try_parse_from([
2436            "ai-memory",
2437            "--db",
2438            env.db_path.to_str().unwrap(),
2439            "pending",
2440            "list",
2441        ])
2442        .unwrap();
2443        run(cli, &cfg).await.unwrap();
2444    }
2445
2446    #[tokio::test]
2447    async fn test_run_dispatch_completions_command() {
2448        let _g = no_config_env();
2449        let env = TestEnv::fresh();
2450        let cfg = AppConfig::default();
2451        let cli = Cli::try_parse_from([
2452            "ai-memory",
2453            "--db",
2454            env.db_path.to_str().unwrap(),
2455            "completions",
2456            "bash",
2457        ])
2458        .unwrap();
2459        run(cli, &cfg).await.unwrap();
2460    }
2461
2462    #[tokio::test]
2463    async fn test_run_dispatch_man_command() {
2464        let _g = no_config_env();
2465        let env = TestEnv::fresh();
2466        let cfg = AppConfig::default();
2467        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "man"])
2468            .unwrap();
2469        run(cli, &cfg).await.unwrap();
2470    }
2471
2472    #[tokio::test]
2473    async fn test_run_dispatch_gc_triggers_post_run_checkpoint() {
2474        // `Gc` is in is_write_command, so result.is_ok() && Some path
2475        // takes the post-run WAL checkpoint branch (lines 638-644).
2476        let _g = no_config_env();
2477        let env = TestEnv::fresh();
2478        let cfg = AppConfig::default();
2479        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "gc"])
2480            .unwrap();
2481        run(cli, &cfg).await.unwrap();
2482    }
2483
2484    #[tokio::test]
2485    async fn test_run_dispatch_resolve_command() {
2486        // Seed two memories, then resolve one as superseding the other.
2487        let _g = no_config_env();
2488        let env = TestEnv::fresh();
2489        let id_a = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "old", "old fact");
2490        let id_b = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "new", "new fact");
2491        let cfg = AppConfig::default();
2492        let cli = Cli::try_parse_from([
2493            "ai-memory",
2494            "--db",
2495            env.db_path.to_str().unwrap(),
2496            "resolve",
2497            &id_a,
2498            &id_b,
2499        ])
2500        .unwrap();
2501        run(cli, &cfg).await.unwrap();
2502    }
2503
2504    #[tokio::test]
2505    async fn test_run_dispatch_get_command() {
2506        let _g = no_config_env();
2507        let env = TestEnv::fresh();
2508        let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2509        let cfg = AppConfig::default();
2510        let cli = Cli::try_parse_from([
2511            "ai-memory",
2512            "--db",
2513            env.db_path.to_str().unwrap(),
2514            "get",
2515            &id,
2516        ])
2517        .unwrap();
2518        run(cli, &cfg).await.unwrap();
2519    }
2520
2521    #[tokio::test]
2522    async fn test_run_dispatch_promote_triggers_write_checkpoint() {
2523        // `Promote` is in is_write_command — covers the post-run
2524        // checkpoint branch on a different command.
2525        let _g = no_config_env();
2526        let env = TestEnv::fresh();
2527        let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2528        let cfg = AppConfig::default();
2529        let cli = Cli::try_parse_from([
2530            "ai-memory",
2531            "--db",
2532            env.db_path.to_str().unwrap(),
2533            "promote",
2534            &id,
2535        ])
2536        .unwrap();
2537        run(cli, &cfg).await.unwrap();
2538    }
2539
2540    // ----- run() dispatch for bench (cmd_bench end-to-end) --------------
2541
2542    #[tokio::test]
2543    async fn test_run_dispatch_bench_smoke_runs_one_iteration() {
2544        // iterations=1, warmup=0 keeps the workload tiny. The bench
2545        // body builds an in-memory DB internally — no on-disk side
2546        // effects. Covers cmd_bench from top to bottom on the
2547        // human-readable, no-baseline, no-history path.
2548        let _g = no_config_env();
2549        let env = TestEnv::fresh();
2550        let cfg = AppConfig::default();
2551        let cli = Cli::try_parse_from([
2552            "ai-memory",
2553            "--db",
2554            env.db_path.to_str().unwrap(),
2555            "bench",
2556            "--iterations",
2557            "1",
2558            "--warmup",
2559            "0",
2560        ])
2561        .unwrap();
2562        // Bench may fail the budget on a paused-time iter=1 run; we
2563        // accept either Ok or Err here — coverage is the goal.
2564        let _ = run(cli, &cfg).await;
2565    }
2566
2567    #[tokio::test]
2568    async fn test_run_dispatch_bench_json_with_history() {
2569        // Covers --json branch + --history append branch of cmd_bench.
2570        let _g = no_config_env();
2571        let env = TestEnv::fresh();
2572        let history = env.db_path.with_file_name("hist.jsonl");
2573        let cfg = AppConfig::default();
2574        let cli = Cli::try_parse_from([
2575            "ai-memory",
2576            "--db",
2577            env.db_path.to_str().unwrap(),
2578            "bench",
2579            "--iterations",
2580            "1",
2581            "--warmup",
2582            "0",
2583            "--json",
2584            "--history",
2585            history.to_str().unwrap(),
2586        ])
2587        .unwrap();
2588        let _ = run(cli, &cfg).await;
2589        // History file should now exist with at least one line.
2590        if history.exists() {
2591            let content = std::fs::read_to_string(&history).unwrap();
2592            assert!(content.contains("captured_at") || !content.is_empty());
2593        }
2594    }
2595
2596    // ----- run() dispatch for migrate (sal feature) --------------------
2597
2598    #[cfg(feature = "sal")]
2599    #[tokio::test]
2600    async fn test_run_dispatch_migrate_sqlite_to_sqlite_dry_run() {
2601        // Covers cmd_migrate happy path + dry-run / human-output branch.
2602        let _g = no_config_env();
2603        let src_env = TestEnv::fresh();
2604        let dst_env = TestEnv::fresh();
2605        // Seed source so migrate has work to do.
2606        crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2607        let from = format!("sqlite://{}", src_env.db_path.display());
2608        let to = format!("sqlite://{}", dst_env.db_path.display());
2609        let cfg = AppConfig::default();
2610        let cli = Cli::try_parse_from([
2611            "ai-memory",
2612            "--db",
2613            src_env.db_path.to_str().unwrap(),
2614            "migrate",
2615            "--from",
2616            &from,
2617            "--to",
2618            &to,
2619            "--dry-run",
2620        ])
2621        .unwrap();
2622        run(cli, &cfg).await.unwrap();
2623    }
2624
2625    #[cfg(feature = "sal")]
2626    #[tokio::test]
2627    async fn test_run_dispatch_migrate_json_output() {
2628        // Covers cmd_migrate --json branch.
2629        let _g = no_config_env();
2630        let src_env = TestEnv::fresh();
2631        let dst_env = TestEnv::fresh();
2632        crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2633        let from = format!("sqlite://{}", src_env.db_path.display());
2634        let to = format!("sqlite://{}", dst_env.db_path.display());
2635        let cfg = AppConfig::default();
2636        let cli = Cli::try_parse_from([
2637            "ai-memory",
2638            "--db",
2639            src_env.db_path.to_str().unwrap(),
2640            "migrate",
2641            "--from",
2642            &from,
2643            "--to",
2644            &to,
2645            "--json",
2646        ])
2647        .unwrap();
2648        run(cli, &cfg).await.unwrap();
2649    }
2650
2651    // ----- run() with passphrase file (covers lines 372-374) ------------
2652
2653    #[tokio::test]
2654    async fn test_run_with_db_passphrase_file_exports_env() {
2655        // Covers the `--db-passphrase-file` branch in run() (lines
2656        // 371-375) which calls passphrase_from_file then sets
2657        // AI_MEMORY_DB_PASSPHRASE in the environment.
2658        let _g = env_var_lock();
2659        // SAFETY: serialized via env_var_lock.
2660        unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2661        let env = TestEnv::fresh();
2662        let pass_path = env.db_path.with_file_name("pass");
2663        std::fs::write(&pass_path, "test-passphrase\n").unwrap();
2664        let cfg = AppConfig::default();
2665        let cli = Cli::try_parse_from([
2666            "ai-memory",
2667            "--db",
2668            env.db_path.to_str().unwrap(),
2669            "--db-passphrase-file",
2670            pass_path.to_str().unwrap(),
2671            "stats",
2672        ])
2673        .unwrap();
2674        run(cli, &cfg).await.unwrap();
2675        // Env var is now set.
2676        assert_eq!(
2677            std::env::var("AI_MEMORY_DB_PASSPHRASE").unwrap(),
2678            "test-passphrase"
2679        );
2680        // SAFETY: serialized via env_var_lock.
2681        unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2682    }
2683
2684    // ----- init_tracing idempotence ------------------------------------
2685
2686    #[test]
2687    fn test_init_tracing_is_idempotent() {
2688        // Covers init_tracing — second call is a harmless no-op
2689        // (try_init returns Err which we ignore). Calling twice from
2690        // the same test exercises the second-call path on a process
2691        // that may or may not already have a global subscriber.
2692        init_tracing();
2693        init_tracing();
2694    }
2695
2696    // ----- serve_http_with_shutdown_future smoke -----------------------
2697    //
2698    // The non-TLS branch of `serve()` delegates here; cover the body
2699    // by binding to a free port, requesting /health, then shutting
2700    // down. This also covers the production code path that
2701    // `daemon_runtime::serve()` uses for the non-TLS case.
2702
2703    #[tokio::test]
2704    async fn test_serve_http_with_shutdown_future_serves_then_stops() {
2705        let env = TestEnv::fresh();
2706        let app_state = keyword_app_state(&env.db_path);
2707        let api_key_state = ApiKeyState { key: None };
2708        // Pick a free port via a transient bind.
2709        let port = {
2710            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2711            let p = l.local_addr().unwrap().port();
2712            drop(l);
2713            p
2714        };
2715        let addr = format!("127.0.0.1:{port}");
2716        let shutdown = Arc::new(Notify::new());
2717        let shutdown_clone = shutdown.clone();
2718        let handle = tokio::spawn(async move {
2719            serve_http_with_shutdown_future(&addr, api_key_state, app_state, async move {
2720                shutdown_clone.notified().await;
2721            })
2722            .await
2723        });
2724        // Give the server a moment to bind, then poke /health.
2725        for _ in 0..40 {
2726            if let Ok(client) = reqwest::Client::builder()
2727                .timeout(Duration::from_millis(200))
2728                .build()
2729                && client
2730                    .get(format!("http://127.0.0.1:{port}/api/v1/health"))
2731                    .send()
2732                    .await
2733                    .is_ok()
2734            {
2735                break;
2736            }
2737            tokio::time::sleep(Duration::from_millis(50)).await;
2738        }
2739        shutdown.notify_one();
2740        let res = handle.await.unwrap();
2741        assert!(res.is_ok(), "serve future returned: {res:?}");
2742    }
2743
2744    // ----- bind error surfacing ----------------------------------------
2745
2746    #[tokio::test]
2747    async fn test_serve_http_with_shutdown_future_bind_failure_errors() {
2748        // An unbindable address (port 1 on Linux/macOS without root)
2749        // should return an Err with the bind context. This covers the
2750        // `with_context` path on the TcpListener::bind line.
2751        let env = TestEnv::fresh();
2752        let app_state = keyword_app_state(&env.db_path);
2753        let api_key_state = ApiKeyState { key: None };
2754        // 0.0.0.0:0 succeeds; we want a guaranteed failure. Bind to
2755        // port 1 which requires privileged perms — except on macOS in
2756        // some configs that may succeed. Use a clearly invalid address
2757        // form instead to force a bind-time error.
2758        let res = serve_http_with_shutdown_future(
2759            "definitely-not-an-address:99999",
2760            api_key_state,
2761            app_state,
2762            async {},
2763        )
2764        .await;
2765        assert!(res.is_err(), "expected bind error, got: {res:?}");
2766    }
2767}