Skip to main content

ai_memory/handlers/
hook_subscribers.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::models::ConfidenceSource;
5use crate::models::field_names;
6use axum::{
7    Json,
8    extract::{Path, Query, State},
9    http::{HeaderMap, StatusCode},
10    response::IntoResponse,
11};
12use chrono::Utc;
13use serde::Deserialize;
14use serde_json::json;
15use uuid::Uuid;
16
17use crate::db;
18use crate::identity::sentinels;
19use crate::models::{Memory, Tier};
20use crate::validate;
21
22#[cfg(feature = "sal")]
23use super::StorageBackend;
24#[cfg(feature = "sal")]
25use super::store_err_to_response;
26use super::{AppState, Db};
27use super::{fanout_or_503, list_namespaces, resolve_caller_agent_id};
28
29/// Marker tag on namespace-standard rows (#1558 batch 6).
30const NAMESPACE_STANDARD_TAG: &str = "_namespace_standard";
31
32#[derive(Deserialize)]
33pub struct InboxQuery {
34    #[serde(default)]
35    pub agent_id: Option<String>,
36    #[serde(default)]
37    pub unread_only: Option<bool>,
38    #[serde(default)]
39    pub limit: Option<u64>,
40}
41
42pub async fn get_inbox(
43    State(app): State<AppState>,
44    headers: HeaderMap,
45    Query(q): Query<InboxQuery>,
46) -> impl IntoResponse {
47    // #901 (security-high, 2026-05-19) — sibling of #874. The pre-#901
48    // path TRUSTED `?agent_id=` query as identity, allowing any caller
49    // to read any agent's inbox by passing `?agent_id=victim`. Header
50    // is now the only trusted source; the query value (if present)
51    // must match the authenticated caller, else 403.
52    let owner = match resolve_caller_agent_id(None, &headers, None) {
53        Ok(id) => id,
54        Err(e) => {
55            return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
56        }
57    };
58    if let Some(claimed) = q.agent_id.as_deref()
59        && claimed != owner
60    {
61        return (
62            StatusCode::FORBIDDEN,
63            Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
64        )
65            .into_response();
66    }
67
68    // v0.7.0 Wave-3 Continuation 4 (Bucket B / S32+S58) — postgres
69    // inbox now reads from the `_inbox/<owner>` namespace via the SAL
70    // `list` projection, matching what `notify` (Phase 16) already
71    // writes. The handler walks the namespace and projects each row
72    // into the inbox-message wire shape. Subscriptions still ride the
73    // legacy sqlite `subscriptions` table; the inbox itself does not
74    // need that surface — `notify` lands the message directly under
75    // `_inbox/<target>` and the inbox is a straight namespace read.
76    #[cfg(feature = "sal")]
77    if matches!(app.storage_backend, StorageBackend::Postgres) {
78        let ns = crate::inbox_namespace(&owner);
79        let ctx = crate::store::CallerContext::for_agent(&owner);
80        let cap = q
81            .limit
82            .and_then(|n| usize::try_from(n).ok())
83            .unwrap_or(100)
84            .clamp(1, 1000);
85        let filter = crate::store::Filter {
86            namespace: Some(ns),
87            limit: cap,
88            ..Default::default()
89        };
90        return match app.store.list(&ctx, &filter).await {
91            Ok(rows) => {
92                let messages: Vec<serde_json::Value> = rows
93                    .into_iter()
94                    .filter(|m| {
95                        // Honour `unread_only` when set: any row whose
96                        // metadata explicitly carries `read=true` is
97                        // filtered out. The default state (no key) is
98                        // treated as unread, mirroring the SQLite
99                        // contract.
100                        if q.unread_only.unwrap_or(false) {
101                            m.metadata.get("read").and_then(serde_json::Value::as_bool)
102                                != Some(true)
103                        } else {
104                            true
105                        }
106                    })
107                    .map(|m| {
108                        json!({
109                            "id": m.id,
110                            "title": m.title,
111                            "payload": m.content,
112                            "content": m.content,
113                            "priority": m.priority,
114                            "tier": m.tier.as_str(),
115                            "namespace": m.namespace,
116                            "metadata": m.metadata,
117                            (field_names::CREATED_AT): m.created_at,
118                            (field_names::UPDATED_AT): m.updated_at,
119                            "agent_id": m.metadata
120                                .get("agent_id")
121                                .and_then(|v| v.as_str())
122                                .unwrap_or(""),
123                            (field_names::FROM_AGENT_ID): m.metadata
124                                .get(field_names::FROM_AGENT_ID)
125                                .and_then(|v| v.as_str())
126                                .unwrap_or(""),
127                            (field_names::TARGET_AGENT_ID): m.metadata
128                                .get(field_names::TARGET_AGENT_ID)
129                                .and_then(|v| v.as_str())
130                                .unwrap_or(""),
131                        })
132                    })
133                    .collect();
134                let unread_count = messages
135                    .iter()
136                    .filter(|m| {
137                        m.get("metadata")
138                            .and_then(|v| v.get("read"))
139                            .and_then(serde_json::Value::as_bool)
140                            != Some(true)
141                    })
142                    .count();
143                (
144                    StatusCode::OK,
145                    Json(json!({
146                        "agent_id": owner,
147                        "messages": messages,
148                        "unread_count": unread_count,
149                        (field_names::STORAGE_BACKEND): "postgres",
150                    })),
151                )
152                    .into_response()
153            }
154            Err(e) => store_err_to_response(e),
155        };
156    }
157
158    let mut params = json!({"agent_id": owner});
159    if let Some(u) = q.unread_only {
160        params[field_names::UNREAD_ONLY] = json!(u);
161    }
162    if let Some(l) = q.limit {
163        params["limit"] = json!(l);
164    }
165    let lock = app.db.lock().await;
166    // #1557 — pass the authenticated, already-403-checked `owner` as the
167    // visibility caller so the `handle_inbox` owner-bind double-enforces it
168    // (defense-in-depth; the upstream X-Agent-Id 403 remains the primary gate).
169    let result = crate::mcp::handle_inbox(&lock.0, &params, None, Some(owner.as_str()));
170    drop(lock);
171    match result {
172        Ok(v) => (StatusCode::OK, Json(v)).into_response(),
173        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
174    }
175}
176// --- /api/v1/namespaces/{ns}/standard (POST / GET / DELETE) ----------------
177//    +/api/v1/namespaces (POST with body.namespace, GET/DELETE with ?namespace=)
178//
179// S34/S35 drive the standard via the bare `/api/v1/namespaces` surface; the
180// `/namespaces/{ns}/standard` path is kept for API-shape parity with the MCP
181// tool namespace. Both share a single underlying implementation.
182
183#[derive(Deserialize)]
184pub struct NamespaceStandardBody {
185    /// The memory id representing the standard.
186    #[serde(default)]
187    pub id: Option<String>,
188    /// Optional parent namespace for chain lookups.
189    #[serde(default)]
190    pub parent: Option<String>,
191    /// Optional governance policy to merge into the standard's metadata.
192    #[serde(default)]
193    pub governance: Option<serde_json::Value>,
194    /// Accepted for the path-less `/namespaces` form — ignored when the
195    /// namespace is supplied via a URL segment.
196    #[serde(default)]
197    pub namespace: Option<String>,
198    /// Some scenarios nest the payload under `standard` (S34 does so).
199    #[serde(default)]
200    pub standard: Option<Box<NamespaceStandardBody>>,
201}
202
203fn flatten_standard_body(body: NamespaceStandardBody) -> NamespaceStandardBody {
204    // When the caller nests fields under `standard: { … }` (S34 shape), pull
205    // the inner payload up to the top level so the single code path below
206    // can read it uniformly.
207    if let Some(inner) = body.standard {
208        let mut merged = *inner;
209        if merged.namespace.is_none() {
210            merged.namespace = body.namespace;
211        }
212        if merged.id.is_none() {
213            merged.id = body.id;
214        }
215        if merged.parent.is_none() {
216            merged.parent = body.parent;
217        }
218        if merged.governance.is_none() {
219            merged.governance = body.governance;
220        }
221        merged
222    } else {
223        body
224    }
225}
226
227fn namespace_standard_params(ns: &str, body: &NamespaceStandardBody) -> serde_json::Value {
228    let mut params = json!({"namespace": ns});
229    if let Some(ref id) = body.id {
230        params["id"] = json!(id);
231    }
232    if let Some(ref p) = body.parent {
233        params["parent"] = json!(p);
234    }
235    if let Some(ref g) = body.governance {
236        params[field_names::GOVERNANCE] = g.clone();
237    }
238    params
239}
240
241/// v0.7.0 G-PHASE-E-2 (#707) — merge an incoming governance JSON blob
242/// onto an existing one, key-by-key. Mirrors the helper in
243/// `mcp::tools::namespace`. Incoming keys override existing ones; keys
244/// present only on the existing blob (e.g. an operator-set
245/// `require_approval_above_depth`) survive untouched.
246///
247/// Only consumed on the SAL/postgres branch at line ~1064; gate the
248/// definition to match so default-features builds don't emit a
249/// dead-code warning.
250#[cfg(feature = "sal")]
251fn merge_governance_fields_http(
252    existing: Option<&serde_json::Value>,
253    incoming: &serde_json::Value,
254) -> serde_json::Value {
255    let mut merged = serde_json::Map::new();
256    if let Some(existing_obj) = existing.and_then(serde_json::Value::as_object) {
257        for (k, v) in existing_obj {
258            merged.insert(k.clone(), v.clone());
259        }
260    }
261    if let Some(incoming_obj) = incoming.as_object() {
262        for (k, v) in incoming_obj {
263            merged.insert(k.clone(), v.clone());
264        }
265    } else {
266        return incoming.clone();
267    }
268    serde_json::Value::Object(merged)
269}
270
271async fn set_namespace_standard_inner(
272    app: &AppState,
273    ns: &str,
274    body: NamespaceStandardBody,
275    headers: Option<&HeaderMap>,
276) -> axum::response::Response {
277    // #913 (security-medium / SOC2, 2026-05-19) — admin governance audit.
278    // `set_namespace_standard` mutates the governance policy that gates
279    // EVERY downstream write into the namespace; the chain entry must be
280    // emitted BEFORE the storage write so the audit trail survives a
281    // failed downstream write. Mirrors the #911 pattern in
282    // `register_agent` / `archive_purge`.
283    let header_agent_id =
284        headers.and_then(|h| h.get(crate::HEADER_AGENT_ID).and_then(|v| v.to_str().ok()));
285    let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
286        .unwrap_or_else(|_| sentinels::ANONYMOUS_INVALID.to_string());
287    crate::governance::audit::record_decision(
288        &caller,
289        "allow",
290        "namespace_set_standard",
291        "",
292        json!({
293            "namespace": ns,
294            (field_names::STANDARD_ID): body.id.clone(),
295            "parent": body.parent.clone(),
296            "has_governance": body.governance.is_some(),
297        }),
298    );
299
300    let body = flatten_standard_body(body);
301
302    // v0.7.0 Wave-3 Continuation 2 (Phase 11) — postgres-backed
303    // namespace standard write path. The trait method handles the
304    // structural namespace_meta upsert; governance metadata that the
305    // sqlite path layers into the standard memory's metadata is
306    // captured by storing the policy in the placeholder memory's
307    // metadata.governance JSONB field via the trait's standard
308    // store path.
309    #[cfg(feature = "sal")]
310    if matches!(app.storage_backend, StorageBackend::Postgres) {
311        // #955 SECURITY-medium (Track A QC sweep, 2026-05-20) — drop
312        // the "ai:http" literal fallback. The outer function already
313        // resolved `caller` from headers above (with
314        // `anonymous:invalid` as the explicit non-header fallback);
315        // reuse it so the SAL #910 visibility filter sees the actual
316        // request principal in both call paths instead of a synthetic
317        // daemon-side placeholder. Pre-fix the "ai:http" literal made
318        // the standard write 404 when looking up its own placeholder
319        // and let any MCP-via-headers=None caller claim the literal
320        // principal as the daemon identity.
321        let ctx = if let Some(h) = headers {
322            crate::handlers::parity::http_caller_ctx(h, None)
323        } else {
324            crate::store::CallerContext::for_agent(&caller)
325        };
326        // Resolve standard_id: caller-supplied or auto-seed a placeholder.
327        let standard_id = if let Some(id) = body.id.clone() {
328            id
329        } else {
330            // Try to find an existing placeholder via list().
331            let filter = crate::store::Filter {
332                namespace: Some(ns.to_string()),
333                limit: 50,
334                ..Default::default()
335            };
336            let existing = match app.store.list(&ctx, &filter).await {
337                Ok(rows) => rows
338                    .into_iter()
339                    .find(|m| m.tags.iter().any(|t| t == NAMESPACE_STANDARD_TAG))
340                    .map(|m| m.id),
341                Err(_) => None,
342            };
343            if let Some(id) = existing {
344                id
345            } else {
346                let now = Utc::now().to_rfc3339();
347                // #929 SECURITY-high (Track A P6, 2026-05-20) — anchor
348                // ownership to the caller on first-write. Pre-fix
349                // stamped "system" and any subsequent caller could
350                // overwrite. The uniform ownership-gate below catches
351                // mutation by non-owners.
352                // scope=shared preserves multi-reader visibility under
353                // the SAL #910 filter so consumers across the
354                // namespace can read the governance policy.
355                let placeholder_agent_id =
356                    if caller.is_empty() || caller == sentinels::ANONYMOUS_INVALID {
357                        sentinels::SYSTEM_PRINCIPAL.to_string()
358                    } else {
359                        caller.clone()
360                    };
361                let mut metadata = serde_json::json!({
362                    "agent_id": placeholder_agent_id,
363                    "scope": "shared",
364                });
365                if let Some(g) = body.governance.clone()
366                    && let Some(obj) = metadata.as_object_mut()
367                {
368                    obj.insert(crate::META_KEY_GOVERNANCE.to_string(), g);
369                }
370                let placeholder = Memory {
371                    id: Uuid::new_v4().to_string(),
372                    tier: Tier::Long,
373                    namespace: ns.to_string(),
374                    title: format!("_standard:{ns}"),
375                    content: format!("namespace standard for {ns}"),
376                    tags: vec![NAMESPACE_STANDARD_TAG.to_string()],
377                    priority: 5,
378                    confidence: 1.0,
379                    source: "api".into(),
380                    access_count: 0,
381                    created_at: now.clone(),
382                    updated_at: now,
383                    last_accessed_at: None,
384                    expires_at: None,
385                    metadata,
386                    reflection_depth: 0,
387                    memory_kind: crate::models::MemoryKind::Observation,
388                    entity_id: None,
389                    persona_version: None,
390                    citations: Vec::new(),
391                    source_uri: None,
392                    source_span: None,
393                    confidence_source: ConfidenceSource::CallerProvided,
394                    confidence_signals: None,
395                    confidence_decayed_at: None,
396                    version: 1,
397                };
398                match app.store.store(&ctx, &placeholder).await {
399                    Ok(id) => id,
400                    Err(e) => return store_err_to_response(e),
401                }
402            }
403        };
404
405        // #929 SECURITY-high (Track A P6, 2026-05-20) — uniform
406        // ownership gate on the postgres path. Catches both the
407        // body.id-supplied branch and the auto-seed reuse branch.
408        // First-writes land on a placeholder stamped with the
409        // caller's id (above), so an immediate re-fetch returns the
410        // caller as owner and this gate is a no-op for first writes.
411        // Subsequent writes by a different caller hit the !is_unowned
412        // branch and 403.
413        if let Ok(resolved_mem) = app.store.get(&ctx, &standard_id).await {
414            let recorded_owner = resolved_mem
415                .metadata
416                .get("agent_id")
417                .and_then(|v| v.as_str())
418                .unwrap_or("");
419            let is_unowned =
420                recorded_owner.is_empty() || recorded_owner == sentinels::SYSTEM_PRINCIPAL;
421            let caller_principal = ctx.effective_principal();
422            if !is_unowned
423                && recorded_owner != caller_principal
424                && caller_principal != sentinels::DAEMON_PRINCIPAL
425            {
426                tracing::warn!(
427                    target: super::AUTHZ_TRACE_TARGET,
428                    "POST /namespaces/{{ns}}/standard 403 (postgres path): caller {caller_principal} != owner {recorded_owner} (ns={ns}, id={standard_id})"
429                );
430                return (
431                    StatusCode::FORBIDDEN,
432                    Json(json!({
433                        "error": crate::errors::msg::CALLER_NOT_NAMESPACE_STANDARD_OWNER,
434                        "owner": recorded_owner,
435                        "caller": caller_principal
436                    })),
437                )
438                    .into_response();
439            }
440            // Unowned-legacy claim: rewrite metadata.agent_id to caller
441            // so subsequent calls are properly gated.
442            if is_unowned
443                && !caller_principal.is_empty()
444                && caller_principal != sentinels::ANONYMOUS_INVALID
445            {
446                let mut new_meta = if resolved_mem.metadata.is_object() {
447                    resolved_mem.metadata.clone()
448                } else {
449                    json!({})
450                };
451                if let Some(obj) = new_meta.as_object_mut() {
452                    obj.insert(
453                        "agent_id".to_string(),
454                        serde_json::Value::String(caller_principal.to_string()),
455                    );
456                    obj.entry("scope".to_string())
457                        .or_insert_with(|| serde_json::Value::String("shared".to_string()));
458                }
459                let patch = crate::store::UpdatePatch {
460                    metadata: Some(new_meta),
461                    ..Default::default()
462                };
463                if let Err(e) = app.store.update(&ctx, &standard_id, patch).await {
464                    tracing::warn!(
465                        "namespace_standard (postgres): ownership-claim metadata update failed: {e}"
466                    );
467                }
468            }
469        }
470
471        // v0.7.0 Wave-3 Continuation 5 (Bucket C / S35+S53+S60+S80) —
472        // when the caller supplied a `governance` policy AND a pre-
473        // existing standard_id, merge the policy into the standard
474        // memory's `metadata.governance` so `resolve_governance_policy`
475        // (which reads exactly this field via `from_metadata`) finds
476        // the policy on the next write. Without this merge step the
477        // postgres adapter's chain walk lands on a memory whose
478        // metadata has no `governance` key, returns `None`, and the
479        // intruder's write is allowed through.
480        if let Some(g) = body.governance.clone() {
481            // Load the standard memory FIRST so we can merge the
482            // incoming `g` onto the existing `metadata.governance`
483            // blob — this preserves extra fields like
484            // `require_approval_above_depth` that live outside the
485            // typed `GovernancePolicy` struct (v0.7.0 G-PHASE-E-2,
486            // #707). Mirrors the SQLite handler's merge in
487            // `mcp::tools::namespace::handle_namespace_set_standard`.
488            let standard_mem = match app.store.get(&ctx, &standard_id).await {
489                Ok(m) => m,
490                Err(e) => return store_err_to_response(e),
491            };
492            let merged = merge_governance_fields_http(
493                standard_mem.metadata.get(crate::META_KEY_GOVERNANCE),
494                &g,
495            );
496            // Validate the merged blob's typed shape. Deserialising
497            // drops unknown fields but the typed sub-set must still
498            // parse + pass policy validation. Mirrors the SQLite path
499            // at `mcp::tools::namespace`.
500            let policy: crate::models::GovernancePolicy = match serde_json::from_value(
501                merged.clone(),
502            ) {
503                Ok(p) => p,
504                Err(e) => {
505                    return (
506                            StatusCode::BAD_REQUEST,
507                            Json(json!({"error": crate::errors::msg::invalid(crate::META_KEY_GOVERNANCE, e)})),
508                        )
509                            .into_response();
510                }
511            };
512            if let Err(e) = validate::validate_governance_policy(&policy) {
513                return (
514                    StatusCode::BAD_REQUEST,
515                    Json(json!({"error": crate::errors::msg::invalid(crate::META_KEY_GOVERNANCE, e)})),
516                )
517                    .into_response();
518            }
519            let mut metadata = if standard_mem.metadata.is_object() {
520                standard_mem.metadata.clone()
521            } else {
522                json!({})
523            };
524            if let Some(obj) = metadata.as_object_mut() {
525                obj.insert(crate::META_KEY_GOVERNANCE.to_string(), merged);
526            }
527            let patch = crate::store::UpdatePatch {
528                metadata: Some(metadata),
529                ..Default::default()
530            };
531            if let Err(e) = app.store.update(&ctx, &standard_id, patch).await {
532                return store_err_to_response(e);
533            }
534        }
535        return match app
536            .store
537            .set_namespace_standard(&ctx, ns, &standard_id, body.parent.as_deref())
538            .await
539        {
540            Ok(()) => (
541                StatusCode::CREATED,
542                Json(json!({
543                    "namespace": ns,
544                    (field_names::STANDARD_ID): standard_id,
545                    "parent": body.parent,
546                    (field_names::STORAGE_BACKEND): "postgres",
547                })),
548            )
549                .into_response(),
550            Err(e) => store_err_to_response(e),
551        };
552    }
553
554    // Auto-seed a placeholder standard memory when the caller didn't supply
555    // an `id`. S34's body is `{governance: …}` with no id — we create a
556    // minimal standard memory so the governance policy has a home.
557    let lock = app.db.lock().await;
558    let resolved_id = if let Some(id) = body.id.clone() {
559        id
560    } else {
561        // Look for an existing placeholder first to keep repeat calls
562        // idempotent; otherwise insert a new row.
563        let existing = db::list(
564            &lock.0,
565            Some(ns),
566            None,
567            1,
568            0,
569            None,
570            None,
571            None,
572            Some(NAMESPACE_STANDARD_TAG),
573            None,
574        )
575        .ok()
576        .and_then(|v| v.into_iter().next());
577        if let Some(m) = existing {
578            // #929 SECURITY-high (Track A P6, 2026-05-20) — ownership
579            // gate on the namespace-standard surface. Pre-fix any
580            // authenticated caller could overwrite any namespace's
581            // governance policy because the placeholder was stamped
582            // metadata.agent_id="system" (an unowned sentinel) and no
583            // caller-vs-owner comparison was performed. Now: the
584            // recorded owner is the only principal who can mutate the
585            // standard. Legacy "system" / empty owners are treated as
586            // unowned (any caller may CLAIM via the
587            // metadata.agent_id rewrite below) for backward
588            // compatibility with rows written before this fix.
589            let recorded_owner = m
590                .metadata
591                .get("agent_id")
592                .and_then(|v| v.as_str())
593                .unwrap_or("");
594            let is_unowned =
595                recorded_owner.is_empty() || recorded_owner == sentinels::SYSTEM_PRINCIPAL;
596            if !is_unowned && recorded_owner != caller && caller != sentinels::DAEMON_PRINCIPAL {
597                tracing::warn!(
598                    target: super::AUTHZ_TRACE_TARGET,
599                    "POST /namespaces/{{ns}}/standard 403: caller {caller} != owner {recorded_owner} (ns={ns})"
600                );
601                return (
602                    StatusCode::FORBIDDEN,
603                    Json(json!({
604                        "error": crate::errors::msg::CALLER_NOT_NAMESPACE_STANDARD_OWNER,
605                        "owner": recorded_owner,
606                        "caller": caller
607                    })),
608                )
609                    .into_response();
610            }
611            // Unowned-legacy fast path: claim ownership by rewriting
612            // metadata.agent_id to the caller. Next request from a
613            // different caller will be 403'd.
614            if is_unowned && !caller.is_empty() && caller != sentinels::ANONYMOUS_INVALID {
615                let mut new_meta = if m.metadata.is_object() {
616                    m.metadata.clone()
617                } else {
618                    json!({})
619                };
620                if let Some(obj) = new_meta.as_object_mut() {
621                    obj.insert(
622                        "agent_id".to_string(),
623                        serde_json::Value::String(caller.clone()),
624                    );
625                    // Preserve scope=shared if not already set so the
626                    // SAL #910 filter still surfaces the standard to
627                    // every reader.
628                    obj.entry("scope".to_string())
629                        .or_insert_with(|| serde_json::Value::String("shared".to_string()));
630                }
631                if let Err(e) = db::update(
632                    &lock.0,
633                    &m.id,
634                    None,
635                    None,
636                    None,
637                    None,
638                    None,
639                    None,
640                    None,
641                    None,
642                    Some(&new_meta),
643                ) {
644                    tracing::warn!(
645                        "namespace_standard: ownership-claim metadata update failed: {e}"
646                    );
647                }
648            }
649            m.id
650        } else {
651            let now = Utc::now().to_rfc3339();
652            // #929 — first-write anchors ownership to the caller, not
653            // the legacy "system" sentinel. scope=shared preserves
654            // multi-reader visibility under the SAL #910 filter.
655            let placeholder_agent_id =
656                if caller.is_empty() || caller == sentinels::ANONYMOUS_INVALID {
657                    sentinels::SYSTEM_PRINCIPAL.to_string()
658                } else {
659                    caller.clone()
660                };
661            let placeholder = Memory {
662                id: Uuid::new_v4().to_string(),
663                tier: Tier::Long,
664                namespace: ns.to_string(),
665                title: format!("_standard:{ns}"),
666                content: format!("namespace standard for {ns}"),
667                tags: vec![NAMESPACE_STANDARD_TAG.to_string()],
668                priority: 5,
669                confidence: 1.0,
670                source: "api".into(),
671                access_count: 0,
672                created_at: now.clone(),
673                updated_at: now,
674                last_accessed_at: None,
675                expires_at: None,
676                metadata: serde_json::json!({
677                    "agent_id": placeholder_agent_id,
678                    "scope": "shared",
679                }),
680                reflection_depth: 0,
681                memory_kind: crate::models::MemoryKind::Observation,
682                entity_id: None,
683                persona_version: None,
684                citations: Vec::new(),
685                source_uri: None,
686                source_span: None,
687                confidence_source: ConfidenceSource::CallerProvided,
688                confidence_signals: None,
689                confidence_decayed_at: None,
690                version: 1,
691            };
692            match db::insert(&lock.0, &placeholder) {
693                Ok(id) => id,
694                Err(e) => {
695                    tracing::error!("namespace_standard: placeholder insert failed: {e}");
696                    return (
697                        StatusCode::INTERNAL_SERVER_ERROR,
698                        Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
699                    )
700                        .into_response();
701                }
702            }
703        }
704    };
705
706    // #929 SECURITY-high (Track A P6, 2026-05-20) — uniform ownership
707    // gate. Catches the path where the caller supplied `body.id`
708    // directly (bypassing the auto-seed lookup above). Load the
709    // resolved standard memory by id, check ownership. The auto-seed
710    // path already validated above and re-checking here is a no-op for
711    // it; the body.id-supplied path goes through this gate once.
712    if let Ok(Some(resolved_mem)) = db::get(&lock.0, &resolved_id) {
713        let recorded_owner = resolved_mem
714            .metadata
715            .get("agent_id")
716            .and_then(|v| v.as_str())
717            .unwrap_or("");
718        let is_unowned = recorded_owner.is_empty() || recorded_owner == sentinels::SYSTEM_PRINCIPAL;
719        if !is_unowned && recorded_owner != caller && caller != sentinels::DAEMON_PRINCIPAL {
720            tracing::warn!(
721                target: super::AUTHZ_TRACE_TARGET,
722                "POST /namespaces/{{ns}}/standard 403 (body.id path): caller {caller} != owner {recorded_owner} (ns={ns}, id={resolved_id})"
723            );
724            return (
725                StatusCode::FORBIDDEN,
726                Json(json!({
727                    "error": crate::errors::msg::CALLER_NOT_NAMESPACE_STANDARD_OWNER,
728                    "owner": recorded_owner,
729                    "caller": caller
730                })),
731            )
732                .into_response();
733        }
734    }
735
736    let mut effective = body;
737    effective.id = Some(resolved_id.clone());
738    let mut params = namespace_standard_params(ns, &effective);
739    // #929 SECURITY-high follow-up (2026-05-20) — thread the HTTP-
740    // resolved caller through to the MCP entry so its #929 ownership
741    // gate sees the same principal as the HTTP-handler gate. Without
742    // this the MCP entry's `resolve_agent_id(params["agent_id"], None)`
743    // falls back to the daemon process identity (`host:<host>:pid-…`),
744    // which never matches a row-owner anchored to the HTTP caller's
745    // X-Agent-Id — 400-rejects every legitimate first-write on the
746    // HTTP standard surface. Verified via Track A re-probe agent
747    // `aaab899d6a4bab36f` 2026-05-20 (re-verify #929 close pending).
748    if let Some(obj) = params.as_object_mut() {
749        obj.insert("agent_id".to_string(), json!(caller));
750    }
751    let result = crate::mcp::handle_namespace_set_standard(&lock.0, &params);
752    // Capture the standard memory so we can fan it out to peers — cluster
753    // visibility of governance rules matters for S34/S35.
754    let standard_mem = db::get(&lock.0, &resolved_id).ok().flatten();
755    // v0.6.2 (S35): also capture the freshly-written namespace_meta row
756    // so peers learn the explicit (namespace, standard_id, parent) tuple.
757    // Without this, peers auto-detect a parent via `-` prefix which may
758    // disagree with what the originator set.
759    let meta_entry = db::get_namespace_meta_entry(&lock.0, ns).ok().flatten();
760    drop(lock);
761
762    match result {
763        Ok(v) => {
764            if let Some(ref mem) = standard_mem
765                && let Some(resp) = fanout_or_503(app, mem).await
766            {
767                return resp;
768            }
769            if let (Some(entry), Some(fed)) = (meta_entry.as_ref(), app.federation.as_ref()) {
770                match crate::federation::broadcast_namespace_meta_quorum(fed, entry).await {
771                    Ok(tracker) => {
772                        if let Err(err) = crate::federation::finalise_quorum(&tracker) {
773                            // #869 — typed 503 envelope via the shared helper.
774                            let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
775                            return super::quorum_not_met_response(&payload);
776                        }
777                    }
778                    Err(err) => {
779                        let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
780                        return super::quorum_not_met_response(&payload);
781                    }
782                }
783            }
784            (StatusCode::CREATED, Json(v)).into_response()
785        }
786        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
787    }
788}
789
790pub async fn set_namespace_standard(
791    State(app): State<AppState>,
792    headers: HeaderMap,
793    Path(ns): Path<String>,
794    Json(body): Json<NamespaceStandardBody>,
795) -> impl IntoResponse {
796    set_namespace_standard_inner(&app, &ns, body, Some(&headers)).await
797}
798
799#[derive(Deserialize)]
800pub struct NamespaceStandardQuery {
801    #[serde(default)]
802    pub namespace: Option<String>,
803    #[serde(default)]
804    pub inherit: Option<bool>,
805}
806
807pub async fn get_namespace_standard(
808    State(state): State<Db>,
809    Path(ns): Path<String>,
810    Query(q): Query<NamespaceStandardQuery>,
811) -> impl IntoResponse {
812    let mut params = json!({"namespace": ns});
813    if let Some(inh) = q.inherit {
814        params["inherit"] = json!(inh);
815    }
816    let lock = state.lock().await;
817    let result = crate::mcp::handle_namespace_get_standard(&lock.0, &params);
818    drop(lock);
819    match result {
820        Ok(v) => (StatusCode::OK, Json(v)).into_response(),
821        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
822    }
823}
824
825pub async fn clear_namespace_standard(
826    State(app): State<AppState>,
827    headers: HeaderMap,
828    Path(ns): Path<String>,
829) -> impl IntoResponse {
830    clear_namespace_standard_inner(&app, &ns, Some(&headers)).await
831}
832
833// Query-string forms for the S34/S35 `/api/v1/namespaces?namespace=…` shape.
834pub async fn set_namespace_standard_qs(
835    State(app): State<AppState>,
836    headers: HeaderMap,
837    Json(body): Json<NamespaceStandardBody>,
838) -> impl IntoResponse {
839    let Some(ns) = body
840        .namespace
841        .clone()
842        .or_else(|| body.standard.as_ref().and_then(|s| s.namespace.clone()))
843    else {
844        return (
845            StatusCode::BAD_REQUEST,
846            Json(json!({"error": crate::errors::msg::NAMESPACE_REQUIRED})),
847        )
848            .into_response();
849    };
850    set_namespace_standard_inner(&app, &ns, body, Some(&headers)).await
851}
852
853pub async fn get_namespace_standard_qs(
854    State(app): State<AppState>,
855    headers: HeaderMap,
856    Query(q): Query<NamespaceStandardQuery>,
857) -> impl IntoResponse {
858    // If no namespace is supplied this shares a route with the existing
859    // `list_namespaces` GET; the router chains the two so a plain
860    // `GET /api/v1/namespaces` still returns the list.
861    let Some(ns) = q.namespace.clone() else {
862        // #945 SECURITY-medium (Track A QC sweep, 2026-05-20) —
863        // list_namespaces now requires admin via require_admin;
864        // thread headers through so the gate sees the X-Agent-Id.
865        return list_namespaces(State(app), headers).await.into_response();
866    };
867    // #945-sibling — when ns IS supplied, this becomes a per-namespace
868    // standard fetch. Currently no caller-vs-owner gate on this read
869    // path (filed as a follow-up under #959).
870    let _ = &headers;
871
872    // v0.7.0 Wave-3 Continuation 5 (Bucket C / S35) — postgres-backed
873    // daemons resolve the namespace standard via the SAL trait. When
874    // `inherit=true` we walk the parent chain (already cached in
875    // `namespace_meta.parent_namespace`) leaf→root to find the nearest
876    // ancestor that has a standard memory. Without inherit we look up
877    // the exact namespace.
878    #[cfg(feature = "sal")]
879    if matches!(app.storage_backend, StorageBackend::Postgres) {
880        // v0.7.0 ship-hardening (2026-05-19): namespace standards are
881        // governance POLICY — readable by any caller that can query
882        // the namespace itself. Use for_admin so the SAL #910
883        // scope=private visibility filter doesn't drop the standard
884        // memory when the requester is not the policy author.
885        let ctx = crate::store::CallerContext::for_admin(sentinels::AI_HTTP_INTERNAL);
886        let inherit = q.inherit.unwrap_or(false);
887        // Build chain leaf → root (most-specific first) by trimming
888        // `/segment` until empty. The chain matches the SQLite
889        // semantics in `db::resolve_namespace_standard` for the
890        // simple namespace-hierarchy case.
891        let mut chain: Vec<String> = vec![ns.clone()];
892        if inherit {
893            let mut cur = ns.clone();
894            while let Some(pos) = cur.rfind('/') {
895                cur.truncate(pos);
896                if cur.is_empty() {
897                    break;
898                }
899                chain.push(cur.clone());
900            }
901        }
902
903        if inherit {
904            // S35 contract — return the FULL chain of standards from
905            // leaf → root so the caller sees both child and parent
906            // rules layered into one view. Mirrors the sqlite
907            // `handle_namespace_get_standard` inherit branch which
908            // returns `chain` + `standards` arrays.
909            let mut standards: Vec<serde_json::Value> = Vec::new();
910            for candidate in &chain {
911                if let Ok(Some((standard_id, parent))) =
912                    app.store.get_namespace_standard(&ctx, candidate).await
913                {
914                    // Pull the standard memory body so the caller can
915                    // see governance + content layered through.
916                    let mem_doc = match app.store.get(&ctx, &standard_id).await {
917                        Ok(m) => json!({
918                            "namespace": candidate,
919                            (field_names::STANDARD_ID): standard_id,
920                            "id": standard_id,
921                            "title": m.title,
922                            "content": m.content,
923                            "priority": m.priority,
924                            (field_names::PARENT_NAMESPACE): parent,
925                            (field_names::GOVERNANCE): m.metadata.get(crate::META_KEY_GOVERNANCE).cloned()
926                                .unwrap_or(serde_json::Value::Null),
927                        }),
928                        Err(_) => json!({
929                            "namespace": candidate,
930                            (field_names::STANDARD_ID): standard_id,
931                            "id": standard_id,
932                            (field_names::PARENT_NAMESPACE): parent,
933                        }),
934                    };
935                    standards.push(mem_doc);
936                }
937            }
938            // Pick the closest (leaf-most) entry as the resolved
939            // standard for the response root level so existing
940            // single-standard consumers still see the expected
941            // `standard_id`.
942            let closest = standards.first().cloned().unwrap_or(json!({}));
943            return (
944                StatusCode::OK,
945                Json(json!({
946                    "namespace": ns,
947                    "chain": chain,
948                    "standards": standards,
949                    "resolved_namespace": closest.get("namespace").cloned()
950                        .unwrap_or(serde_json::Value::Null),
951                    (field_names::STANDARD_ID): closest.get(field_names::STANDARD_ID).cloned()
952                        .unwrap_or(serde_json::Value::Null),
953                    "id": closest.get("id").cloned()
954                        .unwrap_or(serde_json::Value::Null),
955                    (field_names::PARENT_NAMESPACE): closest.get(field_names::PARENT_NAMESPACE).cloned()
956                        .unwrap_or(serde_json::Value::Null),
957                    (field_names::STORAGE_BACKEND): "postgres",
958                })),
959            )
960                .into_response();
961        }
962        // Non-inherit form — single exact-match lookup.
963        match app.store.get_namespace_standard(&ctx, &ns).await {
964            Ok(Some((standard_id, parent))) => {
965                return (
966                    StatusCode::OK,
967                    Json(json!({
968                        "namespace": ns,
969                        "resolved_namespace": ns,
970                        (field_names::STANDARD_ID): standard_id,
971                        "id": standard_id,
972                        (field_names::PARENT_NAMESPACE): parent,
973                        (field_names::STORAGE_BACKEND): "postgres",
974                    })),
975                )
976                    .into_response();
977            }
978            Ok(None) => {}
979            Err(e) => return store_err_to_response(e),
980        }
981        return (
982            StatusCode::OK,
983            Json(json!({
984                "namespace": ns,
985                (field_names::STANDARD_ID): serde_json::Value::Null,
986                "id": serde_json::Value::Null,
987                (field_names::PARENT_NAMESPACE): serde_json::Value::Null,
988                (field_names::STORAGE_BACKEND): "postgres",
989            })),
990        )
991            .into_response();
992    }
993
994    let mut params = json!({"namespace": ns});
995    if let Some(inh) = q.inherit {
996        params["inherit"] = json!(inh);
997    }
998    let lock = app.db.lock().await;
999    let result = crate::mcp::handle_namespace_get_standard(&lock.0, &params);
1000    drop(lock);
1001    match result {
1002        Ok(v) => (StatusCode::OK, Json(v)).into_response(),
1003        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
1004    }
1005}
1006
1007pub async fn clear_namespace_standard_qs(
1008    State(app): State<AppState>,
1009    headers: HeaderMap,
1010    Query(q): Query<NamespaceStandardQuery>,
1011) -> impl IntoResponse {
1012    let Some(ns) = q.namespace else {
1013        return (
1014            StatusCode::BAD_REQUEST,
1015            Json(json!({"error": crate::errors::msg::NAMESPACE_REQUIRED})),
1016        )
1017            .into_response();
1018    };
1019    clear_namespace_standard_inner(&app, &ns, Some(&headers)).await
1020}
1021
1022/// v0.6.2 (S35 follow-up): shared implementation for path and query-string
1023/// clear handlers. Runs the local clear then, on success, fans the cleared
1024/// namespace out to peers via `broadcast_namespace_meta_clear_quorum`.
1025/// Returns 503 `quorum_not_met` when federation is configured and the quorum
1026/// contract fails — matching the pattern established by
1027/// `set_namespace_standard_inner`.
1028async fn clear_namespace_standard_inner(
1029    app: &AppState,
1030    ns: &str,
1031    headers: Option<&HeaderMap>,
1032) -> axum::response::Response {
1033    // #913 (security-medium / SOC2, 2026-05-19) — admin governance audit.
1034    // Clearing a namespace standard removes the governance policy that
1035    // gates downstream writes; the chain entry MUST land before the
1036    // storage write so the audit trail captures intent.
1037    let header_agent_id =
1038        headers.and_then(|h| h.get(crate::HEADER_AGENT_ID).and_then(|v| v.to_str().ok()));
1039    let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
1040        .unwrap_or_else(|_| sentinels::ANONYMOUS_INVALID.to_string());
1041    crate::governance::audit::record_decision(
1042        &caller,
1043        "allow",
1044        "namespace_clear_standard",
1045        "",
1046        json!({
1047            "namespace": ns,
1048        }),
1049    );
1050
1051    // v0.7.0 Wave-3 Continuation 2 (Phase 11) — postgres-backed clear.
1052    #[cfg(feature = "sal")]
1053    if matches!(app.storage_backend, StorageBackend::Postgres) {
1054        // v0.7.0 ship-hardening (2026-05-19): use the resolved caller
1055        // from the X-Agent-Id header. Pre-fix this hardcoded "ai:http"
1056        // which made the standard-clear lookup miss its target memory
1057        // when caller != "ai:http". `caller` here is the
1058        // header-resolved id used for the audit-record above.
1059        let ctx = crate::store::CallerContext::for_agent(caller.clone());
1060        return match app.store.clear_namespace_standard(&ctx, ns).await {
1061            Ok(true) => (
1062                StatusCode::OK,
1063                Json(json!({
1064                    "cleared": true,
1065                    "namespace": ns,
1066                    (field_names::STORAGE_BACKEND): "postgres",
1067                })),
1068            )
1069                .into_response(),
1070            Ok(false) => (
1071                StatusCode::NOT_FOUND,
1072                Json(json!({"error": "no namespace_meta row matched"})),
1073            )
1074                .into_response(),
1075            Err(e) => store_err_to_response(e),
1076        };
1077    }
1078    let params = json!({"namespace": ns});
1079    let lock = app.db.lock().await;
1080    let result = crate::mcp::handle_namespace_clear_standard(&lock.0, &params);
1081    drop(lock);
1082    match result {
1083        Ok(v) => {
1084            if let Some(fed) = app.federation.as_ref() {
1085                let namespaces = vec![ns.to_string()];
1086                match crate::federation::broadcast_namespace_meta_clear_quorum(fed, &namespaces)
1087                    .await
1088                {
1089                    Ok(tracker) => {
1090                        if let Err(err) = crate::federation::finalise_quorum(&tracker) {
1091                            // #869 — typed 503 envelope via the shared helper.
1092                            let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
1093                            return super::quorum_not_met_response(&payload);
1094                        }
1095                    }
1096                    Err(err) => {
1097                        let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
1098                        return super::quorum_not_met_response(&payload);
1099                    }
1100                }
1101            }
1102            (StatusCode::OK, Json(v)).into_response()
1103        }
1104        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
1105    }
1106}
1107
1108// --- /api/v1/session/start (POST) ------------------------------------------
1109
1110#[derive(Deserialize)]
1111pub struct SessionStartBody {
1112    #[serde(default)]
1113    pub namespace: Option<String>,
1114    #[serde(default)]
1115    pub limit: Option<u64>,
1116    #[serde(default)]
1117    pub agent_id: Option<String>,
1118}
1119
1120pub async fn session_start(
1121    State(state): State<Db>,
1122    headers: HeaderMap,
1123    Json(body): Json<SessionStartBody>,
1124) -> impl IntoResponse {
1125    // agent_id is optional for session_start; but if supplied it must validate.
1126    if let Some(ref id) = body.agent_id
1127        && let Err(e) = validate::validate_agent_id(id)
1128    {
1129        return (
1130            StatusCode::BAD_REQUEST,
1131            Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
1132        )
1133            .into_response();
1134    }
1135    let header_agent_id = headers
1136        .get(crate::HEADER_AGENT_ID)
1137        .and_then(|v| v.to_str().ok());
1138    // v0.7.0 #1420 — resolve the caller for the post-list visibility
1139    // filter. Pre-fix, `header_agent_id` was dropped ("identity
1140    // currently informational") and `handle_session_start(..., None)`
1141    // skipped the filter, leaking cross-agent `scope=private` rows.
1142    //
1143    // session_start historically accepted `agent_id` from EITHER body
1144    // OR header (both optional), so we preserve that contract instead
1145    // of using the stricter `resolve_http_agent_id` (which demands a
1146    // header for write surfaces). Precedence: header → body →
1147    // synthesized `anonymous:req-<uuid>`. When both header + body are
1148    // supplied and disagree, return 400 — same mismatch posture as
1149    // every other write surface (#910 norm).
1150    if let (Some(h), Some(b)) = (header_agent_id, body.agent_id.as_deref())
1151        && h != b
1152    {
1153        return (
1154            StatusCode::BAD_REQUEST,
1155            Json(json!({"error": "agent_id body parameter does not match X-Agent-Id header"})),
1156        )
1157            .into_response();
1158    }
1159    let caller = header_agent_id
1160        .map(str::to_string)
1161        .or_else(|| body.agent_id.clone())
1162        .unwrap_or_else(crate::identity::anonymous_request_id);
1163    let mut params = json!({});
1164    if let Some(ref n) = body.namespace {
1165        params["namespace"] = json!(n);
1166    }
1167    if let Some(l) = body.limit {
1168        params["limit"] = json!(l);
1169    }
1170    let lock = state.lock().await;
1171    let result = crate::mcp::handle_session_start(&lock.0, &params, None, Some(&caller));
1172    drop(lock);
1173    match result {
1174        Ok(mut v) => {
1175            // Stamp a stable session id so callers (S36) can correlate
1176            // subsequent writes. We don't persist sessions today; the id is
1177            // advisory and round-tripped via metadata by the caller.
1178            if let Some(obj) = v.as_object_mut() {
1179                obj.entry("session_id")
1180                    .or_insert_with(|| json!(Uuid::new_v4().to_string()));
1181                if let Some(ref a) = body.agent_id {
1182                    obj.insert("agent_id".into(), json!(a));
1183                }
1184            }
1185            (StatusCode::OK, Json(v)).into_response()
1186        }
1187        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
1188    }
1189}