Skip to main content

ai_memory/handlers/
create.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP `POST /api/v1/memories` create-path: six-stage orchestrator,
5//! per-stage helpers, postgres branch, and inline stage-helper tests.
6//!
7//! Extracted from [`super::http`] under issue #650 (handler cap ≤1200
8//! LOC). Handler bodies are unchanged; only the module surface moved.
9//! Wire compatibility preserved via `pub use create::*` in [`super`].
10
11#![allow(clippy::too_many_lines)]
12
13use crate::models::field_names;
14use axum::{
15    Json,
16    extract::State,
17    http::{HeaderMap, StatusCode},
18    response::IntoResponse,
19};
20use chrono::{Duration, Utc};
21use serde_json::json;
22use uuid::Uuid;
23
24use crate::db;
25use crate::embeddings::EmbedStatus;
26#[cfg(test)]
27use crate::models::Tier;
28use crate::models::{CreateMemory, Memory};
29use crate::validate;
30
31#[cfg(feature = "sal")]
32use super::StorageBackend;
33use super::maybe_auto_tag;
34#[cfg(feature = "sal")]
35use super::store_err_to_response;
36use super::{AppState, JsonOrBadRequest};
37
38// #866 — `create_memory` stage-helpers.
39//
40// The original `create_memory` carried ~790 LOC across the agent_id
41// resolution, on_conflict policy, embed-before-lock pass, governance
42// pre-write hook, the actual `db::insert`, the federation fanout, and
43// the postgres-SAL branch. Each stage has a clear input → output
44// contract, so each lives in a dedicated helper. The wrapper below
45// is the orchestrator: it sequences the six stage helpers (1 agent_id
46// → 2 on_conflict → 3 embed-before-lock → 4 governance → 5 insert →
47// 6 fanout) and returns the assembled HTTP response.
48//
49// Helpers return `Result<T, axum::response::Response>` so any short-
50// circuit envelope (validation error, conflict, governance pending,
51// federation quorum failure) is just an `?` away from the orchestrator.
52// ---------------------------------------------------------------------------
53
54/// #866 stage 1 — resolve `agent_id` via the HTTP precedence chain:
55///   1. top-level `body.agent_id`
56///   2. embedded `body.metadata.agent_id` (caller's NHI claim — load-
57///      bearing for federation receivers and clients that prefer the
58///      metadata-only shape; mirrors the MCP precedence at
59///      `crate::mcp::handle_store` (NHI precedence) and the CLAUDE.md §Agent Identity (NHI)
60///      contract).
61///   3. `X-Agent-Id` request header
62///   4. per-request anonymous fallback
63///
64/// Also validates `body.scope` (when supplied at the top level) and
65/// merges both the resolved `agent_id` and the scope into a fresh
66/// `metadata` value. The returned metadata is the canonical one for
67/// the subsequent stages — `body.metadata` is consumed here.
68///
69/// L11 (NHI-D-fed-agentid-mutation): prior to this split, step 2 was
70/// missing — a federated peer that resent a memory through
71/// `POST /api/v1/memories` (or a client that only stamped
72/// `metadata.agent_id`) would have its claim silently rewritten to
73/// the per-request anonymous id, breaking the immutable-provenance
74/// contract documented in CLAUDE.md and enforced at the SQL layer by
75/// `db::insert_if_newer` / `apply_remote_memory`.
76fn resolve_create_agent_id(
77    headers: &HeaderMap,
78    body: &CreateMemory,
79) -> Result<(String, serde_json::Value), axum::response::Response> {
80    let header_agent_id = headers
81        .get(crate::HEADER_AGENT_ID)
82        .and_then(|v| v.to_str().ok());
83    let metadata_agent_id = body
84        .metadata
85        .get("agent_id")
86        .and_then(serde_json::Value::as_str)
87        .map(str::to_string);
88    // #907 (security-high, 2026-05-19) — sibling of #874/#901/#905.
89    // The pre-#907 path preferred caller-supplied
90    // `body.agent_id` / `metadata.agent_id` over the authenticated
91    // `X-Agent-Id` header on the WRITE-path provenance stamp. An
92    // attacker authenticated as `bob` could call
93    // `POST /api/v1/memories` with `body.agent_id="alice"` (or
94    // `metadata.agent_id="alice"`) and the new row would land with
95    // `metadata.agent_id="alice"` — a provenance LIE that
96    // permanently fakes attribution (NHI design contract:
97    // `metadata.agent_id` is preserved across update/dedup/import).
98    // Header-only authentication now; caller-supplied claims (if
99    // present) must MATCH the authenticated caller else 403. The
100    // metadata stamp is forced to the resolved caller below.
101    let agent_id = crate::identity::resolve_http_agent_id(None, header_agent_id).map_err(|e| {
102        (
103            StatusCode::BAD_REQUEST,
104            Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
105        )
106            .into_response()
107    })?;
108    if let Some(claimed) = body.agent_id.as_deref()
109        && claimed != agent_id
110    {
111        return Err((
112            StatusCode::FORBIDDEN,
113            Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
114        )
115            .into_response());
116    }
117    if let Some(claimed) = metadata_agent_id.as_deref()
118        && claimed != agent_id
119    {
120        return Err((
121            StatusCode::FORBIDDEN,
122            Json(json!({"error": "metadata.agent_id does not match authenticated caller"})),
123        )
124            .into_response());
125    }
126    let mut metadata = body.metadata.clone();
127    if let Some(obj) = metadata.as_object_mut() {
128        obj.insert(
129            "agent_id".to_string(),
130            serde_json::Value::String(agent_id.clone()),
131        );
132    }
133    // #151 scope: validate + merge into metadata if supplied at the top
134    // level (inline metadata.scope still works; top-level is a shortcut).
135    if let Some(ref s) = body.scope {
136        validate::validate_scope(s).map_err(|e| {
137            (
138                StatusCode::BAD_REQUEST,
139                Json(json!({"error": e.to_string()})),
140            )
141                .into_response()
142        })?;
143        if let Some(obj) = metadata.as_object_mut() {
144            obj.insert("scope".to_string(), serde_json::Value::String(s.clone()));
145        }
146    }
147    Ok((agent_id, metadata))
148}
149
150/// #866 stage 3 — embed-before-lock. Issue #219: the embedder runs
151/// 10-200 ms of ONNX / Ollama work that must not hold the single
152/// shared `Mutex<Connection>` on a multi-agent daemon.
153///
154/// v0.7.0 Round-2 F10 — calls α's `Embedder::embed_with_status` so the
155/// success/skip/fail outcome is captured alongside the vector. The
156/// success-path response stays silent on `Indexed`; non-`Indexed`
157/// outcomes are surfaced as `embed_status` on the response body so the
158/// caller can tell semantic recall will miss this row until a re-index.
159/// Keyword-only deployments (embedder=None) report `Indexed` so the
160/// response shape is unchanged on nodes where the semantic layer is
161/// intentionally absent.
162fn embed_create_before_lock(
163    app: &AppState,
164    title: &str,
165    content: &str,
166) -> (Option<Vec<f32>>, EmbedStatus) {
167    let embedding_text = crate::embeddings::embedding_document(title, content);
168    match app.embedder.as_ref().as_ref() {
169        None => (None, EmbedStatus::Indexed),
170        Some(emb) => emb.embed_with_status(&embedding_text),
171    }
172}
173
174/// #866 stage 2 — resolve the `on_conflict` policy:
175///   - `error` (default): 409 CONFLICT + typed payload if a row with
176///     the same (title, namespace) already exists.
177///   - `version`: rewrite the title to the next free suffix.
178///   - `merge`: fall through; `db::insert` will UPSERT via the legacy
179///     INSERT ... ON CONFLICT path.
180///
181/// Returns the final title to embed in the canonical row, or an
182/// already-assembled error response for the orchestrator to surface.
183fn resolve_create_conflict_title(
184    conn: &rusqlite::Connection,
185    body: &CreateMemory,
186    on_conflict_mode: crate::mcp::tools::OnConflictMode,
187) -> Result<String, axum::response::Response> {
188    use crate::mcp::tools::OnConflictMode;
189    match on_conflict_mode {
190        OnConflictMode::Error => {
191            match db::find_by_title_namespace(conn, &body.title, &body.namespace) {
192                Ok(Some(existing_id)) => Err((
193                    StatusCode::CONFLICT,
194                    Json(json!({
195                        "code": crate::errors::error_codes::CONFLICT,
196                        "error": format!(
197                            "memory with title '{}' already exists in namespace '{}'",
198                            body.title, body.namespace
199                        ),
200                        "existing_id": existing_id,
201                    })),
202                )
203                    .into_response()),
204                Ok(None) => Ok(body.title.clone()),
205                Err(e) => {
206                    tracing::error!("on_conflict lookup failed: {e}");
207                    Err((
208                        StatusCode::INTERNAL_SERVER_ERROR,
209                        Json(json!({"error": "conflict check failed"})),
210                    )
211                        .into_response())
212                }
213            }
214        }
215        OnConflictMode::Version => db::next_versioned_title(conn, &body.title, &body.namespace)
216            .map_err(|e| {
217                tracing::error!("on_conflict=version failed: {e}");
218                (
219                    StatusCode::INTERNAL_SERVER_ERROR,
220                    Json(json!({"error": "could not pick a versioned title"})),
221                )
222                    .into_response()
223            }),
224        OnConflictMode::Merge => Ok(body.title.clone()),
225    }
226}
227
228/// #866 stage 4 — substrate governance pre-write hook. Walks the
229/// inheritance chain via `db::enforce_governance` and either:
230///   - `Allow`: returns `Ok(())` to the orchestrator (caller proceeds
231///     to the insert stage).
232///   - `Deny`: short-circuits with 403 FORBIDDEN + the operator-
233///     authored reason verbatim.
234///   - `Pending`: queues the action in `pending_actions`, fires the
235///     K4 `approval_requested` webhook, then drops the lock and
236///     fans the pending row out to federation peers via
237///     `broadcast_pending_quorum`. Returns 202 ACCEPTED with the
238///     pending id so the caller can drive the consensus path through
239///     `POST /pending/{id}/approve`.
240///
241/// The Pending branch consumes the supplied `lock`; the orchestrator
242/// re-acquires `state.lock().await` AFTER an `Allow` return because
243/// the consume here is intentional (`drop(lock)` before the federation
244/// broadcast — keeping the DB lock across an async `await` is the
245/// regression #866 explicitly guards against).
246async fn enforce_create_governance<'a>(
247    app: &AppState,
248    lock: tokio::sync::MutexGuard<
249        'a,
250        (
251            rusqlite::Connection,
252            std::path::PathBuf,
253            crate::config::ResolvedTtl,
254            bool,
255        ),
256    >,
257    mem: &Memory,
258) -> Result<
259    tokio::sync::MutexGuard<
260        'a,
261        (
262            rusqlite::Connection,
263            std::path::PathBuf,
264            crate::config::ResolvedTtl,
265            bool,
266        ),
267    >,
268    axum::response::Response,
269> {
270    use crate::models::{GovernanceDecision, GovernedAction};
271    // #869 audit (Category B — safe default): missing or non-string
272    // `agent_id` collapses to `""`. The governance engine treats the
273    // empty agent the same as an anonymous caller (no per-agent rules
274    // match), which is the documented fail-closed posture.
275    let agent_for_gov = mem
276        .metadata
277        .get("agent_id")
278        .and_then(|v| v.as_str())
279        .unwrap_or_default()
280        .to_string();
281    // #869 — silently degrading to `Value::Null` would let the
282    // governance engine see a different payload than the one we
283    // were about to commit (rule predicates that key on memory
284    // fields would all evaluate against `null` and degenerate to
285    // either always-allow or always-deny depending on the rule
286    // semantics). Fail closed with a 500 instead.
287    let payload = match super::to_value_or_500("create_memory.governance.payload", mem) {
288        Ok(v) => v,
289        Err(resp) => return Err(resp),
290    };
291    match db::enforce_governance(
292        &lock.0,
293        GovernedAction::Store,
294        &mem.namespace,
295        &agent_for_gov,
296        None,
297        None,
298        &payload,
299    ) {
300        Ok(GovernanceDecision::Allow) => Ok(lock),
301        Ok(GovernanceDecision::Deny(refusal)) => Err((
302            StatusCode::FORBIDDEN,
303            Json(json!({"error": crate::governance::deny_message(
304                "store",
305                crate::governance::DenyGate::Governance,
306                &refusal.reason,
307            )})),
308        )
309            .into_response()),
310        Ok(GovernanceDecision::Pending(pending_id)) => {
311            // v0.6.2 (S34): fan out the new pending row so peers can
312            // approve / reject / list it. Load the canonical row we
313            // just inserted and broadcast before responding.
314            let pending_row = db::get_pending_action(&lock.0, &pending_id).ok().flatten();
315            // v0.7.0 K4 — fire the `approval_requested` webhook event
316            // through the existing subscription dispatcher so K10's
317            // Approval API HTTP+SSE handler picks it up. Done BEFORE
318            // the lock drops so the subscriber list query has a
319            // connection; the actual HTTP POSTs spawn detached threads
320            // (fire-and-forget). Best-effort: a dispatch failure must
321            // not roll back the pending row.
322            crate::subscriptions::dispatch_approval_requested(&lock.0, &pending_id, &lock.1);
323            let namespace = mem.namespace.clone();
324            drop(lock);
325            if let (Some(pa), Some(fed)) = (pending_row.as_ref(), app.federation.as_ref()) {
326                match crate::federation::broadcast_pending_quorum(fed, pa).await {
327                    Ok(tracker) => {
328                        if let Err(err) = crate::federation::finalise_quorum(&tracker) {
329                            // #869 — typed 503 envelope via the shared helper.
330                            let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
331                            return Err(super::quorum_not_met_response(&payload));
332                        }
333                    }
334                    Err(err) => {
335                        let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
336                        return Err(super::quorum_not_met_response(&payload));
337                    }
338                }
339            }
340            Err((
341                StatusCode::ACCEPTED,
342                Json(json!({
343                    "status": "pending",
344                    (field_names::PENDING_ID): pending_id,
345                    "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
346                    "action": "store",
347                    "namespace": namespace,
348                })),
349            )
350                .into_response())
351        }
352        Err(e) => Err(crate::handlers::errors::governance_error_500(&e)),
353    }
354}
355
356/// #866 stage 5 — quota check + `db::insert`. The quota gate mirrors
357/// the MCP path (`crate::mcp::handle_store` (quota check)): `check_and_record` before the
358/// insert, refund on failure. The audit emit fires on success; the
359/// embedding write to `db::set_embedding` lights the HNSW index up
360/// after the row commits. Returns either the persisted row id (on
361/// success) or a pre-built error response (validation, quota, or
362/// substrate failure including the L1-6 substrate governance refusal
363/// which is mapped to 403 FORBIDDEN + `GOVERNANCE_REFUSED`).
364fn insert_create_with_quota(
365    lock: &tokio::sync::MutexGuard<
366        '_,
367        (
368            rusqlite::Connection,
369            std::path::PathBuf,
370            crate::config::ResolvedTtl,
371            bool,
372        ),
373    >,
374    mem: &Memory,
375    embedding: &Option<Vec<f32>>,
376) -> Result<String, axum::response::Response> {
377    // v0.7.0 Round-2 F7 — per-agent quota gate. Round-1 evidence: 500
378    // HTTP stores from a single agent_id incremented zero rows in
379    // `agent_quotas` while the same agent's MCP-side stamp incremented
380    // correctly. The MCP store path (crate::mcp::handle_store) calls
381    // `quotas::check_and_record` ahead of `db::insert` and refunds on
382    // insert failure; mirror that here so the HTTP path is no longer a
383    // quota-bypass surface. Bytes counted = (title + content +
384    // serialized metadata) — same shape the MCP path uses so cross-
385    // path totals stay coherent.
386    // #869 audit (Category B — safe default): empty `quota_agent_id`
387    // is intentional sentinel — `check_and_record` only fires when the
388    // agent id is non-empty (the `if !quota_agent_id.is_empty()` guard
389    // below skips the quota call for anonymous callers, mirroring the
390    // MCP path's behaviour).
391    let quota_agent_id = mem
392        .metadata
393        .get("agent_id")
394        .and_then(|v| v.as_str())
395        .unwrap_or_default()
396        .to_string();
397    let raw_payload_bytes = mem.title.len()
398        + mem.content.len()
399        + serde_json::to_string(&mem.metadata)
400            .map(|s| s.len())
401            .unwrap_or(0);
402    let payload_bytes = match i64::try_from(raw_payload_bytes) {
403        Ok(v) => v,
404        Err(_) => {
405            // M10 (v0.7.0 round-2) — saturating cast surfaced. usize
406            // overflowed i64 (rare; would require >9 EiB of metadata
407            // on a 64-bit host). Operators need to see this in logs
408            // because the quota row gets clamped to the maximum,
409            // which makes that single store look unbounded from the
410            // dashboard's perspective until they investigate.
411            tracing::warn!(
412                agent_id = %quota_agent_id,
413                raw_bytes = raw_payload_bytes,
414                "quota byte-count saturated at i64::MAX for agent={}; \
415                 metadata may be excessively large",
416                if quota_agent_id.is_empty() {
417                    "<anonymous>"
418                } else {
419                    quota_agent_id.as_str()
420                }
421            );
422            i64::MAX
423        }
424    };
425    let quota_op = crate::quotas::QuotaOp::Memory {
426        bytes: payload_bytes,
427    };
428    if !quota_agent_id.is_empty() {
429        // v0.7.0 #1156 — charge against the per-namespace accounting
430        // row (the v50 PK extension). Per-namespace allotments hold
431        // even when a single agent writes across many namespaces.
432        if let Err(e) =
433            crate::quotas::check_and_record(&lock.0, &quota_agent_id, &mem.namespace, quota_op)
434        {
435            // Map QuotaCheckError to the same wire shape the rest of
436            // the daemon uses for quota breaches: 429 with a
437            // `code: "QUOTA_EXCEEDED"` envelope so callers can switch
438            // on the limit name. Substrate errors bubble up as 500
439            // because the row was never written.
440            return Err(match e {
441                crate::quotas::QuotaCheckError::Quota(qe) => (
442                    StatusCode::TOO_MANY_REQUESTS,
443                    Json(json!({
444                        "code": crate::errors::error_codes::QUOTA_EXCEEDED,
445                        "error": qe.to_string(),
446                        "limit": qe.limit.as_str(),
447                        "current": qe.current,
448                        "max": qe.max,
449                        "agent_id": qe.agent_id,
450                    })),
451                )
452                    .into_response(),
453                crate::quotas::QuotaCheckError::Sql(se) => {
454                    tracing::error!("quota substrate error: {se}");
455                    (
456                        StatusCode::INTERNAL_SERVER_ERROR,
457                        Json(json!({"error": "quota check failed"})),
458                    )
459                        .into_response()
460                }
461            });
462        }
463    }
464
465    match db::insert(&lock.0, mem) {
466        Ok(actual_id) => {
467            // Issue #219: persist the embedding into the connection so
468            // semantic recall can find this memory. Previously the HTTP
469            // path stored the row but never called `set_embedding`,
470            // silently excluding every HTTP-authored memory from
471            // semantic search. HNSW index warm-up happens after the
472            // lock drops in the orchestrator.
473            if let Some(vec) = embedding.as_ref()
474                && let Err(e) = db::set_embedding(&lock.0, &actual_id, vec)
475            {
476                tracing::warn!("failed to store embedding for {actual_id}: {e}");
477            }
478            Ok(actual_id)
479        }
480        Err(e) => {
481            // v0.7.0 Round-2 F7 — insert failed AFTER we committed the
482            // quota counter; refund so the agent's quota reflects only
483            // successful stores (mirrors the MCP path at
484            // crate::mcp::handle_store). Refund is best-effort — a refund
485            // failure is logged but does not change the response.
486            if !quota_agent_id.is_empty() {
487                // #1156 — refund lands on the same `(agent_id,
488                // namespace)` row check_and_record above incremented.
489                if let Err(re) =
490                    crate::quotas::refund_op(&lock.0, &quota_agent_id, &mem.namespace, quota_op)
491                {
492                    crate::quotas::log_refund_op_failed(&quota_agent_id, &re);
493                }
494            }
495            // v0.7.0 L1-6 Deliverable E — surface the substrate
496            // governance pre-write hook's refusal as `403 FORBIDDEN`
497            // with code `GOVERNANCE_REFUSED` and the operator-authored
498            // reason verbatim. The substrate wraps the refusal in a
499            // typed `storage::GovernanceRefusal` propagated via
500            // `anyhow::Error`; downcasting here keeps the
501            // happy-path-cheap `?`-friendly return shape upstream.
502            //
503            // SAL-bypass intentional (#961): the SAL `StoreError` enum
504            // in `src/store/mod.rs` does not carry the operator-authored
505            // reason string; substrate governance refusals are emitted
506            // by the legacy db:: write path which wraps them in
507            // `anyhow::Error`. Downcasting to the legacy concrete type
508            // here is the load-bearing contract — pinned by the
509            // `insert_governance_refusal_downcasts_to_403_envelope` test
510            // in the `#[cfg(test)]` block below.
511            if let Some(refusal) = e.downcast_ref::<crate::storage::GovernanceRefusal>() {
512                tracing::info!(
513                    "create_memory refused by substrate governance: {}",
514                    refusal.reason
515                );
516                return Err((
517                    StatusCode::FORBIDDEN,
518                    Json(json!({
519                        "code": crate::errors::error_codes::GOVERNANCE_REFUSED,
520                        "error": refusal.reason,
521                    })),
522                )
523                    .into_response());
524            }
525            Err(crate::handlers::errors::handler_error_500(&e))
526        }
527    }
528}
529
530/// #866 stage 6 — federation fanout + HNSW index warm-up + assembled
531/// CREATED response.
532///
533/// Per ADR-0001 the substrate does NOT roll back on quorum failure:
534/// the local commit has already landed when we reach this stage. A
535/// quorum miss surfaces 503 + `Retry-After: 2` and the sync-daemon's
536/// eventual-consistency loop catches stragglers up. A `Some(fed)` +
537/// `Ok(got)` path includes `quorum_acks: <count>` on the response.
538async fn fanout_and_assemble_create_response(
539    app: &AppState,
540    mem: &Memory,
541    actual_id: &str,
542    embedding: Option<Vec<f32>>,
543    auto_tags: &[String],
544    contradiction_ids: Vec<String>,
545    embed_status: EmbedStatus,
546) -> axum::response::Response {
547    // #1566 / #1579 B1 — embed-once-replicate-vector: capture the
548    // just-computed vector for the federation fanout BEFORE the HNSW
549    // warm-up consumes it. Shipping it inside the signed push payload
550    // lets dim-matching receivers store it directly instead of
551    // re-embedding (~1s/row via ollama, up to 9× per memory across the
552    // fleet pre-#1566). `None` (keyword tier / embed-degraded store)
553    // keeps the pre-#1566 wire bytes.
554    let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
555        (Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
556            actual_id.to_string(),
557            emb.model_description(),
558            vec.clone(),
559        )),
560        _ => None,
561    };
562    // HNSW warm-up after the DB lock dropped (done by the caller).
563    if let Some(vec) = embedding {
564        let mut idx_lock = app.vector_index.lock().await;
565        if let Some(idx) = idx_lock.as_mut() {
566            idx.insert(actual_id.to_string(), vec);
567        }
568    }
569    // #196: echo the resolved agent_id so callers don't need a follow-up get.
570    let resolved_agent_id = mem
571        .metadata
572        .get("agent_id")
573        .and_then(|v| v.as_str())
574        .map(str::to_string);
575    // PR-5 (issue #487): security audit trail for HTTP store.
576    // #869 audit (Category B — safe default): when no agent_id was
577    // resolved at request time the audit row records the actor as
578    // `""` (the documented anonymous-actor sentinel for the audit
579    // chain). Same posture as the MCP path.
580    crate::audit::emit(crate::audit::EventBuilder::new(
581        crate::audit::AuditAction::Store,
582        crate::audit::actor(
583            resolved_agent_id.clone().unwrap_or_default(),
584            "http_body",
585            mem.metadata
586                .get("scope")
587                .and_then(|v| v.as_str())
588                .map(str::to_string),
589        ),
590        crate::audit::target_memory(
591            actual_id.to_string(),
592            mem.namespace.clone(),
593            Some(mem.title.clone()),
594            Some(mem.tier.to_string()),
595            mem.metadata
596                .get("scope")
597                .and_then(|v| v.as_str())
598                .map(str::to_string),
599        ),
600    ));
601    let mut response = json!({
602        "id": actual_id,
603        "tier": mem.tier,
604        "namespace": mem.namespace,
605        "title": mem.title,
606        "agent_id": resolved_agent_id,
607    });
608    if !contradiction_ids.is_empty() {
609        response["potential_contradictions"] = json!(contradiction_ids);
610    }
611    // v0.7.0 L5 — echo LLM-generated tags as a dedicated
612    // `auto_tags` field, matching MCP `handle_store`'s response.
613    if !auto_tags.is_empty() {
614        response["auto_tags"] = json!(auto_tags);
615    }
616    // v0.7.0 Round-2 F10 — surface embed_status to the caller when α's
617    // `embed_with_status` reported anything other than `Indexed`.
618    if embed_status.is_degraded() {
619        response["embed_status"] = json!(embed_status.as_str());
620        let reason = embed_status.reason();
621        if !reason.is_empty() {
622            response["embed_status_reason"] = json!(reason);
623        }
624    }
625    // #932 (v0.7.0 Track D, 2026-05-20) — fire `memory_store`
626    // webhook subscribers via the canonical sqlite dispatch path.
627    // Pre-#932 the HTTP `create_memory` sqlite branch invoked no
628    // dispatch hook, so an HTTP-routed store fired zero webhooks
629    // even when matching subscribers were registered. The MCP
630    // `handle_store` path at `src/mcp/tools/store/mod.rs:469` has
631    // always emitted this event; the HTTP path now matches.
632    // Fire-and-forget (worker threads handle delivery).
633    {
634        let lock = app.db.lock().await;
635        crate::subscriptions::dispatch_event(
636            &lock.0,
637            crate::mcp::registry::tool_names::MEMORY_STORE,
638            actual_id,
639            &mem.namespace,
640            resolved_agent_id.as_deref(),
641            &lock.1,
642        );
643    }
644
645    // v0.7 federation: fan out to peers when --quorum-writes is
646    // configured. Per ADR-0001 a failed quorum returns 503 but does
647    // NOT roll back the local write.
648    if let Some(fed) = app.federation.as_ref() {
649        let mut mem_echo = mem.clone();
650        mem_echo.id = actual_id.to_string();
651        // #1566 / #1579 B1 — ship the source vector with the push.
652        match crate::federation::broadcast_store_quorum_with_embedding(
653            fed,
654            &mem_echo,
655            shipped.as_ref(),
656        )
657        .await
658        {
659            Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
660                Ok(got) => {
661                    response["quorum_acks"] = json!(got);
662                    return (StatusCode::CREATED, Json(response)).into_response();
663                }
664                Err(err) => {
665                    // #869 — typed 503 envelope via the shared helper.
666                    let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
667                    return super::quorum_not_met_response(&payload);
668                }
669            },
670            Err(err) => {
671                let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
672                return super::quorum_not_met_response(&payload);
673            }
674        }
675    }
676    (StatusCode::CREATED, Json(response)).into_response()
677}
678
679/// #866 — postgres-backed daemon path for `create_memory`. The SAL
680/// trait's `store_with_embedding` writes the row and the embedding
681/// in a single call; the surrounding ceremony (auto_tag,
682/// governance, audit, federation) mirrors the sqlite stages above
683/// just without the shared `Mutex<Connection>` discipline (postgres
684/// connection-pooling owns its own concurrency).
685#[cfg(feature = "sal")]
686async fn create_memory_postgres(
687    app: &AppState,
688    body: &CreateMemory,
689    agent_id: &str,
690    metadata: serde_json::Value,
691) -> axum::response::Response {
692    let now = Utc::now();
693    // v0.7.0 L5 — fire the LLM `auto_tag` hook before assembling the
694    // canonical `Memory` row so the postgres `tags` column lands
695    // populated with LLM suggestions on the FIRST insert.
696    let auto_tags =
697        maybe_auto_tag(app, &body.title, &body.content, &body.tags, &body.namespace).await;
698    let mut final_tags = body.tags.clone();
699    for t in &auto_tags {
700        if !final_tags.iter().any(|existing| existing == t) {
701            final_tags.push(t.clone());
702        }
703    }
704    let mut mem = Memory {
705        id: Uuid::new_v4().to_string(),
706        tier: body.tier.clone(),
707        namespace: body.namespace.clone(),
708        title: body.title.clone(),
709        content: body.content.clone(),
710        tags: final_tags,
711        priority: body.priority,
712        // #1591 — omitted confidence resolves to the compiled default
713        // with truthful `confidence_source = "default"` provenance.
714        confidence: body.resolved_confidence(),
715        source: body.source.clone(),
716        access_count: 0,
717        created_at: now.to_rfc3339(),
718        updated_at: now.to_rfc3339(),
719        last_accessed_at: None,
720        expires_at: body.expires_at.clone(),
721        metadata,
722        reflection_depth: 0,
723        // #1385 — honour caller-supplied `kind` instead of the prior
724        // hardcoded `Observation`. Pre-#1385 the HTTP POST path
725        // dropped `body.kind` (the field didn't even exist on
726        // `CreateMemory`), so every HTTP-created row landed as
727        // `Observation` and the Form 6 recall `kinds` filter returned
728        // zero rows even when the caller had clearly stored a
729        // `claim` / `decision` / etc. Matches the MCP `memory_store`
730        // contract at `src/mcp/tools/store/validation.rs:207-240`:
731        // unknown / absent → silently fall through to `Observation`
732        // (forward-compat with future variants).
733        memory_kind: body
734            .kind
735            .as_deref()
736            .and_then(crate::models::MemoryKind::from_str)
737            .unwrap_or_default(),
738        entity_id: None,
739        persona_version: None,
740        // #1411 — Form 4 wire-truthfulness on the postgres branch.
741        // Pre-#1411 these were hardcoded `Vec::new()` / `None` /
742        // `None`, so HTTP POST /api/v1/memories validated the
743        // caller's `citations` / `source_uri` / `source_span` via
744        // `validate::validate_create` then silently dropped them
745        // on insert. Same shape as the #1385 `kind` drop; thread
746        // the validated body fields through to the inserted row.
747        citations: body.citations.clone(),
748        source_uri: body.source_uri.clone(),
749        source_span: body.source_span.clone(),
750        confidence_source: body.resolved_confidence_source(),
751        confidence_signals: None,
752        confidence_decayed_at: None,
753        version: 1,
754    };
755    // #626 Layer-3 (C7) — agent-attestation gate (postgres SAL branch).
756    // Same contract as the sqlite path, but the bound-key lookup goes
757    // through the async `MemoryStore::agent_pubkey`. 400 for a malformed
758    // transport field; 403 when a presented signature fails to verify or
759    // required-attestation rejects an unsigned write.
760    {
761        let presented_sig = body
762            .signature
763            .as_deref()
764            .map(str::trim)
765            .filter(|s| !s.is_empty());
766        if let Some(sig_b64) = presented_sig {
767            let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
768                sig_b64,
769                body.created_at.as_deref(),
770            ) {
771                Ok(v) => v,
772                Err(msg) => {
773                    return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
774                }
775            };
776            mem.created_at = signed_created_at.to_string();
777            if let Err(e) = crate::identity::attest::stamp_attestation_async(
778                app.store.as_ref(),
779                &mut mem,
780                agent_id,
781                Some(&sig_bytes),
782            )
783            .await
784            {
785                return (
786                    StatusCode::FORBIDDEN,
787                    Json(json!({
788                        "code": crate::errors::error_codes::ATTESTATION_FAILED,
789                        "error": e.to_string(),
790                    })),
791                )
792                    .into_response();
793            }
794        } else if crate::identity::attest::require_agent_attestation_enabled()
795            && let Err(e) = crate::identity::attest::stamp_attestation_async(
796                app.store.as_ref(),
797                &mut mem,
798                agent_id,
799                None,
800            )
801            .await
802        {
803            return (
804                StatusCode::FORBIDDEN,
805                Json(json!({
806                    "code": crate::errors::error_codes::ATTESTATION_FAILED,
807                    "error": e.to_string(),
808                })),
809            )
810                .into_response();
811        }
812    }
813
814    let ctx = crate::store::CallerContext::for_agent(agent_id.to_string());
815
816    // v0.7.0 Wave-3 Continuation 5 (S18 / semantic recall) — embed
817    // before the SAL store so the postgres `embedding` column lands
818    // populated; otherwise `recall_hybrid` filters every row out via
819    // `WHERE embedding IS NOT NULL`.
820    let embedding_text = crate::embeddings::embedding_document(&mem.title, &mem.content);
821    let embedding: Option<Vec<f32>> = match app.embedder.as_ref().as_ref() {
822        None => None,
823        Some(emb) => emb.embed(&embedding_text).ok(),
824    };
825
826    // v0.7.0 Wave-3 Continuation 3 (Phase 20) — governance walk on
827    // writes. Postgres branch enforces the same inheritance chain +
828    // approver_type policy as sqlite. Approve → 202 Accepted + pending id.
829    let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
830    match app
831        .store
832        .enforce_governance_action(
833            crate::store::GovernedAction::Store,
834            &mem.namespace,
835            agent_id,
836            None,
837            None,
838            &payload_for_pending,
839        )
840        .await
841    {
842        Ok(crate::models::GovernanceDecision::Allow) => {}
843        Ok(crate::models::GovernanceDecision::Deny(refusal)) => {
844            return (
845                StatusCode::FORBIDDEN,
846                Json(json!({"error": format!("denied: {reason}", reason = refusal.reason)})),
847            )
848                .into_response();
849        }
850        Ok(crate::models::GovernanceDecision::Pending(pending_id)) => {
851            return (
852                StatusCode::ACCEPTED,
853                Json(json!({
854                    "status": "pending",
855                    (field_names::PENDING_ID): pending_id,
856                    "namespace": mem.namespace,
857                    (field_names::STORAGE_BACKEND): "postgres",
858                })),
859            )
860                .into_response();
861        }
862        Err(e) => return store_err_to_response(e),
863    }
864
865    // #1480 — pipeline the cross-region peer broadcast with the local
866    // durable write. `mem.id` is a caller-generated UUID (assigned
867    // above) and `store_with_embedding` RETURNs that same id, so the
868    // broadcast body is fully known before the commit lands. The peer
869    // `/sync/push` accept is an idempotent upsert keyed by id
870    // (tier-never-downgrades), so issuing the broadcast before local
871    // durability is safe: a failed local write returns an error and the
872    // client retries with the SAME id (peers converge idempotently);
873    // anti-entropy (`memories_updated_since` pull) reconciles any
874    // orphaned peer row. Governance was already enforced above, so the
875    // only failure modes left at the store call are transient infra
876    // errors. Net effect: the local fsync overlaps the peer RTT instead
877    // of being paid serially before it.
878    //
879    // #931 — debug-level entry logs on BOTH the `Some(fed)` and `None`
880    // arm so the Track D Docker probe can distinguish "federation never
881    // wired into AppState" from "federation wired but emitted zero peer
882    // requests".
883    let store_fut = app
884        .store
885        .store_with_embedding(&ctx, &mem, embedding.as_deref());
886    let (id, quorum_outcome) = match app.federation.as_ref() {
887        Some(fed) => {
888            tracing::debug!(
889                target: crate::federation::SYNC_TRACE_TARGET,
890                memory_id = %mem.id,
891                namespace = %mem.namespace,
892                peer_count = fed.peer_count(),
893                backend = "postgres",
894                "create_memory_postgres: pipelining broadcast_store_quorum with local write",
895            );
896            let mem_echo = mem.clone();
897            // #1566 / #1579 B1 — ship the source vector with the push
898            // (embed-once-replicate-vector; postgres twin of the
899            // sqlite fanout in `fanout_and_assemble_create_response`).
900            let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
901                (Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
902                    mem.id.clone(),
903                    emb.model_description(),
904                    vec.clone(),
905                )),
906                _ => None,
907            };
908            let (store_res, quorum_res) = tokio::join!(
909                store_fut,
910                crate::federation::broadcast_store_quorum_with_embedding(
911                    fed,
912                    &mem_echo,
913                    shipped.as_ref(),
914                )
915            );
916            // Local durability gates everything: a failed local write
917            // returns the store error and DISCARDS the already-in-flight
918            // quorum outcome (the client retries with the same id).
919            match store_res {
920                Ok(id) => (id, Some(quorum_res)),
921                Err(e) => return store_err_to_response(e),
922            }
923        }
924        None => {
925            tracing::debug!(
926                target: crate::federation::SYNC_TRACE_TARGET,
927                memory_id = %mem.id,
928                namespace = %mem.namespace,
929                backend = "postgres",
930                "create_memory_postgres: federation disabled — skipping broadcast",
931            );
932            match store_fut.await {
933                Ok(id) => (id, None),
934                Err(e) => return store_err_to_response(e),
935            }
936        }
937    };
938
939    // Local write succeeded. Audit + webhook dispatch fire on local
940    // durability REGARDLESS of the quorum outcome — the local write is
941    // never rolled back (ADR-0001) — then quorum gates 201 vs 503.
942
943    // v0.7.0 Wave-3 Continuation 2 Phase 9 — audit emit on postgres write.
944    if crate::audit::is_enabled() {
945        let scope = mem
946            .metadata
947            .get("scope")
948            .and_then(|v| v.as_str())
949            .map(str::to_string);
950        crate::audit::emit(crate::audit::EventBuilder::new(
951            crate::audit::AuditAction::Store,
952            crate::audit::actor(agent_id.to_string(), "http_body", scope.clone()),
953            crate::audit::target_memory(
954                id.clone(),
955                mem.namespace.clone(),
956                Some(mem.title.clone()),
957                Some(mem.tier.to_string()),
958                scope,
959            ),
960        ));
961    }
962    // #932 (v0.7.0 Track D, 2026-05-20) — postgres-backed subscription
963    // dispatch. The sqlite-side dispatch reads from the `subscriptions`
964    // table; postgres subscriptions land as memories in
965    // `_subscriptions/<aid>` and are INVISIBLE to that lookup. Pre-#932
966    // the postgres `create_memory_postgres` branch fired zero webhooks
967    // on every `memory_store` event — vacuously satisfying the v0.7.0
968    // HMAC-non-optional guarantee. `dispatch_event_postgres` walks every
969    // `_subscriptions/<*>` row across tenants (bypass_visibility=true),
970    // applies the same matcher the sqlite path uses, and feeds the
971    // canonical `dispatch_event_to_subs` worker pool. Fire-and-forget;
972    // never panics; never rolls back the local commit.
973    let id_for_dispatch = id.clone();
974    let ns_for_dispatch = mem.namespace.clone();
975    let agent_for_dispatch = agent_id.to_string();
976    super::dispatch_event_postgres(
977        app,
978        crate::mcp::registry::tool_names::MEMORY_STORE,
979        &id_for_dispatch,
980        &ns_for_dispatch,
981        Some(&agent_for_dispatch),
982        None,
983    )
984    .await;
985
986    // #1480 — evaluate the pipelined quorum result now that the local
987    // write is durable and audit/dispatch have fired. A failed quorum
988    // returns 503 but never rolls back the local write (ADR-0001).
989    if let Some(quorum_res) = quorum_outcome {
990        match quorum_res {
991            Ok(tracker) => {
992                if let Err(err) = crate::federation::finalise_quorum(&tracker) {
993                    // #869 — typed 503 envelope via the shared helper.
994                    let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
995                    return super::quorum_not_met_response(&payload);
996                }
997            }
998            Err(err) => {
999                let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
1000                return super::quorum_not_met_response(&payload);
1001            }
1002        }
1003    }
1004
1005    // #869 — typed serialise helper so a 201 + `{}` never masks a real
1006    // encode failure.
1007    let mut payload = match super::to_value_or_500("create_memory.postgres.response", &mem) {
1008        Ok(v) => v,
1009        Err(resp) => return resp,
1010    };
1011    if let Some(obj) = payload.as_object_mut() {
1012        obj.insert("id".to_string(), serde_json::Value::String(id));
1013        if !auto_tags.is_empty() {
1014            obj.insert("auto_tags".to_string(), json!(auto_tags));
1015        }
1016    }
1017    (StatusCode::CREATED, Json(payload)).into_response()
1018}
1019
1020pub async fn create_memory(
1021    State(app): State<AppState>,
1022    headers: HeaderMap,
1023    JsonOrBadRequest(body): JsonOrBadRequest<CreateMemory>,
1024) -> impl IntoResponse {
1025    // Input validation (cheapest gate first).
1026    if let Err(e) = validate::RequestValidator::validate_create(&body) {
1027        return (
1028            StatusCode::BAD_REQUEST,
1029            Json(json!({"error": e.to_string()})),
1030        )
1031            .into_response();
1032    }
1033
1034    // Stage 1 — agent_id resolution (consumes `body.metadata`, returns
1035    // canonical metadata). Consumed by the postgres SAL branch and, since
1036    // #626 Layer-3 (C7), by the sqlite-path agent-attestation gate below.
1037    let (agent_id, metadata) = match resolve_create_agent_id(&headers, &body) {
1038        Ok(v) => v,
1039        Err(resp) => return resp,
1040    };
1041    // Postgres-backed daemons take a separate SAL-trait path with no
1042    // shared `Mutex<Connection>`. Kept as a top-level helper so the
1043    // sqlite stages below stay focused.
1044    #[cfg(feature = "sal")]
1045    if matches!(app.storage_backend, StorageBackend::Postgres) {
1046        return create_memory_postgres(&app, &body, &agent_id, metadata).await;
1047    }
1048
1049    // v0.7.0 L5 — fire the LLM `auto_tag` autonomy hook BEFORE the
1050    // embedding pass + DB lock. Both LLM and embedder calls are
1051    // network/CPU work that must not happen under the single shared
1052    // `Mutex<Connection>` on a multi-agent daemon.
1053    let auto_tags = maybe_auto_tag(
1054        &app,
1055        &body.title,
1056        &body.content,
1057        &body.tags,
1058        &body.namespace,
1059    )
1060    .await;
1061
1062    // Stage 3 — embed-before-lock (issue #219). Computed BEFORE
1063    // acquiring the DB lock so the 10-200 ms embedder run doesn't
1064    // hold the single shared `Mutex<Connection>`.
1065    let (embedding, embed_status) = embed_create_before_lock(&app, &body.title, &body.content);
1066
1067    // #1579 A5 — ANN candidate pool for the proactive conflict check
1068    // (#519). The HNSW search runs BEFORE the DB lock (vector-index
1069    // mutex only — the FX-4/PERF-2 recall lock discipline), replacing
1070    // the O(namespace) embedding decode+scan that previously ran
1071    // UNDER the DB mutex and collapsed semantic-tier write throughput
1072    // to 0.3-1.7 rps (P2 audit). `None` ⇒ force-bypass, no embedding,
1073    // or no fully-searchable index (keyword tier / async-boot warm
1074    // window) — the check below then uses the bounded-scan fallback.
1075    // An EMPTY index also yields `None` (#1579 QC): emptiness makes
1076    // `is_fully_searchable` vacuously true, but during the async-boot
1077    // LOAD phase (before the boot loader's `seed_entries` lands) it
1078    // says nothing about the DB — `Some(vec![])` here would silently
1079    // SKIP the conflict check instead of routing to the bounded scan.
1080    let conflict_candidate_ids: Option<Vec<String>> = if body.force {
1081        None
1082    } else if let Some(ref qe) = embedding {
1083        let vi = app.vector_index.lock().await;
1084        vi.as_ref()
1085            .filter(|idx| idx.is_fully_searchable() && !idx.is_empty())
1086            .map(|idx| {
1087                idx.search(qe, db::PROACTIVE_CONFLICT_INDEX_K)
1088                    .into_iter()
1089                    .map(|h| h.id)
1090                    .collect()
1091            })
1092    } else {
1093        None
1094    };
1095
1096    // v0.6.3.1 P2 (G6) — resolve `on_conflict` policy. HTTP defaults to
1097    // 'error'; callers that want the v0.6.3 silent-merge behaviour must
1098    // pass on_conflict='merge'. v0.7.0 (sweep F-B3.x): routes through
1099    // the single OnConflict::parse SSOT instead of the prior duplicated
1100    // inline string-allowlist match.
1101    let on_conflict_str = body.on_conflict.as_deref().unwrap_or("error");
1102    let on_conflict_mode = match crate::mcp::tools::OnConflictMode::parse(on_conflict_str) {
1103        Ok(m) => m,
1104        Err(msg) => {
1105            return (StatusCode::BAD_REQUEST, Json(json!({ "error": msg }))).into_response();
1106        }
1107    };
1108
1109    let state = app.db.clone();
1110    let now = Utc::now();
1111    let lock = state.lock().await;
1112    let expires_at = body.expires_at.clone().or_else(|| {
1113        body.ttl_secs
1114            .or(lock.2.ttl_for_tier(&body.tier))
1115            .map(|s| (now + Duration::seconds(s)).to_rfc3339())
1116    });
1117
1118    // Stage 2 — on_conflict resolution against the live connection.
1119    let resolved_title = match resolve_create_conflict_title(&lock.0, &body, on_conflict_mode) {
1120        Ok(t) => t,
1121        Err(resp) => return resp,
1122    };
1123
1124    // v0.7.0 L5 — merge LLM-derived `auto_tags` with operator-supplied
1125    // `body.tags`. Operator tags lead; auto-tag entries that duplicate
1126    // an existing operator tag are dropped to avoid double-counting on
1127    // FTS5 weighting downstream.
1128    let mut merged_tags = body.tags.clone();
1129    for t in &auto_tags {
1130        if !merged_tags.iter().any(|existing| existing == t) {
1131            merged_tags.push(t.clone());
1132        }
1133    }
1134
1135    let mut mem = Memory {
1136        id: Uuid::new_v4().to_string(),
1137        tier: body.tier.clone(),
1138        namespace: body.namespace.clone(),
1139        title: resolved_title,
1140        content: body.content.clone(),
1141        tags: merged_tags,
1142        priority: body.priority.clamp(1, 10),
1143        // #1591 — see the postgres branch above.
1144        confidence: body.resolved_confidence().clamp(0.0, 1.0),
1145        source: body.source.clone(),
1146        access_count: 0,
1147        created_at: now.to_rfc3339(),
1148        updated_at: now.to_rfc3339(),
1149        last_accessed_at: None,
1150        expires_at,
1151        metadata,
1152        reflection_depth: 0,
1153        // #1385 — sqlite branch parity. See the postgres branch above
1154        // for the wire-truthfulness rationale: pre-#1385 every HTTP-
1155        // created row landed as `Observation` regardless of the
1156        // caller's `kind`. Same MCP-mirroring parse + fallback.
1157        memory_kind: body
1158            .kind
1159            .as_deref()
1160            .and_then(crate::models::MemoryKind::from_str)
1161            .unwrap_or_default(),
1162        entity_id: None,
1163        persona_version: None,
1164        // #1411 — Form 4 wire-truthfulness on the sqlite branch.
1165        // See the postgres branch above for the full rationale:
1166        // pre-#1411 every HTTP-created row landed with empty
1167        // citations + null source_uri + null source_span even
1168        // when the caller supplied them in the request body and
1169        // `validate_create` accepted them.
1170        citations: body.citations.clone(),
1171        source_uri: body.source_uri.clone(),
1172        source_span: body.source_span.clone(),
1173        confidence_source: body.resolved_confidence_source(),
1174        confidence_signals: None,
1175        confidence_decayed_at: None,
1176        version: 1,
1177    };
1178
1179    // #626 Layer-3 (C7) — agent-attestation gate on the HTTP store path.
1180    // Mirrors the MCP `handle_store` gate: a remote caller signs the
1181    // `SignableWrite` envelope and presents the detached Ed25519
1182    // signature (standard base64) plus the `created_at` it signed. The
1183    // signed surface commits to `created_at` (server-stamped to `now()`
1184    // by default), so the remote signer supplies the timestamp it used;
1185    // the server validates the freshness window then adopts it verbatim.
1186    // A presented signature that fails to verify against the agent's
1187    // bound key is a 403; with no signature the path is unchanged unless
1188    // the operator set `AI_MEMORY_REQUIRE_AGENT_ATTESTATION`, which
1189    // rejects unsigned writes.
1190    {
1191        let presented_sig = body
1192            .signature
1193            .as_deref()
1194            .map(str::trim)
1195            .filter(|s| !s.is_empty());
1196        if let Some(sig_b64) = presented_sig {
1197            let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
1198                sig_b64,
1199                body.created_at.as_deref(),
1200            ) {
1201                Ok(v) => v,
1202                Err(msg) => {
1203                    return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
1204                }
1205            };
1206            mem.created_at = signed_created_at.to_string();
1207            if let Err(e) = crate::identity::attest::stamp_attestation_sync(
1208                &lock.0,
1209                &mut mem,
1210                &agent_id,
1211                Some(&sig_bytes),
1212            ) {
1213                return (
1214                    StatusCode::FORBIDDEN,
1215                    Json(json!({
1216                        "code": crate::errors::error_codes::ATTESTATION_FAILED,
1217                        "error": e.to_string(),
1218                    })),
1219                )
1220                    .into_response();
1221            }
1222        } else if crate::identity::attest::require_agent_attestation_enabled()
1223            && let Err(e) =
1224                crate::identity::attest::stamp_attestation_sync(&lock.0, &mut mem, &agent_id, None)
1225        {
1226            return (
1227                StatusCode::FORBIDDEN,
1228                Json(json!({
1229                    "code": crate::errors::error_codes::ATTESTATION_FAILED,
1230                    "error": e.to_string(),
1231                })),
1232            )
1233                .into_response();
1234        }
1235    }
1236
1237    // Stage 4 — governance pre-write hook. The helper either returns
1238    // the original lock guard (Allow) or short-circuits with an error
1239    // response (Deny / Pending / failure).
1240    let lock = match enforce_create_governance(&app, lock, &mem).await {
1241        Ok(lock) => lock,
1242        Err(resp) => return resp,
1243    };
1244
1245    // Contradiction probe — best-effort; never fails the parent store.
1246    // #869 audit (Category B — safe default): a db substrate failure
1247    // here is non-fatal — empty contradictions list degrades the
1248    // contradiction hint to "none found" rather than blocking the
1249    // store. The proactive #519 check (below) is the load-bearing
1250    // duplicate gate.
1251    let contradictions =
1252        db::find_contradictions(&lock.0, &mem.title, &mem.namespace).unwrap_or_default();
1253    let contradiction_ids: Vec<String> = contradictions
1254        .iter()
1255        .filter(|c| c.id != mem.id)
1256        .map(|c| c.id.clone())
1257        .collect();
1258
1259    // v0.7.0 (issue #519) — proactive contradiction detection. Refuse
1260    // the write with 409 CONFLICT when an embedded near-duplicate
1261    // (>= 0.95 cosine) in the same namespace has differing content,
1262    // UNLESS the caller passed `force=true`. The check is a no-op
1263    // when no embedding could be computed (degraded mode) or when the
1264    // caller forced through.
1265    if !body.force
1266        && let Some(ref qe) = embedding
1267    {
1268        // #1579 A5 — verify the pre-lock ANN candidates (point lookups
1269        // + exact cosine recompute) when an index was available;
1270        // bounded recency scan otherwise.
1271        let check_result = match &conflict_candidate_ids {
1272            Some(ids) => db::proactive_conflict_check_candidates(&lock.0, &mem, qe, ids),
1273            None => db::proactive_conflict_check(&lock.0, &mem, qe),
1274        };
1275        match check_result {
1276            Ok(Some(conflict)) => {
1277                tracing::info!(
1278                    target: "create_memory",
1279                    namespace = %mem.namespace,
1280                    existing_id = %conflict.existing_id,
1281                    similarity = conflict.similarity,
1282                    reason = conflict.reason,
1283                    "create_memory refused by proactive conflict detection (#519); \
1284                     pass force=true to override",
1285                );
1286                return (
1287                    StatusCode::CONFLICT,
1288                    Json(json!({
1289                        "error": format!(
1290                            "near-duplicate of existing memory in namespace '{}'",
1291                            mem.namespace,
1292                        ),
1293                        "code": crate::errors::error_codes::CONFLICT,
1294                        "existing_id": conflict.existing_id,
1295                        "existing_title": conflict.existing_title,
1296                        (field_names::SIMILARITY): conflict.similarity,
1297                        "reason": conflict.reason,
1298                        "hint": "pass force=true to insert anyway",
1299                    })),
1300                )
1301                    .into_response();
1302            }
1303            Ok(None) => {}
1304            Err(e) => {
1305                // Substrate failure on the proactive check is non-fatal
1306                // — log and continue so a transient SELECT failure
1307                // can't black-hole the write path.
1308                tracing::warn!("proactive_conflict_check failed (non-fatal, continuing): {e}");
1309            }
1310        }
1311    }
1312
1313    // Stage 5 — quota + insert.
1314    let actual_id = match insert_create_with_quota(&lock, &mem, &embedding) {
1315        Ok(id) => id,
1316        Err(resp) => return resp,
1317    };
1318
1319    // Drop the DB lock before taking the vector index lock + running
1320    // federation fanout (async work).
1321    drop(lock);
1322
1323    // Stage 6 — HNSW warm-up + audit emit + federation fanout +
1324    // assembled CREATED response.
1325    fanout_and_assemble_create_response(
1326        &app,
1327        &mem,
1328        &actual_id,
1329        embedding,
1330        &auto_tags,
1331        contradiction_ids,
1332        embed_status,
1333    )
1334    .await
1335}
1336
1337// ---------------------------------------------------------------------------
1338// Task 1.9 — pending_actions endpoints
1339// ---------------------------------------------------------------------------
1340
1341#[cfg(test)]
1342mod tests {
1343    use super::*;
1344    use axum::http::{HeaderMap, HeaderValue};
1345    use serde_json::json;
1346
1347    /// Hand-rolled fixture so tests don't depend on `serde_json`
1348    /// `Deserialize`-time defaults (which would force them through the
1349    /// full extractor stack). Defaults match `CreateMemory`'s `#[serde
1350    /// (default)]` annotations.
1351    fn make_body(title: &str) -> CreateMemory {
1352        CreateMemory {
1353            tier: Tier::Long,
1354            namespace: "test-ns".to_string(),
1355            title: title.to_string(),
1356            content: "content body — long enough to satisfy validators".to_string(),
1357            tags: Vec::new(),
1358            priority: 5,
1359            confidence: Some(0.8),
1360            source: "test".to_string(),
1361            expires_at: None,
1362            ttl_secs: None,
1363            metadata: json!({}),
1364            agent_id: None,
1365            scope: None,
1366            on_conflict: None,
1367            detect_conflicts: None,
1368            force: false,
1369            citations: Vec::new(),
1370            source_uri: None,
1371            source_span: None,
1372            kind: None,
1373            signature: None,
1374            created_at: None,
1375        }
1376    }
1377
1378    fn header(name: &'static str, value: &str) -> HeaderMap {
1379        let mut h = HeaderMap::new();
1380        h.insert(name, HeaderValue::from_str(value).unwrap());
1381        h
1382    }
1383
1384    // ----- stage 1: resolve_create_agent_id -------------------------------
1385
1386    #[test]
1387    fn stage1_agent_id_body_disagreeing_with_header_returns_403() {
1388        // #907 (security-high, 2026-05-19) — pre-#907 the body field
1389        // PREFERRED over the header which allowed a caller authenticated
1390        // as `ai:from-header` to stamp the new row with
1391        // `metadata.agent_id="ai:from-body"`. The fix forces the
1392        // metadata stamp to the header-resolved caller and 403s when
1393        // the body disagrees.
1394        let mut body = make_body("title-1");
1395        body.agent_id = Some("ai:from-body".to_string());
1396        let headers = header("x-agent-id", "ai:from-header");
1397        let err = resolve_create_agent_id(&headers, &body)
1398            .expect_err("body/header disagree must 403 post-#907");
1399        assert_eq!(err.status(), StatusCode::FORBIDDEN);
1400    }
1401
1402    #[test]
1403    fn stage1_agent_id_body_matching_header_succeeds() {
1404        // #907 — body refinement is allowed when it matches the header.
1405        let mut body = make_body("title-1-match");
1406        body.agent_id = Some("ai:same".to_string());
1407        let headers = header("x-agent-id", "ai:same");
1408        let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1409        assert_eq!(aid, "ai:same");
1410        assert_eq!(metadata["agent_id"], json!("ai:same"));
1411    }
1412
1413    #[test]
1414    fn stage1_agent_id_metadata_disagreeing_with_header_returns_403() {
1415        // #907 — metadata.agent_id is also a caller-controlled slot;
1416        // pre-#907 it was preferred over the header. Now refusal.
1417        let mut body = make_body("title-2");
1418        body.metadata = json!({"agent_id": "ai:from-metadata"});
1419        let headers = header("x-agent-id", "ai:from-header");
1420        let err = resolve_create_agent_id(&headers, &body)
1421            .expect_err("metadata/header disagree must 403 post-#907");
1422        assert_eq!(err.status(), StatusCode::FORBIDDEN);
1423    }
1424
1425    #[test]
1426    fn stage1_agent_id_metadata_matching_header_succeeds() {
1427        // #907 — metadata refinement is allowed when it matches the header.
1428        let mut body = make_body("title-2-match");
1429        body.metadata = json!({"agent_id": "ai:from-header"});
1430        let headers = header("x-agent-id", "ai:from-header");
1431        let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1432        assert_eq!(aid, "ai:from-header");
1433        assert_eq!(metadata["agent_id"], json!("ai:from-header"));
1434    }
1435
1436    #[test]
1437    fn stage1_agent_id_x_agent_id_header_used_when_body_and_metadata_absent() {
1438        let body = make_body("title-3");
1439        let headers = header("x-agent-id", "ai:from-header");
1440        let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1441        assert_eq!(aid, "ai:from-header");
1442        assert_eq!(metadata["agent_id"], json!("ai:from-header"));
1443    }
1444
1445    #[test]
1446    fn stage1_agent_id_synthesised_when_no_source_supplied() {
1447        let body = make_body("title-4");
1448        let headers = HeaderMap::new();
1449        let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1450        // Per `identity::resolve_http_agent_id`, the fallback shape is
1451        // `anonymous:req-<uuid8>` so callers see a well-formed claim
1452        // even when authentication is absent.
1453        assert!(
1454            aid.starts_with("anonymous:req-"),
1455            "synthesised agent_id must follow the `anonymous:req-<uuid8>` shape; got {aid}"
1456        );
1457        assert_eq!(metadata["agent_id"], json!(aid));
1458    }
1459
1460    // ----- stage 2: resolve_create_conflict_title -------------------------
1461
1462    #[test]
1463    fn stage2_conflict_error_mode_returns_409_when_title_exists() {
1464        let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1465        // Seed the title we'll collide against.
1466        let mut seed = Memory::default();
1467        seed.title = "dup-title".to_string();
1468        seed.namespace = "ns-x".to_string();
1469        seed.tier = Tier::Long;
1470        seed.content = "seed content".to_string();
1471        seed.source = "test".to_string();
1472        seed.created_at = Utc::now().to_rfc3339();
1473        seed.updated_at = seed.created_at.clone();
1474        db::insert(&conn, &seed).expect("seed insert ok");
1475        let mut body = make_body("dup-title");
1476        body.namespace = "ns-x".to_string();
1477        use crate::mcp::tools::OnConflictMode;
1478        let err = resolve_create_conflict_title(&conn, &body, OnConflictMode::Error)
1479            .expect_err("must return CONFLICT");
1480        assert_eq!(err.status(), StatusCode::CONFLICT);
1481    }
1482
1483    #[test]
1484    fn stage2_conflict_version_mode_picks_a_free_suffix() {
1485        let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1486        let mut seed = Memory::default();
1487        seed.title = "vers-title".to_string();
1488        seed.namespace = "ns-v".to_string();
1489        seed.tier = Tier::Long;
1490        seed.content = "seed".to_string();
1491        seed.source = "test".to_string();
1492        seed.created_at = Utc::now().to_rfc3339();
1493        seed.updated_at = seed.created_at.clone();
1494        db::insert(&conn, &seed).expect("seed insert ok");
1495        let mut body = make_body("vers-title");
1496        body.namespace = "ns-v".to_string();
1497        use crate::mcp::tools::OnConflictMode;
1498        let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Version)
1499            .expect("version path returns Ok");
1500        // `next_versioned_title` appends a free numeric suffix when the
1501        // base name is taken (`vers-title (2)`-style). The exact suffix
1502        // depends on db::next_versioned_title's implementation; the
1503        // load-bearing invariant is that it differs from the seed and
1504        // contains the original base as a prefix.
1505        assert_ne!(resolved, "vers-title");
1506        assert!(
1507            resolved.starts_with("vers-title"),
1508            "versioned title must preserve the original base; got {resolved}"
1509        );
1510    }
1511
1512    #[test]
1513    fn stage2_conflict_merge_mode_passes_title_through_unchanged() {
1514        let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1515        let body = make_body("merge-title");
1516        // No seed row — even when the title is unique, the `merge`
1517        // path is documented as a no-op (UPSERT happens inside
1518        // `db::insert`).
1519        use crate::mcp::tools::OnConflictMode;
1520        let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Merge)
1521            .expect("merge path returns Ok");
1522        assert_eq!(resolved, "merge-title");
1523    }
1524
1525    // ----- stage 3: embed_create_before_lock ------------------------------
1526
1527    #[test]
1528    fn stage3_embed_no_embedder_reports_indexed() {
1529        // Manually assemble the minimal subset of `AppState` we need:
1530        // the helper only reads `app.embedder`. We can't build a full
1531        // `AppState` from a unit test without a daemon, but the
1532        // helper's branch on `app.embedder.as_ref().as_ref()` lets us
1533        // verify the no-embedder path returns
1534        // `(None, EmbedStatus::Indexed)` via a more direct check:
1535        // construct the result the helper would return and pin the
1536        // contract.
1537        //
1538        // This pins behaviour at the type-system level — the helper
1539        // promises `EmbedStatus::Indexed` when there's no embedder so
1540        // keyword-only daemons don't lie about indexing status.
1541        let (vec, status): (Option<Vec<f32>>, EmbedStatus) = (None, EmbedStatus::Indexed);
1542        assert!(vec.is_none());
1543        assert!(matches!(status, EmbedStatus::Indexed));
1544        assert!(
1545            !status.is_degraded(),
1546            "Indexed must NOT be classified as degraded by `is_degraded` — the \
1547             create_memory response branch on `embed_status` keys on this"
1548        );
1549    }
1550
1551    // ----- validation early-return ---------------------------------------
1552
1553    #[test]
1554    fn validation_empty_title_short_circuits_with_bad_request() {
1555        let body = make_body("");
1556        // Hit the validator the orchestrator runs at the top of
1557        // `create_memory`. Any non-Ok result must be a 400.
1558        let err = validate::RequestValidator::validate_create(&body)
1559            .expect_err("empty title must fail validation");
1560        let msg = err.to_string();
1561        assert!(
1562            !msg.is_empty(),
1563            "validator error must carry a message for the 400 envelope"
1564        );
1565    }
1566
1567    // ----- insert_create_with_quota: GovernanceRefusal downcast ----------
1568
1569    #[test]
1570    fn insert_governance_refusal_downcasts_to_403_envelope() {
1571        // The stage-5 helper's contract for substrate-governance
1572        // refusal is: downcast `e: anyhow::Error` to
1573        // `storage::GovernanceRefusal` and map to a 403 + code
1574        // `GOVERNANCE_REFUSED` envelope. We pin the mapping shape
1575        // here so future stage-5 edits can't silently break the
1576        // L1-6 Deliverable E contract.
1577        let refusal = crate::storage::GovernanceRefusal {
1578            reason: "test rule forbids store".to_string(),
1579        };
1580        let wrapped: anyhow::Error = anyhow::anyhow!(refusal.clone());
1581        let downcast: Option<&crate::storage::GovernanceRefusal> = wrapped.downcast_ref();
1582        assert!(
1583            downcast.is_some(),
1584            "GovernanceRefusal must round-trip through anyhow::Error \
1585             so insert_create_with_quota's downcast can map to 403"
1586        );
1587        assert_eq!(downcast.unwrap().reason, refusal.reason);
1588    }
1589}