Skip to main content

ai_memory/
daemon_runtime.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Daemon runtime — orchestration shell for the `ai-memory` binary.
5//!
6//! W6 lifted `serve()` and the top-level dispatch out of `main.rs` so the
7//! production HTTP daemon, the integration test harness, and the
8//! coverage-instrumented tests in this module all share one source of
9//! truth. `main.rs` keeps its `#[tokio::main]` entry point but immediately
10//! delegates here for every subcommand.
11//!
12//! ## Public surface (post-W6)
13//!
14//! - [`run`] — top-level CLI dispatch (called from `main()`).
15//! - [`serve`] — full HTTP daemon body (TLS or plain).
16//! - [`bootstrap_serve`] — testable struct-returning state builder.
17//! - [`build_router`] — composition wrapper around `lib::build_router`.
18//! - [`build_embedder`], [`build_vector_index`] — single canonical builders
19//!   used by both `serve()` and `cli::recall::run`.
20//! - [`spawn_gc_loop`], [`spawn_wal_checkpoint_loop`] — daemon background
21//!   tasks, returning a [`JoinHandle`] so callers can abort on shutdown.
22//! - [`is_write_command`] — write-command predicate driving the post-write
23//!   WAL checkpoint.
24//! - [`passphrase_from_file`], [`apply_anonymize_default`] — startup helpers.
25//!
26//! ## Pre-W6 helpers retained
27//!
28//! - [`serve_http_with_shutdown`], [`serve_http_with_shutdown_future`] —
29//!   the in-process HTTP harness the integration suite drives.
30//! - [`run_sync_daemon_with_shutdown`],
31//!   [`run_sync_daemon_with_shutdown_using_client`],
32//!   [`sync_cycle_once`] — the sync-daemon body.
33//! - [`run_curator_daemon_with_shutdown`],
34//!   [`run_curator_daemon_with_primitives`] — the curator-daemon body.
35
36use std::io::Write as _;
37use std::path::Path;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::time::{Duration, Instant};
42
43use anyhow::{Context, Result};
44use axum::Router;
45use clap::{Args, CommandFactory, Parser, Subcommand};
46use clap_complete::{Shell, generate};
47use rusqlite::Connection;
48use tokio::sync::{Mutex, Notify};
49use tokio::task::JoinHandle;
50use tracing_subscriber::EnvFilter;
51
52use crate::cli::agents::{AgentsArgs, PendingArgs};
53use crate::cli::archive::ArchiveArgs;
54use crate::cli::audit::AuditArgs;
55use crate::cli::backup::{BackupArgs, RestoreArgs};
56use crate::cli::boot::BootArgs;
57use crate::cli::consolidate::{AutoConsolidateArgs, ConsolidateArgs};
58use crate::cli::crud::{DeleteArgs, GetArgs, ListArgs};
59use crate::cli::curator::CuratorArgs;
60use crate::cli::forget::ForgetArgs;
61use crate::cli::install::InstallArgs;
62use crate::cli::io::{ImportArgs, MineArgs};
63use crate::cli::link::{LinkArgs, ResolveArgs};
64use crate::cli::logs::LogsArgs;
65use crate::cli::promote::PromoteArgs;
66use crate::cli::recall::RecallArgs;
67use crate::cli::search::SearchArgs;
68use crate::cli::store::StoreArgs;
69use crate::cli::sync::{SyncArgs, SyncDaemonArgs};
70use crate::cli::update::UpdateArgs;
71use crate::cli::wrap::WrapArgs;
72use crate::config::{AppConfig, FeatureTier};
73use crate::embeddings::Embedder;
74use crate::handlers::{ApiKeyState, AppState, Db};
75use crate::hnsw::VectorIndex;
76use crate::{bench, cli, db, embeddings, federation, hnsw, llm, mcp, tls};
77
78#[cfg(feature = "sal")]
79use crate::migrate;
80
81const DEFAULT_DB: &str = "ai-memory.db";
82const DEFAULT_PORT: u16 = 9077;
83const GC_INTERVAL_SECS: u64 = 1800;
84/// WAL auto-checkpoint cadence in the HTTP daemon. Bounds `*-wal`
85/// file growth between `SQLite`'s internal page-count checkpoints.
86const WAL_CHECKPOINT_INTERVAL_SECS: u64 = 600;
87
88// ---------------------------------------------------------------------------
89// Clap-derived CLI surface
90// ---------------------------------------------------------------------------
91//
92// The clap structs live in the lib crate so `daemon_runtime::run` can
93// take them as parameters. `main.rs` re-exports `Cli` and immediately
94// delegates here.
95
96#[derive(Parser)]
97#[command(
98    name = "ai-memory",
99    version,
100    about = "AI-agnostic persistent memory — MCP server, HTTP API, and CLI for any AI platform"
101)]
102pub struct Cli {
103    #[command(subcommand)]
104    pub command: Command,
105    #[arg(long, env = "AI_MEMORY_DB", default_value = DEFAULT_DB, global = true)]
106    pub db: PathBuf,
107    /// Output as JSON (machine-parseable)
108    #[arg(long, global = true, default_value_t = false)]
109    pub json: bool,
110    /// Agent identifier used for store operations. If unset, an NHI-hardened
111    /// default is synthesized (see `ai-memory store --help`). Accepts the
112    /// `AI_MEMORY_AGENT_ID` environment variable as a fallback.
113    #[arg(long, env = "AI_MEMORY_AGENT_ID", global = true)]
114    pub agent_id: Option<String>,
115    /// v0.6.0.0: path to a file containing the `SQLCipher` passphrase.
116    /// Only meaningful when the binary was built with
117    /// `--features sqlcipher` (standard builds ignore this flag). File
118    /// must be root-readable (mode 0400 recommended). The passphrase is
119    /// read once at startup and exported as `AI_MEMORY_DB_PASSPHRASE`
120    /// for the duration of the process — passing the passphrase
121    /// directly as an env var or as a flag value leaks to the process
122    /// list (`ps -E`) and shell history.
123    #[arg(long, global = true, value_name = "PATH")]
124    pub db_passphrase_file: Option<PathBuf>,
125}
126
127#[derive(Subcommand)]
128pub enum Command {
129    /// Start the HTTP memory daemon
130    Serve(ServeArgs),
131    /// Run as an MCP (Model Context Protocol) tool server over stdio
132    Mcp {
133        /// Feature tier: keyword (FTS only) or semantic (embeddings + FTS)
134        #[arg(long, default_value = "semantic")]
135        tier: String,
136        /// v0.6.4 — Tool surface profile. One of `core`, `graph`, `admin`,
137        /// `power`, `full`, or a comma-separated custom list (e.g.,
138        /// `core,graph,archive`). Default `core` (5 tools). Resolution
139        /// order: this CLI flag > `AI_MEMORY_PROFILE` env > `[mcp].profile`
140        /// in config.toml > `core`. Set `--profile full` to reproduce
141        /// v0.6.3 surface 1:1 (43 tools).
142        #[arg(long, env = "AI_MEMORY_PROFILE")]
143        profile: Option<String>,
144    },
145    /// Store a new memory
146    Store(StoreArgs),
147    /// Update an existing memory by ID
148    Update(UpdateArgs),
149    /// Recall memories relevant to a context
150    Recall(RecallArgs),
151    /// Search memories by text
152    Search(SearchArgs),
153    /// Retrieve a memory by ID
154    Get(GetArgs),
155    /// List memories
156    List(ListArgs),
157    /// Delete a memory by ID
158    Delete(DeleteArgs),
159    /// Promote a memory to long-term
160    Promote(PromoteArgs),
161    /// Delete memories matching a pattern
162    Forget(ForgetArgs),
163    /// Link two memories
164    Link(LinkArgs),
165    /// Consolidate multiple memories into one
166    Consolidate(ConsolidateArgs),
167    /// Run garbage collection
168    Gc,
169    /// Show statistics
170    Stats,
171    /// List all namespaces
172    Namespaces,
173    /// Export all memories as JSON
174    Export,
175    /// Import memories from JSON (stdin)
176    Import(ImportArgs),
177    /// Resolve a contradiction — mark one memory as superseding another
178    Resolve(ResolveArgs),
179    /// Interactive memory shell (REPL)
180    Shell,
181    /// Sync memories between two database files
182    Sync(SyncArgs),
183    /// Run the peer-to-peer sync daemon — continuously exchange memories
184    /// with one or more HTTP peers (Phase 3 Task 3b.1). The defining
185    /// grand-slam capability: two agents on two machines form a live
186    /// knowledge mesh with no cloud, no login, no `SaaS`.
187    SyncDaemon(SyncDaemonArgs),
188    /// Auto-consolidate short-term memories by namespace
189    AutoConsolidate(AutoConsolidateArgs),
190    /// Generate shell completions
191    Completions(CompletionsArgs),
192    /// Generate man page
193    Man,
194    /// Import memories from historical conversations (Claude, `ChatGPT`, Slack exports)
195    Mine(MineArgs),
196    /// Manage the memory archive (list, restore, purge, stats)
197    Archive(ArchiveArgs),
198    /// Register or list agents (Task 1.3)
199    Agents(AgentsArgs),
200    /// List / approve / reject governance-pending actions (Task 1.9)
201    Pending(PendingArgs),
202    /// v0.6.0.0: snapshot the `SQLite` database to a timestamped backup
203    /// file. Uses `SQLite` `VACUUM INTO` which is hot-backup safe (no daemon
204    /// stop required). Writes a `manifest.json` alongside (sha256 + version).
205    Backup(BackupArgs),
206    /// v0.6.0.0: restore the `SQLite` database from a backup file written
207    /// by `ai-memory backup`. Verifies the manifest sha256 before
208    /// replacing the current DB. The current DB is moved aside as a safety
209    /// net before the replacement.
210    Restore(RestoreArgs),
211    /// v0.6.1: run the autonomous curator. `--once` runs a single sweep
212    /// and prints a JSON report; `--daemon` loops with `--interval-secs`
213    /// between cycles. Auto-tags memories without tags and flags
214    /// contradictions against nearby siblings in the same namespace.
215    Curator(CuratorArgs),
216    /// v0.6.3 (Pillar 3 / Stream E): run the canonical performance
217    /// workload and print measured p50/p95/p99 against the budgets in
218    /// `PERFORMANCE.md`. Each invocation seeds a disposable temp DB so
219    /// the user's main DB is untouched. Exits non-zero when any p95
220    /// exceeds its budget by more than the published 10% tolerance.
221    Bench(BenchArgs),
222    /// v0.7: migrate memories between SAL backends. Gated behind
223    /// `--features sal`. Reads pages via `MemoryStore::list`, writes
224    /// via `MemoryStore::store`. Idempotent: source ids are preserved
225    /// and both adapters upsert on id.
226    #[cfg(feature = "sal")]
227    Migrate(MigrateArgs),
228    /// v0.6.3.1 (P7 / R7): operator-visible health dashboard. Reads
229    /// Capabilities v2 (P1) + data integrity surfaces (P2) + recall
230    /// observability (P3). With `--remote <url>` becomes a fleet doctor
231    /// at T3+. Read-only — never mutates the database. Exits 0 on a
232    /// healthy report, 2 on critical findings, and 1 on warnings when
233    /// `--fail-on-warn` is passed.
234    Doctor(DoctorCliArgs),
235    /// Issue #487: emit session-boot context. Universal primitive every
236    /// AI-agent integration recipe (Claude Code SessionStart hook, Cursor /
237    /// Cline / Continue / Windsurf system-message, Codex / Apps SDK /
238    /// Agent SDK programmatic prepend, OpenClaw built-in, local models
239    /// via LM Studio / Ollama / vLLM) calls before the agent's first turn.
240    /// Read-only, fast, never blocks. With `--quiet` (recommended for
241    /// hooks) a missing DB exits 0 with empty stdout.
242    Boot(BootArgs),
243    /// Issue #487 PR-2: wire `ai-memory boot` and the `ai-memory-mcp`
244    /// server into AI agents' config files (Claude Code SessionStart hook,
245    /// Cursor / Cline / Continue / Windsurf / OpenClaw MCP config). Default
246    /// is `--dry-run` (prints the diff, writes nothing). Pass `--apply` to
247    /// commit. Pass `--uninstall --apply` to remove a previously-installed
248    /// managed block.
249    Install(InstallArgs),
250    /// Issue #487 PR-6: cross-platform Rust replacement for the bash /
251    /// PowerShell wrappers PR-1 shipped in the integration recipes. Runs
252    /// `ai-memory boot` in-process, builds a system message, then spawns
253    /// the named agent CLI with the system message delivered via the
254    /// strategy chosen by `default_strategy(<agent>)` (or an explicit
255    /// `--system-flag` / `--system-env` / `--message-file-flag`
256    /// override). Exit code is propagated from the wrapped agent.
257    Wrap(WrapArgs),
258    /// Issue #487 PR-5: operator-facing CLI for the operational logging
259    /// facility (`tail`, `cat`, `archive`, `purge`). Default-OFF — emits
260    /// nothing useful unless `[logging] enabled = true` is set in
261    /// `config.toml`.
262    Logs(LogsArgs),
263    /// Issue #487 PR-5: operator-facing CLI for the security audit
264    /// trail (`verify`, `tail`, `path`). Default-OFF — emits nothing
265    /// useful unless `[audit] enabled = true` is set in `config.toml`.
266    Audit(AuditArgs),
267}
268
269/// Arguments for the `doctor` subcommand. Lives next to `Cli` so clap
270/// derives them automatically; the actual report logic lives in
271/// `cli::doctor::run`.
272#[derive(Args)]
273pub struct DoctorCliArgs {
274    /// Query a remote ai-memory daemon's HTTP capabilities + stats
275    /// endpoints instead of opening the local DB. Sections that need
276    /// raw SQL access render as N/A in this mode.
277    #[arg(long, value_name = "URL")]
278    pub remote: Option<String>,
279    /// Emit the report as JSON instead of human-readable text. Useful
280    /// for CI consumers and for `jq`-style filtering.
281    #[arg(long)]
282    pub json: bool,
283    /// Exit 1 when at least one section is at WARN severity. Without
284    /// this flag, warnings keep exit 0; criticals always exit 2.
285    #[arg(long)]
286    pub fail_on_warn: bool,
287    /// v0.6.4-004 — print per-tool, per-family, and per-profile token
288    /// costs (`cl100k_base`) instead of the regular health report.
289    /// Combined with `--json` returns a structured payload for CI.
290    /// Combined with `--profile <name>` reports the cost under that
291    /// hypothetical profile in addition to the active default.
292    #[arg(long)]
293    pub tokens: bool,
294    /// v0.6.4-004 — when used with `--tokens`, evaluate cost under this
295    /// hypothetical profile. Defaults to `core` (the v0.6.4 default).
296    /// Accepts the same vocabulary as `ai-memory mcp --profile`.
297    #[arg(long, value_name = "PROFILE")]
298    pub profile: Option<String>,
299    /// v0.6.4-004 — dump the full per-tool size table as JSON. Implies
300    /// `--tokens`. Used by CI and benchmarks to capture the source-of-
301    /// truth size data without parsing the rendered report.
302    #[arg(long)]
303    pub raw_table: bool,
304}
305
306#[derive(Args)]
307pub struct BenchArgs {
308    /// Measured iterations per operation. Clamped to `[1, 100_000]`.
309    #[arg(long, default_value_t = bench::DEFAULT_ITERATIONS)]
310    pub iterations: usize,
311    /// Warmup iterations discarded from the percentile sample.
312    /// Clamped to `[0, 10_000]`.
313    #[arg(long, default_value_t = bench::DEFAULT_WARMUP)]
314    pub warmup: usize,
315    /// Emit results as JSON instead of the human-readable table.
316    #[arg(long)]
317    pub json: bool,
318    /// Path to a previous `bench --json` payload. When supplied, the
319    /// fresh run is compared per-operation against this baseline and
320    /// the process exits non-zero if any measured p95 exceeds the
321    /// baseline by more than `--regression-threshold` percent.
322    /// Independent of the absolute-budget guard.
323    #[arg(long, value_name = "PATH")]
324    pub baseline: Option<String>,
325    /// Allowed p95 growth (percent) over the `--baseline` reading
326    /// before a row is flagged as a regression. Clamped to
327    /// `[0.0, 1000.0]`. Has no effect without `--baseline`.
328    #[arg(long, default_value_t = bench::DEFAULT_REGRESSION_THRESHOLD_PCT)]
329    pub regression_threshold: f64,
330    /// Append this run to a JSONL history file (one self-describing
331    /// JSON object per line). Creates the file and any missing parent
332    /// directories on first call. Each entry carries `captured_at`
333    /// (RFC3339), `iterations`, `warmup`, and the same `results` array
334    /// `--json` emits — long-running campaigns can build a regression
335    /// dataset to feed downstream tooling. The CLI table / JSON output
336    /// still prints; this flag only adds the append side effect.
337    #[arg(long, value_name = "PATH")]
338    pub history: Option<PathBuf>,
339}
340
341#[cfg(feature = "sal")]
342#[derive(Args)]
343pub struct MigrateArgs {
344    /// Source URL. `sqlite:///path/to/file.db` or
345    /// `postgres://user:pass@host:port/dbname`.
346    #[arg(long)]
347    pub from: String,
348    /// Destination URL. Same URL shape as `--from`.
349    #[arg(long)]
350    pub to: String,
351    /// Page size. Clamped to [1, 10000]. Default 1000.
352    #[arg(long, default_value_t = 1000)]
353    pub batch: usize,
354    /// Only migrate memories in this namespace.
355    #[arg(long)]
356    pub namespace: Option<String>,
357    /// Emit the report but do NOT write to the destination.
358    #[arg(long)]
359    pub dry_run: bool,
360    /// Emit the report as JSON rather than human-readable text.
361    #[arg(long)]
362    pub json: bool,
363}
364
365#[derive(Args)]
366pub struct ServeArgs {
367    #[arg(long, default_value = "127.0.0.1")]
368    pub host: String,
369    #[arg(long, default_value_t = DEFAULT_PORT)]
370    pub port: u16,
371    /// Path to PEM-encoded TLS certificate (may include the full chain).
372    /// Passing both `--tls-cert` and `--tls-key` switches `serve` to
373    /// HTTPS. rustls under the hood — no OpenSSL dep. Absent both
374    /// flags = plain HTTP (same as every previous release).
375    #[arg(long, requires = "tls_key")]
376    pub tls_cert: Option<PathBuf>,
377    /// Path to PEM-encoded TLS private key (PKCS#8 or RSA).
378    #[arg(long, requires = "tls_cert")]
379    pub tls_key: Option<PathBuf>,
380    /// Path to a file containing SHA-256 fingerprints of trusted client
381    /// certificates, one per line (case-insensitive hex, optionally with
382    /// `:` separators; comments start with `#`). When set, `serve`
383    /// demands client-cert mTLS on every connection and refuses any peer
384    /// whose cert fingerprint is not on the list. Requires `--tls-cert`
385    /// and `--tls-key`. This is the peer-mesh identity gate — a peer
386    /// without an authorised cert can't even open a TCP connection, let
387    /// alone hit `/sync/push`. Layer 2 of the peer-mesh crypto stack;
388    /// attested `agent_id` extraction (Layer 2b) lands post-v0.6.0.
389    #[arg(long, requires = "tls_cert")]
390    pub mtls_allowlist: Option<PathBuf>,
391    /// Seconds to wait for in-flight requests to complete on graceful
392    /// shutdown (SIGINT). Default 30. Bumped from 10 in v0.6.0 because
393    /// large `/sync/push` batches can take longer than 10s under load
394    /// (red-team #233).
395    #[arg(long, default_value_t = 30)]
396    pub shutdown_grace_secs: u64,
397
398    // -------- v0.7 federation (ADR-0001) ---------------------------
399    /// W-of-N write quorum. When >=1 and `--quorum-peers` is non-empty,
400    /// every HTTP write fans out to every peer and returns OK only
401    /// after the local commit + W-1 peer acks land within
402    /// `--quorum-timeout-ms`. Default 0 = federation disabled, daemon
403    /// behaves exactly like v0.6.0.
404    #[arg(long, default_value_t = 0)]
405    pub quorum_writes: usize,
406    /// Comma-separated list of peer base URLs. Each peer is assumed to
407    /// expose `POST /api/v1/sync/push` — the same endpoint the
408    /// sync-daemon already uses.
409    #[arg(long, value_delimiter = ',')]
410    pub quorum_peers: Vec<String>,
411    /// Deadline for quorum-ack collection. After this many ms the
412    /// write returns 503 `quorum_not_met`. Default 2000.
413    #[arg(long, default_value_t = 2000)]
414    pub quorum_timeout_ms: u64,
415    /// Optional mTLS client cert for outbound federation POSTs. Same
416    /// cert material the sync-daemon's `--client-cert` accepts.
417    #[arg(long)]
418    pub quorum_client_cert: Option<PathBuf>,
419    /// Optional mTLS client key for outbound federation POSTs.
420    #[arg(long)]
421    pub quorum_client_key: Option<PathBuf>,
422    /// Optional root CA cert to trust for outbound federation HTTPS.
423    /// Required whenever peers present a cert NOT rooted in Mozilla's
424    /// `webpki-roots` bundle (self-signed, private CA, ephemeral test
425    /// CA, etc.) — without this, the reqwest rustls-tls client rejects
426    /// peer certs and every quorum write times out as `quorum_not_met`.
427    /// See #333.
428    #[arg(long)]
429    pub quorum_ca_cert: Option<PathBuf>,
430    /// v0.6.0.1 (#320) — how often, in seconds, the daemon pulls peers
431    /// for any updates it missed while offline or partitioned. 0 disables
432    /// the catchup loop entirely. Default 30s keeps a post-partition
433    /// node convergent within one interval after resume.
434    #[arg(long, default_value_t = 30)]
435    pub catchup_interval_secs: u64,
436}
437
438#[derive(Args)]
439pub struct CompletionsArgs {
440    pub shell: Shell,
441}
442
443// ---------------------------------------------------------------------------
444// Top-level dispatch
445// ---------------------------------------------------------------------------
446
447/// Top-level CLI dispatch. Called from `main()` after `Cli::parse()`.
448///
449/// Handles:
450/// - `--db-passphrase-file` → exports `AI_MEMORY_DB_PASSPHRASE`.
451/// - `is_write_command` → conditional post-run WAL checkpoint.
452/// - The match arm for every `Command` variant.
453#[allow(clippy::too_many_lines)]
454pub async fn run(cli: Cli, app_config: &AppConfig) -> Result<()> {
455    // v0.6.0.0: read the SQLCipher passphrase from a file and export it as
456    // AI_MEMORY_DB_PASSPHRASE for the duration of the process. File path
457    // comes from the --db-passphrase-file flag (global). No-op on standard
458    // SQLite builds (the env var is ignored unless the binary was built
459    // with --features sqlcipher).
460    if let Some(path) = &cli.db_passphrase_file {
461        let passphrase = passphrase_from_file(path)?;
462        // SAFETY: single-threaded startup before any worker threads spawn.
463        unsafe { std::env::set_var("AI_MEMORY_DB_PASSPHRASE", passphrase) };
464    }
465    let db_path = app_config.effective_db(&cli.db);
466    let j = cli.json;
467    let cli_agent_id: Option<String> = cli.agent_id.clone();
468    // Track whether command writes to DB (for WAL checkpoint)
469    let needs_checkpoint = is_write_command(&cli.command);
470    let db_path_for_checkpoint = if needs_checkpoint {
471        Some(db_path.clone())
472    } else {
473        None
474    };
475
476    let result = match cli.command {
477        Command::Serve(a) => serve(db_path, a, app_config).await,
478        Command::Mcp { tier, profile } => {
479            let feature_tier = app_config.effective_tier(Some(&tier));
480            // v0.6.4-001 — resolve profile (CLI/env > config > default core).
481            // Surface parse errors to stderr with the diagnostic that
482            // ProfileParseError already produces (lists valid profiles +
483            // valid families) before exiting.
484            let resolved_profile = match app_config.effective_profile(profile.as_deref()) {
485                Ok(p) => p,
486                Err(e) => {
487                    eprintln!("ai-memory mcp: invalid profile: {e}");
488                    std::process::exit(2);
489                }
490            };
491            mcp::run_mcp_server(&db_path, feature_tier, app_config, &resolved_profile)?;
492            Ok(())
493        }
494        Command::Store(a) => {
495            let stdout = std::io::stdout();
496            let stderr = std::io::stderr();
497            let mut so = stdout.lock();
498            let mut se = stderr.lock();
499            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
500            cli::store::run(
501                &db_path,
502                a,
503                j,
504                app_config,
505                cli_agent_id.as_deref(),
506                &mut out,
507            )
508        }
509        Command::Update(a) => {
510            let stdout = std::io::stdout();
511            let stderr = std::io::stderr();
512            let mut so = stdout.lock();
513            let mut se = stderr.lock();
514            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
515            cli::update::run(&db_path, &a, j, &mut out)
516        }
517        Command::Recall(a) => {
518            let stdout = std::io::stdout();
519            let stderr = std::io::stderr();
520            let mut so = stdout.lock();
521            let mut se = stderr.lock();
522            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
523            cli::recall::run(&db_path, &a, j, app_config, &mut out)
524        }
525        Command::Search(a) => {
526            let stdout = std::io::stdout();
527            let stderr = std::io::stderr();
528            let mut so = stdout.lock();
529            let mut se = stderr.lock();
530            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
531            cli::search::run(&db_path, &a, j, &mut out)
532        }
533        Command::Get(a) => {
534            let stdout = std::io::stdout();
535            let stderr = std::io::stderr();
536            let mut so = stdout.lock();
537            let mut se = stderr.lock();
538            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
539            cli::crud::cmd_get(&db_path, &a, j, &mut out)
540        }
541        Command::List(a) => {
542            let stdout = std::io::stdout();
543            let stderr = std::io::stderr();
544            let mut so = stdout.lock();
545            let mut se = stderr.lock();
546            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
547            cli::crud::cmd_list(&db_path, &a, j, app_config, &mut out)
548        }
549        Command::Delete(a) => {
550            let stdout = std::io::stdout();
551            let stderr = std::io::stderr();
552            let mut so = stdout.lock();
553            let mut se = stderr.lock();
554            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
555            cli::crud::cmd_delete(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
556        }
557        Command::Promote(a) => {
558            let stdout = std::io::stdout();
559            let stderr = std::io::stderr();
560            let mut so = stdout.lock();
561            let mut se = stderr.lock();
562            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
563            cli::promote::cmd_promote(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
564        }
565        Command::Forget(a) => {
566            let stdout = std::io::stdout();
567            let stderr = std::io::stderr();
568            let mut so = stdout.lock();
569            let mut se = stderr.lock();
570            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
571            cli::forget::cmd_forget(&db_path, &a, j, &mut out)
572        }
573        Command::Link(a) => {
574            let stdout = std::io::stdout();
575            let stderr = std::io::stderr();
576            let mut so = stdout.lock();
577            let mut se = stderr.lock();
578            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
579            cli::link::cmd_link(&db_path, &a, j, &mut out)
580        }
581        Command::Consolidate(a) => {
582            let stdout = std::io::stdout();
583            let stderr = std::io::stderr();
584            let mut so = stdout.lock();
585            let mut se = stderr.lock();
586            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
587            cli::consolidate::run(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
588        }
589        Command::Resolve(a) => {
590            let stdout = std::io::stdout();
591            let stderr = std::io::stderr();
592            let mut so = stdout.lock();
593            let mut se = stderr.lock();
594            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
595            cli::link::cmd_resolve(&db_path, &a, j, &mut out)
596        }
597        Command::Shell => cli::shell::run(&db_path),
598        Command::Sync(a) => {
599            let stdout = std::io::stdout();
600            let stderr = std::io::stderr();
601            let mut so = stdout.lock();
602            let mut se = stderr.lock();
603            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
604            cli::sync::run(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
605        }
606        Command::SyncDaemon(a) => cli::sync::run_daemon(&db_path, a, cli_agent_id.as_deref()).await,
607        Command::AutoConsolidate(a) => {
608            let stdout = std::io::stdout();
609            let stderr = std::io::stderr();
610            let mut so = stdout.lock();
611            let mut se = stderr.lock();
612            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
613            cli::consolidate::run_auto(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
614        }
615        Command::Gc => {
616            let stdout = std::io::stdout();
617            let stderr = std::io::stderr();
618            let mut so = stdout.lock();
619            let mut se = stderr.lock();
620            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
621            cli::gc::run_gc(&db_path, j, app_config, &mut out)
622        }
623        Command::Stats => {
624            let stdout = std::io::stdout();
625            let stderr = std::io::stderr();
626            let mut so = stdout.lock();
627            let mut se = stderr.lock();
628            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
629            cli::gc::run_stats(&db_path, j, &mut out)
630        }
631        Command::Namespaces => {
632            let stdout = std::io::stdout();
633            let stderr = std::io::stderr();
634            let mut so = stdout.lock();
635            let mut se = stderr.lock();
636            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
637            cli::gc::run_namespaces(&db_path, j, &mut out)
638        }
639        Command::Export => {
640            let stdout = std::io::stdout();
641            let stderr = std::io::stderr();
642            let mut so = stdout.lock();
643            let mut se = stderr.lock();
644            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
645            cli::io::export(&db_path, &mut out)
646        }
647        Command::Import(a) => {
648            let stdout = std::io::stdout();
649            let stderr = std::io::stderr();
650            let mut so = stdout.lock();
651            let mut se = stderr.lock();
652            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
653            cli::io::import(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
654        }
655        Command::Completions(a) => {
656            generate(
657                a.shell,
658                &mut Cli::command(),
659                "ai-memory",
660                &mut std::io::stdout(),
661            );
662            Ok(())
663        }
664        Command::Man => {
665            let cmd = Cli::command();
666            let man = clap_mangen::Man::new(cmd);
667            man.render(&mut std::io::stdout())?;
668            Ok(())
669        }
670        Command::Mine(a) => {
671            let stdout = std::io::stdout();
672            let stderr = std::io::stderr();
673            let mut so = stdout.lock();
674            let mut se = stderr.lock();
675            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
676            cli::io::mine(
677                &db_path,
678                a,
679                j,
680                app_config,
681                cli_agent_id.as_deref(),
682                &mut out,
683            )
684        }
685        Command::Archive(a) => {
686            let stdout = std::io::stdout();
687            let stderr = std::io::stderr();
688            let mut so = stdout.lock();
689            let mut se = stderr.lock();
690            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
691            cli::archive::run(&db_path, a, j, &mut out)
692        }
693        Command::Agents(a) => {
694            let stdout = std::io::stdout();
695            let stderr = std::io::stderr();
696            let mut so = stdout.lock();
697            let mut se = stderr.lock();
698            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
699            cli::agents::run_agents(&db_path, a, j, &mut out)
700        }
701        Command::Pending(a) => {
702            let stdout = std::io::stdout();
703            let stderr = std::io::stderr();
704            let mut so = stdout.lock();
705            let mut se = stderr.lock();
706            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
707            cli::agents::run_pending(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
708        }
709        Command::Backup(a) => {
710            let stdout = std::io::stdout();
711            let stderr = std::io::stderr();
712            let mut so = stdout.lock();
713            let mut se = stderr.lock();
714            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
715            cli::backup::run_backup(&db_path, &a, j, &mut out)
716        }
717        Command::Restore(a) => {
718            let stdout = std::io::stdout();
719            let stderr = std::io::stderr();
720            let mut so = stdout.lock();
721            let mut se = stderr.lock();
722            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
723            cli::backup::run_restore(&db_path, &a, j, &mut out)
724        }
725        Command::Curator(a) => {
726            let stdout = std::io::stdout();
727            let stderr = std::io::stderr();
728            let mut so = stdout.lock();
729            let mut se = stderr.lock();
730            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
731            cli::curator::run(&db_path, &a, app_config, &mut out).await
732        }
733        Command::Bench(a) => cmd_bench(&a),
734        #[cfg(feature = "sal")]
735        Command::Migrate(a) => cmd_migrate(&a).await,
736        Command::Doctor(a) => {
737            // P7 / R7. The doctor is read-only; it never sets
738            // `needs_checkpoint`. We compute the exit code from the
739            // overall severity and propagate it via the process-exit
740            // path below so callers (CI, ops scripts) can branch on it.
741            //
742            // The remote mode uses `reqwest::blocking::Client` which
743            // panics when dropped on a tokio runtime thread, so the
744            // entire doctor pass runs inside `spawn_blocking`.
745            let db_path_doctor = db_path.clone();
746            // v0.6.4-004 — `--tokens` (and its alias `--raw-table`) bypass
747            // the regular health pass. Routes to a dedicated tokens
748            // reporter that consumes `crate::sizes::tool_sizes()` and
749            // `crate::profile::Family::for_tool` to roll up cost.
750            if a.tokens || a.raw_table {
751                let stdout = std::io::stdout();
752                let stderr = std::io::stderr();
753                let mut so = stdout.lock();
754                let mut se = stderr.lock();
755                let mut out = cli::CliOutput::from_std(&mut so, &mut se);
756                let exit = cli::doctor::run_tokens(
757                    cli::doctor::TokensArgs {
758                        json: a.json,
759                        raw_table: a.raw_table,
760                        profile: a.profile,
761                    },
762                    &mut out,
763                )?;
764                std::process::exit(exit);
765            }
766            let args = cli::doctor::DoctorArgs {
767                remote: a.remote,
768                json: a.json,
769                fail_on_warn: a.fail_on_warn,
770            };
771            let join = tokio::task::spawn_blocking(move || {
772                let stdout = std::io::stdout();
773                let stderr = std::io::stderr();
774                let mut so = stdout.lock();
775                let mut se = stderr.lock();
776                let mut out = cli::CliOutput::from_std(&mut so, &mut se);
777                cli::doctor::run(&db_path_doctor, &args, &mut out)
778            })
779            .await;
780            match join {
781                Ok(Ok(0)) => Ok(()),
782                Ok(Ok(code)) => std::process::exit(code),
783                Ok(Err(e)) => Err(e),
784                Err(e) => Err(anyhow::anyhow!("doctor task join failed: {e}")),
785            }
786        }
787        Command::Boot(a) => {
788            // Issue #487. Read-only, fast, no embedder, no daemon. Suitable
789            // for invocation from any AI-agent integration (Claude Code
790            // SessionStart hook, Cursor / Cline / Continue / Windsurf
791            // system-message, programmatic prepend in Claude Agent SDK /
792            // OpenAI Apps SDK / Codex CLI, OpenClaw built-in, local models
793            // via LM Studio / Ollama / vLLM).
794            let stdout = std::io::stdout();
795            let stderr = std::io::stderr();
796            let mut so = stdout.lock();
797            let mut se = stderr.lock();
798            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
799            // PR-5: a `boot` invocation is itself an audit-worthy event.
800            // Emission is a no-op when audit is disabled.
801            crate::audit::emit(crate::audit::EventBuilder::new(
802                crate::audit::AuditAction::SessionBoot,
803                crate::audit::actor(
804                    cli_agent_id.as_deref().unwrap_or("anonymous"),
805                    "explicit_or_default",
806                    None,
807                ),
808                crate::audit::target_sweep(a.namespace.as_deref().unwrap_or("auto")),
809            ));
810            cli::boot::run(&db_path, &a, app_config, &mut out)
811        }
812        Command::Install(a) => {
813            // Issue #487 PR-2. Read-only filesystem op against the agent's
814            // config file (NOT the ai-memory DB). Default is dry-run; --apply
815            // is opt-in and writes a backup before mutating anything.
816            let stdout = std::io::stdout();
817            let stderr = std::io::stderr();
818            let mut so = stdout.lock();
819            let mut se = stderr.lock();
820            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
821            cli::install::run(&a, &mut out)
822        }
823        Command::Wrap(a) => {
824            // Issue #487 PR-6. Pure-Rust cross-platform replacement for
825            // the bash / PowerShell wrappers PR-1 shipped in the
826            // integration recipes. Runs boot in-process, builds the
827            // system message, spawns the wrapped agent, and propagates
828            // the agent's exit code via std::process::exit.
829            let stdout = std::io::stdout();
830            let stderr = std::io::stderr();
831            let mut so = stdout.lock();
832            let mut se = stderr.lock();
833            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
834            let code = cli::wrap::run(&db_path, &a, app_config, &mut out)?;
835            // Drop the locks/output before exit so any pending writes
836            // get flushed by the OS on process teardown.
837            drop(out);
838            drop(so);
839            drop(se);
840            if code == 0 {
841                Ok(())
842            } else {
843                std::process::exit(code);
844            }
845        }
846        Command::Logs(a) => {
847            let stdout = std::io::stdout();
848            let stderr = std::io::stderr();
849            let mut so = stdout.lock();
850            let mut se = stderr.lock();
851            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
852            cli::logs::run(a, app_config, &mut out)
853        }
854        Command::Audit(a) => {
855            let stdout = std::io::stdout();
856            let stderr = std::io::stderr();
857            let mut so = stdout.lock();
858            let mut se = stderr.lock();
859            let mut out = cli::CliOutput::from_std(&mut so, &mut se);
860            match cli::audit::run(a, app_config, &mut out)? {
861                0 => Ok(()),
862                code => std::process::exit(code),
863            }
864        }
865    };
866
867    // WAL checkpoint after write commands to prevent unbounded WAL growth
868    if result.is_ok()
869        && let Some(cp_path) = db_path_for_checkpoint
870        && let Ok(conn) = db::open(&cp_path)
871    {
872        let _ = db::checkpoint(&conn);
873    }
874
875    result
876}
877
878// ---------------------------------------------------------------------------
879// is_write_command — predicate for the post-run WAL checkpoint.
880// ---------------------------------------------------------------------------
881
882/// Returns true if `cmd` is a write-class subcommand. The post-run WAL
883/// checkpoint in [`run`] runs only when this returns `true`.
884#[must_use]
885pub fn is_write_command(cmd: &Command) -> bool {
886    matches!(
887        cmd,
888        Command::Store(_)
889            | Command::Update(_)
890            | Command::Delete(_)
891            | Command::Promote(_)
892            | Command::Forget(_)
893            | Command::Link(_)
894            | Command::Consolidate(_)
895            | Command::Resolve(_)
896            | Command::Sync(_)
897            | Command::SyncDaemon(_)
898            | Command::Import(_)
899            | Command::AutoConsolidate(_)
900            | Command::Gc
901    )
902}
903
904// ---------------------------------------------------------------------------
905// Startup helpers (passphrase, anonymize default)
906// ---------------------------------------------------------------------------
907
908/// Read the `SQLCipher` passphrase from `path`. Strips a single trailing
909/// newline / CRLF; rejects an empty passphrase (post-strip) with an error;
910/// preserves all other internal whitespace.
911///
912/// # Errors
913///
914/// - The file cannot be read (e.g. missing, permission denied).
915/// - The passphrase, after stripping the trailing newline, is empty.
916pub fn passphrase_from_file(path: &Path) -> Result<String> {
917    let raw = std::fs::read_to_string(path)
918        .with_context(|| format!("reading passphrase file {}", path.display()))?;
919    let passphrase = raw.trim_end_matches(['\n', '\r']).to_string();
920    if passphrase.is_empty() {
921        anyhow::bail!("passphrase file {} is empty", path.display());
922    }
923    Ok(passphrase)
924}
925
926/// Apply the configured `anonymize_default` to the runtime env: when the
927/// config asks for anonymization but the user hasn't already set
928/// `AI_MEMORY_ANONYMIZE`, set it to `"1"`. Idempotent — repeated calls are
929/// a no-op once the env var is set.
930///
931/// Note: this writes to the process environment; callers must invoke it
932/// from the single-threaded startup region (before any worker threads are
933/// spawned). The production binary calls it from `main()` for that reason.
934pub fn apply_anonymize_default(app_config: &AppConfig) {
935    // #198: config → env mapping for agent_id anonymization. Env var already
936    // set by the caller wins; config is only applied when the env is unset.
937    if app_config.effective_anonymize_default() && std::env::var("AI_MEMORY_ANONYMIZE").is_err() {
938        // SAFETY: single-threaded startup before any worker threads spawn.
939        unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "1") };
940    }
941}
942
943// ---------------------------------------------------------------------------
944// Embedder / vector-index canonical builders
945// ---------------------------------------------------------------------------
946
947/// Construct the [`Embedder`] for a given tier. Returns `None` for the
948/// keyword tier (no embedder requested) and on load failure (caller
949/// degrades to keyword fallback). On failure the diagnostic is emitted
950/// via `tracing::error!` so operators see it in `journalctl`.
951///
952/// This is the single canonical embedder builder used by both `serve()`
953/// (HTTP daemon) and `cli::recall::run` (offline recall). Prior to W6
954/// each call site had its own copy, with subtly different fallback
955/// shapes — the bug at issue #322 was a direct consequence.
956pub async fn build_embedder(feature_tier: FeatureTier, app_config: &AppConfig) -> Option<Embedder> {
957    let tier_config = feature_tier.config();
958    let Some(emb_model) = tier_config.embedding_model else {
959        tracing::info!(
960            "embedder disabled — tier={} keyword-only (FTS5); semantic recall not wired",
961            feature_tier.as_str()
962        );
963        return None;
964    };
965    let embed_url = app_config.effective_embed_url().to_string();
966    // The HF-Hub sync API and candle model-load are blocking CPU work that
967    // internally spin their own tokio runtime. Running them directly in this
968    // async context panics with "Cannot drop a runtime in a context where
969    // blocking is not allowed." Move the whole construction onto the blocking
970    // pool so the inner runtime is owned by a dedicated thread.
971    let build = match tokio::task::spawn_blocking(move || {
972        let embed_client = llm::OllamaClient::new_with_url(&embed_url, "nomic-embed-text")
973            .ok()
974            .map(Arc::new);
975        embeddings::Embedder::for_model(emb_model, embed_client)
976    })
977    .await
978    {
979        Ok(b) => b,
980        Err(e) => {
981            tracing::error!("embedder spawn_blocking join failed: {e}");
982            return None;
983        }
984    };
985    match build {
986        Ok(emb) => {
987            tracing::info!(
988                "embedder loaded ({}) — tier={} semantic recall enabled",
989                emb.model_description(),
990                feature_tier.as_str()
991            );
992            Some(emb)
993        }
994        Err(e) => {
995            // v0.6.2 (#327): make embedder load failures loud. The
996            // prior WARN level was easy to miss in DO droplet logs,
997            // which led to scenario-18 black-holing (semantic recall
998            // falling back to keyword-only without the operator
999            // noticing). An ERROR-level log with an obvious marker
1000            // surfaces this immediately in `journalctl -u ai-memory`
1001            // or tail -f /var/log/ai-memory-serve.log.
1002            tracing::error!(
1003                "EMBEDDER LOAD FAILED — tier={} requested semantic features, \
1004                 but embedder init errored: {e}. Daemon falls back to keyword-only. \
1005                 Semantic recall, sync_push embedding refresh (#322), and HNSW index \
1006                 will be NO-OPS. Check network egress to HuggingFace Hub + available \
1007                 memory for model weights. To force keyword-only explicitly (silences \
1008                 this error), set `tier = \"keyword\"` in config.toml.",
1009                feature_tier.as_str()
1010            );
1011            None
1012        }
1013    }
1014}
1015
1016/// Build the in-memory [`VectorIndex`] from `conn`. When `embedder_present`
1017/// is false, returns `None` (the keyword-only path doesn't need an index).
1018/// When the embedder is present but the DB is empty (or query errors),
1019/// returns `Some(VectorIndex::empty())` so write paths can populate it
1020/// in-place.
1021#[must_use]
1022pub fn build_vector_index(conn: &Connection, embedder_present: bool) -> Option<VectorIndex> {
1023    if !embedder_present {
1024        return None;
1025    }
1026    match db::get_all_embeddings(conn) {
1027        Ok(entries) if !entries.is_empty() => Some(hnsw::VectorIndex::build(entries)),
1028        _ => Some(hnsw::VectorIndex::empty()),
1029    }
1030}
1031
1032// ---------------------------------------------------------------------------
1033// Background tasks (GC, WAL checkpoint)
1034// ---------------------------------------------------------------------------
1035
1036/// Spawn the periodic GC loop. Sleeps `interval`, then runs `db::gc` and
1037/// `db::auto_purge_archive` against the daemon's shared connection. The
1038/// returned [`JoinHandle`] is owned by the caller; `serve()` aborts it on
1039/// shutdown.
1040#[must_use]
1041pub fn spawn_gc_loop(
1042    state: Db,
1043    archive_max_days: Option<i64>,
1044    interval: Duration,
1045) -> JoinHandle<()> {
1046    tokio::spawn(async move {
1047        loop {
1048            tokio::time::sleep(interval).await;
1049            let lock = state.lock().await;
1050            match db::gc(&lock.0, lock.3) {
1051                Ok(n) if n > 0 => tracing::info!("gc: expired {n} memories"),
1052                _ => {}
1053            }
1054            // Auto-purge old archives if configured
1055            match db::auto_purge_archive(&lock.0, archive_max_days) {
1056                Ok(n) if n > 0 => tracing::info!("gc: purged {n} old archived memories"),
1057                _ => {}
1058            }
1059        }
1060    })
1061}
1062
1063/// Spawn the periodic WAL checkpoint loop. First checkpoint runs
1064/// `interval / 2` after start (staggered from the GC loop to avoid
1065/// lock-contention bursts on cold start), then on a fixed cadence.
1066#[must_use]
1067pub fn spawn_wal_checkpoint_loop(state: Db, interval: Duration) -> JoinHandle<()> {
1068    let half = interval / 2;
1069    tokio::spawn(async move {
1070        // First checkpoint runs halfway through the interval so the two
1071        // long-running maintenance tasks never overlap on cold start.
1072        tokio::time::sleep(half).await;
1073        loop {
1074            {
1075                let lock = state.lock().await;
1076                match db::checkpoint(&lock.0) {
1077                    Ok(()) => tracing::debug!("wal checkpoint: ok"),
1078                    Err(e) => tracing::warn!("wal checkpoint failed: {e}"),
1079                }
1080            }
1081            tokio::time::sleep(interval).await;
1082        }
1083    })
1084}
1085
1086// ---------------------------------------------------------------------------
1087// Router composition
1088// ---------------------------------------------------------------------------
1089
1090/// Compose the production HTTP router. Thin wrapper around
1091/// [`crate::build_router`] (the W3-vintage source of truth for the
1092/// route table). `daemon_runtime::build_router` exists so test code in
1093/// this module can build the router without naming `crate::build_router`
1094/// directly, and so future router-composition logic (e.g. middleware
1095/// reorder, custom layers) lives in one place.
1096#[must_use]
1097pub fn build_router(app_state: AppState, api_key_state: ApiKeyState) -> Router {
1098    crate::build_router(api_key_state, app_state)
1099}
1100
1101// ---------------------------------------------------------------------------
1102// serve() — the HTTP daemon body, post-W6 split.
1103// ---------------------------------------------------------------------------
1104
1105/// Aggregated state produced by [`bootstrap_serve`].
1106pub struct ServeBootstrap {
1107    pub app_state: AppState,
1108    pub api_key_state: ApiKeyState,
1109    pub db_state: Db,
1110    pub archive_max_days: Option<i64>,
1111    pub task_handles: Vec<JoinHandle<()>>,
1112}
1113
1114/// Build all daemon state and spawn background tasks. Returns the
1115/// aggregated state without binding any sockets — testable in isolation.
1116pub async fn bootstrap_serve(
1117    db_path: &Path,
1118    args: &ServeArgs,
1119    app_config: &AppConfig,
1120) -> Result<ServeBootstrap> {
1121    let resolved_ttl = app_config.effective_ttl();
1122    let archive_on_gc = app_config.effective_archive_on_gc();
1123    let conn = db::open(db_path)?;
1124
1125    // Issue #219: build the embedder + HNSW index up front so HTTP write
1126    // paths can populate them. Previously the daemon never constructed an
1127    // embedder, silently excluding every HTTP-authored memory from semantic
1128    // recall. Build only when the configured feature tier enables it —
1129    // keyword-only deployments keep their zero-dep, zero-RAM profile.
1130    // Daemon has no per-invocation tier override; honour the config tier.
1131    let feature_tier = app_config.effective_tier(None);
1132    let tier_config = feature_tier.config();
1133    let embedder = build_embedder(feature_tier, app_config).await;
1134    let vector_index = build_vector_index(&conn, embedder.is_some());
1135
1136    let db_state: Db = Arc::new(Mutex::new((
1137        conn,
1138        db_path.to_path_buf(),
1139        resolved_ttl,
1140        archive_on_gc,
1141    )));
1142
1143    // Federation: parsed from --quorum-writes / --quorum-peers. Disabled
1144    // entirely when either is absent — daemon behaves exactly like
1145    // v0.6.0 in that case.
1146    let federation = federation::FederationConfig::build(
1147        args.quorum_writes,
1148        &args.quorum_peers,
1149        std::time::Duration::from_millis(args.quorum_timeout_ms),
1150        args.quorum_client_cert.as_deref(),
1151        args.quorum_client_key.as_deref(),
1152        args.quorum_ca_cert.as_deref(),
1153        format!("host:{}", gethostname::gethostname().to_string_lossy()),
1154    )
1155    .context("federation config")?;
1156
1157    let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
1158
1159    if let Some(ref fed) = federation {
1160        tracing::info!(
1161            "federation enabled: W={} over {} peer(s), timeout {}ms",
1162            fed.policy.w,
1163            fed.peer_count(),
1164            args.quorum_timeout_ms,
1165        );
1166        // v0.6.0.1 (#320) — post-partition catchup poller. Closes the gap
1167        // where a rejoining node only sees post-resume writes.
1168        if args.catchup_interval_secs > 0 {
1169            let interval = std::time::Duration::from_secs(args.catchup_interval_secs);
1170            tracing::info!(
1171                "catchup loop enabled: polling {} peer(s) every {}s",
1172                fed.peer_count(),
1173                args.catchup_interval_secs,
1174            );
1175            federation::spawn_catchup_loop(fed.clone(), db_state.clone(), interval);
1176        } else {
1177            tracing::info!("catchup loop disabled (--catchup-interval-secs=0)");
1178        }
1179    }
1180
1181    let app_state = AppState {
1182        db: db_state.clone(),
1183        embedder: Arc::new(embedder),
1184        vector_index: Arc::new(Mutex::new(vector_index)),
1185        federation: Arc::new(federation),
1186        tier_config: Arc::new(tier_config),
1187        scoring: Arc::new(app_config.effective_scoring()),
1188    };
1189
1190    // Automatic GC.
1191    task_handles.push(spawn_gc_loop(
1192        db_state.clone(),
1193        app_config.archive_max_days,
1194        Duration::from_secs(GC_INTERVAL_SECS),
1195    ));
1196
1197    // v0.6.0 GA: periodic WAL checkpoint. Under continuous writes the WAL
1198    // file grows until SQLite's auto-checkpoint fires (every 1000 pages by
1199    // default) — which is inconsistent timing and can leave the file at
1200    // hundreds of MB between auto-checkpoints. A dedicated task running on
1201    // a fixed cadence keeps the WAL bounded and makes operational storage
1202    // behaviour predictable. We stagger from GC to avoid lock-contention
1203    // bursts. See docs/ARCHITECTURAL_LIMITS.md for why this workaround is
1204    // necessary in a single-connection daemon.
1205    task_handles.push(spawn_wal_checkpoint_loop(
1206        db_state.clone(),
1207        Duration::from_secs(WAL_CHECKPOINT_INTERVAL_SECS),
1208    ));
1209
1210    let api_key_state = ApiKeyState {
1211        key: app_config.api_key.clone(),
1212    };
1213    if api_key_state.key.is_some() {
1214        tracing::info!("API key authentication enabled");
1215    }
1216
1217    Ok(ServeBootstrap {
1218        app_state,
1219        api_key_state,
1220        db_state,
1221        archive_max_days: app_config.archive_max_days,
1222        task_handles,
1223    })
1224}
1225
1226/// Init the tracing subscriber for the HTTP daemon. Idempotent at the
1227/// `tracing-subscriber` level — repeated calls log a warning and no-op
1228/// rather than panic. Split out from `serve()` so test code can opt out.
1229fn init_tracing() {
1230    let _ = tracing_subscriber::fmt()
1231        .with_env_filter(
1232            EnvFilter::from_default_env()
1233                .add_directive("ai_memory=info".parse().unwrap())
1234                .add_directive("tower_http=info".parse().unwrap()),
1235        )
1236        .try_init();
1237}
1238
1239/// Run the HTTP memory daemon. Loads TLS state, builds `AppState`, spawns
1240/// the GC + WAL-checkpoint loops, and binds a listener (TLS or plain HTTP).
1241///
1242/// Behaviour is preserved from the pre-W6 inline `main::serve` body — only
1243/// the structure has changed.
1244#[allow(clippy::too_many_lines)]
1245pub async fn serve(db_path: PathBuf, args: ServeArgs, app_config: &AppConfig) -> Result<()> {
1246    init_tracing();
1247
1248    let bootstrap = bootstrap_serve(&db_path, &args, app_config).await?;
1249
1250    let addr = format!("{}:{}", args.host, args.port);
1251    tracing::info!("database: {}", db_path.display());
1252
1253    // Graceful shutdown with WAL checkpoint
1254    let shutdown_state = bootstrap.db_state.clone();
1255    let shutdown = async move {
1256        let _ = tokio::signal::ctrl_c().await;
1257        tracing::info!("shutting down — checkpointing WAL");
1258        let lock = shutdown_state.lock().await;
1259        let _ = db::checkpoint(&lock.0);
1260    };
1261
1262    // Native TLS (Layer 1): if both --tls-cert and --tls-key are provided,
1263    // bind via axum-server + rustls. Plain HTTP otherwise — backward
1264    // compatible with every prior release. The `requires = …` clap
1265    // attributes prevent the half-configured case.
1266    if let (Some(cert), Some(key)) = (&args.tls_cert, &args.tls_key) {
1267        // rustls 0.23 needs an explicit CryptoProvider; install ring
1268        // before any TLS setup. Idempotent — second install is a
1269        // harmless no-op via ignore.
1270        let _ = rustls::crypto::ring::default_provider().install_default();
1271        // Load TLS / mTLS config BEFORE printing the "listening" log
1272        // so a misconfigured cert / key / allowlist surfaces the error
1273        // first (red-team #248).
1274        let tls_config = if let Some(allowlist_path) = &args.mtls_allowlist {
1275            tracing::info!(
1276                "mTLS enabled — client certs required. Allowlist: {}",
1277                allowlist_path.display()
1278            );
1279            tls::load_mtls_rustls_config(cert, key, allowlist_path).await?
1280        } else {
1281            tracing::warn!(
1282                "TLS enabled but mTLS NOT configured — sync endpoints \
1283                 (/api/v1/sync/push, /api/v1/sync/since) accept any client. \
1284                 Set --mtls-allowlist for production peer-mesh deployments \
1285                 (red-team #231)."
1286            );
1287            tls::load_rustls_config(cert, key).await?
1288        };
1289        let app = build_router(bootstrap.app_state, bootstrap.api_key_state);
1290        tracing::info!("ai-memory listening on https://{addr}");
1291        let socket_addr: std::net::SocketAddr = addr.parse()?;
1292        // axum-server doesn't have a direct graceful-shutdown on the
1293        // TLS builder yet; spawn the signal listener on the Handle
1294        // instead so ctrl_c triggers a graceful shutdown. Window is
1295        // operator-configurable via --shutdown-grace-secs (default 30,
1296        // bumped from 10 in v0.6.0 — red-team #233).
1297        let grace = std::time::Duration::from_secs(args.shutdown_grace_secs);
1298        let handle = axum_server::Handle::new();
1299        let handle_clone = handle.clone();
1300        tokio::spawn(async move {
1301            shutdown.await;
1302            handle_clone.graceful_shutdown(Some(grace));
1303        });
1304        axum_server::bind_rustls(socket_addr, tls_config)
1305            .handle(handle)
1306            .serve(app.into_make_service())
1307            .await?;
1308    } else {
1309        tracing::warn!(
1310            "TLS NOT enabled — sync endpoints (/api/v1/sync/push, \
1311             /api/v1/sync/since) accept any caller over plain HTTP. \
1312             Set --tls-cert + --tls-key + --mtls-allowlist for production \
1313             peer-mesh deployments (red-team #231)."
1314        );
1315        tracing::info!("ai-memory listening on http://{addr}");
1316        // Wave 3 (v0.6.3): the non-TLS path delegates to
1317        // `daemon_runtime::serve_http_with_shutdown_future`, which is the
1318        // same `build_router` + `TcpListener::bind` + `axum::serve` body
1319        // the integration tests drive in-process. Production threads its
1320        // WAL-checkpoint-on-shutdown future in directly so the cleanup
1321        // semantic is preserved verbatim.
1322        serve_http_with_shutdown_future(
1323            &addr,
1324            bootstrap.api_key_state,
1325            bootstrap.app_state,
1326            shutdown,
1327        )
1328        .await?;
1329    }
1330    Ok(())
1331}
1332
1333// ---------------------------------------------------------------------------
1334// cmd_bench / cmd_migrate (no-op for non-sal builds)
1335// ---------------------------------------------------------------------------
1336
1337fn cmd_bench(args: &BenchArgs) -> Result<()> {
1338    let iterations = args.iterations.clamp(1, 100_000);
1339    let warmup = args.warmup.min(10_000);
1340    let regression_threshold = args.regression_threshold.clamp(0.0, 1000.0);
1341    // Bench always seeds a disposable in-memory DB so the operator's
1342    // main DB (and disk) are untouched. SQLite's `:memory:` URL and
1343    // WAL-less mode keep the workload bounded by RAM and CPU.
1344    let conn = db::open(Path::new(":memory:"))?;
1345    let config = bench::BenchConfig {
1346        iterations,
1347        warmup,
1348        namespace: bench::BENCH_NAMESPACE.to_string(),
1349    };
1350    let results = bench::run(&conn, &config)?;
1351
1352    let regressions = if let Some(path) = &args.baseline {
1353        let baseline = bench::load_baseline(Path::new(path))?;
1354        Some(bench::compare_against_baseline(
1355            &results,
1356            &baseline,
1357            regression_threshold,
1358        ))
1359    } else {
1360        None
1361    };
1362
1363    if args.json {
1364        println!(
1365            "{}",
1366            serde_json::to_string_pretty(&serde_json::json!({
1367                "iterations": iterations,
1368                "warmup": warmup,
1369                "results": results,
1370                "regressions": regressions,
1371            }))?
1372        );
1373    } else {
1374        print!("{}", bench::render_table(&results));
1375        if let Some(rows) = &regressions {
1376            println!();
1377            print!("{}", bench::render_regression_table(rows));
1378        }
1379    }
1380
1381    if let Some(history_path) = &args.history {
1382        let captured_at = chrono::Utc::now().to_rfc3339();
1383        bench::append_history(history_path, &captured_at, iterations, warmup, &results)?;
1384        let mut stderr = std::io::stderr().lock();
1385        let _ = writeln!(
1386            stderr,
1387            "bench: appended run to history file {}",
1388            history_path.display()
1389        );
1390    }
1391
1392    let budget_failed = results
1393        .iter()
1394        .any(|r| matches!(r.status, bench::Status::Fail));
1395    let regression_failed = regressions
1396        .as_ref()
1397        .is_some_and(|rows| rows.iter().any(|r| r.regressed));
1398
1399    if budget_failed && regression_failed {
1400        anyhow::bail!(
1401            "bench: at least one operation exceeded its p95 budget by >10% AND regressed >{regression_threshold:.1}% vs baseline"
1402        );
1403    }
1404    if budget_failed {
1405        anyhow::bail!("bench: at least one operation exceeded its p95 budget by >10%");
1406    }
1407    if regression_failed {
1408        anyhow::bail!(
1409            "bench: at least one operation regressed >{regression_threshold:.1}% vs baseline"
1410        );
1411    }
1412    Ok(())
1413}
1414
1415#[cfg(feature = "sal")]
1416async fn cmd_migrate(args: &MigrateArgs) -> Result<()> {
1417    let src = migrate::open_store(&args.from)
1418        .await
1419        .context("open source store")?;
1420    let dst = migrate::open_store(&args.to)
1421        .await
1422        .context("open destination store")?;
1423    let report = migrate::migrate(
1424        src.as_ref(),
1425        dst.as_ref(),
1426        args.batch,
1427        args.namespace.clone(),
1428        args.dry_run,
1429    )
1430    .await;
1431    if args.json {
1432        let value = serde_json::json!({
1433            "from_url": args.from,
1434            "to_url": args.to,
1435            "memories_read": report.memories_read,
1436            "memories_written": report.memories_written,
1437            "batches": report.batches,
1438            "errors": report.errors,
1439            "dry_run": report.dry_run,
1440        });
1441        println!("{}", serde_json::to_string_pretty(&value)?);
1442    } else {
1443        println!("migration report");
1444        println!("  from:              {}", args.from);
1445        println!("  to:                {}", args.to);
1446        println!("  memories_read:     {}", report.memories_read);
1447        println!("  memories_written:  {}", report.memories_written);
1448        println!("  batches:           {}", report.batches);
1449        println!("  dry_run:           {}", report.dry_run);
1450        println!("  errors:            {}", report.errors.len());
1451        for e in &report.errors {
1452            println!("    - {e}");
1453        }
1454    }
1455    if !report.errors.is_empty() {
1456        anyhow::bail!("migration completed with {} error(s)", report.errors.len());
1457    }
1458    Ok(())
1459}
1460
1461// ---------------------------------------------------------------------------
1462// Pre-W6 helpers — in-process HTTP harness, sync-daemon body, curator-daemon body.
1463// ---------------------------------------------------------------------------
1464
1465/// Run the HTTP daemon (plain HTTP, no TLS) with a programmable shutdown.
1466///
1467/// Mirrors the `else` branch of `serve()` in pre-W6 `main.rs` (the non-TLS
1468/// path). Builds the production `Router` via `build_router`, binds a
1469/// `TcpListener` to `addr`, and runs `axum::serve` with a graceful-shutdown
1470/// future that resolves when `shutdown.notify_one()` is called.
1471///
1472/// Tests pass a known port (pick one via `free_port()` and pass
1473/// `127.0.0.1:<port>`). The function returns when shutdown completes;
1474/// callers can `tokio::spawn` it and `notify` to stop.
1475pub async fn serve_http_with_shutdown(
1476    addr: &str,
1477    api_key_state: ApiKeyState,
1478    app_state: AppState,
1479    shutdown: Arc<Notify>,
1480) -> Result<()> {
1481    serve_http_with_shutdown_future(addr, api_key_state, app_state, async move {
1482        shutdown.notified().await;
1483    })
1484    .await
1485}
1486
1487/// Variant of [`serve_http_with_shutdown`] that takes an arbitrary
1488/// shutdown future. The production `serve()` needs to run a WAL
1489/// checkpoint after the OS signal but before tearing down the listener;
1490/// that cleanup work is awkward to express through a `Notify` alone.
1491/// Accepting a `Future` lets the caller embed any async cleanup into the
1492/// shutdown future itself, while the helper keeps the `build_router` +
1493/// `TcpListener::bind` + `axum::serve` body it already owns.
1494pub async fn serve_http_with_shutdown_future<F>(
1495    addr: &str,
1496    api_key_state: ApiKeyState,
1497    app_state: AppState,
1498    shutdown: F,
1499) -> Result<()>
1500where
1501    F: std::future::Future<Output = ()> + Send + 'static,
1502{
1503    let app = crate::build_router(api_key_state, app_state);
1504    let listener = tokio::net::TcpListener::bind(addr)
1505        .await
1506        .with_context(|| format!("bind {addr}"))?;
1507    axum::serve(listener, app)
1508        .with_graceful_shutdown(shutdown)
1509        .await
1510        .context("axum::serve")?;
1511    Ok(())
1512}
1513
1514/// Run a single sync cycle against one peer — pull then push.
1515///
1516/// Lifted verbatim (modulo path-of-Path-vs-PathBuf) from the pre-W6
1517/// `main.rs::sync_cycle_once` so the integration sync-daemon test can
1518/// drive it without subprocess. The signature matches the private
1519/// main.rs helper 1:1 to keep call sites identical.
1520pub async fn sync_cycle_once(
1521    client: &reqwest::Client,
1522    db_path: &Path,
1523    local_agent_id: &str,
1524    peer_url: &str,
1525    api_key: Option<&str>,
1526    batch_size: usize,
1527) -> Result<()> {
1528    let peer_url = peer_url.trim_end_matches('/');
1529
1530    // --- PULL --------------------------------------------------------
1531    let since = {
1532        let conn = db::open(db_path)?;
1533        db::sync_state_load(&conn, local_agent_id)?
1534            .entries
1535            .get(peer_url)
1536            .cloned()
1537    };
1538
1539    let mut pull_url = format!(
1540        "{peer_url}/api/v1/sync/since?limit={batch_size}&peer={}",
1541        urlencoding_minimal(local_agent_id)
1542    );
1543    if let Some(ref s) = since {
1544        pull_url.push_str("&since=");
1545        pull_url.push_str(&urlencoding_minimal(s));
1546    }
1547
1548    let mut req = client.get(&pull_url).header("x-agent-id", local_agent_id);
1549    if let Some(key) = api_key {
1550        req = req.header("x-api-key", key);
1551    }
1552    let resp = req.send().await?;
1553    if !resp.status().is_success() {
1554        anyhow::bail!("sync-daemon: pull status {}", resp.status());
1555    }
1556    let pulled: SyncSinceResponse = resp.json().await?;
1557    let pull_count = pulled.memories.len();
1558    let latest_pulled = pulled.memories.last().map(|m| m.updated_at.clone());
1559
1560    {
1561        let conn = db::open(db_path)?;
1562        for mem in &pulled.memories {
1563            if crate::validate::validate_memory(mem).is_ok() {
1564                let _ = db::insert_if_newer(&conn, mem);
1565            }
1566        }
1567        if let Some(ref at) = latest_pulled {
1568            db::sync_state_observe(&conn, local_agent_id, peer_url, at)?;
1569        }
1570    }
1571
1572    // --- PUSH --------------------------------------------------------
1573    let last_pushed = {
1574        let conn = db::open(db_path)?;
1575        db::sync_state_last_pushed(&conn, local_agent_id, peer_url)
1576    };
1577    let outgoing = {
1578        let conn = db::open(db_path)?;
1579        db::memories_updated_since(&conn, last_pushed.as_deref(), batch_size)?
1580    };
1581    let push_count = outgoing.len();
1582    let latest_pushed = outgoing.last().map(|m| m.updated_at.clone());
1583
1584    if !outgoing.is_empty() {
1585        let body = serde_json::json!({
1586            "sender_agent_id": local_agent_id,
1587            "sender_clock": { "entries": {} },
1588            "memories": outgoing,
1589            "dry_run": false,
1590        });
1591        let mut req = client
1592            .post(format!("{peer_url}/api/v1/sync/push"))
1593            .header("x-agent-id", local_agent_id)
1594            .header("content-type", "application/json")
1595            .json(&body);
1596        if let Some(key) = api_key {
1597            req = req.header("x-api-key", key);
1598        }
1599        let resp = req.send().await?;
1600        if !resp.status().is_success() {
1601            anyhow::bail!("sync-daemon: push status {}", resp.status());
1602        }
1603        if let Some(at) = latest_pushed {
1604            let conn = db::open(db_path)?;
1605            db::sync_state_record_push(&conn, local_agent_id, peer_url, &at)?;
1606        }
1607    }
1608
1609    tracing::info!("sync-daemon: peer={peer_url} pulled={pull_count} pushed={push_count}");
1610    Ok(())
1611}
1612
1613/// Run the sync-daemon main loop with a programmable shutdown.
1614///
1615/// Mirrors the body of the pre-W6 `cmd_sync_daemon()` in `main.rs`: for
1616/// each cycle, fan out a `JoinSet` across `peers`, then race a sleep
1617/// against the shutdown notify. Returns when the notify fires. The
1618/// integration test can build a one-cycle test by setting `interval_secs=1`
1619/// and notifying after a short tokio sleep.
1620pub async fn run_sync_daemon_with_shutdown(
1621    db_path: PathBuf,
1622    local_agent_id: String,
1623    peers: Vec<String>,
1624    api_key: Option<String>,
1625    interval_secs: u64,
1626    batch_size: usize,
1627    shutdown: Arc<Notify>,
1628) -> Result<()> {
1629    let client = reqwest::Client::builder()
1630        .timeout(Duration::from_secs(30))
1631        .build()?;
1632    run_sync_daemon_with_shutdown_using_client(
1633        client,
1634        db_path,
1635        local_agent_id,
1636        peers,
1637        api_key,
1638        interval_secs,
1639        batch_size,
1640        shutdown,
1641    )
1642    .await
1643}
1644
1645/// Variant of [`run_sync_daemon_with_shutdown`] that takes a caller-built
1646/// `reqwest::Client`. The production `cmd_sync_daemon()` constructs an
1647/// mTLS-aware client (via `build_rustls_client_config`) and threads it
1648/// in here so the helper drives the same loop body the test version
1649/// drives — keeping `daemon_runtime` as the single source of truth for
1650/// the sync-daemon loop while preserving the production TLS contract.
1651pub async fn run_sync_daemon_with_shutdown_using_client(
1652    client: reqwest::Client,
1653    db_path: PathBuf,
1654    local_agent_id: String,
1655    peers: Vec<String>,
1656    api_key: Option<String>,
1657    interval_secs: u64,
1658    batch_size: usize,
1659    shutdown: Arc<Notify>,
1660) -> Result<()> {
1661    let interval = interval_secs.max(1);
1662    let batch_size = batch_size.max(1);
1663
1664    let db_path_owned: Arc<Path> = Arc::from(db_path.as_path());
1665    let local_agent_id_arc: Arc<str> = Arc::from(local_agent_id.as_str());
1666    let api_key_arc: Option<Arc<str>> = api_key.as_deref().map(Arc::from);
1667    let peers_arc: Vec<Arc<str>> = peers.iter().map(|s| Arc::from(s.as_str())).collect();
1668    loop {
1669        let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1670        for peer_url in &peers_arc {
1671            let client = client.clone();
1672            let db_path = db_path_owned.clone();
1673            let local_agent_id = local_agent_id_arc.clone();
1674            let peer_url = peer_url.clone();
1675            let api_key = api_key_arc.clone();
1676            set.spawn(async move {
1677                if let Err(e) = sync_cycle_once(
1678                    &client,
1679                    &db_path,
1680                    &local_agent_id,
1681                    &peer_url,
1682                    api_key.as_deref(),
1683                    batch_size,
1684                )
1685                .await
1686                {
1687                    tracing::warn!("sync-daemon: peer {peer_url} cycle failed: {e}");
1688                }
1689            });
1690        }
1691        while set.join_next().await.is_some() {}
1692
1693        tokio::select! {
1694            () = tokio::time::sleep(Duration::from_secs(interval)) => {}
1695            () = shutdown.notified() => {
1696                tracing::info!("sync-daemon: shutdown signal received");
1697                return Ok(());
1698            }
1699        }
1700    }
1701}
1702
1703/// Run the curator daemon with a programmable shutdown.
1704///
1705/// Mirrors the daemon arm of the pre-W6 `cmd_curator()`. The inner work is
1706/// `curator::run_daemon` (a blocking, tight-loop-with-`AtomicBool` already
1707/// in lib code), which we drive from a `spawn_blocking`. Tests fire the
1708/// `Notify` to set the shutdown bool and the blocking task observes it
1709/// within ~500ms (`run_daemon`'s sleep tick).
1710pub async fn run_curator_daemon_with_shutdown(
1711    db_path: PathBuf,
1712    cfg: crate::curator::CuratorConfig,
1713    shutdown: Arc<Notify>,
1714) -> Result<()> {
1715    let shutdown_flag = Arc::new(AtomicBool::new(false));
1716    let shutdown_flag_for_signal = shutdown_flag.clone();
1717    tokio::spawn(async move {
1718        shutdown.notified().await;
1719        shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1720    });
1721
1722    let llm_arc: Option<Arc<crate::llm::OllamaClient>> = None;
1723    let db_owned = db_path;
1724    tokio::task::spawn_blocking(move || {
1725        crate::curator::run_daemon(db_owned, llm_arc, cfg, shutdown_flag);
1726    })
1727    .await
1728    .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1729    Ok(())
1730}
1731
1732/// Curator-daemon loop body, primitive-arg flavour for the binary.
1733///
1734/// `ollama_model` of `None` disables the LLM (matching the pre-tiered
1735/// keyword-only path in `build_curator_llm`).
1736#[allow(clippy::too_many_arguments)]
1737pub async fn run_curator_daemon_with_primitives(
1738    db_path: PathBuf,
1739    interval_secs: u64,
1740    max_ops_per_cycle: usize,
1741    dry_run: bool,
1742    include_namespaces: Vec<String>,
1743    exclude_namespaces: Vec<String>,
1744    ollama_model: Option<String>,
1745    shutdown: Arc<Notify>,
1746) -> Result<()> {
1747    let cfg = crate::curator::CuratorConfig {
1748        interval_secs,
1749        max_ops_per_cycle,
1750        dry_run,
1751        include_namespaces,
1752        exclude_namespaces,
1753    };
1754    let llm: Option<Arc<crate::llm::OllamaClient>> =
1755        ollama_model.and_then(|m| crate::llm::OllamaClient::new(&m).ok().map(Arc::new));
1756
1757    let shutdown_flag = Arc::new(AtomicBool::new(false));
1758    let shutdown_flag_for_signal = shutdown_flag.clone();
1759    tokio::spawn(async move {
1760        shutdown.notified().await;
1761        shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1762    });
1763
1764    tokio::task::spawn_blocking(move || {
1765        crate::curator::run_daemon(db_path, llm, cfg, shutdown_flag);
1766    })
1767    .await
1768    .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1769    Ok(())
1770}
1771
1772// -----------------------------------------------------------------------
1773// helpers
1774// -----------------------------------------------------------------------
1775
1776/// Minimal URL-component encoder — only the characters the sync-daemon
1777/// queries actually emit (RFC3339 timestamps with `:` and `+`, and
1778/// agent ids with `:`/`@`/`/`). Mirror of the pre-W6
1779/// `main.rs::urlencoding_minimal`.
1780fn urlencoding_minimal(s: &str) -> String {
1781    use std::fmt::Write as _;
1782    let mut out = String::with_capacity(s.len());
1783    for b in s.bytes() {
1784        match b {
1785            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1786                out.push(b as char);
1787            }
1788            _ => {
1789                let _ = write!(out, "%{b:02X}");
1790            }
1791        }
1792    }
1793    out
1794}
1795
1796/// Mirrors the pre-W6 `main.rs::SyncSinceResponse` — the fields we
1797/// deserialize from the peer's `/api/v1/sync/since` body. `count` and
1798/// `limit` are present in the wire payload but unused on the receive
1799/// side; allowed to be dead so `clippy::pedantic` doesn't trip.
1800#[derive(serde::Deserialize)]
1801struct SyncSinceResponse {
1802    #[allow(dead_code)]
1803    count: usize,
1804    #[allow(dead_code)]
1805    limit: usize,
1806    memories: Vec<crate::models::Memory>,
1807}
1808
1809/// Re-export the `Instant`/`Duration` types so test crate use sites stay
1810/// terse.  Kept private — internal to this module.
1811#[allow(dead_code)]
1812fn _imports_in_use(_: Instant, _: Duration) {}
1813
1814// ===========================================================================
1815// Tests
1816// ===========================================================================
1817
1818#[cfg(test)]
1819mod tests {
1820    use super::*;
1821    use crate::cli::test_utils::TestEnv;
1822    use crate::config::ResolvedTtl;
1823    use axum::body::Body;
1824    use axum::http::{Request, StatusCode};
1825    use tower::ServiceExt as _;
1826
1827    // ----- helpers -------------------------------------------------------
1828
1829    fn args_with_db(_db: &Path) -> ServeArgs {
1830        ServeArgs {
1831            host: "127.0.0.1".to_string(),
1832            port: 0,
1833            tls_cert: None,
1834            tls_key: None,
1835            mtls_allowlist: None,
1836            shutdown_grace_secs: 30,
1837            quorum_writes: 0,
1838            quorum_peers: vec![],
1839            quorum_timeout_ms: 2000,
1840            quorum_client_cert: None,
1841            quorum_client_key: None,
1842            quorum_ca_cert: None,
1843            catchup_interval_secs: 0,
1844        }
1845    }
1846
1847    fn keyword_app_state(db_path: &Path) -> AppState {
1848        let conn = db::open(db_path).unwrap();
1849        let db_state: Db = Arc::new(Mutex::new((
1850            conn,
1851            db_path.to_path_buf(),
1852            ResolvedTtl::default(),
1853            true,
1854        )));
1855        AppState {
1856            db: db_state,
1857            embedder: Arc::new(None),
1858            vector_index: Arc::new(Mutex::new(None)),
1859            federation: Arc::new(None),
1860            tier_config: Arc::new(FeatureTier::Keyword.config()),
1861            scoring: Arc::new(crate::config::ResolvedScoring::default()),
1862        }
1863    }
1864
1865    /// Mutex env-var guard. Tests that flip env vars must serialize to
1866    /// avoid clobbering each other; `cargo test --test-threads=2` is the
1867    /// upstream gate but a per-test mutex keeps the tests honest.
1868    fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
1869        use std::sync::OnceLock;
1870        static LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1871        LOCK.get_or_init(|| std::sync::Mutex::new(()))
1872            .lock()
1873            .unwrap_or_else(|e| e.into_inner())
1874    }
1875
1876    // ----- is_write_command ---------------------------------------------
1877
1878    #[test]
1879    fn test_is_write_command_all_variants() {
1880        // Use clap's parser to build every Command variant. This avoids
1881        // having to know each Args struct's required-field set by name —
1882        // we just feed the same argv form an operator would use, and
1883        // assert the predicate returns the right answer.
1884        //
1885        // Writes (post-run WAL checkpoint expected):
1886        let writes: &[&[&str]] = &[
1887            &["ai-memory", "store", "title", "content"],
1888            &["ai-memory", "update", "id123", "--title", "t"],
1889            &["ai-memory", "delete", "id123"],
1890            &["ai-memory", "promote", "id123"],
1891            &["ai-memory", "forget", "pattern"],
1892            &["ai-memory", "link", "a", "b"],
1893            &["ai-memory", "consolidate", "ids"],
1894            &["ai-memory", "resolve", "a", "b"],
1895            &["ai-memory", "sync", "--peer", "/tmp/peer.db"],
1896            &[
1897                "ai-memory",
1898                "sync-daemon",
1899                "--peers",
1900                "http://x",
1901                "--interval-secs",
1902                "60",
1903            ],
1904            &["ai-memory", "import"],
1905            &["ai-memory", "auto-consolidate"],
1906            &["ai-memory", "gc"],
1907        ];
1908        let mut writes_checked = 0;
1909        for argv in writes {
1910            // Skip a variant whose required-field set our argv doesn't
1911            // match (clap will reject it). We still get coverage from the
1912            // variants that parse cleanly, which is the bulk.
1913            if let Ok(cli) = Cli::try_parse_from(*argv) {
1914                assert!(
1915                    is_write_command(&cli.command),
1916                    "expected write for {argv:?}"
1917                );
1918                writes_checked += 1;
1919            }
1920        }
1921        assert!(
1922            writes_checked >= 5,
1923            "expected at least 5 write variants checked, got {writes_checked}"
1924        );
1925
1926        // Reads / no-ops (no checkpoint expected):
1927        let reads: &[&[&str]] = &[
1928            &["ai-memory", "mcp"],
1929            &["ai-memory", "recall", "context"],
1930            &["ai-memory", "search", "query"],
1931            &["ai-memory", "get", "id"],
1932            &["ai-memory", "list"],
1933            &["ai-memory", "stats"],
1934            &["ai-memory", "namespaces"],
1935            &["ai-memory", "export"],
1936            &["ai-memory", "shell"],
1937            &["ai-memory", "man"],
1938            &["ai-memory", "completions", "bash"],
1939            &["ai-memory", "archive", "list"],
1940            &["ai-memory", "agents", "list"],
1941            &["ai-memory", "pending", "list"],
1942            &["ai-memory", "bench"],
1943            &["ai-memory", "serve", "--host", "127.0.0.1", "--port", "0"],
1944        ];
1945        let mut reads_checked = 0;
1946        for argv in reads {
1947            if let Ok(cli) = Cli::try_parse_from(*argv) {
1948                assert!(
1949                    !is_write_command(&cli.command),
1950                    "expected read for {argv:?}"
1951                );
1952                reads_checked += 1;
1953            }
1954        }
1955        assert!(
1956            reads_checked >= 8,
1957            "expected at least 8 read variants checked, got {reads_checked}"
1958        );
1959
1960        // Direct construction of the Args-less variants (10 variants
1961        // covered programmatically by clap above; pin the no-Args ones
1962        // here too for explicitness):
1963        assert!(is_write_command(&Command::Gc));
1964        assert!(!is_write_command(&Command::Stats));
1965        assert!(!is_write_command(&Command::Namespaces));
1966        assert!(!is_write_command(&Command::Export));
1967        assert!(!is_write_command(&Command::Shell));
1968        assert!(!is_write_command(&Command::Man));
1969        assert!(!is_write_command(&Command::Mcp {
1970            tier: "keyword".to_string(),
1971            profile: None,
1972        }));
1973    }
1974
1975    // ----- build_router via lib::build_router ---------------------------
1976
1977    #[tokio::test]
1978    async fn test_router_has_health_endpoint() {
1979        let env = TestEnv::fresh();
1980        let app_state = keyword_app_state(&env.db_path);
1981        let api_key_state = ApiKeyState { key: None };
1982        let router = build_router(app_state, api_key_state);
1983        let resp = router
1984            .oneshot(
1985                Request::builder()
1986                    .method("GET")
1987                    .uri("/api/v1/health")
1988                    .body(Body::empty())
1989                    .unwrap(),
1990            )
1991            .await
1992            .unwrap();
1993        assert_eq!(resp.status(), StatusCode::OK);
1994    }
1995
1996    #[tokio::test]
1997    async fn test_router_has_metrics_at_both_paths() {
1998        let env = TestEnv::fresh();
1999        let app_state = keyword_app_state(&env.db_path);
2000        let api_key_state = ApiKeyState { key: None };
2001        // /metrics
2002        let r1 = build_router(app_state.clone(), api_key_state.clone())
2003            .oneshot(
2004                Request::builder()
2005                    .method("GET")
2006                    .uri("/metrics")
2007                    .body(Body::empty())
2008                    .unwrap(),
2009            )
2010            .await
2011            .unwrap();
2012        assert_eq!(r1.status(), StatusCode::OK);
2013        // /api/v1/metrics
2014        let r2 = build_router(app_state, api_key_state)
2015            .oneshot(
2016                Request::builder()
2017                    .method("GET")
2018                    .uri("/api/v1/metrics")
2019                    .body(Body::empty())
2020                    .unwrap(),
2021            )
2022            .await
2023            .unwrap();
2024        assert_eq!(r2.status(), StatusCode::OK);
2025    }
2026
2027    #[tokio::test]
2028    async fn test_router_lists_all_v1_memory_routes() {
2029        let env = TestEnv::fresh();
2030        let app_state = keyword_app_state(&env.db_path);
2031        let api_key_state = ApiKeyState { key: None };
2032        let router = build_router(app_state, api_key_state);
2033        let resp = router
2034            .oneshot(
2035                Request::builder()
2036                    .method("GET")
2037                    .uri("/api/v1/memories")
2038                    .body(Body::empty())
2039                    .unwrap(),
2040            )
2041            .await
2042            .unwrap();
2043        // Empty DB returns 200 with an empty list — anything non-error
2044        // proves the route is wired in.
2045        assert!(resp.status().is_success(), "got {}", resp.status());
2046    }
2047
2048    #[tokio::test]
2049    async fn test_router_applies_api_key_middleware_when_key_set() {
2050        let env = TestEnv::fresh();
2051        let app_state = keyword_app_state(&env.db_path);
2052        let api_key_state = ApiKeyState {
2053            key: Some("s3cret".to_string()),
2054        };
2055        let router = build_router(app_state, api_key_state);
2056        let resp = router
2057            .oneshot(
2058                Request::builder()
2059                    .method("GET")
2060                    .uri("/api/v1/memories")
2061                    .body(Body::empty())
2062                    .unwrap(),
2063            )
2064            .await
2065            .unwrap();
2066        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2067    }
2068
2069    #[tokio::test]
2070    async fn test_router_skips_api_key_middleware_when_key_none() {
2071        let env = TestEnv::fresh();
2072        let app_state = keyword_app_state(&env.db_path);
2073        let api_key_state = ApiKeyState { key: None };
2074        let router = build_router(app_state, api_key_state);
2075        let resp = router
2076            .oneshot(
2077                Request::builder()
2078                    .method("GET")
2079                    .uri("/api/v1/memories")
2080                    .body(Body::empty())
2081                    .unwrap(),
2082            )
2083            .await
2084            .unwrap();
2085        assert_eq!(resp.status(), StatusCode::OK);
2086    }
2087
2088    // ----- build_embedder ------------------------------------------------
2089
2090    #[tokio::test]
2091    async fn test_build_embedder_keyword_tier_returns_none() {
2092        let cfg = AppConfig::default();
2093        let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
2094        assert!(emb.is_none());
2095    }
2096
2097    #[tokio::test]
2098    async fn test_build_embedder_load_failure_returns_none() {
2099        // Can't easily induce a load failure without network — skip here.
2100        // Keyword tier covers the None branch; the ERROR-level fallback
2101        // path requires a live HF-hub-style mock, which is out of scope
2102        // for a unit test. The semantic-tier success/failure path is
2103        // exercised under `feature = "test-with-models"` in the
2104        // recall integration tests.
2105        // This test stays as a smoke check — it doesn't attempt to load.
2106    }
2107
2108    // ----- build_vector_index -------------------------------------------
2109
2110    #[test]
2111    fn test_build_vector_index_no_embedder_returns_none() {
2112        let env = TestEnv::fresh();
2113        let conn = db::open(&env.db_path).unwrap();
2114        assert!(build_vector_index(&conn, false).is_none());
2115    }
2116
2117    #[test]
2118    fn test_build_vector_index_empty_db_returns_empty_index() {
2119        let env = TestEnv::fresh();
2120        let conn = db::open(&env.db_path).unwrap();
2121        let idx = build_vector_index(&conn, true);
2122        assert!(
2123            idx.is_some(),
2124            "empty DB with embedder must yield empty index"
2125        );
2126        assert_eq!(idx.unwrap().len(), 0);
2127    }
2128
2129    // ----- spawn_gc_loop / spawn_wal_checkpoint_loop --------------------
2130
2131    #[tokio::test(start_paused = true)]
2132    async fn test_spawn_gc_loop_runs_and_can_be_aborted() {
2133        let env = TestEnv::fresh();
2134        let conn = db::open(&env.db_path).unwrap();
2135        let state: Db = Arc::new(Mutex::new((
2136            conn,
2137            env.db_path.clone(),
2138            ResolvedTtl::default(),
2139            true,
2140        )));
2141        let h = spawn_gc_loop(state, None, Duration::from_secs(60));
2142        // Advance past the first sleep — the loop should now have ticked at
2143        // least once (its sleep arm has resolved). We can't easily observe
2144        // a side effect on an empty DB, so just abort and confirm the
2145        // handle is well-behaved.
2146        tokio::time::advance(Duration::from_secs(61)).await;
2147        // Yield once so the background task can see the tick.
2148        tokio::task::yield_now().await;
2149        h.abort();
2150        // Joining an aborted handle returns `JoinError` with cancelled() == true.
2151        let err = h.await.unwrap_err();
2152        assert!(err.is_cancelled());
2153    }
2154
2155    #[tokio::test(start_paused = true)]
2156    async fn test_spawn_wal_checkpoint_loop_runs_and_can_be_aborted() {
2157        let env = TestEnv::fresh();
2158        let conn = db::open(&env.db_path).unwrap();
2159        let state: Db = Arc::new(Mutex::new((
2160            conn,
2161            env.db_path.clone(),
2162            ResolvedTtl::default(),
2163            true,
2164        )));
2165        let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(60));
2166        // First sleep is interval/2 = 30s. Advance past that + one full
2167        // interval to ensure at least one checkpoint cycle ran.
2168        tokio::time::advance(Duration::from_secs(31)).await;
2169        tokio::task::yield_now().await;
2170        tokio::time::advance(Duration::from_secs(60)).await;
2171        tokio::task::yield_now().await;
2172        h.abort();
2173        let err = h.await.unwrap_err();
2174        assert!(err.is_cancelled());
2175    }
2176
2177    // ----- passphrase_from_file -----------------------------------------
2178
2179    #[test]
2180    fn test_passphrase_strips_trailing_newline() {
2181        let dir = tempfile::tempdir().unwrap();
2182        let p = dir.path().join("pass");
2183        std::fs::write(&p, "secret\n").unwrap();
2184        assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
2185    }
2186
2187    #[test]
2188    fn test_passphrase_strips_trailing_crlf() {
2189        let dir = tempfile::tempdir().unwrap();
2190        let p = dir.path().join("pass");
2191        std::fs::write(&p, "secret\r\n").unwrap();
2192        assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
2193    }
2194
2195    #[test]
2196    fn test_passphrase_empty_file_errors() {
2197        let dir = tempfile::tempdir().unwrap();
2198        let p = dir.path().join("empty");
2199        std::fs::write(&p, "").unwrap();
2200        let err = passphrase_from_file(&p).unwrap_err();
2201        assert!(
2202            err.to_string().contains("empty"),
2203            "expected 'empty' error, got: {err}"
2204        );
2205    }
2206
2207    #[test]
2208    fn test_passphrase_empty_after_trim_errors() {
2209        // File contains only whitespace lines — after trim_end_matches
2210        // it remains "  \t" (internal whitespace preserved). Only "\n"
2211        // / "\r" alone would trigger the empty-after-strip case.
2212        let dir = tempfile::tempdir().unwrap();
2213        let p = dir.path().join("nl-only");
2214        std::fs::write(&p, "\n").unwrap();
2215        let err = passphrase_from_file(&p).unwrap_err();
2216        assert!(err.to_string().contains("empty"));
2217    }
2218
2219    #[test]
2220    fn test_passphrase_nonexistent_file_errors() {
2221        let dir = tempfile::tempdir().unwrap();
2222        let p = dir.path().join("does-not-exist");
2223        let err = passphrase_from_file(&p).unwrap_err();
2224        assert!(
2225            err.to_string().contains("reading passphrase file")
2226                || err.chain().any(|e| e.to_string().contains("No such file"))
2227                || err.chain().any(|e| e.to_string().contains("cannot find")),
2228            "got: {err:#}"
2229        );
2230    }
2231
2232    #[test]
2233    fn test_passphrase_preserves_internal_whitespace() {
2234        let dir = tempfile::tempdir().unwrap();
2235        let p = dir.path().join("pass");
2236        std::fs::write(&p, "my pass phrase\n").unwrap();
2237        assert_eq!(passphrase_from_file(&p).unwrap(), "my pass phrase");
2238    }
2239
2240    // ----- apply_anonymize_default --------------------------------------
2241
2242    #[test]
2243    fn test_anonymize_set_when_config_true_and_env_unset() {
2244        let _g = env_var_lock();
2245        // SAFETY: serialized via env_var_lock.
2246        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2247        let mut cfg = AppConfig::default();
2248        cfg.identity = Some(crate::config::IdentityConfig {
2249            anonymize_default: true,
2250        });
2251        apply_anonymize_default(&cfg);
2252        assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "1");
2253        // SAFETY: serialized via env_var_lock.
2254        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2255    }
2256
2257    #[test]
2258    fn test_anonymize_unchanged_when_env_already_set() {
2259        let _g = env_var_lock();
2260        // SAFETY: serialized via env_var_lock.
2261        unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
2262        let mut cfg = AppConfig::default();
2263        cfg.identity = Some(crate::config::IdentityConfig {
2264            anonymize_default: true,
2265        });
2266        apply_anonymize_default(&cfg);
2267        // Env var is left alone — caller-set value wins.
2268        assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "0");
2269        // SAFETY: serialized via env_var_lock.
2270        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2271    }
2272
2273    #[test]
2274    fn test_anonymize_unchanged_when_config_false() {
2275        let _g = env_var_lock();
2276        // SAFETY: serialized via env_var_lock.
2277        unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2278        let cfg = AppConfig::default();
2279        // Default config is false / None for identity.anonymize_default.
2280        apply_anonymize_default(&cfg);
2281        assert!(std::env::var("AI_MEMORY_ANONYMIZE").is_err());
2282    }
2283
2284    // ----- bootstrap_serve ----------------------------------------------
2285
2286    #[tokio::test]
2287    async fn test_bootstrap_serve_keyword_tier_no_embedder() {
2288        let env = TestEnv::fresh();
2289        let mut cfg = AppConfig::default();
2290        cfg.tier = Some("keyword".to_string());
2291        let args = args_with_db(&env.db_path);
2292        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2293        // Keyword tier => no embedder, no vector index.
2294        assert!(bs.app_state.embedder.is_none());
2295        let vi = bs.app_state.vector_index.lock().await;
2296        assert!(vi.is_none());
2297        // Two task handles spawned (gc + wal_checkpoint).
2298        assert_eq!(bs.task_handles.len(), 2);
2299        // Cleanly abort the spawned tasks so they don't leak across tests.
2300        for h in bs.task_handles {
2301            h.abort();
2302        }
2303    }
2304
2305    #[tokio::test]
2306    async fn test_bootstrap_serve_with_api_key_logs_enabled() {
2307        let env = TestEnv::fresh();
2308        let mut cfg = AppConfig::default();
2309        cfg.tier = Some("keyword".to_string());
2310        cfg.api_key = Some("test-key".to_string());
2311        let args = args_with_db(&env.db_path);
2312        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2313        assert_eq!(bs.api_key_state.key.as_deref(), Some("test-key"));
2314        for h in bs.task_handles {
2315            h.abort();
2316        }
2317    }
2318
2319    #[tokio::test]
2320    async fn test_bootstrap_serve_federation_disabled_when_quorum_zero() {
2321        let env = TestEnv::fresh();
2322        let mut cfg = AppConfig::default();
2323        cfg.tier = Some("keyword".to_string());
2324        let args = args_with_db(&env.db_path);
2325        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2326        assert!(bs.app_state.federation.is_none());
2327        for h in bs.task_handles {
2328            h.abort();
2329        }
2330    }
2331
2332    // ----- W12-F: deeper coverage --------------------------------------
2333    //
2334    // Targets the gaps left after W6 + W7 + D6: `bootstrap_serve` variants
2335    // that require a populated DB or federation, the `run` dispatch arms
2336    // not yet exercised, `cmd_bench` end-to-end with a tiny workload,
2337    // `cmd_migrate` (sal feature), `urlencoding_minimal` direct test,
2338    // and the gc / wal-checkpoint loop bodies executing through one
2339    // tick with a measurable side effect.
2340
2341    // ----- bootstrap_serve federation enabled ---------------------------
2342
2343    #[tokio::test]
2344    async fn test_bootstrap_serve_federation_enabled_attaches_config() {
2345        // quorum_writes=1 + one peer → FederationConfig::build returns
2346        // Some, so app_state.federation is wired in. Catchup loop is
2347        // disabled (catchup_interval_secs=0) — the spawn-catchup branch
2348        // is exercised by federation tests; we only verify wiring here.
2349        let env = TestEnv::fresh();
2350        let mut cfg = AppConfig::default();
2351        cfg.tier = Some("keyword".to_string());
2352        let mut args = args_with_db(&env.db_path);
2353        args.quorum_writes = 1;
2354        args.quorum_peers = vec!["http://127.0.0.1:65530".to_string()];
2355        args.quorum_timeout_ms = 100;
2356        args.catchup_interval_secs = 0;
2357        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2358        assert!(bs.app_state.federation.is_some());
2359        for h in bs.task_handles {
2360            h.abort();
2361        }
2362    }
2363
2364    #[tokio::test]
2365    async fn test_bootstrap_serve_federation_enabled_with_catchup_loop() {
2366        // catchup_interval_secs > 0 → spawn_catchup_loop is invoked.
2367        // We can't directly observe the catchup loop's internal handle
2368        // (federation::spawn_catchup_loop returns a JoinHandle owned
2369        // privately by the federation module), but the side branch
2370        // "catchup loop enabled" runs and the bootstrap completes.
2371        let env = TestEnv::fresh();
2372        let mut cfg = AppConfig::default();
2373        cfg.tier = Some("keyword".to_string());
2374        let mut args = args_with_db(&env.db_path);
2375        args.quorum_writes = 1;
2376        args.quorum_peers = vec!["http://127.0.0.1:65531".to_string()];
2377        args.quorum_timeout_ms = 100;
2378        args.catchup_interval_secs = 3600; // long enough not to fire
2379        let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2380        assert!(bs.app_state.federation.is_some());
2381        for h in bs.task_handles {
2382            h.abort();
2383        }
2384    }
2385
2386    #[tokio::test]
2387    async fn test_bootstrap_serve_federation_invalid_peer_errors() {
2388        // FederationConfig::build returns Err on duplicate peer URLs
2389        // (#341). The bootstrap_serve `.context("federation config")`
2390        // wrap turns it into a daemon-startup error.
2391        let env = TestEnv::fresh();
2392        let mut cfg = AppConfig::default();
2393        cfg.tier = Some("keyword".to_string());
2394        let mut args = args_with_db(&env.db_path);
2395        args.quorum_writes = 1;
2396        args.quorum_peers = vec![
2397            "http://127.0.0.1:65532".to_string(),
2398            "http://127.0.0.1:65532/".to_string(), // duplicate after trim
2399        ];
2400        let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
2401        let err = match res {
2402            Ok(_) => panic!("expected error from duplicate peer URLs"),
2403            Err(e) => e,
2404        };
2405        let s = format!("{err:#}");
2406        assert!(
2407            s.contains("federation") || s.contains("duplicate"),
2408            "got: {s}"
2409        );
2410    }
2411
2412    // ----- build_vector_index populated DB ------------------------------
2413
2414    #[test]
2415    fn test_build_vector_index_populated_db_returns_built_index() {
2416        // When the DB has stored embeddings AND the embedder is present,
2417        // `build_vector_index` should return Some(VectorIndex) populated
2418        // with those embeddings rather than an empty one.
2419        let env = TestEnv::fresh();
2420        let conn = db::open(&env.db_path).unwrap();
2421        // Insert one memory + an embedding via the public db helpers.
2422        let now = chrono::Utc::now().to_rfc3339();
2423        let mem = crate::models::Memory {
2424            id: uuid::Uuid::new_v4().to_string(),
2425            tier: crate::models::Tier::Mid,
2426            namespace: "ns".to_string(),
2427            title: "t".to_string(),
2428            content: "c".to_string(),
2429            tags: vec![],
2430            priority: 5,
2431            confidence: 1.0,
2432            source: "test".to_string(),
2433            access_count: 0,
2434            created_at: now.clone(),
2435            updated_at: now,
2436            last_accessed_at: None,
2437            expires_at: None,
2438            metadata: crate::models::default_metadata(),
2439        };
2440        let id = db::insert(&conn, &mem).unwrap();
2441        db::set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
2442        let idx = build_vector_index(&conn, true).expect("populated index");
2443        assert!(
2444            idx.len() >= 1,
2445            "expected non-empty index, got len={}",
2446            idx.len()
2447        );
2448    }
2449
2450    // ----- gc loop with non-empty side effect ---------------------------
2451    //
2452    // The existing `test_spawn_gc_loop_runs_and_can_be_aborted` only
2453    // covers the empty-DB path where db::gc returns 0. Seeding an expired
2454    // memory and pointing the gc loop at it lets the `Ok(n) if n > 0`
2455    // arm fire.
2456
2457    #[tokio::test(start_paused = true)]
2458    async fn test_spawn_gc_loop_purges_expired_memories() {
2459        let env = TestEnv::fresh();
2460        let conn = db::open(&env.db_path).unwrap();
2461        // Insert an expired memory (expires_at in the past).
2462        let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
2463        let now = chrono::Utc::now().to_rfc3339();
2464        let mem = crate::models::Memory {
2465            id: uuid::Uuid::new_v4().to_string(),
2466            tier: crate::models::Tier::Short,
2467            namespace: "ns-gc".to_string(),
2468            title: "stale".to_string(),
2469            content: "stale".to_string(),
2470            tags: vec![],
2471            priority: 1,
2472            confidence: 1.0,
2473            source: "test".to_string(),
2474            access_count: 0,
2475            created_at: now.clone(),
2476            updated_at: now,
2477            last_accessed_at: None,
2478            expires_at: Some(past),
2479            metadata: crate::models::default_metadata(),
2480        };
2481        db::insert(&conn, &mem).unwrap();
2482        drop(conn);
2483
2484        let conn = db::open(&env.db_path).unwrap();
2485        let state: Db = Arc::new(Mutex::new((
2486            conn,
2487            env.db_path.clone(),
2488            ResolvedTtl::default(),
2489            true,
2490        )));
2491        // archive_max_days=Some(1) lets the auto_purge_archive arm
2492        // execute too (covers the second match in the loop body).
2493        let h = spawn_gc_loop(state.clone(), Some(1), Duration::from_secs(60));
2494        // Advance past two full intervals to give both branches multiple
2495        // chances to log under paused time.
2496        tokio::time::advance(Duration::from_secs(61)).await;
2497        tokio::task::yield_now().await;
2498        tokio::time::advance(Duration::from_secs(61)).await;
2499        tokio::task::yield_now().await;
2500        h.abort();
2501        let _ = h.await;
2502    }
2503
2504    // ----- WAL checkpoint loop with measurable cycle --------------------
2505
2506    #[tokio::test(start_paused = true)]
2507    async fn test_spawn_wal_checkpoint_loop_runs_multiple_cycles() {
2508        let env = TestEnv::fresh();
2509        let conn = db::open(&env.db_path).unwrap();
2510        let state: Db = Arc::new(Mutex::new((
2511            conn,
2512            env.db_path.clone(),
2513            ResolvedTtl::default(),
2514            true,
2515        )));
2516        let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(2));
2517        // First sleep is 1s (interval/2), then 2s per cycle. Advance
2518        // past three cycles.
2519        for _ in 0..4 {
2520            tokio::time::advance(Duration::from_secs(2)).await;
2521            tokio::task::yield_now().await;
2522        }
2523        h.abort();
2524        let _ = h.await;
2525    }
2526
2527    // ----- urlencoding_minimal -----------------------------------------
2528
2529    #[test]
2530    fn test_urlencoding_minimal_round_trip() {
2531        // Unreserved characters pass through unchanged.
2532        assert_eq!(urlencoding_minimal("abcXYZ-_.~"), "abcXYZ-_.~");
2533        assert_eq!(urlencoding_minimal("0123456789"), "0123456789");
2534        // Reserved / unsafe characters are percent-encoded.
2535        assert_eq!(urlencoding_minimal("a:b"), "a%3Ab");
2536        assert_eq!(urlencoding_minimal("a/b"), "a%2Fb");
2537        assert_eq!(urlencoding_minimal("a@b"), "a%40b");
2538        assert_eq!(urlencoding_minimal("a+b"), "a%2Bb");
2539        assert_eq!(urlencoding_minimal(" "), "%20");
2540        // Empty string is empty.
2541        assert_eq!(urlencoding_minimal(""), "");
2542        // RFC3339 timestamp shape (sync-daemon real input).
2543        assert_eq!(
2544            urlencoding_minimal("2024-01-02T03:04:05+00:00"),
2545            "2024-01-02T03%3A04%3A05%2B00%3A00"
2546        );
2547    }
2548
2549    // ----- run() dispatch for read-only commands ------------------------
2550    //
2551    // Each test parses a CLI argv via clap, hands the resulting `Cli`
2552    // to `daemon_runtime::run`, and asserts the dispatch path returned
2553    // Ok. We don't assert on stdout because run() writes to the
2554    // process stdout directly — what we care about for coverage is
2555    // that the match arm executed and the inner cli handler returned.
2556
2557    fn no_config_env() -> std::sync::MutexGuard<'static, ()> {
2558        // run() reads `AI_MEMORY_NO_CONFIG` indirectly via the AppConfig
2559        // we pass. We don't rely on the env directly here, but holding
2560        // env_var_lock keeps run() tests serialized so they don't race
2561        // on stdout / global subscribers.
2562        env_var_lock()
2563    }
2564
2565    #[tokio::test]
2566    async fn test_run_dispatch_stats_command() {
2567        let _g = no_config_env();
2568        let env = TestEnv::fresh();
2569        let cfg = AppConfig::default();
2570        let cli =
2571            Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "stats"])
2572                .unwrap();
2573        run(cli, &cfg).await.unwrap();
2574    }
2575
2576    #[tokio::test]
2577    async fn test_run_dispatch_namespaces_command() {
2578        let _g = no_config_env();
2579        let env = TestEnv::fresh();
2580        let cfg = AppConfig::default();
2581        let cli = Cli::try_parse_from([
2582            "ai-memory",
2583            "--db",
2584            env.db_path.to_str().unwrap(),
2585            "namespaces",
2586        ])
2587        .unwrap();
2588        run(cli, &cfg).await.unwrap();
2589    }
2590
2591    #[tokio::test]
2592    async fn test_run_dispatch_export_command() {
2593        let _g = no_config_env();
2594        let env = TestEnv::fresh();
2595        let cfg = AppConfig::default();
2596        let cli =
2597            Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "export"])
2598                .unwrap();
2599        run(cli, &cfg).await.unwrap();
2600    }
2601
2602    #[tokio::test]
2603    async fn test_run_dispatch_list_command() {
2604        let _g = no_config_env();
2605        let env = TestEnv::fresh();
2606        let cfg = AppConfig::default();
2607        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "list"])
2608            .unwrap();
2609        run(cli, &cfg).await.unwrap();
2610    }
2611
2612    #[tokio::test]
2613    async fn test_run_dispatch_search_command() {
2614        let _g = no_config_env();
2615        let env = TestEnv::fresh();
2616        let cfg = AppConfig::default();
2617        let cli = Cli::try_parse_from([
2618            "ai-memory",
2619            "--db",
2620            env.db_path.to_str().unwrap(),
2621            "search",
2622            "anyq",
2623        ])
2624        .unwrap();
2625        run(cli, &cfg).await.unwrap();
2626    }
2627
2628    #[tokio::test]
2629    async fn test_run_dispatch_archive_list_command() {
2630        let _g = no_config_env();
2631        let env = TestEnv::fresh();
2632        let cfg = AppConfig::default();
2633        let cli = Cli::try_parse_from([
2634            "ai-memory",
2635            "--db",
2636            env.db_path.to_str().unwrap(),
2637            "archive",
2638            "list",
2639        ])
2640        .unwrap();
2641        run(cli, &cfg).await.unwrap();
2642    }
2643
2644    #[tokio::test]
2645    async fn test_run_dispatch_agents_list_command() {
2646        let _g = no_config_env();
2647        let env = TestEnv::fresh();
2648        let cfg = AppConfig::default();
2649        let cli = Cli::try_parse_from([
2650            "ai-memory",
2651            "--db",
2652            env.db_path.to_str().unwrap(),
2653            "agents",
2654            "list",
2655        ])
2656        .unwrap();
2657        run(cli, &cfg).await.unwrap();
2658    }
2659
2660    #[tokio::test]
2661    async fn test_run_dispatch_pending_list_command() {
2662        let _g = no_config_env();
2663        let env = TestEnv::fresh();
2664        let cfg = AppConfig::default();
2665        let cli = Cli::try_parse_from([
2666            "ai-memory",
2667            "--db",
2668            env.db_path.to_str().unwrap(),
2669            "pending",
2670            "list",
2671        ])
2672        .unwrap();
2673        run(cli, &cfg).await.unwrap();
2674    }
2675
2676    #[tokio::test]
2677    async fn test_run_dispatch_completions_command() {
2678        let _g = no_config_env();
2679        let env = TestEnv::fresh();
2680        let cfg = AppConfig::default();
2681        let cli = Cli::try_parse_from([
2682            "ai-memory",
2683            "--db",
2684            env.db_path.to_str().unwrap(),
2685            "completions",
2686            "bash",
2687        ])
2688        .unwrap();
2689        run(cli, &cfg).await.unwrap();
2690    }
2691
2692    #[tokio::test]
2693    async fn test_run_dispatch_man_command() {
2694        let _g = no_config_env();
2695        let env = TestEnv::fresh();
2696        let cfg = AppConfig::default();
2697        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "man"])
2698            .unwrap();
2699        run(cli, &cfg).await.unwrap();
2700    }
2701
2702    #[tokio::test]
2703    async fn test_run_dispatch_gc_triggers_post_run_checkpoint() {
2704        // `Gc` is in is_write_command, so result.is_ok() && Some path
2705        // takes the post-run WAL checkpoint branch (lines 638-644).
2706        let _g = no_config_env();
2707        let env = TestEnv::fresh();
2708        let cfg = AppConfig::default();
2709        let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "gc"])
2710            .unwrap();
2711        run(cli, &cfg).await.unwrap();
2712    }
2713
2714    #[tokio::test]
2715    async fn test_run_dispatch_resolve_command() {
2716        // Seed two memories, then resolve one as superseding the other.
2717        let _g = no_config_env();
2718        let env = TestEnv::fresh();
2719        let id_a = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "old", "old fact");
2720        let id_b = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "new", "new fact");
2721        let cfg = AppConfig::default();
2722        let cli = Cli::try_parse_from([
2723            "ai-memory",
2724            "--db",
2725            env.db_path.to_str().unwrap(),
2726            "resolve",
2727            &id_a,
2728            &id_b,
2729        ])
2730        .unwrap();
2731        run(cli, &cfg).await.unwrap();
2732    }
2733
2734    #[tokio::test]
2735    async fn test_run_dispatch_get_command() {
2736        let _g = no_config_env();
2737        let env = TestEnv::fresh();
2738        let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2739        let cfg = AppConfig::default();
2740        let cli = Cli::try_parse_from([
2741            "ai-memory",
2742            "--db",
2743            env.db_path.to_str().unwrap(),
2744            "get",
2745            &id,
2746        ])
2747        .unwrap();
2748        run(cli, &cfg).await.unwrap();
2749    }
2750
2751    #[tokio::test]
2752    async fn test_run_dispatch_promote_triggers_write_checkpoint() {
2753        // `Promote` is in is_write_command — covers the post-run
2754        // checkpoint branch on a different command.
2755        let _g = no_config_env();
2756        let env = TestEnv::fresh();
2757        let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2758        let cfg = AppConfig::default();
2759        let cli = Cli::try_parse_from([
2760            "ai-memory",
2761            "--db",
2762            env.db_path.to_str().unwrap(),
2763            "promote",
2764            &id,
2765        ])
2766        .unwrap();
2767        run(cli, &cfg).await.unwrap();
2768    }
2769
2770    // ----- run() dispatch for bench (cmd_bench end-to-end) --------------
2771
2772    #[tokio::test]
2773    async fn test_run_dispatch_bench_smoke_runs_one_iteration() {
2774        // iterations=1, warmup=0 keeps the workload tiny. The bench
2775        // body builds an in-memory DB internally — no on-disk side
2776        // effects. Covers cmd_bench from top to bottom on the
2777        // human-readable, no-baseline, no-history path.
2778        let _g = no_config_env();
2779        let env = TestEnv::fresh();
2780        let cfg = AppConfig::default();
2781        let cli = Cli::try_parse_from([
2782            "ai-memory",
2783            "--db",
2784            env.db_path.to_str().unwrap(),
2785            "bench",
2786            "--iterations",
2787            "1",
2788            "--warmup",
2789            "0",
2790        ])
2791        .unwrap();
2792        // Bench may fail the budget on a paused-time iter=1 run; we
2793        // accept either Ok or Err here — coverage is the goal.
2794        let _ = run(cli, &cfg).await;
2795    }
2796
2797    #[tokio::test]
2798    async fn test_run_dispatch_bench_json_with_history() {
2799        // Covers --json branch + --history append branch of cmd_bench.
2800        let _g = no_config_env();
2801        let env = TestEnv::fresh();
2802        let history = env.db_path.with_file_name("hist.jsonl");
2803        let cfg = AppConfig::default();
2804        let cli = Cli::try_parse_from([
2805            "ai-memory",
2806            "--db",
2807            env.db_path.to_str().unwrap(),
2808            "bench",
2809            "--iterations",
2810            "1",
2811            "--warmup",
2812            "0",
2813            "--json",
2814            "--history",
2815            history.to_str().unwrap(),
2816        ])
2817        .unwrap();
2818        let _ = run(cli, &cfg).await;
2819        // History file should now exist with at least one line.
2820        if history.exists() {
2821            let content = std::fs::read_to_string(&history).unwrap();
2822            assert!(content.contains("captured_at") || !content.is_empty());
2823        }
2824    }
2825
2826    // ----- run() dispatch for migrate (sal feature) --------------------
2827
2828    #[cfg(feature = "sal")]
2829    #[tokio::test]
2830    async fn test_run_dispatch_migrate_sqlite_to_sqlite_dry_run() {
2831        // Covers cmd_migrate happy path + dry-run / human-output branch.
2832        let _g = no_config_env();
2833        let src_env = TestEnv::fresh();
2834        let dst_env = TestEnv::fresh();
2835        // Seed source so migrate has work to do.
2836        crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2837        let from = format!("sqlite://{}", src_env.db_path.display());
2838        let to = format!("sqlite://{}", dst_env.db_path.display());
2839        let cfg = AppConfig::default();
2840        let cli = Cli::try_parse_from([
2841            "ai-memory",
2842            "--db",
2843            src_env.db_path.to_str().unwrap(),
2844            "migrate",
2845            "--from",
2846            &from,
2847            "--to",
2848            &to,
2849            "--dry-run",
2850        ])
2851        .unwrap();
2852        run(cli, &cfg).await.unwrap();
2853    }
2854
2855    #[cfg(feature = "sal")]
2856    #[tokio::test]
2857    async fn test_run_dispatch_migrate_json_output() {
2858        // Covers cmd_migrate --json branch.
2859        let _g = no_config_env();
2860        let src_env = TestEnv::fresh();
2861        let dst_env = TestEnv::fresh();
2862        crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2863        let from = format!("sqlite://{}", src_env.db_path.display());
2864        let to = format!("sqlite://{}", dst_env.db_path.display());
2865        let cfg = AppConfig::default();
2866        let cli = Cli::try_parse_from([
2867            "ai-memory",
2868            "--db",
2869            src_env.db_path.to_str().unwrap(),
2870            "migrate",
2871            "--from",
2872            &from,
2873            "--to",
2874            &to,
2875            "--json",
2876        ])
2877        .unwrap();
2878        run(cli, &cfg).await.unwrap();
2879    }
2880
2881    // ----- run() with passphrase file (covers lines 372-374) ------------
2882
2883    #[tokio::test]
2884    async fn test_run_with_db_passphrase_file_exports_env() {
2885        // Covers the `--db-passphrase-file` branch in run() (lines
2886        // 371-375) which calls passphrase_from_file then sets
2887        // AI_MEMORY_DB_PASSPHRASE in the environment.
2888        let _g = env_var_lock();
2889        // SAFETY: serialized via env_var_lock.
2890        unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2891        let env = TestEnv::fresh();
2892        let pass_path = env.db_path.with_file_name("pass");
2893        std::fs::write(&pass_path, "test-passphrase\n").unwrap();
2894        let cfg = AppConfig::default();
2895        let cli = Cli::try_parse_from([
2896            "ai-memory",
2897            "--db",
2898            env.db_path.to_str().unwrap(),
2899            "--db-passphrase-file",
2900            pass_path.to_str().unwrap(),
2901            "stats",
2902        ])
2903        .unwrap();
2904        run(cli, &cfg).await.unwrap();
2905        // Env var is now set.
2906        assert_eq!(
2907            std::env::var("AI_MEMORY_DB_PASSPHRASE").unwrap(),
2908            "test-passphrase"
2909        );
2910        // SAFETY: serialized via env_var_lock.
2911        unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2912    }
2913
2914    // ----- init_tracing idempotence ------------------------------------
2915
2916    #[test]
2917    fn test_init_tracing_is_idempotent() {
2918        // Covers init_tracing — second call is a harmless no-op
2919        // (try_init returns Err which we ignore). Calling twice from
2920        // the same test exercises the second-call path on a process
2921        // that may or may not already have a global subscriber.
2922        init_tracing();
2923        init_tracing();
2924    }
2925
2926    // ----- serve_http_with_shutdown_future smoke -----------------------
2927    //
2928    // The non-TLS branch of `serve()` delegates here; cover the body
2929    // by binding to a free port, requesting /health, then shutting
2930    // down. This also covers the production code path that
2931    // `daemon_runtime::serve()` uses for the non-TLS case.
2932
2933    #[tokio::test]
2934    async fn test_serve_http_with_shutdown_future_serves_then_stops() {
2935        let env = TestEnv::fresh();
2936        let app_state = keyword_app_state(&env.db_path);
2937        let api_key_state = ApiKeyState { key: None };
2938        // Pick a free port via a transient bind.
2939        let port = {
2940            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2941            let p = l.local_addr().unwrap().port();
2942            drop(l);
2943            p
2944        };
2945        let addr = format!("127.0.0.1:{port}");
2946        let shutdown = Arc::new(Notify::new());
2947        let shutdown_clone = shutdown.clone();
2948        let handle = tokio::spawn(async move {
2949            serve_http_with_shutdown_future(&addr, api_key_state, app_state, async move {
2950                shutdown_clone.notified().await;
2951            })
2952            .await
2953        });
2954        // Give the server a moment to bind, then poke /health.
2955        for _ in 0..40 {
2956            if let Ok(client) = reqwest::Client::builder()
2957                .timeout(Duration::from_millis(200))
2958                .build()
2959                && client
2960                    .get(format!("http://127.0.0.1:{port}/api/v1/health"))
2961                    .send()
2962                    .await
2963                    .is_ok()
2964            {
2965                break;
2966            }
2967            tokio::time::sleep(Duration::from_millis(50)).await;
2968        }
2969        shutdown.notify_one();
2970        let res = handle.await.unwrap();
2971        assert!(res.is_ok(), "serve future returned: {res:?}");
2972    }
2973
2974    // ----- bind error surfacing ----------------------------------------
2975
2976    #[tokio::test]
2977    async fn test_serve_http_with_shutdown_future_bind_failure_errors() {
2978        // An unbindable address (port 1 on Linux/macOS without root)
2979        // should return an Err with the bind context. This covers the
2980        // `with_context` path on the TcpListener::bind line.
2981        let env = TestEnv::fresh();
2982        let app_state = keyword_app_state(&env.db_path);
2983        let api_key_state = ApiKeyState { key: None };
2984        // 0.0.0.0:0 succeeds; we want a guaranteed failure. Bind to
2985        // port 1 which requires privileged perms — except on macOS in
2986        // some configs that may succeed. Use a clearly invalid address
2987        // form instead to force a bind-time error.
2988        let res = serve_http_with_shutdown_future(
2989            "definitely-not-an-address:99999",
2990            api_key_state,
2991            app_state,
2992            async {},
2993        )
2994        .await;
2995        assert!(res.is_err(), "expected bind error, got: {res:?}");
2996    }
2997}