Skip to main content

ai_memory/handlers/
kg.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP handlers for the v0.7.0 knowledge-graph + entity surface (#650
5//! follow-up per-domain split). Each handler is a thin Axum-layer
6//! wrapper around the SAL `MemoryStore` trait (postgres path) or the
7//! legacy `db::*` API (sqlite path), shaping the result into the
8//! canonical wire envelope.
9//!
10//! All handlers were extracted verbatim from `src/handlers/http.rs`
11//! (commit `12e1253`, lines 4169-5013 + 5192-5419); wire compatibility
12//! is preserved via the `pub use kg::*` re-export from
13//! `src/handlers/mod.rs`. The split keeps the kg/entity domain in
14//! a single ~1 100-line module while shrinking the legacy
15//! `handlers/http.rs` toward the long-term ≤600-LOC target.
16//!
17//! Functions in this module:
18//!   - `entity_register`        (POST /api/v1/entities)
19//!   - `entity_get_by_alias`    (GET  /api/v1/entities/by_alias)
20//!   - `kg_timeline`            (GET  /api/v1/kg/timeline)
21//!   - `kg_invalidate`          (POST /api/v1/kg/invalidate)
22//!   - `kg_find_paths`          (POST /api/v1/kg/find_paths)
23//!   - `kg_query`               (POST /api/v1/kg/query)
24
25#![allow(clippy::too_many_lines)]
26
27use crate::models::Memory;
28use crate::models::field_names;
29use axum::{
30    Json,
31    extract::{Query, State},
32    http::{HeaderMap, StatusCode},
33    response::IntoResponse,
34};
35use serde::Deserialize;
36use serde_json::json;
37
38use crate::db;
39use crate::identity::sentinels;
40use crate::validate;
41
42use super::AppState;
43#[cfg(feature = "sal")]
44use super::StorageBackend;
45#[cfg(feature = "sal")]
46use super::store_err_to_response;
47
48/// Request body for `POST /api/v1/entities` (Pillar 2 / Stream B).
49#[derive(Debug, Deserialize)]
50pub struct EntityRegisterBody {
51    pub canonical_name: String,
52    pub namespace: String,
53    /// Aliases that should resolve to this entity. Blanks are skipped;
54    /// duplicates collapse via `entity_aliases`'s primary key.
55    #[serde(default)]
56    pub aliases: Vec<String>,
57    /// Arbitrary metadata to merge onto the entity memory. `kind` is
58    /// always overwritten with `"entity"`.
59    #[serde(default)]
60    pub metadata: serde_json::Value,
61    /// Override the resolved NHI for this request's
62    /// `metadata.agent_id`. Falls back to the `X-Agent-Id` header
63    /// when omitted.
64    pub agent_id: Option<String>,
65}
66
67/// Query parameters for `GET /api/v1/entities/by_alias` (Pillar 2 /
68/// Stream B).
69#[derive(Debug, Deserialize)]
70pub struct EntityByAliasQuery {
71    pub alias: String,
72    pub namespace: Option<String>,
73}
74
75/// `POST /api/v1/entities` — REST mirror of the MCP
76/// `memory_entity_register` tool. Idempotent on
77/// `(canonical_name, namespace)`; merges aliases on re-registration.
78pub async fn entity_register(
79    State(app): State<AppState>,
80    headers: HeaderMap,
81    Json(body): Json<EntityRegisterBody>,
82) -> impl IntoResponse {
83    if let Err(e) = validate::validate_title(&body.canonical_name) {
84        return (
85            StatusCode::BAD_REQUEST,
86            Json(json!({"error": format!("invalid canonical_name: {e}")})),
87        )
88            .into_response();
89    }
90    if let Err(e) = validate::validate_namespace(&body.namespace) {
91        return (
92            StatusCode::BAD_REQUEST,
93            Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
94        )
95            .into_response();
96    }
97
98    let agent_id = body
99        .agent_id
100        .as_deref()
101        .or_else(|| {
102            headers
103                .get(crate::HEADER_AGENT_ID)
104                .and_then(|v| v.to_str().ok())
105        })
106        .map(str::trim)
107        .filter(|s| !s.is_empty())
108        .map(str::to_string);
109    if let Some(aid) = agent_id.as_deref()
110        && let Err(e) = validate::validate_agent_id(aid)
111    {
112        return (
113            StatusCode::BAD_REQUEST,
114            Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
115        )
116            .into_response();
117    }
118
119    let extra_metadata = if body.metadata.is_object() {
120        body.metadata.clone()
121    } else {
122        json!({})
123    };
124
125    // v0.7.0 Wave-3 Continuation — postgres-backed daemons register
126    // the entity as a regular memory (title = canonical_name,
127    // namespace = body.namespace, kind=entity in metadata) via the
128    // SAL `store` method. The wire shape mirrors the SQLite path.
129    //
130    // v0.7.0 Wave-3 Continuation 4 (Bucket E / S47) — alias-union
131    // persistence on re-register. The SAL `store` method upserts on
132    // `(title, namespace)`, but a naive overwrite of `metadata.aliases`
133    // erases any aliases registered previously. To preserve the
134    // canonical SQLite contract (`db::entity_register` unions aliases
135    // across registrations), we first list any matching entity row and
136    // union its prior aliases into the incoming set before the upsert.
137    // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): the postgres branch now
138    // rides the SAL trait `entity_register` (the alias-union walk +
139    // upsert is encapsulated inside the adapter, byte-for-byte aligned
140    // with the sqlite `db::entity_register` contract). Pre-batch5 the
141    // handler open-coded the alias union + `app.store.store` upsert in
142    // ~150 LOC; the trait method collapses that to a single call. The
143    // governance enforcement gate (F-A2A1.5 / #705) is preserved
144    // verbatim — entity rows remain governance-relevant writes and
145    // must consult the namespace policy before the underlying upsert.
146    #[cfg(feature = "sal")]
147    if matches!(app.storage_backend, StorageBackend::Postgres) {
148        let aid = agent_id
149            .clone()
150            .unwrap_or_else(|| "anonymous:entity-register".to_string());
151        let ctx = crate::store::CallerContext::for_agent(aid.clone());
152
153        // F-A2A1.5 (#705) — governance enforcement runs BEFORE the
154        // entity_register trait call so deny / pending / 403 / 202
155        // semantics match the sqlite path. The payload shape is the
156        // canonical entity-create body so a downstream approver replay
157        // can reconstruct the registration on `execute_pending_action`.
158        {
159            use crate::models::GovernanceDecision;
160            let payload_for_pending = serde_json::json!({
161                "title": body.canonical_name,
162                "namespace": body.namespace,
163                "tier": "long",
164                "tags": ["entity"],
165                "metadata": &extra_metadata,
166                "aliases": &body.aliases,
167            });
168            match app
169                .store
170                .enforce_governance_action(
171                    crate::store::GovernedAction::Store,
172                    &body.namespace,
173                    &aid,
174                    None,
175                    None,
176                    &payload_for_pending,
177                )
178                .await
179            {
180                Ok(GovernanceDecision::Allow) => {}
181                Ok(GovernanceDecision::Deny(refusal)) => {
182                    return (
183                        StatusCode::FORBIDDEN,
184                        Json(json!({
185                            "error": crate::governance::deny_message(
186                                "entity_register",
187                                crate::governance::DenyGate::Governance,
188                                &refusal.reason,
189                            ),
190                        })),
191                    )
192                        .into_response();
193                }
194                Ok(GovernanceDecision::Pending(pending_id)) => {
195                    return (
196                        StatusCode::ACCEPTED,
197                        Json(json!({
198                            "status": "pending",
199                            (field_names::PENDING_ID): pending_id,
200                            "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
201                            "action": "store",
202                            "namespace": body.namespace,
203                            (field_names::STORAGE_BACKEND): "postgres",
204                        })),
205                    )
206                        .into_response();
207                }
208                Err(e) => return store_err_to_response(e),
209            }
210        }
211
212        return match app
213            .store
214            .entity_register(
215                &ctx,
216                &body.canonical_name,
217                &body.namespace,
218                &body.aliases,
219                &extra_metadata,
220                Some(&aid),
221            )
222            .await
223        {
224            Ok(reg) => (
225                if reg.created {
226                    StatusCode::CREATED
227                } else {
228                    StatusCode::OK
229                },
230                Json(json!({
231                    "entity_id": reg.entity_id,
232                    (field_names::CANONICAL_NAME): reg.canonical_name,
233                    "namespace": reg.namespace,
234                    "aliases": reg.aliases,
235                    "created": reg.created,
236                })),
237            )
238                .into_response(),
239            Err(e) => store_err_to_response(e),
240        };
241    }
242
243    let lock = app.db.lock().await;
244    match db::entity_register(
245        &lock.0,
246        &body.canonical_name,
247        &body.namespace,
248        &body.aliases,
249        &extra_metadata,
250        agent_id.as_deref(),
251    ) {
252        Ok(reg) => {
253            let status = if reg.created {
254                StatusCode::CREATED
255            } else {
256                StatusCode::OK
257            };
258            (
259                status,
260                Json(json!({
261                    "entity_id": reg.entity_id,
262                    (field_names::CANONICAL_NAME): reg.canonical_name,
263                    "namespace": reg.namespace,
264                    "aliases": reg.aliases,
265                    "created": reg.created,
266                })),
267            )
268                .into_response()
269        }
270        Err(e) => {
271            // Title-collision errors carry a stable, recognisable
272            // substring; surface them as 409 Conflict so callers can
273            // distinguish a genuine name clash from internal failure.
274            let msg = e.to_string();
275            if msg.contains("non-entity memory") {
276                return (StatusCode::CONFLICT, Json(json!({"error": msg}))).into_response();
277            }
278            crate::handlers::errors::handler_error_500(&e)
279        }
280    }
281}
282
283/// `GET /api/v1/entities/by_alias?alias=<>&namespace=<>` — REST mirror
284/// of the MCP `memory_entity_get_by_alias` tool. Returns
285/// `{ found: false, ... }` with HTTP 200 when no entity claims the
286/// alias under the filter, so callers don't have to disambiguate
287/// "no match" from a server error.
288pub async fn entity_get_by_alias(
289    State(app): State<AppState>,
290    headers: axum::http::HeaderMap,
291    Query(p): Query<EntityByAliasQuery>,
292) -> impl IntoResponse {
293    #[cfg(not(feature = "sal"))]
294    let _ = &headers;
295    let alias = p.alias.trim();
296    if alias.is_empty() {
297        return (
298            StatusCode::BAD_REQUEST,
299            Json(json!({"error": "alias is required"})),
300        )
301            .into_response();
302    }
303    let namespace = p
304        .namespace
305        .as_deref()
306        .map(str::trim)
307        .filter(|s| !s.is_empty());
308    if let Some(ns) = namespace
309        && let Err(e) = validate::validate_namespace(ns)
310    {
311        return (
312            StatusCode::BAD_REQUEST,
313            Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
314        )
315            .into_response();
316    }
317
318    // v0.7.0 ARCH-2 followup (FX-C2-batch3) — postgres-backed daemons
319    // route through `MemoryStore::entity_get_by_alias` first for an
320    // exact-alias match (the canonical resolution path). When the
321    // dedicated trait method returns `Ok(None)` we fall back to the
322    // legacy SAL `list` walk to preserve the `m.title.eq_ignore_ascii_case`
323    // fallback (alias-or-title match) and `metadata.aliases` array
324    // walk. Visibility filtering applies to the fallback path
325    // identically to pre-fix behaviour.
326    #[cfg(feature = "sal-postgres")]
327    if matches!(app.storage_backend, StorageBackend::Postgres) {
328        // 1. Trait-method exact-alias match — sqlx-native, single
329        //    indexed lookup against `entity_aliases`.
330        match app.store.entity_get_by_alias(alias, namespace).await {
331            Ok(Some(rec)) => {
332                // Apply the post-fix visibility mask: hide the entity
333                // if the caller cannot see the underlying memory row.
334                let caller = {
335                    let header_agent_id = headers
336                        .get(crate::HEADER_AGENT_ID)
337                        .and_then(|v| v.to_str().ok());
338                    crate::identity::resolve_http_agent_id(None, header_agent_id)
339                        .unwrap_or_else(|_| crate::identity::anonymous_request_id())
340                };
341                let caller_is_admin =
342                    crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
343                let ctx_admin =
344                    crate::store::CallerContext::for_admin_checked(caller.clone(), caller_is_admin);
345                let visible = caller_is_admin
346                    || app
347                        .store
348                        .get(&ctx_admin, &rec.entity_id)
349                        .await
350                        .ok()
351                        .as_ref()
352                        .is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
353                if visible {
354                    return Json(json!({
355                        "found": true,
356                        "entity_id": rec.entity_id,
357                        (field_names::CANONICAL_NAME): rec.canonical_name,
358                        "namespace": rec.namespace,
359                        "aliases": rec.aliases,
360                    }))
361                    .into_response();
362                }
363                // Fall through to fallback-walk shape on visibility mask
364                // — emits the `found:false` envelope below.
365            }
366            Ok(None) => { /* fall through to title-fallback walk */ }
367            Err(e) => return store_err_to_response(e),
368        }
369        // 2. Fallback walk: legacy SAL `list` so the title-eq-alias
370        //    branch and `metadata.aliases` array case-insensitive
371        //    match stay functional.
372        let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
373        let filter = crate::store::Filter {
374            namespace: namespace.map(str::to_string),
375            limit: 1000,
376            ..Default::default()
377        };
378        return match app.store.list(&ctx, &filter).await {
379            Ok(memories) => {
380                for m in &memories {
381                    let Some(meta) = m.metadata.as_object() else {
382                        continue;
383                    };
384                    let Some(kind) = meta.get("kind").and_then(|v| v.as_str()) else {
385                        continue;
386                    };
387                    if kind != "entity" {
388                        continue;
389                    }
390                    // #869 audit (Category B — safe default): an entity
391                    // with no `aliases` array collapses to empty
392                    // `Vec<String>`; the lookup falls through to the
393                    // `m.title.eq_ignore_ascii_case(alias)` branch.
394                    let aliases: Vec<String> = meta
395                        .get("aliases")
396                        .and_then(|v| v.as_array())
397                        .map(|a| {
398                            a.iter()
399                                .filter_map(|x| x.as_str().map(str::to_string))
400                                .collect()
401                        })
402                        .unwrap_or_default();
403                    if aliases.iter().any(|a| a.eq_ignore_ascii_case(alias))
404                        || m.title.eq_ignore_ascii_case(alias)
405                    {
406                        return Json(json!({
407                            "found": true,
408                            "entity_id": m.id,
409                            (field_names::CANONICAL_NAME): m.title,
410                            "namespace": m.namespace,
411                            "aliases": aliases,
412                        }))
413                        .into_response();
414                    }
415                }
416                Json(json!({
417                    "found": false,
418                    "entity_id": null,
419                    (field_names::CANONICAL_NAME): null,
420                    "namespace": null,
421                    "aliases": [],
422                }))
423                .into_response()
424            }
425            Err(e) => store_err_to_response(e),
426        };
427    }
428
429    // #947 SECURITY-medium (Track A QC sweep, 2026-05-20) — resolve
430    // caller for the visibility post-filter on entity aliases. Pre-fix
431    // any caller could resolve a private entity by alias in the sqlite
432    // branch. Admin bypasses the filter.
433    let caller = {
434        let header_agent_id = headers
435            .get(crate::HEADER_AGENT_ID)
436            .and_then(|v| v.to_str().ok());
437        crate::identity::resolve_http_agent_id(None, header_agent_id)
438            .unwrap_or_else(|_| crate::identity::anonymous_request_id())
439    };
440    let caller_is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
441
442    let lock = app.db.lock().await;
443    match db::entity_get_by_alias(&lock.0, alias, namespace) {
444        Ok(Some(rec)) => {
445            // Mask the entity if the caller cannot see the underlying
446            // memory row. The `entity_id` IS the memory id by the
447            // entity-as-memory contract; if the row exists and is not
448            // visible to the caller, return the found:false shape
449            // (existence-leak mask). If the row is missing entirely
450            // (e.g. legacy entity-alias row without a backing memory),
451            // fall through to the visible path — the alias is a
452            // namespace-scoped pointer, not a private secret.
453            let visible = caller_is_admin
454                || db::get(&lock.0, &rec.entity_id)
455                    .ok()
456                    .flatten()
457                    .as_ref()
458                    .is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
459            if !visible {
460                return Json(json!({
461                    "found": false,
462                    "entity_id": null,
463                    (field_names::CANONICAL_NAME): null,
464                    "namespace": null,
465                    "aliases": [],
466                }))
467                .into_response();
468            }
469            Json(json!({
470                "found": true,
471                "entity_id": rec.entity_id,
472                (field_names::CANONICAL_NAME): rec.canonical_name,
473                "namespace": rec.namespace,
474                "aliases": rec.aliases,
475            }))
476            .into_response()
477        }
478        Ok(None) => Json(json!({
479            "found": false,
480            "entity_id": null,
481            (field_names::CANONICAL_NAME): null,
482            "namespace": null,
483            "aliases": [],
484        }))
485        .into_response(),
486        Err(e) => crate::handlers::errors::handler_error_500(&e),
487    }
488}
489
490/// Query parameters for `GET /api/v1/kg/timeline` (Pillar 2 / Stream C).
491#[derive(Debug, Deserialize)]
492pub struct KgTimelineQuery {
493    pub source_id: String,
494    pub since: Option<String>,
495    pub until: Option<String>,
496    pub limit: Option<usize>,
497}
498
499/// `GET /api/v1/kg/timeline?source_id=<>&since=<>&until=<>&limit=<>` —
500/// REST mirror of the MCP `memory_kg_timeline` tool. Returns outbound
501/// link assertions from `source_id` ordered by `valid_from ASC`.
502pub async fn kg_timeline(
503    State(app): State<AppState>,
504    headers: axum::http::HeaderMap,
505    Query(p): Query<KgTimelineQuery>,
506) -> impl IntoResponse {
507    if let Err(e) = validate::validate_id(&p.source_id) {
508        return (
509            StatusCode::BAD_REQUEST,
510            Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
511        )
512            .into_response();
513    }
514    let since = p.since.as_deref().map(str::trim).filter(|s| !s.is_empty());
515    let until = p.until.as_deref().map(str::trim).filter(|s| !s.is_empty());
516    if let Some(s) = since
517        && let Err(e) = validate::validate_expires_at_format(s)
518    {
519        return (
520            StatusCode::BAD_REQUEST,
521            Json(json!({"error": format!("invalid since: {e}")})),
522        )
523            .into_response();
524    }
525    if let Some(u) = until
526        && let Err(e) = validate::validate_expires_at_format(u)
527    {
528        return (
529            StatusCode::BAD_REQUEST,
530            Json(json!({"error": format!("invalid until: {e}")})),
531        )
532            .into_response();
533    }
534
535    // #944 SECURITY-high (Track A QC sweep, 2026-05-20) —
536    // caller-vs-source-memory-owner gate. Pre-fix the GET handler
537    // took NO `headers: HeaderMap` parameter, so any authenticated
538    // caller could read the full outbound link-event timeline
539    // (`target_id`, `relation`, `valid_from`, `valid_until`,
540    // `observed_by`, `title`, `target_namespace`) for ANY source_id
541    // — including memories owned by other tenants. Cross-tenant
542    // info-leak on the temporal-graph surface. Mirrors the #938
543    // `kg_invalidate` gate shape (commit 54706eeed, same file) and
544    // the #937 `delete_memory` shape (commit a582bdc5b).
545    let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
546        Ok(c) => c,
547        Err(err) => {
548            return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
549        }
550    };
551
552    // Fetch the source memory + verify caller owns it (or is the
553    // inbox target, or the row is unowned-legacy, or caller is
554    // "daemon" sentinel). Mirrors the gate shape in #938
555    // kg_invalidate and #937 delete_memory.
556    //
557    // #1134: branch on storage_backend so postgres-backed daemons
558    // read the source memory from postgres instead of the empty
559    // SQLite scratch connection (which made every postgres-backed
560    // kg_timeline call return 404 regardless of the actual memory's
561    // existence in the live store).
562    let extract_owner_target = |mem: &Memory| -> (String, String) {
563        let owner = mem
564            .metadata
565            .get("agent_id")
566            .and_then(|v| v.as_str())
567            .unwrap_or("")
568            .to_string();
569        let target = mem
570            .metadata
571            .get(field_names::TARGET_AGENT_ID)
572            .and_then(|v| v.as_str())
573            .unwrap_or("")
574            .to_string();
575        (owner, target)
576    };
577    let source_owner: Option<(String, String)> = {
578        #[cfg(feature = "sal")]
579        if matches!(app.storage_backend, StorageBackend::Postgres) {
580            let ctx = crate::store::CallerContext::for_agent(caller.clone());
581            match app.store.get(&ctx, &p.source_id).await {
582                Ok(mem) => Some(extract_owner_target(&mem)),
583                Err(e) => {
584                    let msg = format!("{e:?}");
585                    if msg.contains("NotFound") || msg.contains("not found") {
586                        None
587                    } else {
588                        tracing::error!("kg_timeline: source lookup failed (postgres): {e:?}");
589                        return (
590                            StatusCode::INTERNAL_SERVER_ERROR,
591                            Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
592                        )
593                            .into_response();
594                    }
595                }
596            }
597        } else {
598            let lock = app.db.lock().await;
599            match db::get(&lock.0, &p.source_id) {
600                Ok(Some(mem)) => Some(extract_owner_target(&mem)),
601                Ok(None) => None,
602                Err(e) => {
603                    tracing::error!("kg_timeline: source lookup failed: {e}");
604                    return (
605                        StatusCode::INTERNAL_SERVER_ERROR,
606                        Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
607                    )
608                        .into_response();
609                }
610            }
611        }
612        #[cfg(not(feature = "sal"))]
613        {
614            let lock = app.db.lock().await;
615            match db::get(&lock.0, &p.source_id) {
616                Ok(Some(mem)) => Some(extract_owner_target(&mem)),
617                Ok(None) => None,
618                Err(e) => {
619                    tracing::error!("kg_timeline: source lookup failed: {e}");
620                    return (
621                        StatusCode::INTERNAL_SERVER_ERROR,
622                        Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
623                    )
624                        .into_response();
625                }
626            }
627        }
628    };
629    let Some((owner, target)) = source_owner else {
630        return (
631            StatusCode::NOT_FOUND,
632            Json(json!({
633                "found": false,
634                "source_id": p.source_id,
635                "error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
636            })),
637        )
638            .into_response();
639    };
640    let is_unowned_legacy = owner.is_empty();
641    if !is_unowned_legacy
642        && owner != caller
643        && target != caller
644        && caller != sentinels::DAEMON_PRINCIPAL
645    {
646        tracing::warn!(
647            target: super::AUTHZ_TRACE_TARGET,
648            "GET /api/v1/kg/timeline 403: caller {caller} != owner {owner} (source_id={})",
649            p.source_id
650        );
651        return (
652            StatusCode::FORBIDDEN,
653            Json(json!({
654                "error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
655                "owner": owner,
656                "caller": caller,
657                "source_id": p.source_id,
658            })),
659        )
660            .into_response();
661    }
662
663    // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): postgres dispatches via
664    // the new `MemoryStore::kg_timeline` trait method (the SAL is the
665    // canonical kg_timeline surface). The legacy
666    // `kg_timeline_via_store` helper stays in place for out-of-tree
667    // back-compat but new routes ride the trait. The adapter still
668    // resolves AGE vs CTE backend at connect time and projects rows in
669    // the shared `KgTimelineRow` shape so the wire envelope stays
670    // parity-equal to the SQLite path.
671    #[cfg(feature = "sal-postgres")]
672    if matches!(app.storage_backend, StorageBackend::Postgres) {
673        let limit = p.limit;
674        return match app
675            .store
676            .kg_timeline(&p.source_id, since, until, limit)
677            .await
678        {
679            Ok(events) => {
680                let events_json: Vec<serde_json::Value> = events
681                    .iter()
682                    .map(|e| {
683                        json!({
684                            "target_id": e.target_id,
685                            "relation": e.relation,
686                            (field_names::VALID_FROM): e.valid_from,
687                            (field_names::VALID_UNTIL): e.valid_until,
688                            (field_names::OBSERVED_BY): e.observed_by,
689                            "title": e.title,
690                            (field_names::TARGET_NAMESPACE): e.target_namespace,
691                        })
692                    })
693                    .collect();
694                Json(json!({
695                    "source_id": p.source_id,
696                    "events": events_json,
697                    "count": events.len(),
698                }))
699                .into_response()
700            }
701            Err(e) => store_err_to_response(e),
702        };
703    }
704
705    let lock = app.db.lock().await;
706    match db::kg_timeline(&lock.0, &p.source_id, since, until, p.limit) {
707        Ok(events) => {
708            let events_json: Vec<serde_json::Value> = events
709                .iter()
710                .map(|e| {
711                    json!({
712                        "target_id": e.target_id,
713                        "relation": e.relation,
714                        (field_names::VALID_FROM): e.valid_from,
715                        (field_names::VALID_UNTIL): e.valid_until,
716                        (field_names::OBSERVED_BY): e.observed_by,
717                        "title": e.title,
718                        (field_names::TARGET_NAMESPACE): e.target_namespace,
719                    })
720                })
721                .collect();
722            Json(json!({
723                "source_id": p.source_id,
724                "events": events_json,
725                "count": events.len(),
726            }))
727            .into_response()
728        }
729        Err(e) => crate::handlers::errors::handler_error_500(&e),
730    }
731}
732
733/// JSON body for `POST /api/v1/kg/invalidate` (Pillar 2 / Stream C —
734/// `memory_kg_invalidate`). The link is identified by its composite
735/// key; `valid_until` defaults to wall-clock now when omitted.
736#[derive(Debug, Deserialize)]
737pub struct KgInvalidateBody {
738    pub source_id: String,
739    pub target_id: String,
740    pub relation: String,
741    pub valid_until: Option<String>,
742}
743
744/// `POST /api/v1/kg/invalidate` — REST mirror of `memory_kg_invalidate`.
745/// 200 with `{found: true, …, previous_valid_until}` when the link
746/// existed; 404 with `{found: false}` when no link matches the triple.
747pub async fn kg_invalidate(
748    State(app): State<AppState>,
749    headers: axum::http::HeaderMap,
750    Json(body): Json<KgInvalidateBody>,
751) -> impl IntoResponse {
752    if let Err(e) = validate::RequestValidator::validate_link_triple(
753        &body.source_id,
754        &body.target_id,
755        &body.relation,
756    ) {
757        return (
758            StatusCode::BAD_REQUEST,
759            Json(json!({"error": e.to_string()})),
760        )
761            .into_response();
762    }
763    let valid_until = body
764        .valid_until
765        .as_deref()
766        .map(str::trim)
767        .filter(|s| !s.is_empty());
768    if let Some(ts) = valid_until
769        && let Err(e) = validate::validate_expires_at_format(ts)
770    {
771        return (
772            StatusCode::BAD_REQUEST,
773            Json(json!({"error": format!("invalid valid_until: {e}")})),
774        )
775            .into_response();
776    }
777
778    // #938 SECURITY-high (Track A QC sweep, 2026-05-20) —
779    // caller-vs-source-memory-owner gate. Pre-fix any HTTP caller
780    // could forge temporal-graph state by invalidating another
781    // tenant's `:supersedes` / `:contradicts` / governance edges via
782    // `valid_until = now()`, hiding contradiction history. Mirrors the
783    // #930 caller-vs-owner gate shape on update/promote.
784    let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
785        Ok(c) => c,
786        Err(err) => {
787            return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
788        }
789    };
790
791    // Fetch the source memory + verify caller owns it (or is the
792    // inbox target, or the row is unowned-legacy, or caller is
793    // "daemon" sentinel). Mirrors the gate shape in #930 update_memory
794    // and #936 archive_purge.
795    let source_owner: Option<(String, String)> = {
796        let lock = app.db.lock().await;
797        match db::get(&lock.0, &body.source_id) {
798            Ok(Some(mem)) => {
799                let owner = mem
800                    .metadata
801                    .get("agent_id")
802                    .and_then(|v| v.as_str())
803                    .unwrap_or("")
804                    .to_string();
805                let target = mem
806                    .metadata
807                    .get(field_names::TARGET_AGENT_ID)
808                    .and_then(|v| v.as_str())
809                    .unwrap_or("")
810                    .to_string();
811                Some((owner, target))
812            }
813            Ok(None) => None,
814            Err(e) => {
815                tracing::error!("kg_invalidate: source lookup failed: {e}");
816                return (
817                    StatusCode::INTERNAL_SERVER_ERROR,
818                    Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
819                )
820                    .into_response();
821            }
822        }
823    };
824    let Some((owner, target)) = source_owner else {
825        return (
826            StatusCode::NOT_FOUND,
827            Json(json!({
828                "found": false,
829                "source_id": body.source_id,
830                "target_id": body.target_id,
831                "relation": body.relation,
832                "error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
833            })),
834        )
835            .into_response();
836    };
837    let is_unowned_legacy = owner.is_empty();
838    if !is_unowned_legacy
839        && owner != caller
840        && target != caller
841        && caller != sentinels::DAEMON_PRINCIPAL
842    {
843        tracing::warn!(
844            target: super::AUTHZ_TRACE_TARGET,
845            "POST /api/v1/kg/invalidate 403: caller {caller} != owner {owner} (source_id={})",
846            body.source_id
847        );
848        return (
849            StatusCode::FORBIDDEN,
850            Json(json!({
851                "error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
852                "owner": owner,
853                "caller": caller,
854                "source_id": body.source_id,
855            })),
856        )
857            .into_response();
858    }
859
860    // v0.7.0 SAL-routing batch-4 (FX-C2) — postgres dispatches via the
861    // canonical `MemoryStore::invalidate_link` trait method. The
862    // pre-fix `kg_invalidate_via_store` helper (an `as_any_for_postgres`
863    // downcast hatch) stays in place for back-compat callers but new
864    // routes ride the trait surface — no SAL-boundary bypass.
865    #[cfg(feature = "sal-postgres")]
866    if matches!(app.storage_backend, StorageBackend::Postgres) {
867        return match app
868            .store
869            .invalidate_link(
870                &body.source_id,
871                &body.target_id,
872                &body.relation,
873                valid_until,
874            )
875            .await
876        {
877            Ok(res) if res.found => (
878                StatusCode::OK,
879                Json(json!({
880                    "found": true,
881                    "source_id": body.source_id,
882                    "target_id": body.target_id,
883                    "relation": body.relation,
884                    (field_names::VALID_UNTIL): res.valid_until,
885                    (field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
886                })),
887            )
888                .into_response(),
889            Ok(_) => (
890                StatusCode::NOT_FOUND,
891                Json(json!({
892                    "found": false,
893                    "source_id": body.source_id,
894                    "target_id": body.target_id,
895                    "relation": body.relation,
896                })),
897            )
898                .into_response(),
899            Err(e) => store_err_to_response(e),
900        };
901    }
902
903    let lock = app.db.lock().await;
904    match db::invalidate_link(
905        &lock.0,
906        &body.source_id,
907        &body.target_id,
908        &body.relation,
909        valid_until,
910    ) {
911        Ok(Some(res)) => (
912            StatusCode::OK,
913            Json(json!({
914                "found": true,
915                "source_id": body.source_id,
916                "target_id": body.target_id,
917                "relation": body.relation,
918                (field_names::VALID_UNTIL): res.valid_until,
919                (field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
920            })),
921        )
922            .into_response(),
923        Ok(None) => (
924            StatusCode::NOT_FOUND,
925            Json(json!({
926                "found": false,
927                "source_id": body.source_id,
928                "target_id": body.target_id,
929                "relation": body.relation,
930            })),
931        )
932            .into_response(),
933        Err(e) => crate::handlers::errors::handler_error_500(&e),
934    }
935}
936
937/// JSON body for `POST /api/v1/kg/find_paths`.
938///
939/// `source_id` + `target_id` are required. `max_depth` defaults to the
940/// adapter's `FIND_PATHS_DEFAULT_DEPTH`; `max_results` clamps the
941/// returned path count.
942#[derive(Debug, Deserialize)]
943pub struct FindPathsBody {
944    /// Source memory id. Accepts the legacy `from_id` alias for
945    /// compatibility with the MCP `memory_find_paths` tool, the CLI
946    /// `find-paths --from`, and pre-v0.7.0 docs (#934 field-name drift
947    /// fix, 2026-05-20).
948    #[serde(alias = "from_id")]
949    pub source_id: String,
950    /// Target memory id. Accepts the legacy `to_id` alias for the same
951    /// MCP / CLI / docs compatibility surface as `source_id`.
952    #[serde(alias = "to_id")]
953    pub target_id: String,
954    #[serde(default)]
955    pub max_depth: Option<usize>,
956    #[serde(default)]
957    pub max_results: Option<usize>,
958}
959
960/// `POST /api/v1/kg/find_paths` — enumerate up to N paths between two
961/// memories. Wraps the SAL [`MemoryStore::find_paths`] surface so both
962/// SQLite (recursive CTE) and Postgres (AGE Cypher / CTE fallback)
963/// dispatch through the same handler.
964///
965/// Wire shape: `{paths: [[id, id, ...], ...], count}`. Each inner
966/// array is the chain of memory ids from `source_id` to `target_id`,
967/// inclusive.
968pub async fn kg_find_paths(
969    State(app): State<AppState>,
970    headers: HeaderMap,
971    Json(body): Json<FindPathsBody>,
972) -> impl IntoResponse {
973    if let Err(e) = validate::validate_id(&body.source_id) {
974        return (
975            StatusCode::BAD_REQUEST,
976            Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
977        )
978            .into_response();
979    }
980    if let Err(e) = validate::validate_id(&body.target_id) {
981        return (
982            StatusCode::BAD_REQUEST,
983            Json(json!({"error": format!("invalid target_id: {e}")})),
984        )
985            .into_response();
986    }
987
988    // #910 SAL-level — resolve the caller so the trait method's
989    // visibility filter (path-traversal flavour) sees the right
990    // principal. Header-only authentication on this POST surface;
991    // anonymous callers get a per-request `anonymous:req-…` id.
992    let header_agent_id = headers
993        .get(crate::HEADER_AGENT_ID)
994        .and_then(|v| v.to_str().ok());
995    let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
996        Ok(id) => id,
997        Err(e) => {
998            return (
999                StatusCode::BAD_REQUEST,
1000                Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
1001            )
1002                .into_response();
1003        }
1004    };
1005
1006    #[cfg(feature = "sal")]
1007    {
1008        let ctx = crate::store::CallerContext::for_agent(&caller);
1009        return match app
1010            .store
1011            .find_paths(
1012                &ctx,
1013                &body.source_id,
1014                &body.target_id,
1015                body.max_depth,
1016                body.max_results,
1017            )
1018            .await
1019        {
1020            Ok(paths) => {
1021                if crate::audit::is_enabled() {
1022                    crate::audit::emit(crate::audit::EventBuilder::new(
1023                        crate::audit::AuditAction::Recall,
1024                        crate::audit::actor(sentinels::AI_HTTP, "http_body", None),
1025                        crate::audit::target_memory(
1026                            body.source_id.clone(),
1027                            String::new(),
1028                            Some(format!("find_paths -> {}", body.target_id)),
1029                            None,
1030                            None,
1031                        ),
1032                    ));
1033                }
1034                let count = paths.len();
1035                Json(json!({
1036                    "paths": paths,
1037                    "count": count,
1038                    "source_id": body.source_id,
1039                    "target_id": body.target_id,
1040                }))
1041                .into_response()
1042            }
1043            Err(e) => {
1044                let msg = e.to_string();
1045                if msg.contains("max_depth") || msg.contains("depth") {
1046                    return (
1047                        StatusCode::UNPROCESSABLE_ENTITY,
1048                        Json(json!({"error": msg})),
1049                    )
1050                        .into_response();
1051                }
1052                store_err_to_response(e)
1053            }
1054        };
1055    }
1056
1057    #[cfg(not(feature = "sal"))]
1058    {
1059        let _ = app;
1060        let _ = body;
1061        let _ = caller;
1062        (
1063            StatusCode::NOT_IMPLEMENTED,
1064            Json(json!({"error": "find_paths requires --features sal"})),
1065        )
1066            .into_response()
1067    }
1068}
1069
1070/// JSON body for `POST /api/v1/kg/query` (Pillar 2 / Stream C —
1071/// `memory_kg_query`). POST is used because `allowed_agents` is a list;
1072/// keeping it in a body avoids over-long query strings and keeps the
1073/// surface symmetric with `POST /api/v1/kg/invalidate`. `max_depth`
1074/// defaults to 1 and is bounded by `KG_QUERY_MAX_SUPPORTED_DEPTH`.
1075#[derive(Debug, Deserialize)]
1076pub struct KgQueryBody {
1077    /// Canonical name. Aliased by `from` (S82's wire shape).
1078    #[serde(default)]
1079    pub source_id: Option<String>,
1080    /// `from` alias for `source_id` — the cert harness S82 uses
1081    /// `{from, to, max_depth, rel_types}`.
1082    #[serde(default)]
1083    pub from: Option<String>,
1084    /// Optional target id — when present the query is interpreted as
1085    /// a find-path between (`source_id`, `to`); kg_query's existing
1086    /// surface ignores it but accepting it keeps the wire shape
1087    /// flexible for the cert harness.
1088    #[serde(default)]
1089    pub to: Option<String>,
1090    pub max_depth: Option<usize>,
1091    pub valid_at: Option<String>,
1092    pub allowed_agents: Option<Vec<String>>,
1093    pub limit: Option<usize>,
1094    /// NHI-P3-T7 (v0.7.0 NHI testing): when omitted or false, the
1095    /// "current view" filter excludes edges whose `valid_until` lies
1096    /// in the past (invalidated via `memory_kg_invalidate`). Pass
1097    /// `true` to traverse the full historical link graph.
1098    #[serde(default)]
1099    pub include_invalidated: bool,
1100    /// Optional relation-type filter — accepted for forward-compat
1101    /// with the find_paths shape; unused on the current trait
1102    /// surface (CTE walks `:related_to` only).
1103    #[serde(default)]
1104    pub rel_types: Option<Vec<String>>,
1105}
1106
1107/// #910 (security-medium, 2026-05-19) — apply the scope=private
1108/// visibility filter on `POST /api/v1/kg/query` traversal results.
1109/// Pre-#910 the handler returned every reachable target node from
1110/// the recursive-CTE / AGE Cypher walk; a target whose
1111/// `metadata.scope == "private"` was visible to any caller who could
1112/// pass `kg_query` validation, including callers other than the
1113/// target's `metadata.agent_id` owner. The fix mirrors the post-
1114/// filter applied in `memories_query::list_memories` — a row is
1115/// visible iff `metadata.scope != "private"` OR
1116/// `metadata.agent_id == caller`. Rows we cannot fetch (deleted
1117/// since the traversal, in another namespace the caller cannot
1118/// read, etc.) fail-closed (excluded).
1119#[cfg(feature = "sal-postgres")]
1120async fn kg_query_filter_visible(
1121    app: &AppState,
1122    caller: &str,
1123    target_ids: Vec<String>,
1124) -> std::collections::HashSet<String> {
1125    // v0.7.0 F-E3 fix (issue #1436): route through the canonical
1126    // `crate::visibility::is_visible_to_caller` helper instead of
1127    // reimplementing the predicate inline. The pre-fix copy was missing
1128    // the `target_agent_id` inbox carve-out (the same defect class #951
1129    // closed at the SAL layer — the kg post-filter pre-dated that fix
1130    // and silently dropped inbox rows the recipient was entitled to
1131    // see).
1132    use std::collections::HashSet;
1133    let mut visible: HashSet<String> = HashSet::with_capacity(target_ids.len());
1134    let ctx = crate::store::CallerContext::for_agent(caller);
1135    for id in target_ids {
1136        if let Ok(mem) = app.store.get(&ctx, &id).await {
1137            if crate::visibility::is_visible_to_caller(&mem, caller) {
1138                visible.insert(id);
1139            }
1140        }
1141    }
1142    visible
1143}
1144
1145/// `POST /api/v1/kg/query` — REST mirror of the MCP `memory_kg_query`
1146/// tool. Returns outbound multi-hop traversal from `source_id` (1..=5
1147/// hops) filtered by the temporal/agent windows. 400 for invalid
1148/// IDs/timestamps; 422 when `max_depth` exceeds the supported ceiling
1149/// (clearer than 500 for what is a documented limitation, not an
1150/// internal error).
1151pub async fn kg_query(
1152    State(app): State<AppState>,
1153    headers: HeaderMap,
1154    Json(body): Json<KgQueryBody>,
1155) -> impl IntoResponse {
1156    // #910 (security-medium, 2026-05-19) — resolve the caller via the
1157    // `X-Agent-Id` header so the scope=private visibility filter
1158    // below has a known principal to compare `metadata.agent_id`
1159    // against. Pre-#910 `kg_query` returned every reachable target
1160    // node regardless of the target memory's `metadata.scope` — a
1161    // caller could enumerate scope=private targets owned by other
1162    // agents by walking from a public source row. Anonymous callers
1163    // get a per-request `anonymous:req-…` id and see only
1164    // non-private targets.
1165    let header_agent_id = headers
1166        .get(crate::HEADER_AGENT_ID)
1167        .and_then(|v| v.to_str().ok());
1168    let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
1169        Ok(id) => id,
1170        Err(e) => {
1171            return (
1172                StatusCode::BAD_REQUEST,
1173                Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
1174            )
1175                .into_response();
1176        }
1177    };
1178
1179    // S82's wire shape sends `from` instead of `source_id`; resolve
1180    // the canonical id from either field with `source_id` taking
1181    // precedence when both are supplied.
1182    //
1183    // #869 audit (Category B — safe default): empty `String` flows
1184    // into `validate_id` below which returns a typed 400 with the
1185    // "invalid source_id" envelope.
1186    let source_id = body
1187        .source_id
1188        .clone()
1189        .or_else(|| body.from.clone())
1190        .unwrap_or_default();
1191    if let Err(e) = validate::validate_id(&source_id) {
1192        return (
1193            StatusCode::BAD_REQUEST,
1194            Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
1195        )
1196            .into_response();
1197    }
1198    let max_depth = body.max_depth.unwrap_or(1);
1199    let valid_at = body
1200        .valid_at
1201        .as_deref()
1202        .map(str::trim)
1203        .filter(|s| !s.is_empty());
1204    if let Some(t) = valid_at
1205        && let Err(e) = validate::validate_expires_at_format(t)
1206    {
1207        return (
1208            StatusCode::BAD_REQUEST,
1209            Json(json!({"error": format!("invalid valid_at: {e}")})),
1210        )
1211            .into_response();
1212    }
1213    let allowed_agents: Option<Vec<String>> = body.allowed_agents.as_ref().map(|v| {
1214        v.iter()
1215            .map(|s| s.trim().to_string())
1216            .filter(|s| !s.is_empty())
1217            .collect()
1218    });
1219    if let Some(agents) = allowed_agents.as_ref() {
1220        for a in agents {
1221            if let Err(e) = validate::validate_agent_id(a) {
1222                return (
1223                    StatusCode::BAD_REQUEST,
1224                    Json(json!({"error": format!("invalid allowed_agents entry: {e}")})),
1225                )
1226                    .into_response();
1227            }
1228        }
1229    }
1230
1231    // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): postgres dispatches via
1232    // the new `MemoryStore::kg_query` trait method (the SAL is the
1233    // canonical kg_query surface). The legacy `kg_query_via_store`
1234    // helper stays in place for out-of-tree back-compat but new routes
1235    // ride the trait. Backend (AGE vs CTE) is still resolved at
1236    // adapter connect time inside `kg_query_with_history`.
1237    // Temporal/agent filters are applied client-side post-traversal
1238    // because the AGE Cypher path returns the unfiltered topology —
1239    // match the SQLite recursive-CTE wire shape.
1240    #[cfg(feature = "sal-postgres")]
1241    if matches!(app.storage_backend, StorageBackend::Postgres) {
1242        return match app
1243            .store
1244            .kg_query(&source_id, max_depth, body.include_invalidated)
1245            .await
1246        {
1247            Ok(nodes) => {
1248                // #910 — fetch each target's metadata, filter by the
1249                // scope=private visibility rule (see
1250                // `kg_query_filter_visible`). Pre-#910 every reachable
1251                // target was returned verbatim regardless of the
1252                // target's owner / scope.
1253                let target_ids: Vec<String> = nodes.iter().map(|n| n.target_id.clone()).collect();
1254                let visible = kg_query_filter_visible(&app, &caller, target_ids).await;
1255                let nodes: Vec<_> = nodes
1256                    .into_iter()
1257                    .filter(|n| visible.contains(&n.target_id))
1258                    .collect();
1259
1260                // S82's wire shape — when `to` is supplied, project a
1261                // single-path `paths` array of node-id chains so the
1262                // find-paths style consumer can read the result back
1263                // without a separate `find_paths` route.
1264                let memories_json: Vec<serde_json::Value> = nodes
1265                    .iter()
1266                    .map(|n| {
1267                        json!({
1268                            "target_id": n.target_id,
1269                            "relation": n.relation,
1270                            "depth": n.depth,
1271                            "path": n.path,
1272                        })
1273                    })
1274                    .collect();
1275                let mut paths_json: Vec<serde_json::Value> = Vec::new();
1276                if let Some(target) = body.to.as_deref() {
1277                    // Find the first traversal path that ends at `target`
1278                    // and project the chain as a list of node ids.
1279                    for n in &nodes {
1280                        if n.target_id == target {
1281                            let chain: Vec<String> =
1282                                n.path.split("->").map(str::to_string).collect();
1283                            paths_json.push(serde_json::Value::Array(
1284                                chain.into_iter().map(serde_json::Value::String).collect(),
1285                            ));
1286                            break;
1287                        }
1288                    }
1289                } else {
1290                    for n in &nodes {
1291                        paths_json.push(serde_json::Value::String(n.path.clone()));
1292                    }
1293                }
1294                Json(json!({
1295                    "source_id": source_id,
1296                    "max_depth": max_depth,
1297                    "memories": memories_json,
1298                    "paths": paths_json,
1299                    "count": nodes.len(),
1300                }))
1301                .into_response()
1302            }
1303            Err(e) => {
1304                let msg = e.to_string();
1305                if msg.contains("max_depth") || msg.contains("depth") {
1306                    (
1307                        StatusCode::UNPROCESSABLE_ENTITY,
1308                        Json(json!({"error": msg})),
1309                    )
1310                        .into_response()
1311                } else {
1312                    store_err_to_response(e)
1313                }
1314            }
1315        };
1316    }
1317
1318    // #910 — apply scope=private visibility filter on the SQLite path
1319    // too. The kg_query DB function returns the full reachable
1320    // topology with target metadata absent from the row shape; we
1321    // post-fetch each target's `metadata.scope` / `metadata.agent_id`
1322    // inside the same lock window so the filter sees an atomic view
1323    // of the traversal.
1324    let lock = app.db.lock().await;
1325    let kg_res = db::kg_query(
1326        &lock.0,
1327        &source_id,
1328        max_depth,
1329        valid_at,
1330        allowed_agents.as_deref(),
1331        body.limit,
1332        body.include_invalidated,
1333    );
1334    let nodes_opt = match &kg_res {
1335        Ok(nodes) => {
1336            // v0.7.0 F-E3 fix (#1436): route through the canonical
1337            // `is_visible_to_caller` helper. Pre-fix the predicate was
1338            // inlined here missing the inbox carve-out
1339            // (`target_agent_id` short-circuit) — the same defect class
1340            // #951 closed at the SAL layer.
1341            let mut visible: std::collections::HashSet<String> =
1342                std::collections::HashSet::with_capacity(nodes.len());
1343            for n in nodes {
1344                if let Ok(Some(mem)) = db::get(&lock.0, &n.target_id) {
1345                    if crate::visibility::is_visible_to_caller(&mem, &caller) {
1346                        visible.insert(n.target_id.clone());
1347                    }
1348                }
1349            }
1350            Some(visible)
1351        }
1352        Err(_) => None,
1353    };
1354    drop(lock);
1355    match kg_res {
1356        Ok(nodes) => {
1357            let visible = nodes_opt.unwrap_or_default();
1358            let nodes: Vec<_> = nodes
1359                .into_iter()
1360                .filter(|n| visible.contains(&n.target_id))
1361                .collect();
1362            let memories_json: Vec<serde_json::Value> = nodes
1363                .iter()
1364                .map(|n| {
1365                    json!({
1366                        "target_id": n.target_id,
1367                        "relation": n.relation,
1368                        (field_names::VALID_FROM): n.valid_from,
1369                        (field_names::VALID_UNTIL): n.valid_until,
1370                        (field_names::OBSERVED_BY): n.observed_by,
1371                        "title": n.title,
1372                        (field_names::TARGET_NAMESPACE): n.target_namespace,
1373                        "depth": n.depth,
1374                        "path": n.path,
1375                    })
1376                })
1377                .collect();
1378            let paths_json: Vec<&str> = nodes.iter().map(|n| n.path.as_str()).collect();
1379            Json(json!({
1380                "source_id": source_id,
1381                "max_depth": max_depth,
1382                "memories": memories_json,
1383                "paths": paths_json,
1384                "count": nodes.len(),
1385            }))
1386            .into_response()
1387        }
1388        Err(e) => {
1389            // The `kg_query` DB layer raises explicit errors for
1390            // depth=0 and for max_depth past the supported ceiling;
1391            // those are caller-fixable, not server faults.
1392            let msg = e.to_string();
1393            if msg.contains("max_depth") {
1394                return (
1395                    StatusCode::UNPROCESSABLE_ENTITY,
1396                    Json(json!({"error": msg})),
1397                )
1398                    .into_response();
1399            }
1400            crate::handlers::errors::handler_error_500(&e)
1401        }
1402    }
1403}