Skip to main content

ai_memory/handlers/
transport.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::{
5    Json,
6    extract::{FromRef, FromRequest, Request, State, rejection::JsonRejection},
7    http::StatusCode,
8    middleware::Next,
9    response::{IntoResponse, Response},
10};
11use serde::de::DeserializeOwned;
12use serde_json::json;
13use std::sync::Arc;
14use tokio::sync::{Mutex, RwLock};
15
16use crate::config::{ResolvedTtl, TierConfig};
17use crate::db;
18use crate::embeddings::{Embed, Embedder};
19use crate::hnsw::VectorIndex;
20use crate::profile::Family;
21
22pub type Db = Arc<Mutex<(rusqlite::Connection, std::path::PathBuf, ResolvedTtl, bool)>>;
23
24/// v0.7.0 PERF-1 (FX-3) — `spawn_blocking` helper for HTTP handler DB I/O.
25///
26/// Wraps a synchronous `rusqlite` operation in `tokio::task::spawn_blocking`
27/// so it runs on the blocking pool instead of pinning a tokio worker thread.
28/// Pre-fix every HTTP handler held the `tokio::sync::Mutex` AND executed
29/// synchronous `rusqlite` calls (FTS5 scans, multi-row UPDATEs on touch,
30/// trigger fires) on the tokio worker that picked up the request. With
31/// the default multi-threaded runtime (`#tokio = ncpu`), N concurrent
32/// recalls serialised completely on the single-connection mutex AND stole
33/// worker slots from non-DB tasks (federation receive, webhook dispatch,
34/// metrics scrape). p99 floor under N concurrent recalls was
35/// `N × wall_time(FTS+touch)` rather than `max(wall_time)`.
36///
37/// Helper contract:
38///
39/// - Takes a `Db` clone (the `Arc<Mutex<...>>` extractor handle) and an
40///   `FnOnce(&mut (Connection, PathBuf, ResolvedTtl, bool)) -> T` closure
41///   so callers can access every field the existing pattern reads
42///   (`lock.0` = Connection, `lock.1` = DB path, `lock.2` = `ResolvedTtl`,
43///   `lock.3` = SAL-enabled flag).
44/// - Uses `Mutex::blocking_lock` inside `spawn_blocking` — the
45///   `tokio::sync::Mutex` API explicitly supports this from a
46///   spawn_blocking worker; the worker is OFF the tokio runtime threads
47///   so no await-deadlock risk.
48/// - Returns `T` directly. Join errors from `spawn_blocking` (panic
49///   propagation, runtime shutdown) surface via `expect`; a panic
50///   inside the closure unwinds the blocking worker and the join error
51///   is logged before the handler aborts with a 500 — the caller's
52///   `Result<T, _>` is the right shape to surface domain errors. Join
53///   failures are runtime bugs, not request-shape failures.
54///
55/// The helper deliberately does NOT take `headers: HeaderMap` /
56/// `caller: &str` etc. — every closure already captures whatever extra
57/// context it needs by move. The helper is the narrow waist: lock +
58/// run + drop, no business logic.
59///
60/// Limit-of-applicability: closures that hold `await` points inside
61/// CANNOT use this helper (the `spawn_blocking` worker is a sync
62/// context). Handlers that interleave SQL with vector-index
63/// `Mutex::lock().await` or federation `broadcast_*().await` must
64/// either restructure to drop the DB lock first (the common case),
65/// or keep the legacy `.lock().await` pattern when the interleave is
66/// load-bearing (e.g. `recall` keeps the lock across `decorate_memory`
67/// re-queries). The recall + create hot paths carry follow-up
68/// trackers (the in-tree `#982` docstring at `src/handlers/recall.rs:485`
69/// already calls out the deeper restructure).
70///
71/// Type parameter `T` requires `Send + 'static` because the closure's
72/// return value crosses the spawn_blocking boundary back to the tokio
73/// runtime.
74pub async fn db_op<T, F>(db: Db, op: F) -> T
75where
76    T: Send + 'static,
77    F: FnOnce(&mut (rusqlite::Connection, std::path::PathBuf, ResolvedTtl, bool)) -> T
78        + Send
79        + 'static,
80{
81    tokio::task::spawn_blocking(move || {
82        let mut guard = db.blocking_lock();
83        op(&mut guard)
84    })
85    .await
86    .expect("PERF-1: db_op spawn_blocking worker panicked or runtime shut down")
87}
88
89/// v0.7.0 Wave-3 — declared storage backend for the daemon.
90///
91/// Surfaced through the `/capabilities` payload so operators and clients
92/// can detect whether the daemon is backed by the bundled SQLite path
93/// (the historical default) or by the SAL-routed Postgres adapter.
94///
95/// The variant resolves once at `serve()` startup from the
96/// `--store-url` flag (when set) or the `--db` path (when absent), and
97/// is stable across the process lifetime.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum StorageBackend {
100    /// Bundled SQLite — the production default. Every handler operates
101    /// on the `Db` connection directly and the SAL handle in `AppState`
102    /// wraps the same connection for parity tests + the v0.7.0 Wave-3
103    /// trait-routed code paths.
104    Sqlite,
105    /// Postgres — selected when `serve --store-url postgres://...` is
106    /// passed and the binary was built with `--features sal-postgres`.
107    /// Handlers that have been migrated to dispatch through the
108    /// [`crate::store::MemoryStore`] trait operate against the
109    /// `PostgresStore` adapter; handlers that have not yet migrated
110    /// surface `501 Not Implemented` with a clear `storage_backend`
111    /// hint so operators can plan the rollout.
112    Postgres,
113}
114
115impl StorageBackend {
116    /// Stable lowercase tag for log lines, the `/capabilities`
117    /// `storage_backend` field, and the `ai-memory doctor` report.
118    #[must_use]
119    pub fn as_str(self) -> &'static str {
120        match self {
121            Self::Sqlite => "sqlite",
122            Self::Postgres => "postgres",
123        }
124    }
125}
126
127/// Composite daemon state (issue #219/v0.7 prep).
128///
129/// Previously the Axum router held only `Db`. Closing the HTTP embedding gap
130/// (semantic recall silently missed HTTP-stored memories because the daemon
131/// never generated embeddings) requires the embedder and the in-memory HNSW
132/// index to be reachable from write handlers. We introduce `AppState` and
133/// use `FromRef` so every existing `State<Db>` handler keeps working
134/// unchanged — only the write paths opt into `State<AppState>` to pick up
135/// the embedder and vector index.
136#[derive(Clone)]
137pub struct AppState {
138    pub db: Db,
139    pub embedder: Arc<Option<Embedder>>,
140    pub vector_index: Arc<Mutex<Option<VectorIndex>>>,
141    /// v0.7 federation config — `Some` when `--quorum-writes N` +
142    /// `--quorum-peers` are configured at serve time. Writes fan out
143    /// to peers via `FederationConfig::broadcast_store_quorum` when
144    /// this is `Some`.
145    pub federation: Arc<Option<crate::federation::FederationConfig>>,
146    /// Resolved [`TierConfig`] for this daemon. Exposed so HTTP
147    /// endpoints that mirror MCP tools (notably `/capabilities`) can
148    /// reuse the MCP-side report builder without re-parsing config.
149    pub tier_config: Arc<TierConfig>,
150    /// v0.6.2 (S18): resolved recall scoring config — tier half-lives,
151    /// legacy-scoring toggle. Exposed so `recall_memories_get` /
152    /// `recall_memories_post` can call `db::recall_hybrid` (semantic
153    /// blend) when the embedder is loaded, mirroring how the MCP
154    /// `memory_recall` handler already wires it (crate::mcp::handle_recall).
155    /// Prior to this, HTTP recall was keyword-only regardless of
156    /// embedder availability — scenario-18 surfaced the gap.
157    pub scoring: Arc<crate::config::ResolvedScoring>,
158    /// v0.7.0 A5 — resolved tool [`Profile`] for this daemon. The
159    /// HTTP `/capabilities` endpoint needs it to compute the v3
160    /// `summary` / `to_describe_to_user` / `tools[].callable_now`
161    /// fields, which reflect the profile the running server actually
162    /// advertises in `tools/list`. Mirrors the MCP-dispatch threading
163    /// at `crate::mcp::handle_search`.
164    pub profile: Arc<crate::profile::Profile>,
165    /// v0.7.0 A5 — resolved [`McpConfig`] for this daemon. Carries
166    /// the optional `[mcp.allowlist]` table that v3's per-tool
167    /// `callable_now` and top-level `agent_permitted_families` honor.
168    /// `Arc<Option<...>>` rather than `Option<Arc<...>>` so cloning
169    /// the AppState stays cheap; absent allowlist (the v0.6.4 default)
170    /// shows up as `Arc<None>`.
171    pub mcp_config: Arc<Option<crate::config::McpConfig>>,
172    /// v0.7 Track H — H2 outbound link signing. The keypair loaded at
173    /// daemon startup (or `None` when the operator hasn't generated
174    /// one yet). When `Some`, every `db::create_link_signed` call from
175    /// HTTP handlers signs the link with this key and stamps
176    /// `attest_level = "self_signed"`; when `None`, links go in
177    /// unsigned, preserving v0.6.4 behaviour for unmigrated deployments.
178    /// H3 will reuse this handle for outbound writes that need to
179    /// carry the same signing identity.
180    pub active_keypair: Arc<Option<crate::identity::keypair::AgentKeypair>>,
181    /// v0.7.0 B3 — pre-computed embeddings for each [`Family`]
182    /// descriptor. Filled asynchronously after boot from
183    /// [`family_descriptors`] and reused by B2's
184    /// `memory_smart_load(intent)` to do a fast cosine match between
185    /// an intent string and the eight family descriptors.
186    ///
187    /// **CI fix (v0.7 B3-fix)**: held behind `RwLock<Option<…>>` and
188    /// filled by a detached `tokio::spawn` task launched from
189    /// `bootstrap_serve` rather than synchronously on the serve
190    /// startup path. The original synchronous precompute would block
191    /// HTTP `/health` past the integration suite's 5 s
192    /// `wait_for_health` budget on CI runners without a pre-warmed
193    /// `hf-hub` model cache. `None` means "not yet populated"; an
194    /// empty inner `Vec` means "embedder unavailable, will never be
195    /// populated"; either case makes `best_family_match` return
196    /// `None` and B2's smart loader degrades to its non-embedding
197    /// match path.
198    pub family_embeddings: Arc<RwLock<Option<Vec<(Family, Vec<f32>)>>>>,
199
200    // ----- v0.7.0 Wave-3 — adapter selection ------------------------
201    /// v0.7.0 Wave-3 — declared storage backend for this daemon.
202    ///
203    /// Resolved once from `--store-url` (or `--db` fallback) at
204    /// `serve()` startup; stable across the process lifetime.
205    /// Surfaced through `/api/v1/capabilities.storage_backend` and
206    /// consulted by trait-eligible handlers to decide whether to
207    /// dispatch through `app.store` or fall back to the legacy
208    /// `db::*` free-function code path.
209    pub storage_backend: StorageBackend,
210    /// v0.7.0 Wave-3 — polymorphic [`MemoryStore`] handle.
211    ///
212    /// Always populated. For [`StorageBackend::Sqlite`] it wraps a
213    /// `SqliteStore` opened against the same on-disk database as the
214    /// [`AppState::db`] connection (the two views see the same rows).
215    /// For [`StorageBackend::Postgres`] it wraps a `PostgresStore`
216    /// connected to the operator-supplied URL.
217    ///
218    /// Only available under `--features sal`. Standard builds keep
219    /// the legacy `db::*` free-function path verbatim.
220    ///
221    /// [`MemoryStore`]: crate::store::MemoryStore
222    #[cfg(feature = "sal")]
223    pub store: Arc<dyn crate::store::MemoryStore>,
224
225    // ----- v0.7.0 L5 — LLM client for autonomy hooks ----------------
226    /// v0.7.0 L5 — optional LLM client used by the HTTP `create_memory`
227    /// handler to fire the `auto_tag` autonomy hook on stores, matching
228    /// the behaviour the MCP `handle_store` path has provided since
229    /// v0.6.0.0 (`crate::mcp::handle_store` (auto-tag block)). `None` when the daemon's
230    /// configured [`FeatureTier`] does not request an LLM (keyword /
231    /// semantic) or when Ollama is unreachable at startup; in either
232    /// case the create_memory handler silently skips the hook so the
233    /// store still succeeds.
234    pub llm: Arc<Option<crate::llm::OllamaClient>>,
235
236    /// v0.7.0 L15 — dedicated model id for `auto_tag` (and other short
237    /// structured-output LLM calls). When `Some`, [`maybe_auto_tag`]
238    /// passes the value as `OllamaClient::auto_tag(.., Some(model))` so
239    /// the call hits a fast tag-friendly model (default config recommends
240    /// `gemma3:4b`, ~0.7s p50) instead of the reasoning-tier `llm_model`
241    /// (Gemma 4 thinking can take 15s to emit a 5-tag list). When `None`
242    /// the call falls back to the client's configured model. Wrapped in
243    /// `Arc<Option<...>>` so cloning the AppState stays cheap and the
244    /// absent case (the v0.7.0.0 default) is a cheap `Arc<None>`.
245    pub auto_tag_model: Arc<Option<String>>,
246
247    /// v0.7.0 H8 (round-2) — per-LLM-call wall-clock timeout. Wraps
248    /// every `tokio::task::spawn_blocking` invocation of an Ollama
249    /// call (`auto_tag`, `expand_query`, `summarize_memories`, ...)
250    /// in `tokio::time::timeout`. On timeout the handler logs at
251    /// `warn` and continues on the LLM-absent fallback path
252    /// (already exists per L5/L7). Resolved at boot from
253    /// `AppConfig::effective_llm_call_timeout_secs` (default 30s).
254    pub llm_call_timeout: std::time::Duration,
255
256    /// v0.7.0 H5 (round-2) — bounded in-memory LRU keyed on
257    /// `(link_id, signature, verification_nonce)`. Consulted by
258    /// [`verify_link_handler`] to reject exact-repeat verify
259    /// requests with 409 Conflict. See
260    /// [`crate::identity::replay::ReplayCache`] for the memory bound
261    /// (~512 KB at the 10 000-entry capacity) + threat model.
262    pub replay_cache: Arc<crate::identity::replay::ReplayCache>,
263
264    /// v0.7.0 H5 (round-2) — strict mode for the verify replay
265    /// guard. When `true`, every `POST /api/v1/links/verify` request
266    /// body MUST include a `verification_nonce` field; missing or
267    /// empty nonces produce 400 Bad Request. Default `false` keeps
268    /// the v0.6.x verify-anytime semantics and logs a deprecation
269    /// WARN on the missing-nonce path instead. Operators opt into
270    /// strict mode via `[verify] require_nonce = true` in
271    /// `config.toml`.
272    pub verify_require_nonce: bool,
273
274    /// v0.7.0 #922 — per-peer LRU keyed on `(peer_id, X-Memory-Nonce)`.
275    pub federation_nonce_cache: Arc<crate::identity::replay::FederationNonceCache>,
276
277    /// v0.7.0 (issue #519) — resolved `autonomous_hooks` flag (from
278    /// config.toml + `AI_MEMORY_AUTONOMOUS_HOOKS` env). Consulted by
279    /// the HTTP `create_memory` path's [`maybe_detect_conflicts`]
280    /// helper as the global default when a request omits the per-call
281    /// `detect_conflicts` override. `false` preserves the v0.6.x
282    /// post-hoc-only contradiction surface.
283    pub autonomous_hooks: bool,
284
285    /// v0.7.0 (issue #518) — resolved
286    /// `[agents.defaults.recall_scope]` block. `Some` carries the
287    /// session-default namespace / since / tier / limit filters
288    /// spliced into recall requests that pass `session_default=true`
289    /// and omit one or more filter fields. `None` (the default for
290    /// existing single-tenant deployments) preserves v0.6.x recall
291    /// semantics — every cross-session recall must spell its filters
292    /// out explicitly.
293    ///
294    /// Wrapped in `Arc<Option<...>>` so cloning the AppState stays
295    /// cheap and the absent case (every deployment that hasn't
296    /// opted in yet) is a single `Arc<None>`.
297    pub recall_scope: Arc<Option<crate::config::RecallScope>>,
298
299    /// v0.7.0 Policy-Engine Item 3 (2026-05-14) — deferred-audit
300    /// queue handle. Captures every `governance.refusal` event
301    /// from the storage `GOVERNANCE_PRE_WRITE` hook and submits it
302    /// to a background drainer task that chain-logs the refusal to
303    /// `signed_events` on a FRESH `Connection` (separate from the
304    /// substrate writer's connection — closes the re-entrant-deadlock
305    /// gap the old `_no_audit` variant traded the chain-log property
306    /// for).
307    ///
308    /// The queue is `Clone` (cheap `Arc` semantics over an mpsc
309    /// sender) so each callsite (storage hook closure, future MCP
310    /// `governance_state` tool, future Prometheus scrape) can hold
311    /// its own producer handle without contention.
312    ///
313    /// Always present on `bootstrap_serve` — the drainer is spawned
314    /// unconditionally before the storage hook installs. The
315    /// `Option<...>` shape lets tests inject `None` in scaffolds
316    /// that don't need the audit chain.
317    pub deferred_audit_queue: Arc<Option<crate::governance::deferred_audit::DeferredAuditQueue>>,
318
319    /// v0.7.0 SHIP cluster (#946 / #957 / #960 / #961, 2026-05-20) —
320    /// resolved `[admin].agent_ids` allowlist from `config.toml`. The
321    /// shared admin-role gate (see [`crate::handlers::admin_role`])
322    /// consults this list before any admin-class endpoint
323    /// (`/api/v1/export`, `/api/v1/agents`, `/api/v1/stats`, the
324    /// `/api/v1/quota/status` list path) honors the request.
325    ///
326    /// Default-empty closes those endpoints to all callers, matching
327    /// the `pm-v3` safe-by-default posture. Operators opt callers in
328    /// via `[admin] agent_ids = [...]` in `config.toml`.
329    ///
330    /// `Arc<Vec<String>>` rather than `Arc<HashSet<String>>` so the
331    /// shape stays cheap to clone (per the AppState contract) and the
332    /// list is short by design — admin-role allowlists are
333    /// operator-curated, typically <10 entries.
334    pub admin_agent_ids: Arc<Vec<String>>,
335
336    /// v0.7.0 #991 — per-instance enabled-rule cache. Owned by this
337    /// `AppState`; cloned by reference (`Arc<RuleCache>`) into the
338    /// substrate `GOVERNANCE_PRE_WRITE` storage hook closure and the
339    /// `wire_check::GOVERNANCE_PRE_ACTION` action hook closure so
340    /// every governance read on the hot write path (and every action
341    /// wire-point in the daemon) shares ONE cache for the lifetime of
342    /// this daemon. The cache is per-instance (not a process-wide
343    /// singleton) so multi-`AppState` test fixtures don't cross-pollute
344    /// — same isolation contract that the post-#990 revert restored
345    /// in the test suite. See `governance/rule_cache.rs` for the
346    /// design rationale + the cross-instance isolation regression
347    /// pinning.
348    pub rule_cache: Arc<crate::governance::rule_cache::RuleCache>,
349
350    /// v0.7.x (issue #1168) — operator-resolved LLM / embeddings /
351    /// reranker triple. Threaded into the HTTP `/api/v1/capabilities`
352    /// handler so the wire-reported `models.*` block mirrors the
353    /// running daemon's actual model wiring (matching the boot banner)
354    /// instead of the compiled tier preset. Built once at
355    /// `bootstrap_serve` via [`crate::config::AppConfig::resolve_models`]
356    /// and reused for every request — the resolver folds CLI / env /
357    /// `[llm]` / legacy / compiled-default precedence, so the resulting
358    /// triple is process-stable.
359    pub resolved_models: Arc<crate::config::ResolvedModels>,
360
361    /// v0.7.x (issue #1174 follow-up #1192 / #1196) — cross-surface
362    /// [`crate::runtime_context::RuntimeContext`] handle. Holds the
363    /// process-wide K7 HMAC override, I1 decompression cap, V-4 audit
364    /// chain state, session-recall tracker, and X25519 keypair cache
365    /// — i.e. every substrate static that the HTTP daemon, MCP stdio
366    /// binary, and CLI need to observe identically.
367    ///
368    /// Always populated. Cloned by reference (`Arc::clone`) so storing
369    /// it on `AppState` is cheap and the wire / chain / cache
370    /// semantics across surfaces stay byte-equivalent: every accessor
371    /// (`crate::config::active_hooks_hmac_secret`, `crate::audit::emit`,
372    /// `crate::reranker::global_session_recall_tracker`,
373    /// `crate::encryption::get_or_create_keypair`) delegates to the
374    /// same `RuntimeContext::global()` singleton.
375    pub runtime: Arc<crate::runtime_context::RuntimeContext>,
376
377    /// Operator-resolved per-request page-size / bulk-materialization cap
378    /// (the `[limits].max_page_size` knob, env `AI_MEMORY_MAX_PAGE_SIZE`).
379    /// Bounds how many rows a single list / search response page and a
380    /// single bulk-create / federation-sync request may materialize in
381    /// memory at once — it is NOT a rate limit. Resolved once at
382    /// `bootstrap_serve` from [`crate::config::AppConfig::resolve_limits`];
383    /// falls back to the compiled [`MAX_BULK_SIZE`] default when unset.
384    /// Operators with genuinely large per-request payloads raise this
385    /// knob, but the correct tool for large datasets is pagination
386    /// (`offset` / `since`), not an unbounded page size — a single
387    /// unbounded request would materialize the whole result set in RAM.
388    pub max_page_size: usize,
389}
390
391/// v0.7.0 B3 — canonical 1-2 sentence English descriptors for each
392/// [`Family`]. Used at boot to pre-compute embeddings that B2's
393/// `memory_smart_load(intent)` cosine-matches against an intent
394/// string. Order tracks [`Family::all()`] (declaration order) so the
395/// returned slice is stable across releases. Wording is chosen to
396/// reflect the *user-facing* purpose of each family, not its tool
397/// names — the embedder needs natural-language signal, not enum
398/// labels, for the cosine match to be meaningful.
399#[must_use]
400pub fn family_descriptors() -> &'static [(Family, &'static str)] {
401    &[
402        (
403            Family::Core,
404            "Store, recall, list, get, and search memories. The basic \
405             read and write operations for saving facts and looking \
406             them up later.",
407        ),
408        (
409            Family::Lifecycle,
410            "Update, delete, forget, garbage-collect, and promote \
411             memories. Operations that change a memory's state, tier, \
412             or visibility over time.",
413        ),
414        (
415            Family::Graph,
416            "Knowledge-graph queries, timelines, links between \
417             memories, entity registration, taxonomy lookup, and \
418             replay or verification of stored relationships.",
419        ),
420        (
421            Family::Governance,
422            "Approval workflows, namespace standards, and \
423             subscriptions. Operations that gate or shape what other \
424             agents are allowed to write or see.",
425        ),
426        (
427            Family::Power,
428            "Advanced reasoning helpers: consolidate duplicates, \
429             detect contradictions, check for duplicates, auto-tag, \
430             expand a query, and inspect the inbox.",
431        ),
432        (
433            Family::Meta,
434            "Server capabilities, agent registration and listing, \
435             session bootstrap, and aggregate stats. Operations that \
436             describe the memory system itself rather than its \
437             contents.",
438        ),
439        (
440            Family::Archive,
441            "List, restore, purge, and report stats on archived \
442             memories. The cold-storage tier where forgotten or aged-out \
443             memories live until they are pruned.",
444        ),
445        (
446            Family::Other,
447            "Subscription listing and out-of-band notifications. \
448             Auxiliary operations that don't fit the other families.",
449        ),
450    ]
451}
452
453impl AppState {
454    /// v0.7.0 B3 — pre-compute the family-descriptor embedding cache.
455    /// Iterates the eight descriptors from [`family_descriptors`] and
456    /// runs each through the embedder once. Returns an empty vector
457    /// when the embedder is `None` (keyword-only deployments) or when
458    /// any single descriptor fails to embed — the latter is logged at
459    /// `warn` and the cache is still returned empty so boot stays
460    /// fault-tolerant. The returned vector is intended to be wrapped
461    /// in `Arc::new(...)` and stored in [`AppState::family_embeddings`].
462    #[must_use]
463    pub fn precompute_family_embeddings(embedder: Option<&dyn Embed>) -> Vec<(Family, Vec<f32>)> {
464        let Some(embedder) = embedder else {
465            return Vec::new();
466        };
467        let descriptors = family_descriptors();
468        let mut out: Vec<(Family, Vec<f32>)> = Vec::with_capacity(descriptors.len());
469        for (family, descriptor) in descriptors {
470            match embedder.embed(descriptor) {
471                Ok(v) => out.push((*family, v)),
472                Err(e) => {
473                    tracing::warn!(
474                        family = family.name(),
475                        error = %e,
476                        "B3: failed to embed family descriptor; \
477                         family_embeddings will be empty",
478                    );
479                    return Vec::new();
480                }
481            }
482        }
483        out
484    }
485
486    /// v0.7.0 B3 — embed `intent` and return the family-descriptor
487    /// with the highest cosine similarity, paired with its score.
488    /// Returns `None` if the cache is not yet populated (the
489    /// asynchronous precompute task has not finished, or the
490    /// embedder is unavailable so the cache will never populate) or
491    /// if the embedder is unavailable now. This is the entry point
492    /// B2's `memory_smart_load(intent)` uses to pick which family to
493    /// load.
494    ///
495    /// Uses `try_read()` so a slow concurrent writer (the boot-time
496    /// precompute task still finalising its write) cannot block the
497    /// caller — on contention we degrade to `None` and the smart
498    /// loader's non-embedding fallback path takes over.
499    #[must_use]
500    pub fn best_family_match(&self, intent: &str) -> Option<(Family, f32)> {
501        let guard = self.family_embeddings.try_read().ok()?;
502        let cache = guard.as_ref()?;
503        if cache.is_empty() {
504            return None;
505        }
506        let embedder = self.embedder.as_ref().as_ref()?;
507        let intent_vec = embedder.embed_query(intent).ok()?;
508        let mut best: Option<(Family, f32)> = None;
509        for (family, descriptor_vec) in cache.iter() {
510            let score = Embedder::cosine_similarity(&intent_vec, descriptor_vec);
511            match best {
512                Some((_, prev)) if prev >= score => {}
513                _ => best = Some((*family, score)),
514            }
515        }
516        best
517    }
518}
519
520impl FromRef<AppState> for Db {
521    fn from_ref(app: &AppState) -> Self {
522        app.db.clone()
523    }
524}
525
526/// Compiled-default per-request page / bulk-materialization cap.
527///
528/// This is the fallback value for the operator-tunable
529/// [`AppState::max_page_size`] knob (`[limits].max_page_size` /
530/// `AI_MEMORY_MAX_PAGE_SIZE`). It bounds how many rows a single
531/// list / search page and a single bulk-create / federation-sync
532/// request may materialize in memory at once — it is NOT a rate
533/// limit. Exposed `pub` so integration-test `AppState` scaffolds
534/// can seed `max_page_size` from the same named constant instead of
535/// a magic literal.
536pub const MAX_BULK_SIZE: usize = 1000;
537
538// ---------------------------------------------------------------------------
539// v0.7.0 Round-2 F9 — JSON body extractor that returns 400 (not axum's
540// default 422) for missing/malformed fields, with a sanitized response
541// envelope `{ "error": "...", "fields": ["..."] }` so callers can switch
542// on the field name without parsing a free-form serde message.
543// ---------------------------------------------------------------------------
544
545/// Wrapping extractor that delegates to `axum::Json<T>` but rewrites
546/// every rejection to `400 Bad Request` with a structured body shaped
547/// like the rest of the daemon's error envelopes
548/// (`{"error": ..., "fields": [...]}`).
549///
550/// Applied to the HTTP store path so a body missing `content` (or any
551/// other required field) returns 400 + a field-name hint instead of
552/// axum's default 422 Unprocessable Entity. The 422 default leaks the
553/// raw serde error string ("Failed to deserialize the JSON body...
554/// missing field `content` at line 1 column 14"), which forces clients
555/// into substring matching on a non-stable diagnostic message; the
556/// `fields` array is the structured replacement.
557pub struct JsonOrBadRequest<T>(pub T);
558
559impl<S, T> FromRequest<S> for JsonOrBadRequest<T>
560where
561    S: Send + Sync,
562    T: DeserializeOwned,
563    Json<T>: FromRequest<S, Rejection = JsonRejection>,
564{
565    type Rejection = Response;
566
567    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
568        match Json::<T>::from_request(req, state).await {
569            Ok(Json(value)) => Ok(Self(value)),
570            Err(rej) => Err(json_rejection_to_400(&rej)),
571        }
572    }
573}
574
575/// Convert an axum `JsonRejection` into a `400 Bad Request` response
576/// with the daemon's standard `{"error": ..., "fields": [...]}` shape.
577/// The `fields` array best-effort-extracts missing field names from
578/// the underlying serde error message; on parse failure it is left
579/// empty so callers can still rely on the envelope shape.
580fn json_rejection_to_400(rej: &JsonRejection) -> Response {
581    let raw_msg = rej.body_text();
582    // serde_json's "missing field" diagnostic: `missing field \`<name>\``.
583    // We extract the backtick-quoted identifier and surface it both as
584    // a sanitized human message and as the structured `fields` array.
585    let fields = extract_missing_fields(&raw_msg);
586    let error_msg = if let Some(first) = fields.first() {
587        format!("missing required field: {first}")
588    } else {
589        // Generic malformed-body fallback (syntax error, type error,
590        // etc.). Sanitized to avoid leaking the raw serde diagnostic
591        // (which can include positional info from the request body).
592        match rej {
593            JsonRejection::JsonSyntaxError(_) => "malformed JSON body".to_string(),
594            JsonRejection::MissingJsonContentType(_) => {
595                "expected Content-Type: application/json".to_string()
596            }
597            _ => "invalid request body".to_string(),
598        }
599    };
600    (
601        StatusCode::BAD_REQUEST,
602        Json(json!({
603            "error": error_msg,
604            "fields": fields,
605        })),
606    )
607        .into_response()
608}
609
610/// Best-effort scan of a serde-error message for `missing field
611/// \`<name>\`` occurrences. Returns the de-duplicated list of field
612/// names in order of appearance. When no match is found (e.g. a type
613/// error or syntax error) the returned vector is empty so the caller
614/// falls back to the generic "invalid request body" message.
615fn extract_missing_fields(msg: &str) -> Vec<String> {
616    // #1022 (LOW, 2026-05-21): cap the result vector at 16 entries.
617    // Pre-#1022 a pathologically long body returning a serde error
618    // containing N `missing field` patterns yielded an O(N)
619    // Vec<String>. Serde's own diagnostics are short in practice so
620    // the cap is belt-and-suspenders against future serde upgrades
621    // that might change diagnostic shape OR a hostile actor crafting
622    // a body that produces many missing-field reports. 16 entries is
623    // already more than any caller needs in a 400-Bad-Request
624    // envelope.
625    const MAX_MISSING_FIELDS: usize = 16;
626    let needle = "missing field `";
627    let mut out: Vec<String> = Vec::new();
628    let mut rest = msg;
629    while let Some(idx) = rest.find(needle) {
630        if out.len() >= MAX_MISSING_FIELDS {
631            break;
632        }
633        let after = &rest[idx + needle.len()..];
634        if let Some(end) = after.find('`') {
635            let name = &after[..end];
636            // Light validation — reject anything that doesn't look like
637            // a serde field identifier so a hostile body cannot smuggle
638            // arbitrary content into the response envelope.
639            if !name.is_empty()
640                && name
641                    .chars()
642                    .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
643                && !out.iter().any(|existing| existing == name)
644            {
645                out.push(name.to_string());
646            }
647            rest = &after[end + 1..];
648        } else {
649            break;
650        }
651    }
652    out
653}
654
655// ---------------------------------------------------------------------------
656// v0.7.0 Round-2 F10 — embed-status surface for the HTTP store path.
657//
658// When the embedder times out / refuses oversized content / otherwise
659// fails to produce a vector, the row still commits (correct — embeddings
660// are an enhancement layer, not a write-path gate) but the HTTP response
661// must surface that fact so the caller can tell semantic recall will
662// silently miss this memory until a re-index. Prior to F10 the daemon
663// returned 201 with no signal whatsoever.
664//
665// The canonical [`crate::embeddings::EmbedStatus`] enum + the
666// [`crate::embeddings::Embedder::embed_with_status`] producer were
667// landed by Fix-Agent α (Round-2 F6); the HTTP wiring below is the
668// F10 consumer side that turns the producer's signal into a response
669// field on non-`Indexed` outcomes.
670// ---------------------------------------------------------------------------
671
672/// v0.6.2 (S40): maximum number of per-row `broadcast_store_quorum` fanouts
673/// in flight at once during `bulk_create`. Replaces the prior sequential
674/// for-loop (which paid 100ms × N rows of wall time and blew past the
675/// testbook's 20s settle on N=500) with bounded concurrency. The bound
676/// balances speedup against peer-side `SQLite` Mutex contention and the
677/// leader-side reqwest connection-pool / ephemeral-port envelope. See the
678/// comment above the loop in `bulk_create` for the full rationale.
679pub(crate) const BULK_FANOUT_CONCURRENCY: usize = 8;
680
681/// Shared state for API key authentication middleware.
682///
683/// v0.7.0 fold-A2A1.4 (#702) — `mtls_enforced` carries whether the
684/// listener this state is mounted on enforces mTLS at the rustls layer
685/// (i.e. `--tls-cert + --tls-key + --mtls-allowlist`). When true, the
686/// federation endpoints (`/api/v1/sync/*`) are allowed without an
687/// `x-api-key` header because the rustls server has already verified
688/// the client cert against the operator-pinned allowlist — adding an
689/// api-key check on top would force every peer to also carry the
690/// shared api-key secret, which is exactly the auth-matrix gap
691/// procurement deployments hit (a peer with valid mTLS but no
692/// `x-api-key` got 401 and quorum never converged across hosts).
693/// Non-federation paths still demand the api-key when configured.
694#[derive(Clone, Default)]
695pub struct ApiKeyState {
696    pub key: Option<String>,
697    pub mtls_enforced: bool,
698}
699
700/// Constant-time byte-slice equality. Doesn't short-circuit on the
701/// Percent-decode a URL-encoded query value in place. Invalid `%XX`
702/// escapes are passed through verbatim (lossy). Ultrareview #337.
703#[inline]
704pub(crate) fn percent_decode_lossy(input: &str) -> String {
705    let bytes = input.as_bytes();
706    let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
707    let mut i = 0;
708    while i < bytes.len() {
709        if bytes[i] == b'%' && i + 2 < bytes.len() {
710            let h = (bytes[i + 1] as char).to_digit(16);
711            let l = (bytes[i + 2] as char).to_digit(16);
712            if let (Some(h), Some(l)) = (h, l) {
713                // h and l are single hex digits (0..=15), so h*16 + l
714                // is always in 0..=255. Cast is lossless.
715                out.push(u8::try_from(h * 16 + l).unwrap_or(0));
716                i += 3;
717                continue;
718            }
719        }
720        out.push(bytes[i]);
721        i += 1;
722    }
723    String::from_utf8_lossy(&out).into_owned()
724}
725
726/// first mismatched byte, preventing timing-oracle leaks of secret
727/// material. Used for API-key comparison (#301 hardening item 3).
728///
729/// v0.7.0 #1060 (Agent-2 #7) — the length-mismatch early-return at
730/// the top of this function leaks `len(a) == len(b)` via timing,
731/// which an attacker timing many requests with varying-length
732/// `X-API-Key` headers can use to learn the configured key's exact
733/// byte length, reducing the brute-force search space.
734///
735/// We close the leak by running the constant-time compare over
736/// `max(a.len(), b.len())` bytes regardless of length match.
737/// The shorter side is XORed against zero (effectively
738/// `b[i] ^ 0 != 0` whenever `b[i] != 0`), and a separate
739/// `len_mismatch` flag is OR'd into the diff accumulator so the
740/// final `diff == 0` test fires only when both the lengths match
741/// AND every byte matches. The runtime is dominated by the longer
742/// of the two slices, so an attacker can't distinguish "length
743/// mismatch" from "byte mismatch on the same length" via timing.
744#[inline]
745pub(crate) fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
746    let len_a = a.len();
747    let len_b = b.len();
748    let max_len = len_a.max(len_b);
749    let mut diff: u8 = 0;
750    // OR a length-mismatch flag into the diff so a final-byte XOR
751    // can't accidentally produce diff=0 when the lengths differ.
752    // Cast is safe: `(len_a ^ len_b) != 0` collapses to a bool.
753    diff |= u8::from(len_a != len_b);
754    for i in 0..max_len {
755        let x = a.get(i).copied().unwrap_or(0);
756        let y = b.get(i).copied().unwrap_or(0);
757        diff |= x ^ y;
758    }
759    diff == 0
760}
761
762/// Middleware: reject requests with 401 if `api_key` is configured and request
763/// doesn't provide a matching `X-API-Key` header or `?api_key=` query param.
764/// The `/api/v1/health` endpoint is exempt.
765pub async fn api_key_auth(
766    State(auth): State<ApiKeyState>,
767    req: Request,
768    next: Next,
769) -> impl IntoResponse {
770    let Some(ref expected) = auth.key else {
771        // No API key configured — allow all requests
772        return next.run(req).await.into_response();
773    };
774
775    // Exempt health endpoint
776    if req.uri().path() == super::routes::HEALTH {
777        return next.run(req).await.into_response();
778    }
779
780    // v0.7.0 fold-A2A1.4 (#702) — mTLS bypass for federation endpoints.
781    //
782    // The federation peer mesh authenticates via mTLS cert-fingerprint
783    // pinning (see `tls::FingerprintAllowlistVerifier` — rustls rejects
784    // any TLS connect whose client cert isn't on the operator's
785    // allowlist). When that's enforced, a request reaching this
786    // middleware has already cleared a stronger authentication step
787    // than `x-api-key`. Demanding the api-key on top forces every peer
788    // to ALSO carry the shared secret, which causes the cross-host
789    // quorum gap procurement-grade deployments hit (the peer's
790    // outbound forgets the header → 401 → quorum_not_met). The
791    // bypass is scoped to `/api/v1/sync/*` so non-federation surfaces
792    // still require the api-key when configured (defense in depth).
793    //
794    // v0.7.0 #1040 (Agent-5 #7) — the bypass is signature-gated
795    // downstream:
796    //
797    //   - `/api/v1/sync/push` requires `X-Memory-Sig` over the body
798    //     under `AI_MEMORY_FED_REQUIRE_SIG=1` (#791 default).
799    //   - `/api/v1/sync/since` requires `X-Memory-Sig` over canonical
800    //     GET bytes (`method || path || query`) under the same env
801    //     gate (#1031, v0.7.0).
802    //
803    // So with the v0.7.0 secure defaults (`AI_MEMORY_FED_REQUIRE_SIG=1`),
804    // an mTLS peer cannot spoof `X-Peer-Id` because the signed-message
805    // gate downstream verifies the sig against the claimed peer-id's
806    // enrolled key — the claim is bound to a cryptographic identity
807    // separate from the cert fingerprint. Operators running with
808    // `AI_MEMORY_FED_REQUIRE_SIG=0` (legacy peer-rollout posture) lose
809    // this defense and trust X-Peer-Id verbatim; that mode is
810    // explicitly documented as UNSAFE in the CLAUDE.md env-var table.
811    //
812    // A deeper hardening — extract peer-id from the client cert's
813    // Subject CN / SAN and cross-check against X-Peer-Id — requires
814    // axum to expose the peer certificate via request extensions; a
815    // focused follow-up tracks that v0.8 surface change. For v0.7.0
816    // the #1031 signed-GET gate + the env-default-secure posture
817    // close the acute exploitability surface.
818    let path = req.uri().path();
819    if auth.mtls_enforced && path.starts_with("/api/v1/sync/") {
820        return next.run(req).await.into_response();
821    }
822
823    // Check X-API-Key header
824    if let Some(header_val) = req.headers().get(crate::HEADER_API_KEY)
825        && let Ok(val) = header_val.to_str()
826        && constant_time_eq(val.as_bytes(), expected.as_bytes())
827    {
828        return next.run(req).await.into_response();
829    }
830
831    // Check ?api_key= query param (ultrareview #337: URL-decode
832    // before comparison. A key with reserved chars like `+`, `%`,
833    // `&` must be percent-encoded by the caller per RFC 3986; the
834    // previous raw-compare path silently mismatched those keys and
835    // opened an encoded-bypass surface where a key containing `%2B`
836    // would compare against `%2B` rather than `+`, producing a
837    // different trust decision depending on caller quoting.)
838    if let Some(query) = req.uri().query() {
839        for pair in query.split('&') {
840            if let Some(val) = pair.strip_prefix("api_key=") {
841                let decoded = percent_decode_lossy(val);
842                if constant_time_eq(decoded.as_bytes(), expected.as_bytes()) {
843                    // v0.7.0 de-silencing: a credential in the URL query
844                    // string leaks into access logs, the Referer header,
845                    // and proxy logs. Accept it (the v0.7.0 back-compat
846                    // contract) but emit a once-per-process
847                    // operator-visible warn naming the header
848                    // alternative + the deprecation intent (#1574).
849                    static QUERY_KEY_WARN_ONCE: std::sync::Once = std::sync::Once::new();
850                    QUERY_KEY_WARN_ONCE.call_once(|| {
851                        tracing::warn!(
852                            target: "http::auth",
853                            "a request authenticated via the `?api_key=` query \
854                             parameter; URL-embedded credentials leak into access \
855                             logs, Referer headers, and proxy logs. Migrate callers \
856                             to the `x-api-key` request header — the `?api_key=` \
857                             query form is DEPRECATED and will be removed in a \
858                             future release (still accepted for the v0.7.0 \
859                             back-compat contract)."
860                        );
861                    });
862                    return next.run(req).await.into_response();
863                }
864            }
865        }
866    }
867
868    (
869        StatusCode::UNAUTHORIZED,
870        Json(json!({"error": "missing or invalid API key"})),
871    )
872        .into_response()
873}
874
875pub async fn health(State(app): State<AppState>) -> impl IntoResponse {
876    // v0.7.0 ARCH-2 followup (FX-C2-batch3) — Postgres-backed daemons
877    // ride the new `MemoryStore::health_check` trait method which is
878    // natively async (sqlx round-trip), so we can skip the blocking
879    // pool for that path entirely. SQLite-backed daemons stay on the
880    // `db_op` blocking-pool route per PERF-1 (FX-3); /health is the
881    // most-frequently scraped endpoint and pinning a tokio worker on
882    // a sync sqlite PRAGMA query would starve the runtime under
883    // concurrent scrape load.
884    #[cfg(feature = "sal-postgres")]
885    let ok = if matches!(app.storage_backend, StorageBackend::Postgres) {
886        app.store.health_check().await.unwrap_or(false)
887    } else {
888        db_op(app.db.clone(), |guard| {
889            db::health_check(&guard.0).unwrap_or(false)
890        })
891        .await
892    };
893    #[cfg(not(feature = "sal-postgres"))]
894    let ok = db_op(app.db.clone(), |guard| {
895        db::health_check(&guard.0).unwrap_or(false)
896    })
897    .await;
898    let embedder_ready = app.embedder.as_ref().is_some();
899    let federation_enabled = app.federation.as_ref().is_some();
900    let code = if ok {
901        StatusCode::OK
902    } else {
903        StatusCode::SERVICE_UNAVAILABLE
904    };
905    // v0.6.2 (#327): expose embedder status so operators can tell from
906    // /health alone whether semantic recall is wired up on this node.
907    (
908        code,
909        Json(json!({
910            "status": if ok { "ok" } else { "error" },
911            "service": "ai-memory",
912            "version": crate::PKG_VERSION,
913            "embedder_ready": embedder_ready,
914            "federation_enabled": federation_enabled,
915        })),
916    )
917        .into_response()
918}
919
920/// v0.6.0.0 — Prometheus scrape endpoint. Refreshes gauge samples
921/// (`ai_memory_memories`) against the current DB before rendering so
922/// scrapers see up-to-date counts without needing a background refresh
923/// task.
924pub async fn prometheus_metrics(State(state): State<Db>) -> impl IntoResponse {
925    // PERF-1 (FX-3): route the rusqlite stats query through `db_op`.
926    // The stats query touches `memories` + `archived_memories` for COUNTs
927    // and can take 10-50ms on a populated DB; scrape cadence is every
928    // 10-30s, so without spawn_blocking this would periodically pin a
929    // tokio worker mid-scrape.
930    db_op(state, |guard| {
931        if let Ok(stats) = db::stats(&guard.0, &guard.1) {
932            crate::metrics::registry()
933                .memories_gauge
934                .set(stats.total.try_into().unwrap_or(i64::MAX));
935        }
936    })
937    .await;
938    let body = crate::metrics::render();
939    (
940        StatusCode::OK,
941        [(
942            axum::http::header::CONTENT_TYPE,
943            "text/plain; version=0.0.4; charset=utf-8",
944        )],
945        body,
946    )
947        .into_response()
948}
949#[cfg(test)]
950mod transport_helpers_tests {
951    use super::*;
952
953    #[test]
954    fn percent_decode_handles_typical_keys() {
955        assert_eq!(percent_decode_lossy("abc"), "abc");
956        assert_eq!(percent_decode_lossy("a%2Bb"), "a+b");
957        assert_eq!(percent_decode_lossy("hello%20world"), "hello world");
958        assert_eq!(percent_decode_lossy("%2F%3D%3F"), "/=?");
959    }
960
961    #[test]
962    fn percent_decode_passes_through_invalid_escapes() {
963        // Invalid hex digits => pass through verbatim.
964        assert_eq!(percent_decode_lossy("a%ZZb"), "a%ZZb");
965        // Truncated escape at end => verbatim.
966        assert_eq!(percent_decode_lossy("a%2"), "a%2");
967    }
968
969    #[test]
970    fn constant_time_eq_handles_equal_and_diff_inputs() {
971        assert!(constant_time_eq(b"abc", b"abc"));
972        assert!(!constant_time_eq(b"abc", b"abd"));
973        assert!(!constant_time_eq(b"abc", b"abcd"));
974        assert!(constant_time_eq(b"", b""));
975    }
976
977    #[test]
978    fn constant_time_eq_no_length_short_circuit_1060() {
979        // v0.7.0 #1060 (Agent-2 #7) — pin the post-fix invariant:
980        // length-mismatch comparison must NOT short-circuit on len
981        // alone. Pre-#1060 the function returned `false` immediately
982        // when `a.len() != b.len()`, leaking the configured key's
983        // exact byte length via timing. Post-#1060 the compare runs
984        // over `max(a.len(), b.len())` bytes regardless, and the
985        // length mismatch is OR'd into the diff accumulator.
986        //
987        // We pin the algorithmic shape by asserting the structural
988        // properties:
989        //
990        // - `("abc", "abcd")` and `("abcd", "abc")` both return false
991        //   (length mismatch detected).
992        // - `("abc", "abc")` returns true (no diff).
993        // - Empty vs empty returns true.
994        // - Empty vs non-empty returns false (len mismatch).
995        // - Differing length AND differing bytes returns false.
996        assert!(!constant_time_eq(b"abc", b"abcd"));
997        assert!(!constant_time_eq(b"abcd", b"abc"));
998        assert!(constant_time_eq(b"abc", b"abc"));
999        assert!(constant_time_eq(b"", b""));
1000        assert!(!constant_time_eq(b"", b"x"));
1001        assert!(!constant_time_eq(b"xxxx", b"yy"));
1002        // Edge case: same byte sequence ends in same byte but
1003        // shorter slice — must still detect the mismatch via the
1004        // zero-fill XOR.
1005        assert!(!constant_time_eq(b"aa", b"aaaa"));
1006    }
1007
1008    #[test]
1009    fn storage_backend_as_str_round_trip() {
1010        assert_eq!(StorageBackend::Sqlite.as_str(), "sqlite");
1011        assert_eq!(StorageBackend::Postgres.as_str(), "postgres");
1012    }
1013
1014    #[test]
1015    fn family_descriptors_returns_eight_entries() {
1016        // Order must match Family::all() declaration order — see the
1017        // upstream `family_descriptors` doc comment.
1018        let d = family_descriptors();
1019        assert_eq!(d.len(), 8, "expected 8 family descriptors, got {}", d.len());
1020        // Every descriptor is a non-empty English sentence.
1021        for (family, text) in d {
1022            assert!(!text.is_empty(), "descriptor for {family:?} is empty");
1023            assert!(
1024                text.len() > 20,
1025                "descriptor for {family:?} too short: {text}"
1026            );
1027        }
1028    }
1029
1030    #[test]
1031    fn precompute_family_embeddings_no_embedder_returns_empty() {
1032        // The fast path of `precompute_family_embeddings`: when the
1033        // embedder is `None` (keyword tier or load failure) the
1034        // function returns an empty vector and never touches the
1035        // descriptor list. Pin the contract here so a future refactor
1036        // that swaps the early return for a panic catches the test.
1037        let out = AppState::precompute_family_embeddings(None);
1038        assert!(out.is_empty());
1039    }
1040
1041    #[test]
1042    fn extract_missing_fields_finds_single_field() {
1043        let msg =
1044            "Failed to deserialize the JSON body: missing field `content` at line 1 column 14";
1045        let fields = extract_missing_fields(msg);
1046        assert_eq!(fields, vec!["content".to_string()]);
1047    }
1048
1049    #[test]
1050    fn extract_missing_fields_finds_multiple_fields() {
1051        let msg = "missing field `title` and missing field `content`";
1052        let fields = extract_missing_fields(msg);
1053        assert_eq!(fields, vec!["title".to_string(), "content".to_string()]);
1054    }
1055
1056    #[test]
1057    fn extract_missing_fields_dedups_repeats() {
1058        let msg = "missing field `name` ... missing field `name` again";
1059        let fields = extract_missing_fields(msg);
1060        assert_eq!(fields, vec!["name".to_string()]);
1061    }
1062
1063    #[test]
1064    fn extract_missing_fields_returns_empty_for_clean_message() {
1065        assert!(extract_missing_fields("no missing fields here").is_empty());
1066    }
1067
1068    #[test]
1069    fn extract_missing_fields_rejects_non_identifier_content() {
1070        // The function light-validates so a hostile body cannot smuggle
1071        // arbitrary content into the response envelope.
1072        let msg = "missing field `<script>` injection attempt";
1073        let fields = extract_missing_fields(msg);
1074        // The `<script>` payload contains `<` and `>` which are not
1075        // ascii_alphanumeric / _ / - so the field is dropped.
1076        assert!(fields.is_empty(), "non-ident content must be rejected");
1077    }
1078
1079    #[test]
1080    fn extract_missing_fields_accepts_underscores_and_dashes() {
1081        let msg = "missing field `agent_id-x` here";
1082        let fields = extract_missing_fields(msg);
1083        assert_eq!(fields, vec!["agent_id-x".to_string()]);
1084    }
1085
1086    #[test]
1087    fn extract_missing_fields_handles_unterminated_backtick() {
1088        // No trailing backtick → break the loop without panicking.
1089        let msg = "missing field `unterminated";
1090        let fields = extract_missing_fields(msg);
1091        assert!(fields.is_empty());
1092    }
1093}