ai_memory/handlers/create.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP `POST /api/v1/memories` create-path: six-stage orchestrator,
5//! per-stage helpers, postgres branch, and inline stage-helper tests.
6//!
7//! Extracted from [`super::http`] under issue #650 (handler cap ≤1200
8//! LOC). Handler bodies are unchanged; only the module surface moved.
9//! Wire compatibility preserved via `pub use create::*` in [`super`].
10
11#![allow(clippy::too_many_lines)]
12
13use crate::models::field_names;
14use axum::{
15 Json,
16 extract::State,
17 http::{HeaderMap, StatusCode},
18 response::IntoResponse,
19};
20use chrono::{Duration, Utc};
21use serde_json::json;
22use uuid::Uuid;
23
24use crate::db;
25use crate::embeddings::EmbedStatus;
26#[cfg(test)]
27use crate::models::Tier;
28use crate::models::{CreateMemory, Memory};
29use crate::validate;
30
31#[cfg(feature = "sal")]
32use super::StorageBackend;
33use super::maybe_auto_tag;
34#[cfg(feature = "sal")]
35use super::store_err_to_response;
36use super::{AppState, JsonOrBadRequest};
37
38// #866 — `create_memory` stage-helpers.
39//
40// The original `create_memory` carried ~790 LOC across the agent_id
41// resolution, on_conflict policy, embed-before-lock pass, governance
42// pre-write hook, the actual `db::insert`, the federation fanout, and
43// the postgres-SAL branch. Each stage has a clear input → output
44// contract, so each lives in a dedicated helper. The wrapper below
45// is the orchestrator: it sequences the six stage helpers (1 agent_id
46// → 2 on_conflict → 3 embed-before-lock → 4 governance → 5 insert →
47// 6 fanout) and returns the assembled HTTP response.
48//
49// Helpers return `Result<T, axum::response::Response>` so any short-
50// circuit envelope (validation error, conflict, governance pending,
51// federation quorum failure) is just an `?` away from the orchestrator.
52// ---------------------------------------------------------------------------
53
54/// #866 stage 1 — resolve `agent_id` via the HTTP precedence chain:
55/// 1. top-level `body.agent_id`
56/// 2. embedded `body.metadata.agent_id` (caller's NHI claim — load-
57/// bearing for federation receivers and clients that prefer the
58/// metadata-only shape; mirrors the MCP precedence at
59/// `crate::mcp::handle_store` (NHI precedence) and the CLAUDE.md §Agent Identity (NHI)
60/// contract).
61/// 3. `X-Agent-Id` request header
62/// 4. per-request anonymous fallback
63///
64/// Also validates `body.scope` (when supplied at the top level) and
65/// merges both the resolved `agent_id` and the scope into a fresh
66/// `metadata` value. The returned metadata is the canonical one for
67/// the subsequent stages — `body.metadata` is consumed here.
68///
69/// L11 (NHI-D-fed-agentid-mutation): prior to this split, step 2 was
70/// missing — a federated peer that resent a memory through
71/// `POST /api/v1/memories` (or a client that only stamped
72/// `metadata.agent_id`) would have its claim silently rewritten to
73/// the per-request anonymous id, breaking the immutable-provenance
74/// contract documented in CLAUDE.md and enforced at the SQL layer by
75/// `db::insert_if_newer` / `apply_remote_memory`.
76fn resolve_create_agent_id(
77 headers: &HeaderMap,
78 body: &CreateMemory,
79) -> Result<(String, serde_json::Value), axum::response::Response> {
80 let header_agent_id = headers
81 .get(crate::HEADER_AGENT_ID)
82 .and_then(|v| v.to_str().ok());
83 let metadata_agent_id = body
84 .metadata
85 .get("agent_id")
86 .and_then(serde_json::Value::as_str)
87 .map(str::to_string);
88 // #907 (security-high, 2026-05-19) — sibling of #874/#901/#905.
89 // The pre-#907 path preferred caller-supplied
90 // `body.agent_id` / `metadata.agent_id` over the authenticated
91 // `X-Agent-Id` header on the WRITE-path provenance stamp. An
92 // attacker authenticated as `bob` could call
93 // `POST /api/v1/memories` with `body.agent_id="alice"` (or
94 // `metadata.agent_id="alice"`) and the new row would land with
95 // `metadata.agent_id="alice"` — a provenance LIE that
96 // permanently fakes attribution (NHI design contract:
97 // `metadata.agent_id` is preserved across update/dedup/import).
98 // Header-only authentication now; caller-supplied claims (if
99 // present) must MATCH the authenticated caller else 403. The
100 // metadata stamp is forced to the resolved caller below.
101 let agent_id = crate::identity::resolve_http_agent_id(None, header_agent_id).map_err(|e| {
102 (
103 StatusCode::BAD_REQUEST,
104 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
105 )
106 .into_response()
107 })?;
108 if let Some(claimed) = body.agent_id.as_deref()
109 && claimed != agent_id
110 {
111 return Err((
112 StatusCode::FORBIDDEN,
113 Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
114 )
115 .into_response());
116 }
117 if let Some(claimed) = metadata_agent_id.as_deref()
118 && claimed != agent_id
119 {
120 return Err((
121 StatusCode::FORBIDDEN,
122 Json(json!({"error": "metadata.agent_id does not match authenticated caller"})),
123 )
124 .into_response());
125 }
126 let mut metadata = body.metadata.clone();
127 if let Some(obj) = metadata.as_object_mut() {
128 obj.insert(
129 "agent_id".to_string(),
130 serde_json::Value::String(agent_id.clone()),
131 );
132 }
133 // #151 scope: validate + merge into metadata if supplied at the top
134 // level (inline metadata.scope still works; top-level is a shortcut).
135 if let Some(ref s) = body.scope {
136 validate::validate_scope(s).map_err(|e| {
137 (
138 StatusCode::BAD_REQUEST,
139 Json(json!({"error": e.to_string()})),
140 )
141 .into_response()
142 })?;
143 if let Some(obj) = metadata.as_object_mut() {
144 obj.insert("scope".to_string(), serde_json::Value::String(s.clone()));
145 }
146 }
147 Ok((agent_id, metadata))
148}
149
150/// #866 stage 3 — embed-before-lock. Issue #219: the embedder runs
151/// 10-200 ms of ONNX / Ollama work that must not hold the single
152/// shared `Mutex<Connection>` on a multi-agent daemon.
153///
154/// v0.7.0 Round-2 F10 — calls α's `Embedder::embed_with_status` so the
155/// success/skip/fail outcome is captured alongside the vector. The
156/// success-path response stays silent on `Indexed`; non-`Indexed`
157/// outcomes are surfaced as `embed_status` on the response body so the
158/// caller can tell semantic recall will miss this row until a re-index.
159/// Keyword-only deployments (embedder=None) report `Indexed` so the
160/// response shape is unchanged on nodes where the semantic layer is
161/// intentionally absent.
162fn embed_create_before_lock(
163 app: &AppState,
164 title: &str,
165 content: &str,
166) -> (Option<Vec<f32>>, EmbedStatus) {
167 let embedding_text = crate::embeddings::embedding_document(title, content);
168 match app.embedder.as_ref().as_ref() {
169 None => (None, EmbedStatus::Indexed),
170 Some(emb) => emb.embed_with_status(&embedding_text),
171 }
172}
173
174/// #866 stage 2 — resolve the `on_conflict` policy:
175/// - `error` (default): 409 CONFLICT + typed payload if a row with
176/// the same (title, namespace) already exists.
177/// - `version`: rewrite the title to the next free suffix.
178/// - `merge`: fall through; `db::insert` will UPSERT via the legacy
179/// INSERT ... ON CONFLICT path.
180///
181/// Returns the final title to embed in the canonical row, or an
182/// already-assembled error response for the orchestrator to surface.
183fn resolve_create_conflict_title(
184 conn: &rusqlite::Connection,
185 body: &CreateMemory,
186 on_conflict_mode: crate::mcp::tools::OnConflictMode,
187) -> Result<String, axum::response::Response> {
188 use crate::mcp::tools::OnConflictMode;
189 match on_conflict_mode {
190 OnConflictMode::Error => {
191 match db::find_by_title_namespace(conn, &body.title, &body.namespace) {
192 Ok(Some(existing_id)) => Err((
193 StatusCode::CONFLICT,
194 Json(json!({
195 "code": crate::errors::error_codes::CONFLICT,
196 "error": format!(
197 "memory with title '{}' already exists in namespace '{}'",
198 body.title, body.namespace
199 ),
200 "existing_id": existing_id,
201 })),
202 )
203 .into_response()),
204 Ok(None) => Ok(body.title.clone()),
205 Err(e) => {
206 tracing::error!("on_conflict lookup failed: {e}");
207 Err((
208 StatusCode::INTERNAL_SERVER_ERROR,
209 Json(json!({"error": "conflict check failed"})),
210 )
211 .into_response())
212 }
213 }
214 }
215 OnConflictMode::Version => db::next_versioned_title(conn, &body.title, &body.namespace)
216 .map_err(|e| {
217 tracing::error!("on_conflict=version failed: {e}");
218 (
219 StatusCode::INTERNAL_SERVER_ERROR,
220 Json(json!({"error": "could not pick a versioned title"})),
221 )
222 .into_response()
223 }),
224 OnConflictMode::Merge => Ok(body.title.clone()),
225 }
226}
227
228/// #866 stage 4 — substrate governance pre-write hook. Walks the
229/// inheritance chain via `db::enforce_governance` and either:
230/// - `Allow`: returns `Ok(())` to the orchestrator (caller proceeds
231/// to the insert stage).
232/// - `Deny`: short-circuits with 403 FORBIDDEN + the operator-
233/// authored reason verbatim.
234/// - `Pending`: queues the action in `pending_actions`, fires the
235/// K4 `approval_requested` webhook, then drops the lock and
236/// fans the pending row out to federation peers via
237/// `broadcast_pending_quorum`. Returns 202 ACCEPTED with the
238/// pending id so the caller can drive the consensus path through
239/// `POST /pending/{id}/approve`.
240///
241/// The Pending branch consumes the supplied `lock`; the orchestrator
242/// re-acquires `state.lock().await` AFTER an `Allow` return because
243/// the consume here is intentional (`drop(lock)` before the federation
244/// broadcast — keeping the DB lock across an async `await` is the
245/// regression #866 explicitly guards against).
246async fn enforce_create_governance<'a>(
247 app: &AppState,
248 lock: tokio::sync::MutexGuard<
249 'a,
250 (
251 rusqlite::Connection,
252 std::path::PathBuf,
253 crate::config::ResolvedTtl,
254 bool,
255 ),
256 >,
257 mem: &Memory,
258) -> Result<
259 tokio::sync::MutexGuard<
260 'a,
261 (
262 rusqlite::Connection,
263 std::path::PathBuf,
264 crate::config::ResolvedTtl,
265 bool,
266 ),
267 >,
268 axum::response::Response,
269> {
270 use crate::models::{GovernanceDecision, GovernedAction};
271 // #869 audit (Category B — safe default): missing or non-string
272 // `agent_id` collapses to `""`. The governance engine treats the
273 // empty agent the same as an anonymous caller (no per-agent rules
274 // match), which is the documented fail-closed posture.
275 let agent_for_gov = mem
276 .metadata
277 .get("agent_id")
278 .and_then(|v| v.as_str())
279 .unwrap_or_default()
280 .to_string();
281 // #869 — silently degrading to `Value::Null` would let the
282 // governance engine see a different payload than the one we
283 // were about to commit (rule predicates that key on memory
284 // fields would all evaluate against `null` and degenerate to
285 // either always-allow or always-deny depending on the rule
286 // semantics). Fail closed with a 500 instead.
287 let payload = match super::to_value_or_500("create_memory.governance.payload", mem) {
288 Ok(v) => v,
289 Err(resp) => return Err(resp),
290 };
291 match db::enforce_governance(
292 &lock.0,
293 GovernedAction::Store,
294 &mem.namespace,
295 &agent_for_gov,
296 None,
297 None,
298 &payload,
299 ) {
300 Ok(GovernanceDecision::Allow) => Ok(lock),
301 Ok(GovernanceDecision::Deny(refusal)) => Err((
302 StatusCode::FORBIDDEN,
303 Json(json!({"error": crate::governance::deny_message(
304 "store",
305 crate::governance::DenyGate::Governance,
306 &refusal.reason,
307 )})),
308 )
309 .into_response()),
310 Ok(GovernanceDecision::Pending(pending_id)) => {
311 // v0.6.2 (S34): fan out the new pending row so peers can
312 // approve / reject / list it. Load the canonical row we
313 // just inserted and broadcast before responding.
314 let pending_row = db::get_pending_action(&lock.0, &pending_id).ok().flatten();
315 // v0.7.0 K4 — fire the `approval_requested` webhook event
316 // through the existing subscription dispatcher so K10's
317 // Approval API HTTP+SSE handler picks it up. Done BEFORE
318 // the lock drops so the subscriber list query has a
319 // connection; the actual HTTP POSTs spawn detached threads
320 // (fire-and-forget). Best-effort: a dispatch failure must
321 // not roll back the pending row.
322 crate::subscriptions::dispatch_approval_requested(&lock.0, &pending_id, &lock.1);
323 let namespace = mem.namespace.clone();
324 drop(lock);
325 if let (Some(pa), Some(fed)) = (pending_row.as_ref(), app.federation.as_ref()) {
326 match crate::federation::broadcast_pending_quorum(fed, pa).await {
327 Ok(tracker) => {
328 if let Err(err) = crate::federation::finalise_quorum(&tracker) {
329 // #869 — typed 503 envelope via the shared helper.
330 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
331 return Err(super::quorum_not_met_response(&payload));
332 }
333 }
334 Err(err) => {
335 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
336 return Err(super::quorum_not_met_response(&payload));
337 }
338 }
339 }
340 Err((
341 StatusCode::ACCEPTED,
342 Json(json!({
343 "status": "pending",
344 (field_names::PENDING_ID): pending_id,
345 "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
346 "action": "store",
347 "namespace": namespace,
348 })),
349 )
350 .into_response())
351 }
352 Err(e) => Err(crate::handlers::errors::governance_error_500(&e)),
353 }
354}
355
356/// #866 stage 5 — quota check + `db::insert`. The quota gate mirrors
357/// the MCP path (`crate::mcp::handle_store` (quota check)): `check_and_record` before the
358/// insert, refund on failure. The audit emit fires on success; the
359/// embedding write to `db::set_embedding` lights the HNSW index up
360/// after the row commits. Returns either the persisted row id (on
361/// success) or a pre-built error response (validation, quota, or
362/// substrate failure including the L1-6 substrate governance refusal
363/// which is mapped to 403 FORBIDDEN + `GOVERNANCE_REFUSED`).
364fn insert_create_with_quota(
365 lock: &tokio::sync::MutexGuard<
366 '_,
367 (
368 rusqlite::Connection,
369 std::path::PathBuf,
370 crate::config::ResolvedTtl,
371 bool,
372 ),
373 >,
374 mem: &Memory,
375 embedding: &Option<Vec<f32>>,
376) -> Result<String, axum::response::Response> {
377 // v0.7.0 Round-2 F7 — per-agent quota gate. Round-1 evidence: 500
378 // HTTP stores from a single agent_id incremented zero rows in
379 // `agent_quotas` while the same agent's MCP-side stamp incremented
380 // correctly. The MCP store path (crate::mcp::handle_store) calls
381 // `quotas::check_and_record` ahead of `db::insert` and refunds on
382 // insert failure; mirror that here so the HTTP path is no longer a
383 // quota-bypass surface. Bytes counted = (title + content +
384 // serialized metadata) — same shape the MCP path uses so cross-
385 // path totals stay coherent.
386 // #869 audit (Category B — safe default): empty `quota_agent_id`
387 // is intentional sentinel — `check_and_record` only fires when the
388 // agent id is non-empty (the `if !quota_agent_id.is_empty()` guard
389 // below skips the quota call for anonymous callers, mirroring the
390 // MCP path's behaviour).
391 let quota_agent_id = mem
392 .metadata
393 .get("agent_id")
394 .and_then(|v| v.as_str())
395 .unwrap_or_default()
396 .to_string();
397 let raw_payload_bytes = mem.title.len()
398 + mem.content.len()
399 + serde_json::to_string(&mem.metadata)
400 .map(|s| s.len())
401 .unwrap_or(0);
402 let payload_bytes = match i64::try_from(raw_payload_bytes) {
403 Ok(v) => v,
404 Err(_) => {
405 // M10 (v0.7.0 round-2) — saturating cast surfaced. usize
406 // overflowed i64 (rare; would require >9 EiB of metadata
407 // on a 64-bit host). Operators need to see this in logs
408 // because the quota row gets clamped to the maximum,
409 // which makes that single store look unbounded from the
410 // dashboard's perspective until they investigate.
411 tracing::warn!(
412 agent_id = %quota_agent_id,
413 raw_bytes = raw_payload_bytes,
414 "quota byte-count saturated at i64::MAX for agent={}; \
415 metadata may be excessively large",
416 if quota_agent_id.is_empty() {
417 "<anonymous>"
418 } else {
419 quota_agent_id.as_str()
420 }
421 );
422 i64::MAX
423 }
424 };
425 let quota_op = crate::quotas::QuotaOp::Memory {
426 bytes: payload_bytes,
427 };
428 if !quota_agent_id.is_empty() {
429 // v0.7.0 #1156 — charge against the per-namespace accounting
430 // row (the v50 PK extension). Per-namespace allotments hold
431 // even when a single agent writes across many namespaces.
432 if let Err(e) =
433 crate::quotas::check_and_record(&lock.0, "a_agent_id, &mem.namespace, quota_op)
434 {
435 // Map QuotaCheckError to the same wire shape the rest of
436 // the daemon uses for quota breaches: 429 with a
437 // `code: "QUOTA_EXCEEDED"` envelope so callers can switch
438 // on the limit name. Substrate errors bubble up as 500
439 // because the row was never written.
440 return Err(match e {
441 crate::quotas::QuotaCheckError::Quota(qe) => (
442 StatusCode::TOO_MANY_REQUESTS,
443 Json(json!({
444 "code": crate::errors::error_codes::QUOTA_EXCEEDED,
445 "error": qe.to_string(),
446 "limit": qe.limit.as_str(),
447 "current": qe.current,
448 "max": qe.max,
449 "agent_id": qe.agent_id,
450 })),
451 )
452 .into_response(),
453 crate::quotas::QuotaCheckError::Sql(se) => {
454 tracing::error!("quota substrate error: {se}");
455 (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(json!({"error": "quota check failed"})),
458 )
459 .into_response()
460 }
461 });
462 }
463 }
464
465 match db::insert(&lock.0, mem) {
466 Ok(actual_id) => {
467 // Issue #219: persist the embedding into the connection so
468 // semantic recall can find this memory. Previously the HTTP
469 // path stored the row but never called `set_embedding`,
470 // silently excluding every HTTP-authored memory from
471 // semantic search. HNSW index warm-up happens after the
472 // lock drops in the orchestrator.
473 if let Some(vec) = embedding.as_ref()
474 && let Err(e) = db::set_embedding(&lock.0, &actual_id, vec)
475 {
476 tracing::warn!("failed to store embedding for {actual_id}: {e}");
477 }
478 Ok(actual_id)
479 }
480 Err(e) => {
481 // v0.7.0 Round-2 F7 — insert failed AFTER we committed the
482 // quota counter; refund so the agent's quota reflects only
483 // successful stores (mirrors the MCP path at
484 // crate::mcp::handle_store). Refund is best-effort — a refund
485 // failure is logged but does not change the response.
486 if !quota_agent_id.is_empty() {
487 // #1156 — refund lands on the same `(agent_id,
488 // namespace)` row check_and_record above incremented.
489 if let Err(re) =
490 crate::quotas::refund_op(&lock.0, "a_agent_id, &mem.namespace, quota_op)
491 {
492 crate::quotas::log_refund_op_failed("a_agent_id, &re);
493 }
494 }
495 // v0.7.0 L1-6 Deliverable E — surface the substrate
496 // governance pre-write hook's refusal as `403 FORBIDDEN`
497 // with code `GOVERNANCE_REFUSED` and the operator-authored
498 // reason verbatim. The substrate wraps the refusal in a
499 // typed `storage::GovernanceRefusal` propagated via
500 // `anyhow::Error`; downcasting here keeps the
501 // happy-path-cheap `?`-friendly return shape upstream.
502 //
503 // SAL-bypass intentional (#961): the SAL `StoreError` enum
504 // in `src/store/mod.rs` does not carry the operator-authored
505 // reason string; substrate governance refusals are emitted
506 // by the legacy db:: write path which wraps them in
507 // `anyhow::Error`. Downcasting to the legacy concrete type
508 // here is the load-bearing contract — pinned by the
509 // `insert_governance_refusal_downcasts_to_403_envelope` test
510 // in the `#[cfg(test)]` block below.
511 if let Some(refusal) = e.downcast_ref::<crate::storage::GovernanceRefusal>() {
512 tracing::info!(
513 "create_memory refused by substrate governance: {}",
514 refusal.reason
515 );
516 return Err((
517 StatusCode::FORBIDDEN,
518 Json(json!({
519 "code": crate::errors::error_codes::GOVERNANCE_REFUSED,
520 "error": refusal.reason,
521 })),
522 )
523 .into_response());
524 }
525 Err(crate::handlers::errors::handler_error_500(&e))
526 }
527 }
528}
529
530/// #866 stage 6 — federation fanout + HNSW index warm-up + assembled
531/// CREATED response.
532///
533/// Per ADR-0001 the substrate does NOT roll back on quorum failure:
534/// the local commit has already landed when we reach this stage. A
535/// quorum miss surfaces 503 + `Retry-After: 2` and the sync-daemon's
536/// eventual-consistency loop catches stragglers up. A `Some(fed)` +
537/// `Ok(got)` path includes `quorum_acks: <count>` on the response.
538async fn fanout_and_assemble_create_response(
539 app: &AppState,
540 mem: &Memory,
541 actual_id: &str,
542 embedding: Option<Vec<f32>>,
543 auto_tags: &[String],
544 contradiction_ids: Vec<String>,
545 embed_status: EmbedStatus,
546) -> axum::response::Response {
547 // #1566 / #1579 B1 — embed-once-replicate-vector: capture the
548 // just-computed vector for the federation fanout BEFORE the HNSW
549 // warm-up consumes it. Shipping it inside the signed push payload
550 // lets dim-matching receivers store it directly instead of
551 // re-embedding (~1s/row via ollama, up to 9× per memory across the
552 // fleet pre-#1566). `None` (keyword tier / embed-degraded store)
553 // keeps the pre-#1566 wire bytes.
554 let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
555 (Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
556 actual_id.to_string(),
557 emb.model_description(),
558 vec.clone(),
559 )),
560 _ => None,
561 };
562 // HNSW warm-up after the DB lock dropped (done by the caller).
563 if let Some(vec) = embedding {
564 let mut idx_lock = app.vector_index.lock().await;
565 if let Some(idx) = idx_lock.as_mut() {
566 idx.insert(actual_id.to_string(), vec);
567 }
568 }
569 // #196: echo the resolved agent_id so callers don't need a follow-up get.
570 let resolved_agent_id = mem
571 .metadata
572 .get("agent_id")
573 .and_then(|v| v.as_str())
574 .map(str::to_string);
575 // PR-5 (issue #487): security audit trail for HTTP store.
576 // #869 audit (Category B — safe default): when no agent_id was
577 // resolved at request time the audit row records the actor as
578 // `""` (the documented anonymous-actor sentinel for the audit
579 // chain). Same posture as the MCP path.
580 crate::audit::emit(crate::audit::EventBuilder::new(
581 crate::audit::AuditAction::Store,
582 crate::audit::actor(
583 resolved_agent_id.clone().unwrap_or_default(),
584 "http_body",
585 mem.metadata
586 .get("scope")
587 .and_then(|v| v.as_str())
588 .map(str::to_string),
589 ),
590 crate::audit::target_memory(
591 actual_id.to_string(),
592 mem.namespace.clone(),
593 Some(mem.title.clone()),
594 Some(mem.tier.to_string()),
595 mem.metadata
596 .get("scope")
597 .and_then(|v| v.as_str())
598 .map(str::to_string),
599 ),
600 ));
601 let mut response = json!({
602 "id": actual_id,
603 "tier": mem.tier,
604 "namespace": mem.namespace,
605 "title": mem.title,
606 "agent_id": resolved_agent_id,
607 });
608 if !contradiction_ids.is_empty() {
609 response["potential_contradictions"] = json!(contradiction_ids);
610 }
611 // v0.7.0 L5 — echo LLM-generated tags as a dedicated
612 // `auto_tags` field, matching MCP `handle_store`'s response.
613 if !auto_tags.is_empty() {
614 response["auto_tags"] = json!(auto_tags);
615 }
616 // v0.7.0 Round-2 F10 — surface embed_status to the caller when α's
617 // `embed_with_status` reported anything other than `Indexed`.
618 if embed_status.is_degraded() {
619 response["embed_status"] = json!(embed_status.as_str());
620 let reason = embed_status.reason();
621 if !reason.is_empty() {
622 response["embed_status_reason"] = json!(reason);
623 }
624 }
625 // #932 (v0.7.0 Track D, 2026-05-20) — fire `memory_store`
626 // webhook subscribers via the canonical sqlite dispatch path.
627 // Pre-#932 the HTTP `create_memory` sqlite branch invoked no
628 // dispatch hook, so an HTTP-routed store fired zero webhooks
629 // even when matching subscribers were registered. The MCP
630 // `handle_store` path at `src/mcp/tools/store/mod.rs:469` has
631 // always emitted this event; the HTTP path now matches.
632 // Fire-and-forget (worker threads handle delivery).
633 {
634 let lock = app.db.lock().await;
635 crate::subscriptions::dispatch_event(
636 &lock.0,
637 crate::mcp::registry::tool_names::MEMORY_STORE,
638 actual_id,
639 &mem.namespace,
640 resolved_agent_id.as_deref(),
641 &lock.1,
642 );
643 }
644
645 // v0.7 federation: fan out to peers when --quorum-writes is
646 // configured. Per ADR-0001 a failed quorum returns 503 but does
647 // NOT roll back the local write.
648 if let Some(fed) = app.federation.as_ref() {
649 let mut mem_echo = mem.clone();
650 mem_echo.id = actual_id.to_string();
651 // #1566 / #1579 B1 — ship the source vector with the push.
652 match crate::federation::broadcast_store_quorum_with_embedding(
653 fed,
654 &mem_echo,
655 shipped.as_ref(),
656 )
657 .await
658 {
659 Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
660 Ok(got) => {
661 response["quorum_acks"] = json!(got);
662 return (StatusCode::CREATED, Json(response)).into_response();
663 }
664 Err(err) => {
665 // #869 — typed 503 envelope via the shared helper.
666 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
667 return super::quorum_not_met_response(&payload);
668 }
669 },
670 Err(err) => {
671 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
672 return super::quorum_not_met_response(&payload);
673 }
674 }
675 }
676 (StatusCode::CREATED, Json(response)).into_response()
677}
678
679/// #866 — postgres-backed daemon path for `create_memory`. The SAL
680/// trait's `store_with_embedding` writes the row and the embedding
681/// in a single call; the surrounding ceremony (auto_tag,
682/// governance, audit, federation) mirrors the sqlite stages above
683/// just without the shared `Mutex<Connection>` discipline (postgres
684/// connection-pooling owns its own concurrency).
685#[cfg(feature = "sal")]
686async fn create_memory_postgres(
687 app: &AppState,
688 body: &CreateMemory,
689 agent_id: &str,
690 metadata: serde_json::Value,
691) -> axum::response::Response {
692 let now = Utc::now();
693 // v0.7.0 L5 — fire the LLM `auto_tag` hook before assembling the
694 // canonical `Memory` row so the postgres `tags` column lands
695 // populated with LLM suggestions on the FIRST insert.
696 let auto_tags =
697 maybe_auto_tag(app, &body.title, &body.content, &body.tags, &body.namespace).await;
698 let mut final_tags = body.tags.clone();
699 for t in &auto_tags {
700 if !final_tags.iter().any(|existing| existing == t) {
701 final_tags.push(t.clone());
702 }
703 }
704 let mut mem = Memory {
705 id: Uuid::new_v4().to_string(),
706 tier: body.tier.clone(),
707 namespace: body.namespace.clone(),
708 title: body.title.clone(),
709 content: body.content.clone(),
710 tags: final_tags,
711 priority: body.priority,
712 // #1591 — omitted confidence resolves to the compiled default
713 // with truthful `confidence_source = "default"` provenance.
714 confidence: body.resolved_confidence(),
715 source: body.source.clone(),
716 access_count: 0,
717 created_at: now.to_rfc3339(),
718 updated_at: now.to_rfc3339(),
719 last_accessed_at: None,
720 expires_at: body.expires_at.clone(),
721 metadata,
722 reflection_depth: 0,
723 // #1385 — honour caller-supplied `kind` instead of the prior
724 // hardcoded `Observation`. Pre-#1385 the HTTP POST path
725 // dropped `body.kind` (the field didn't even exist on
726 // `CreateMemory`), so every HTTP-created row landed as
727 // `Observation` and the Form 6 recall `kinds` filter returned
728 // zero rows even when the caller had clearly stored a
729 // `claim` / `decision` / etc. Matches the MCP `memory_store`
730 // contract at `src/mcp/tools/store/validation.rs:207-240`:
731 // unknown / absent → silently fall through to `Observation`
732 // (forward-compat with future variants).
733 memory_kind: body
734 .kind
735 .as_deref()
736 .and_then(crate::models::MemoryKind::from_str)
737 .unwrap_or_default(),
738 entity_id: None,
739 persona_version: None,
740 // #1411 — Form 4 wire-truthfulness on the postgres branch.
741 // Pre-#1411 these were hardcoded `Vec::new()` / `None` /
742 // `None`, so HTTP POST /api/v1/memories validated the
743 // caller's `citations` / `source_uri` / `source_span` via
744 // `validate::validate_create` then silently dropped them
745 // on insert. Same shape as the #1385 `kind` drop; thread
746 // the validated body fields through to the inserted row.
747 citations: body.citations.clone(),
748 source_uri: body.source_uri.clone(),
749 source_span: body.source_span.clone(),
750 confidence_source: body.resolved_confidence_source(),
751 confidence_signals: None,
752 confidence_decayed_at: None,
753 version: 1,
754 };
755 // #626 Layer-3 (C7) — agent-attestation gate (postgres SAL branch).
756 // Same contract as the sqlite path, but the bound-key lookup goes
757 // through the async `MemoryStore::agent_pubkey`. 400 for a malformed
758 // transport field; 403 when a presented signature fails to verify or
759 // required-attestation rejects an unsigned write.
760 {
761 let presented_sig = body
762 .signature
763 .as_deref()
764 .map(str::trim)
765 .filter(|s| !s.is_empty());
766 if let Some(sig_b64) = presented_sig {
767 let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
768 sig_b64,
769 body.created_at.as_deref(),
770 ) {
771 Ok(v) => v,
772 Err(msg) => {
773 return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
774 }
775 };
776 mem.created_at = signed_created_at.to_string();
777 if let Err(e) = crate::identity::attest::stamp_attestation_async(
778 app.store.as_ref(),
779 &mut mem,
780 agent_id,
781 Some(&sig_bytes),
782 )
783 .await
784 {
785 return (
786 StatusCode::FORBIDDEN,
787 Json(json!({
788 "code": crate::errors::error_codes::ATTESTATION_FAILED,
789 "error": e.to_string(),
790 })),
791 )
792 .into_response();
793 }
794 } else if crate::identity::attest::require_agent_attestation_enabled()
795 && let Err(e) = crate::identity::attest::stamp_attestation_async(
796 app.store.as_ref(),
797 &mut mem,
798 agent_id,
799 None,
800 )
801 .await
802 {
803 return (
804 StatusCode::FORBIDDEN,
805 Json(json!({
806 "code": crate::errors::error_codes::ATTESTATION_FAILED,
807 "error": e.to_string(),
808 })),
809 )
810 .into_response();
811 }
812 }
813
814 let ctx = crate::store::CallerContext::for_agent(agent_id.to_string());
815
816 // v0.7.0 Wave-3 Continuation 5 (S18 / semantic recall) — embed
817 // before the SAL store so the postgres `embedding` column lands
818 // populated; otherwise `recall_hybrid` filters every row out via
819 // `WHERE embedding IS NOT NULL`.
820 let embedding_text = crate::embeddings::embedding_document(&mem.title, &mem.content);
821 let embedding: Option<Vec<f32>> = match app.embedder.as_ref().as_ref() {
822 None => None,
823 Some(emb) => emb.embed(&embedding_text).ok(),
824 };
825
826 // v0.7.0 Wave-3 Continuation 3 (Phase 20) — governance walk on
827 // writes. Postgres branch enforces the same inheritance chain +
828 // approver_type policy as sqlite. Approve → 202 Accepted + pending id.
829 let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
830 match app
831 .store
832 .enforce_governance_action(
833 crate::store::GovernedAction::Store,
834 &mem.namespace,
835 agent_id,
836 None,
837 None,
838 &payload_for_pending,
839 )
840 .await
841 {
842 Ok(crate::models::GovernanceDecision::Allow) => {}
843 Ok(crate::models::GovernanceDecision::Deny(refusal)) => {
844 return (
845 StatusCode::FORBIDDEN,
846 Json(json!({"error": format!("denied: {reason}", reason = refusal.reason)})),
847 )
848 .into_response();
849 }
850 Ok(crate::models::GovernanceDecision::Pending(pending_id)) => {
851 return (
852 StatusCode::ACCEPTED,
853 Json(json!({
854 "status": "pending",
855 (field_names::PENDING_ID): pending_id,
856 "namespace": mem.namespace,
857 (field_names::STORAGE_BACKEND): "postgres",
858 })),
859 )
860 .into_response();
861 }
862 Err(e) => return store_err_to_response(e),
863 }
864
865 // #1480 — pipeline the cross-region peer broadcast with the local
866 // durable write. `mem.id` is a caller-generated UUID (assigned
867 // above) and `store_with_embedding` RETURNs that same id, so the
868 // broadcast body is fully known before the commit lands. The peer
869 // `/sync/push` accept is an idempotent upsert keyed by id
870 // (tier-never-downgrades), so issuing the broadcast before local
871 // durability is safe: a failed local write returns an error and the
872 // client retries with the SAME id (peers converge idempotently);
873 // anti-entropy (`memories_updated_since` pull) reconciles any
874 // orphaned peer row. Governance was already enforced above, so the
875 // only failure modes left at the store call are transient infra
876 // errors. Net effect: the local fsync overlaps the peer RTT instead
877 // of being paid serially before it.
878 //
879 // #931 — debug-level entry logs on BOTH the `Some(fed)` and `None`
880 // arm so the Track D Docker probe can distinguish "federation never
881 // wired into AppState" from "federation wired but emitted zero peer
882 // requests".
883 let store_fut = app
884 .store
885 .store_with_embedding(&ctx, &mem, embedding.as_deref());
886 let (id, quorum_outcome) = match app.federation.as_ref() {
887 Some(fed) => {
888 tracing::debug!(
889 target: crate::federation::SYNC_TRACE_TARGET,
890 memory_id = %mem.id,
891 namespace = %mem.namespace,
892 peer_count = fed.peer_count(),
893 backend = "postgres",
894 "create_memory_postgres: pipelining broadcast_store_quorum with local write",
895 );
896 let mem_echo = mem.clone();
897 // #1566 / #1579 B1 — ship the source vector with the push
898 // (embed-once-replicate-vector; postgres twin of the
899 // sqlite fanout in `fanout_and_assemble_create_response`).
900 let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
901 (Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
902 mem.id.clone(),
903 emb.model_description(),
904 vec.clone(),
905 )),
906 _ => None,
907 };
908 let (store_res, quorum_res) = tokio::join!(
909 store_fut,
910 crate::federation::broadcast_store_quorum_with_embedding(
911 fed,
912 &mem_echo,
913 shipped.as_ref(),
914 )
915 );
916 // Local durability gates everything: a failed local write
917 // returns the store error and DISCARDS the already-in-flight
918 // quorum outcome (the client retries with the same id).
919 match store_res {
920 Ok(id) => (id, Some(quorum_res)),
921 Err(e) => return store_err_to_response(e),
922 }
923 }
924 None => {
925 tracing::debug!(
926 target: crate::federation::SYNC_TRACE_TARGET,
927 memory_id = %mem.id,
928 namespace = %mem.namespace,
929 backend = "postgres",
930 "create_memory_postgres: federation disabled — skipping broadcast",
931 );
932 match store_fut.await {
933 Ok(id) => (id, None),
934 Err(e) => return store_err_to_response(e),
935 }
936 }
937 };
938
939 // Local write succeeded. Audit + webhook dispatch fire on local
940 // durability REGARDLESS of the quorum outcome — the local write is
941 // never rolled back (ADR-0001) — then quorum gates 201 vs 503.
942
943 // v0.7.0 Wave-3 Continuation 2 Phase 9 — audit emit on postgres write.
944 if crate::audit::is_enabled() {
945 let scope = mem
946 .metadata
947 .get("scope")
948 .and_then(|v| v.as_str())
949 .map(str::to_string);
950 crate::audit::emit(crate::audit::EventBuilder::new(
951 crate::audit::AuditAction::Store,
952 crate::audit::actor(agent_id.to_string(), "http_body", scope.clone()),
953 crate::audit::target_memory(
954 id.clone(),
955 mem.namespace.clone(),
956 Some(mem.title.clone()),
957 Some(mem.tier.to_string()),
958 scope,
959 ),
960 ));
961 }
962 // #932 (v0.7.0 Track D, 2026-05-20) — postgres-backed subscription
963 // dispatch. The sqlite-side dispatch reads from the `subscriptions`
964 // table; postgres subscriptions land as memories in
965 // `_subscriptions/<aid>` and are INVISIBLE to that lookup. Pre-#932
966 // the postgres `create_memory_postgres` branch fired zero webhooks
967 // on every `memory_store` event — vacuously satisfying the v0.7.0
968 // HMAC-non-optional guarantee. `dispatch_event_postgres` walks every
969 // `_subscriptions/<*>` row across tenants (bypass_visibility=true),
970 // applies the same matcher the sqlite path uses, and feeds the
971 // canonical `dispatch_event_to_subs` worker pool. Fire-and-forget;
972 // never panics; never rolls back the local commit.
973 let id_for_dispatch = id.clone();
974 let ns_for_dispatch = mem.namespace.clone();
975 let agent_for_dispatch = agent_id.to_string();
976 super::dispatch_event_postgres(
977 app,
978 crate::mcp::registry::tool_names::MEMORY_STORE,
979 &id_for_dispatch,
980 &ns_for_dispatch,
981 Some(&agent_for_dispatch),
982 None,
983 )
984 .await;
985
986 // #1480 — evaluate the pipelined quorum result now that the local
987 // write is durable and audit/dispatch have fired. A failed quorum
988 // returns 503 but never rolls back the local write (ADR-0001).
989 if let Some(quorum_res) = quorum_outcome {
990 match quorum_res {
991 Ok(tracker) => {
992 if let Err(err) = crate::federation::finalise_quorum(&tracker) {
993 // #869 — typed 503 envelope via the shared helper.
994 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
995 return super::quorum_not_met_response(&payload);
996 }
997 }
998 Err(err) => {
999 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
1000 return super::quorum_not_met_response(&payload);
1001 }
1002 }
1003 }
1004
1005 // #869 — typed serialise helper so a 201 + `{}` never masks a real
1006 // encode failure.
1007 let mut payload = match super::to_value_or_500("create_memory.postgres.response", &mem) {
1008 Ok(v) => v,
1009 Err(resp) => return resp,
1010 };
1011 if let Some(obj) = payload.as_object_mut() {
1012 obj.insert("id".to_string(), serde_json::Value::String(id));
1013 if !auto_tags.is_empty() {
1014 obj.insert("auto_tags".to_string(), json!(auto_tags));
1015 }
1016 }
1017 (StatusCode::CREATED, Json(payload)).into_response()
1018}
1019
1020pub async fn create_memory(
1021 State(app): State<AppState>,
1022 headers: HeaderMap,
1023 JsonOrBadRequest(body): JsonOrBadRequest<CreateMemory>,
1024) -> impl IntoResponse {
1025 // Input validation (cheapest gate first).
1026 if let Err(e) = validate::RequestValidator::validate_create(&body) {
1027 return (
1028 StatusCode::BAD_REQUEST,
1029 Json(json!({"error": e.to_string()})),
1030 )
1031 .into_response();
1032 }
1033
1034 // Stage 1 — agent_id resolution (consumes `body.metadata`, returns
1035 // canonical metadata). Consumed by the postgres SAL branch and, since
1036 // #626 Layer-3 (C7), by the sqlite-path agent-attestation gate below.
1037 let (agent_id, metadata) = match resolve_create_agent_id(&headers, &body) {
1038 Ok(v) => v,
1039 Err(resp) => return resp,
1040 };
1041 // Postgres-backed daemons take a separate SAL-trait path with no
1042 // shared `Mutex<Connection>`. Kept as a top-level helper so the
1043 // sqlite stages below stay focused.
1044 #[cfg(feature = "sal")]
1045 if matches!(app.storage_backend, StorageBackend::Postgres) {
1046 return create_memory_postgres(&app, &body, &agent_id, metadata).await;
1047 }
1048
1049 // v0.7.0 L5 — fire the LLM `auto_tag` autonomy hook BEFORE the
1050 // embedding pass + DB lock. Both LLM and embedder calls are
1051 // network/CPU work that must not happen under the single shared
1052 // `Mutex<Connection>` on a multi-agent daemon.
1053 let auto_tags = maybe_auto_tag(
1054 &app,
1055 &body.title,
1056 &body.content,
1057 &body.tags,
1058 &body.namespace,
1059 )
1060 .await;
1061
1062 // Stage 3 — embed-before-lock (issue #219). Computed BEFORE
1063 // acquiring the DB lock so the 10-200 ms embedder run doesn't
1064 // hold the single shared `Mutex<Connection>`.
1065 let (embedding, embed_status) = embed_create_before_lock(&app, &body.title, &body.content);
1066
1067 // #1579 A5 — ANN candidate pool for the proactive conflict check
1068 // (#519). The HNSW search runs BEFORE the DB lock (vector-index
1069 // mutex only — the FX-4/PERF-2 recall lock discipline), replacing
1070 // the O(namespace) embedding decode+scan that previously ran
1071 // UNDER the DB mutex and collapsed semantic-tier write throughput
1072 // to 0.3-1.7 rps (P2 audit). `None` ⇒ force-bypass, no embedding,
1073 // or no fully-searchable index (keyword tier / async-boot warm
1074 // window) — the check below then uses the bounded-scan fallback.
1075 // An EMPTY index also yields `None` (#1579 QC): emptiness makes
1076 // `is_fully_searchable` vacuously true, but during the async-boot
1077 // LOAD phase (before the boot loader's `seed_entries` lands) it
1078 // says nothing about the DB — `Some(vec![])` here would silently
1079 // SKIP the conflict check instead of routing to the bounded scan.
1080 let conflict_candidate_ids: Option<Vec<String>> = if body.force {
1081 None
1082 } else if let Some(ref qe) = embedding {
1083 let vi = app.vector_index.lock().await;
1084 vi.as_ref()
1085 .filter(|idx| idx.is_fully_searchable() && !idx.is_empty())
1086 .map(|idx| {
1087 idx.search(qe, db::PROACTIVE_CONFLICT_INDEX_K)
1088 .into_iter()
1089 .map(|h| h.id)
1090 .collect()
1091 })
1092 } else {
1093 None
1094 };
1095
1096 // v0.6.3.1 P2 (G6) — resolve `on_conflict` policy. HTTP defaults to
1097 // 'error'; callers that want the v0.6.3 silent-merge behaviour must
1098 // pass on_conflict='merge'. v0.7.0 (sweep F-B3.x): routes through
1099 // the single OnConflict::parse SSOT instead of the prior duplicated
1100 // inline string-allowlist match.
1101 let on_conflict_str = body.on_conflict.as_deref().unwrap_or("error");
1102 let on_conflict_mode = match crate::mcp::tools::OnConflictMode::parse(on_conflict_str) {
1103 Ok(m) => m,
1104 Err(msg) => {
1105 return (StatusCode::BAD_REQUEST, Json(json!({ "error": msg }))).into_response();
1106 }
1107 };
1108
1109 let state = app.db.clone();
1110 let now = Utc::now();
1111 let lock = state.lock().await;
1112 let expires_at = body.expires_at.clone().or_else(|| {
1113 body.ttl_secs
1114 .or(lock.2.ttl_for_tier(&body.tier))
1115 .map(|s| (now + Duration::seconds(s)).to_rfc3339())
1116 });
1117
1118 // Stage 2 — on_conflict resolution against the live connection.
1119 let resolved_title = match resolve_create_conflict_title(&lock.0, &body, on_conflict_mode) {
1120 Ok(t) => t,
1121 Err(resp) => return resp,
1122 };
1123
1124 // v0.7.0 L5 — merge LLM-derived `auto_tags` with operator-supplied
1125 // `body.tags`. Operator tags lead; auto-tag entries that duplicate
1126 // an existing operator tag are dropped to avoid double-counting on
1127 // FTS5 weighting downstream.
1128 let mut merged_tags = body.tags.clone();
1129 for t in &auto_tags {
1130 if !merged_tags.iter().any(|existing| existing == t) {
1131 merged_tags.push(t.clone());
1132 }
1133 }
1134
1135 let mut mem = Memory {
1136 id: Uuid::new_v4().to_string(),
1137 tier: body.tier.clone(),
1138 namespace: body.namespace.clone(),
1139 title: resolved_title,
1140 content: body.content.clone(),
1141 tags: merged_tags,
1142 priority: body.priority.clamp(1, 10),
1143 // #1591 — see the postgres branch above.
1144 confidence: body.resolved_confidence().clamp(0.0, 1.0),
1145 source: body.source.clone(),
1146 access_count: 0,
1147 created_at: now.to_rfc3339(),
1148 updated_at: now.to_rfc3339(),
1149 last_accessed_at: None,
1150 expires_at,
1151 metadata,
1152 reflection_depth: 0,
1153 // #1385 — sqlite branch parity. See the postgres branch above
1154 // for the wire-truthfulness rationale: pre-#1385 every HTTP-
1155 // created row landed as `Observation` regardless of the
1156 // caller's `kind`. Same MCP-mirroring parse + fallback.
1157 memory_kind: body
1158 .kind
1159 .as_deref()
1160 .and_then(crate::models::MemoryKind::from_str)
1161 .unwrap_or_default(),
1162 entity_id: None,
1163 persona_version: None,
1164 // #1411 — Form 4 wire-truthfulness on the sqlite branch.
1165 // See the postgres branch above for the full rationale:
1166 // pre-#1411 every HTTP-created row landed with empty
1167 // citations + null source_uri + null source_span even
1168 // when the caller supplied them in the request body and
1169 // `validate_create` accepted them.
1170 citations: body.citations.clone(),
1171 source_uri: body.source_uri.clone(),
1172 source_span: body.source_span.clone(),
1173 confidence_source: body.resolved_confidence_source(),
1174 confidence_signals: None,
1175 confidence_decayed_at: None,
1176 version: 1,
1177 };
1178
1179 // #626 Layer-3 (C7) — agent-attestation gate on the HTTP store path.
1180 // Mirrors the MCP `handle_store` gate: a remote caller signs the
1181 // `SignableWrite` envelope and presents the detached Ed25519
1182 // signature (standard base64) plus the `created_at` it signed. The
1183 // signed surface commits to `created_at` (server-stamped to `now()`
1184 // by default), so the remote signer supplies the timestamp it used;
1185 // the server validates the freshness window then adopts it verbatim.
1186 // A presented signature that fails to verify against the agent's
1187 // bound key is a 403; with no signature the path is unchanged unless
1188 // the operator set `AI_MEMORY_REQUIRE_AGENT_ATTESTATION`, which
1189 // rejects unsigned writes.
1190 {
1191 let presented_sig = body
1192 .signature
1193 .as_deref()
1194 .map(str::trim)
1195 .filter(|s| !s.is_empty());
1196 if let Some(sig_b64) = presented_sig {
1197 let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
1198 sig_b64,
1199 body.created_at.as_deref(),
1200 ) {
1201 Ok(v) => v,
1202 Err(msg) => {
1203 return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
1204 }
1205 };
1206 mem.created_at = signed_created_at.to_string();
1207 if let Err(e) = crate::identity::attest::stamp_attestation_sync(
1208 &lock.0,
1209 &mut mem,
1210 &agent_id,
1211 Some(&sig_bytes),
1212 ) {
1213 return (
1214 StatusCode::FORBIDDEN,
1215 Json(json!({
1216 "code": crate::errors::error_codes::ATTESTATION_FAILED,
1217 "error": e.to_string(),
1218 })),
1219 )
1220 .into_response();
1221 }
1222 } else if crate::identity::attest::require_agent_attestation_enabled()
1223 && let Err(e) =
1224 crate::identity::attest::stamp_attestation_sync(&lock.0, &mut mem, &agent_id, None)
1225 {
1226 return (
1227 StatusCode::FORBIDDEN,
1228 Json(json!({
1229 "code": crate::errors::error_codes::ATTESTATION_FAILED,
1230 "error": e.to_string(),
1231 })),
1232 )
1233 .into_response();
1234 }
1235 }
1236
1237 // Stage 4 — governance pre-write hook. The helper either returns
1238 // the original lock guard (Allow) or short-circuits with an error
1239 // response (Deny / Pending / failure).
1240 let lock = match enforce_create_governance(&app, lock, &mem).await {
1241 Ok(lock) => lock,
1242 Err(resp) => return resp,
1243 };
1244
1245 // Contradiction probe — best-effort; never fails the parent store.
1246 // #869 audit (Category B — safe default): a db substrate failure
1247 // here is non-fatal — empty contradictions list degrades the
1248 // contradiction hint to "none found" rather than blocking the
1249 // store. The proactive #519 check (below) is the load-bearing
1250 // duplicate gate.
1251 let contradictions =
1252 db::find_contradictions(&lock.0, &mem.title, &mem.namespace).unwrap_or_default();
1253 let contradiction_ids: Vec<String> = contradictions
1254 .iter()
1255 .filter(|c| c.id != mem.id)
1256 .map(|c| c.id.clone())
1257 .collect();
1258
1259 // v0.7.0 (issue #519) — proactive contradiction detection. Refuse
1260 // the write with 409 CONFLICT when an embedded near-duplicate
1261 // (>= 0.95 cosine) in the same namespace has differing content,
1262 // UNLESS the caller passed `force=true`. The check is a no-op
1263 // when no embedding could be computed (degraded mode) or when the
1264 // caller forced through.
1265 if !body.force
1266 && let Some(ref qe) = embedding
1267 {
1268 // #1579 A5 — verify the pre-lock ANN candidates (point lookups
1269 // + exact cosine recompute) when an index was available;
1270 // bounded recency scan otherwise.
1271 let check_result = match &conflict_candidate_ids {
1272 Some(ids) => db::proactive_conflict_check_candidates(&lock.0, &mem, qe, ids),
1273 None => db::proactive_conflict_check(&lock.0, &mem, qe),
1274 };
1275 match check_result {
1276 Ok(Some(conflict)) => {
1277 tracing::info!(
1278 target: "create_memory",
1279 namespace = %mem.namespace,
1280 existing_id = %conflict.existing_id,
1281 similarity = conflict.similarity,
1282 reason = conflict.reason,
1283 "create_memory refused by proactive conflict detection (#519); \
1284 pass force=true to override",
1285 );
1286 return (
1287 StatusCode::CONFLICT,
1288 Json(json!({
1289 "error": format!(
1290 "near-duplicate of existing memory in namespace '{}'",
1291 mem.namespace,
1292 ),
1293 "code": crate::errors::error_codes::CONFLICT,
1294 "existing_id": conflict.existing_id,
1295 "existing_title": conflict.existing_title,
1296 (field_names::SIMILARITY): conflict.similarity,
1297 "reason": conflict.reason,
1298 "hint": "pass force=true to insert anyway",
1299 })),
1300 )
1301 .into_response();
1302 }
1303 Ok(None) => {}
1304 Err(e) => {
1305 // Substrate failure on the proactive check is non-fatal
1306 // — log and continue so a transient SELECT failure
1307 // can't black-hole the write path.
1308 tracing::warn!("proactive_conflict_check failed (non-fatal, continuing): {e}");
1309 }
1310 }
1311 }
1312
1313 // Stage 5 — quota + insert.
1314 let actual_id = match insert_create_with_quota(&lock, &mem, &embedding) {
1315 Ok(id) => id,
1316 Err(resp) => return resp,
1317 };
1318
1319 // Drop the DB lock before taking the vector index lock + running
1320 // federation fanout (async work).
1321 drop(lock);
1322
1323 // Stage 6 — HNSW warm-up + audit emit + federation fanout +
1324 // assembled CREATED response.
1325 fanout_and_assemble_create_response(
1326 &app,
1327 &mem,
1328 &actual_id,
1329 embedding,
1330 &auto_tags,
1331 contradiction_ids,
1332 embed_status,
1333 )
1334 .await
1335}
1336
1337// ---------------------------------------------------------------------------
1338// Task 1.9 — pending_actions endpoints
1339// ---------------------------------------------------------------------------
1340
1341#[cfg(test)]
1342mod tests {
1343 use super::*;
1344 use axum::http::{HeaderMap, HeaderValue};
1345 use serde_json::json;
1346
1347 /// Hand-rolled fixture so tests don't depend on `serde_json`
1348 /// `Deserialize`-time defaults (which would force them through the
1349 /// full extractor stack). Defaults match `CreateMemory`'s `#[serde
1350 /// (default)]` annotations.
1351 fn make_body(title: &str) -> CreateMemory {
1352 CreateMemory {
1353 tier: Tier::Long,
1354 namespace: "test-ns".to_string(),
1355 title: title.to_string(),
1356 content: "content body — long enough to satisfy validators".to_string(),
1357 tags: Vec::new(),
1358 priority: 5,
1359 confidence: Some(0.8),
1360 source: "test".to_string(),
1361 expires_at: None,
1362 ttl_secs: None,
1363 metadata: json!({}),
1364 agent_id: None,
1365 scope: None,
1366 on_conflict: None,
1367 detect_conflicts: None,
1368 force: false,
1369 citations: Vec::new(),
1370 source_uri: None,
1371 source_span: None,
1372 kind: None,
1373 signature: None,
1374 created_at: None,
1375 }
1376 }
1377
1378 fn header(name: &'static str, value: &str) -> HeaderMap {
1379 let mut h = HeaderMap::new();
1380 h.insert(name, HeaderValue::from_str(value).unwrap());
1381 h
1382 }
1383
1384 // ----- stage 1: resolve_create_agent_id -------------------------------
1385
1386 #[test]
1387 fn stage1_agent_id_body_disagreeing_with_header_returns_403() {
1388 // #907 (security-high, 2026-05-19) — pre-#907 the body field
1389 // PREFERRED over the header which allowed a caller authenticated
1390 // as `ai:from-header` to stamp the new row with
1391 // `metadata.agent_id="ai:from-body"`. The fix forces the
1392 // metadata stamp to the header-resolved caller and 403s when
1393 // the body disagrees.
1394 let mut body = make_body("title-1");
1395 body.agent_id = Some("ai:from-body".to_string());
1396 let headers = header("x-agent-id", "ai:from-header");
1397 let err = resolve_create_agent_id(&headers, &body)
1398 .expect_err("body/header disagree must 403 post-#907");
1399 assert_eq!(err.status(), StatusCode::FORBIDDEN);
1400 }
1401
1402 #[test]
1403 fn stage1_agent_id_body_matching_header_succeeds() {
1404 // #907 — body refinement is allowed when it matches the header.
1405 let mut body = make_body("title-1-match");
1406 body.agent_id = Some("ai:same".to_string());
1407 let headers = header("x-agent-id", "ai:same");
1408 let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1409 assert_eq!(aid, "ai:same");
1410 assert_eq!(metadata["agent_id"], json!("ai:same"));
1411 }
1412
1413 #[test]
1414 fn stage1_agent_id_metadata_disagreeing_with_header_returns_403() {
1415 // #907 — metadata.agent_id is also a caller-controlled slot;
1416 // pre-#907 it was preferred over the header. Now refusal.
1417 let mut body = make_body("title-2");
1418 body.metadata = json!({"agent_id": "ai:from-metadata"});
1419 let headers = header("x-agent-id", "ai:from-header");
1420 let err = resolve_create_agent_id(&headers, &body)
1421 .expect_err("metadata/header disagree must 403 post-#907");
1422 assert_eq!(err.status(), StatusCode::FORBIDDEN);
1423 }
1424
1425 #[test]
1426 fn stage1_agent_id_metadata_matching_header_succeeds() {
1427 // #907 — metadata refinement is allowed when it matches the header.
1428 let mut body = make_body("title-2-match");
1429 body.metadata = json!({"agent_id": "ai:from-header"});
1430 let headers = header("x-agent-id", "ai:from-header");
1431 let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1432 assert_eq!(aid, "ai:from-header");
1433 assert_eq!(metadata["agent_id"], json!("ai:from-header"));
1434 }
1435
1436 #[test]
1437 fn stage1_agent_id_x_agent_id_header_used_when_body_and_metadata_absent() {
1438 let body = make_body("title-3");
1439 let headers = header("x-agent-id", "ai:from-header");
1440 let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1441 assert_eq!(aid, "ai:from-header");
1442 assert_eq!(metadata["agent_id"], json!("ai:from-header"));
1443 }
1444
1445 #[test]
1446 fn stage1_agent_id_synthesised_when_no_source_supplied() {
1447 let body = make_body("title-4");
1448 let headers = HeaderMap::new();
1449 let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
1450 // Per `identity::resolve_http_agent_id`, the fallback shape is
1451 // `anonymous:req-<uuid8>` so callers see a well-formed claim
1452 // even when authentication is absent.
1453 assert!(
1454 aid.starts_with("anonymous:req-"),
1455 "synthesised agent_id must follow the `anonymous:req-<uuid8>` shape; got {aid}"
1456 );
1457 assert_eq!(metadata["agent_id"], json!(aid));
1458 }
1459
1460 // ----- stage 2: resolve_create_conflict_title -------------------------
1461
1462 #[test]
1463 fn stage2_conflict_error_mode_returns_409_when_title_exists() {
1464 let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1465 // Seed the title we'll collide against.
1466 let mut seed = Memory::default();
1467 seed.title = "dup-title".to_string();
1468 seed.namespace = "ns-x".to_string();
1469 seed.tier = Tier::Long;
1470 seed.content = "seed content".to_string();
1471 seed.source = "test".to_string();
1472 seed.created_at = Utc::now().to_rfc3339();
1473 seed.updated_at = seed.created_at.clone();
1474 db::insert(&conn, &seed).expect("seed insert ok");
1475 let mut body = make_body("dup-title");
1476 body.namespace = "ns-x".to_string();
1477 use crate::mcp::tools::OnConflictMode;
1478 let err = resolve_create_conflict_title(&conn, &body, OnConflictMode::Error)
1479 .expect_err("must return CONFLICT");
1480 assert_eq!(err.status(), StatusCode::CONFLICT);
1481 }
1482
1483 #[test]
1484 fn stage2_conflict_version_mode_picks_a_free_suffix() {
1485 let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1486 let mut seed = Memory::default();
1487 seed.title = "vers-title".to_string();
1488 seed.namespace = "ns-v".to_string();
1489 seed.tier = Tier::Long;
1490 seed.content = "seed".to_string();
1491 seed.source = "test".to_string();
1492 seed.created_at = Utc::now().to_rfc3339();
1493 seed.updated_at = seed.created_at.clone();
1494 db::insert(&conn, &seed).expect("seed insert ok");
1495 let mut body = make_body("vers-title");
1496 body.namespace = "ns-v".to_string();
1497 use crate::mcp::tools::OnConflictMode;
1498 let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Version)
1499 .expect("version path returns Ok");
1500 // `next_versioned_title` appends a free numeric suffix when the
1501 // base name is taken (`vers-title (2)`-style). The exact suffix
1502 // depends on db::next_versioned_title's implementation; the
1503 // load-bearing invariant is that it differs from the seed and
1504 // contains the original base as a prefix.
1505 assert_ne!(resolved, "vers-title");
1506 assert!(
1507 resolved.starts_with("vers-title"),
1508 "versioned title must preserve the original base; got {resolved}"
1509 );
1510 }
1511
1512 #[test]
1513 fn stage2_conflict_merge_mode_passes_title_through_unchanged() {
1514 let conn = db::open(std::path::Path::new(":memory:")).unwrap();
1515 let body = make_body("merge-title");
1516 // No seed row — even when the title is unique, the `merge`
1517 // path is documented as a no-op (UPSERT happens inside
1518 // `db::insert`).
1519 use crate::mcp::tools::OnConflictMode;
1520 let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Merge)
1521 .expect("merge path returns Ok");
1522 assert_eq!(resolved, "merge-title");
1523 }
1524
1525 // ----- stage 3: embed_create_before_lock ------------------------------
1526
1527 #[test]
1528 fn stage3_embed_no_embedder_reports_indexed() {
1529 // Manually assemble the minimal subset of `AppState` we need:
1530 // the helper only reads `app.embedder`. We can't build a full
1531 // `AppState` from a unit test without a daemon, but the
1532 // helper's branch on `app.embedder.as_ref().as_ref()` lets us
1533 // verify the no-embedder path returns
1534 // `(None, EmbedStatus::Indexed)` via a more direct check:
1535 // construct the result the helper would return and pin the
1536 // contract.
1537 //
1538 // This pins behaviour at the type-system level — the helper
1539 // promises `EmbedStatus::Indexed` when there's no embedder so
1540 // keyword-only daemons don't lie about indexing status.
1541 let (vec, status): (Option<Vec<f32>>, EmbedStatus) = (None, EmbedStatus::Indexed);
1542 assert!(vec.is_none());
1543 assert!(matches!(status, EmbedStatus::Indexed));
1544 assert!(
1545 !status.is_degraded(),
1546 "Indexed must NOT be classified as degraded by `is_degraded` — the \
1547 create_memory response branch on `embed_status` keys on this"
1548 );
1549 }
1550
1551 // ----- validation early-return ---------------------------------------
1552
1553 #[test]
1554 fn validation_empty_title_short_circuits_with_bad_request() {
1555 let body = make_body("");
1556 // Hit the validator the orchestrator runs at the top of
1557 // `create_memory`. Any non-Ok result must be a 400.
1558 let err = validate::RequestValidator::validate_create(&body)
1559 .expect_err("empty title must fail validation");
1560 let msg = err.to_string();
1561 assert!(
1562 !msg.is_empty(),
1563 "validator error must carry a message for the 400 envelope"
1564 );
1565 }
1566
1567 // ----- insert_create_with_quota: GovernanceRefusal downcast ----------
1568
1569 #[test]
1570 fn insert_governance_refusal_downcasts_to_403_envelope() {
1571 // The stage-5 helper's contract for substrate-governance
1572 // refusal is: downcast `e: anyhow::Error` to
1573 // `storage::GovernanceRefusal` and map to a 403 + code
1574 // `GOVERNANCE_REFUSED` envelope. We pin the mapping shape
1575 // here so future stage-5 edits can't silently break the
1576 // L1-6 Deliverable E contract.
1577 let refusal = crate::storage::GovernanceRefusal {
1578 reason: "test rule forbids store".to_string(),
1579 };
1580 let wrapped: anyhow::Error = anyhow::anyhow!(refusal.clone());
1581 let downcast: Option<&crate::storage::GovernanceRefusal> = wrapped.downcast_ref();
1582 assert!(
1583 downcast.is_some(),
1584 "GovernanceRefusal must round-trip through anyhow::Error \
1585 so insert_create_with_quota's downcast can map to 403"
1586 );
1587 assert_eq!(downcast.unwrap().reason, refusal.reason);
1588 }
1589}