ai_memory/handlers/kg.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP handlers for the v0.7.0 knowledge-graph + entity surface (#650
5//! follow-up per-domain split). Each handler is a thin Axum-layer
6//! wrapper around the SAL `MemoryStore` trait (postgres path) or the
7//! legacy `db::*` API (sqlite path), shaping the result into the
8//! canonical wire envelope.
9//!
10//! All handlers were extracted verbatim from `src/handlers/http.rs`
11//! (commit `12e1253`, lines 4169-5013 + 5192-5419); wire compatibility
12//! is preserved via the `pub use kg::*` re-export from
13//! `src/handlers/mod.rs`. The split keeps the kg/entity domain in
14//! a single ~1 100-line module while shrinking the legacy
15//! `handlers/http.rs` toward the long-term ≤600-LOC target.
16//!
17//! Functions in this module:
18//! - `entity_register` (POST /api/v1/entities)
19//! - `entity_get_by_alias` (GET /api/v1/entities/by_alias)
20//! - `kg_timeline` (GET /api/v1/kg/timeline)
21//! - `kg_invalidate` (POST /api/v1/kg/invalidate)
22//! - `kg_find_paths` (POST /api/v1/kg/find_paths)
23//! - `kg_query` (POST /api/v1/kg/query)
24
25#![allow(clippy::too_many_lines)]
26
27use crate::models::Memory;
28use crate::models::field_names;
29use axum::{
30 Json,
31 extract::{Query, State},
32 http::{HeaderMap, StatusCode},
33 response::IntoResponse,
34};
35use serde::Deserialize;
36use serde_json::json;
37
38use crate::db;
39use crate::identity::sentinels;
40use crate::validate;
41
42use super::AppState;
43#[cfg(feature = "sal")]
44use super::StorageBackend;
45#[cfg(feature = "sal")]
46use super::store_err_to_response;
47
48/// Request body for `POST /api/v1/entities` (Pillar 2 / Stream B).
49#[derive(Debug, Deserialize)]
50pub struct EntityRegisterBody {
51 pub canonical_name: String,
52 pub namespace: String,
53 /// Aliases that should resolve to this entity. Blanks are skipped;
54 /// duplicates collapse via `entity_aliases`'s primary key.
55 #[serde(default)]
56 pub aliases: Vec<String>,
57 /// Arbitrary metadata to merge onto the entity memory. `kind` is
58 /// always overwritten with `"entity"`.
59 #[serde(default)]
60 pub metadata: serde_json::Value,
61 /// Override the resolved NHI for this request's
62 /// `metadata.agent_id`. Falls back to the `X-Agent-Id` header
63 /// when omitted.
64 pub agent_id: Option<String>,
65}
66
67/// Query parameters for `GET /api/v1/entities/by_alias` (Pillar 2 /
68/// Stream B).
69#[derive(Debug, Deserialize)]
70pub struct EntityByAliasQuery {
71 pub alias: String,
72 pub namespace: Option<String>,
73}
74
75/// `POST /api/v1/entities` — REST mirror of the MCP
76/// `memory_entity_register` tool. Idempotent on
77/// `(canonical_name, namespace)`; merges aliases on re-registration.
78pub async fn entity_register(
79 State(app): State<AppState>,
80 headers: HeaderMap,
81 Json(body): Json<EntityRegisterBody>,
82) -> impl IntoResponse {
83 if let Err(e) = validate::validate_title(&body.canonical_name) {
84 return (
85 StatusCode::BAD_REQUEST,
86 Json(json!({"error": format!("invalid canonical_name: {e}")})),
87 )
88 .into_response();
89 }
90 if let Err(e) = validate::validate_namespace(&body.namespace) {
91 return (
92 StatusCode::BAD_REQUEST,
93 Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
94 )
95 .into_response();
96 }
97
98 let agent_id = body
99 .agent_id
100 .as_deref()
101 .or_else(|| {
102 headers
103 .get(crate::HEADER_AGENT_ID)
104 .and_then(|v| v.to_str().ok())
105 })
106 .map(str::trim)
107 .filter(|s| !s.is_empty())
108 .map(str::to_string);
109 if let Some(aid) = agent_id.as_deref()
110 && let Err(e) = validate::validate_agent_id(aid)
111 {
112 return (
113 StatusCode::BAD_REQUEST,
114 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
115 )
116 .into_response();
117 }
118
119 let extra_metadata = if body.metadata.is_object() {
120 body.metadata.clone()
121 } else {
122 json!({})
123 };
124
125 // v0.7.0 Wave-3 Continuation — postgres-backed daemons register
126 // the entity as a regular memory (title = canonical_name,
127 // namespace = body.namespace, kind=entity in metadata) via the
128 // SAL `store` method. The wire shape mirrors the SQLite path.
129 //
130 // v0.7.0 Wave-3 Continuation 4 (Bucket E / S47) — alias-union
131 // persistence on re-register. The SAL `store` method upserts on
132 // `(title, namespace)`, but a naive overwrite of `metadata.aliases`
133 // erases any aliases registered previously. To preserve the
134 // canonical SQLite contract (`db::entity_register` unions aliases
135 // across registrations), we first list any matching entity row and
136 // union its prior aliases into the incoming set before the upsert.
137 // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): the postgres branch now
138 // rides the SAL trait `entity_register` (the alias-union walk +
139 // upsert is encapsulated inside the adapter, byte-for-byte aligned
140 // with the sqlite `db::entity_register` contract). Pre-batch5 the
141 // handler open-coded the alias union + `app.store.store` upsert in
142 // ~150 LOC; the trait method collapses that to a single call. The
143 // governance enforcement gate (F-A2A1.5 / #705) is preserved
144 // verbatim — entity rows remain governance-relevant writes and
145 // must consult the namespace policy before the underlying upsert.
146 #[cfg(feature = "sal")]
147 if matches!(app.storage_backend, StorageBackend::Postgres) {
148 let aid = agent_id
149 .clone()
150 .unwrap_or_else(|| "anonymous:entity-register".to_string());
151 let ctx = crate::store::CallerContext::for_agent(aid.clone());
152
153 // F-A2A1.5 (#705) — governance enforcement runs BEFORE the
154 // entity_register trait call so deny / pending / 403 / 202
155 // semantics match the sqlite path. The payload shape is the
156 // canonical entity-create body so a downstream approver replay
157 // can reconstruct the registration on `execute_pending_action`.
158 {
159 use crate::models::GovernanceDecision;
160 let payload_for_pending = serde_json::json!({
161 "title": body.canonical_name,
162 "namespace": body.namespace,
163 "tier": "long",
164 "tags": ["entity"],
165 "metadata": &extra_metadata,
166 "aliases": &body.aliases,
167 });
168 match app
169 .store
170 .enforce_governance_action(
171 crate::store::GovernedAction::Store,
172 &body.namespace,
173 &aid,
174 None,
175 None,
176 &payload_for_pending,
177 )
178 .await
179 {
180 Ok(GovernanceDecision::Allow) => {}
181 Ok(GovernanceDecision::Deny(refusal)) => {
182 return (
183 StatusCode::FORBIDDEN,
184 Json(json!({
185 "error": crate::governance::deny_message(
186 "entity_register",
187 crate::governance::DenyGate::Governance,
188 &refusal.reason,
189 ),
190 })),
191 )
192 .into_response();
193 }
194 Ok(GovernanceDecision::Pending(pending_id)) => {
195 return (
196 StatusCode::ACCEPTED,
197 Json(json!({
198 "status": "pending",
199 (field_names::PENDING_ID): pending_id,
200 "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
201 "action": "store",
202 "namespace": body.namespace,
203 (field_names::STORAGE_BACKEND): "postgres",
204 })),
205 )
206 .into_response();
207 }
208 Err(e) => return store_err_to_response(e),
209 }
210 }
211
212 return match app
213 .store
214 .entity_register(
215 &ctx,
216 &body.canonical_name,
217 &body.namespace,
218 &body.aliases,
219 &extra_metadata,
220 Some(&aid),
221 )
222 .await
223 {
224 Ok(reg) => (
225 if reg.created {
226 StatusCode::CREATED
227 } else {
228 StatusCode::OK
229 },
230 Json(json!({
231 "entity_id": reg.entity_id,
232 (field_names::CANONICAL_NAME): reg.canonical_name,
233 "namespace": reg.namespace,
234 "aliases": reg.aliases,
235 "created": reg.created,
236 })),
237 )
238 .into_response(),
239 Err(e) => store_err_to_response(e),
240 };
241 }
242
243 let lock = app.db.lock().await;
244 match db::entity_register(
245 &lock.0,
246 &body.canonical_name,
247 &body.namespace,
248 &body.aliases,
249 &extra_metadata,
250 agent_id.as_deref(),
251 ) {
252 Ok(reg) => {
253 let status = if reg.created {
254 StatusCode::CREATED
255 } else {
256 StatusCode::OK
257 };
258 (
259 status,
260 Json(json!({
261 "entity_id": reg.entity_id,
262 (field_names::CANONICAL_NAME): reg.canonical_name,
263 "namespace": reg.namespace,
264 "aliases": reg.aliases,
265 "created": reg.created,
266 })),
267 )
268 .into_response()
269 }
270 Err(e) => {
271 // Title-collision errors carry a stable, recognisable
272 // substring; surface them as 409 Conflict so callers can
273 // distinguish a genuine name clash from internal failure.
274 let msg = e.to_string();
275 if msg.contains("non-entity memory") {
276 return (StatusCode::CONFLICT, Json(json!({"error": msg}))).into_response();
277 }
278 crate::handlers::errors::handler_error_500(&e)
279 }
280 }
281}
282
283/// `GET /api/v1/entities/by_alias?alias=<>&namespace=<>` — REST mirror
284/// of the MCP `memory_entity_get_by_alias` tool. Returns
285/// `{ found: false, ... }` with HTTP 200 when no entity claims the
286/// alias under the filter, so callers don't have to disambiguate
287/// "no match" from a server error.
288pub async fn entity_get_by_alias(
289 State(app): State<AppState>,
290 headers: axum::http::HeaderMap,
291 Query(p): Query<EntityByAliasQuery>,
292) -> impl IntoResponse {
293 #[cfg(not(feature = "sal"))]
294 let _ = &headers;
295 let alias = p.alias.trim();
296 if alias.is_empty() {
297 return (
298 StatusCode::BAD_REQUEST,
299 Json(json!({"error": "alias is required"})),
300 )
301 .into_response();
302 }
303 let namespace = p
304 .namespace
305 .as_deref()
306 .map(str::trim)
307 .filter(|s| !s.is_empty());
308 if let Some(ns) = namespace
309 && let Err(e) = validate::validate_namespace(ns)
310 {
311 return (
312 StatusCode::BAD_REQUEST,
313 Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
314 )
315 .into_response();
316 }
317
318 // v0.7.0 ARCH-2 followup (FX-C2-batch3) — postgres-backed daemons
319 // route through `MemoryStore::entity_get_by_alias` first for an
320 // exact-alias match (the canonical resolution path). When the
321 // dedicated trait method returns `Ok(None)` we fall back to the
322 // legacy SAL `list` walk to preserve the `m.title.eq_ignore_ascii_case`
323 // fallback (alias-or-title match) and `metadata.aliases` array
324 // walk. Visibility filtering applies to the fallback path
325 // identically to pre-fix behaviour.
326 #[cfg(feature = "sal-postgres")]
327 if matches!(app.storage_backend, StorageBackend::Postgres) {
328 // 1. Trait-method exact-alias match — sqlx-native, single
329 // indexed lookup against `entity_aliases`.
330 match app.store.entity_get_by_alias(alias, namespace).await {
331 Ok(Some(rec)) => {
332 // Apply the post-fix visibility mask: hide the entity
333 // if the caller cannot see the underlying memory row.
334 let caller = {
335 let header_agent_id = headers
336 .get(crate::HEADER_AGENT_ID)
337 .and_then(|v| v.to_str().ok());
338 crate::identity::resolve_http_agent_id(None, header_agent_id)
339 .unwrap_or_else(|_| crate::identity::anonymous_request_id())
340 };
341 let caller_is_admin =
342 crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
343 let ctx_admin =
344 crate::store::CallerContext::for_admin_checked(caller.clone(), caller_is_admin);
345 let visible = caller_is_admin
346 || app
347 .store
348 .get(&ctx_admin, &rec.entity_id)
349 .await
350 .ok()
351 .as_ref()
352 .is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
353 if visible {
354 return Json(json!({
355 "found": true,
356 "entity_id": rec.entity_id,
357 (field_names::CANONICAL_NAME): rec.canonical_name,
358 "namespace": rec.namespace,
359 "aliases": rec.aliases,
360 }))
361 .into_response();
362 }
363 // Fall through to fallback-walk shape on visibility mask
364 // — emits the `found:false` envelope below.
365 }
366 Ok(None) => { /* fall through to title-fallback walk */ }
367 Err(e) => return store_err_to_response(e),
368 }
369 // 2. Fallback walk: legacy SAL `list` so the title-eq-alias
370 // branch and `metadata.aliases` array case-insensitive
371 // match stay functional.
372 let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
373 let filter = crate::store::Filter {
374 namespace: namespace.map(str::to_string),
375 limit: 1000,
376 ..Default::default()
377 };
378 return match app.store.list(&ctx, &filter).await {
379 Ok(memories) => {
380 for m in &memories {
381 let Some(meta) = m.metadata.as_object() else {
382 continue;
383 };
384 let Some(kind) = meta.get("kind").and_then(|v| v.as_str()) else {
385 continue;
386 };
387 if kind != "entity" {
388 continue;
389 }
390 // #869 audit (Category B — safe default): an entity
391 // with no `aliases` array collapses to empty
392 // `Vec<String>`; the lookup falls through to the
393 // `m.title.eq_ignore_ascii_case(alias)` branch.
394 let aliases: Vec<String> = meta
395 .get("aliases")
396 .and_then(|v| v.as_array())
397 .map(|a| {
398 a.iter()
399 .filter_map(|x| x.as_str().map(str::to_string))
400 .collect()
401 })
402 .unwrap_or_default();
403 if aliases.iter().any(|a| a.eq_ignore_ascii_case(alias))
404 || m.title.eq_ignore_ascii_case(alias)
405 {
406 return Json(json!({
407 "found": true,
408 "entity_id": m.id,
409 (field_names::CANONICAL_NAME): m.title,
410 "namespace": m.namespace,
411 "aliases": aliases,
412 }))
413 .into_response();
414 }
415 }
416 Json(json!({
417 "found": false,
418 "entity_id": null,
419 (field_names::CANONICAL_NAME): null,
420 "namespace": null,
421 "aliases": [],
422 }))
423 .into_response()
424 }
425 Err(e) => store_err_to_response(e),
426 };
427 }
428
429 // #947 SECURITY-medium (Track A QC sweep, 2026-05-20) — resolve
430 // caller for the visibility post-filter on entity aliases. Pre-fix
431 // any caller could resolve a private entity by alias in the sqlite
432 // branch. Admin bypasses the filter.
433 let caller = {
434 let header_agent_id = headers
435 .get(crate::HEADER_AGENT_ID)
436 .and_then(|v| v.to_str().ok());
437 crate::identity::resolve_http_agent_id(None, header_agent_id)
438 .unwrap_or_else(|_| crate::identity::anonymous_request_id())
439 };
440 let caller_is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
441
442 let lock = app.db.lock().await;
443 match db::entity_get_by_alias(&lock.0, alias, namespace) {
444 Ok(Some(rec)) => {
445 // Mask the entity if the caller cannot see the underlying
446 // memory row. The `entity_id` IS the memory id by the
447 // entity-as-memory contract; if the row exists and is not
448 // visible to the caller, return the found:false shape
449 // (existence-leak mask). If the row is missing entirely
450 // (e.g. legacy entity-alias row without a backing memory),
451 // fall through to the visible path — the alias is a
452 // namespace-scoped pointer, not a private secret.
453 let visible = caller_is_admin
454 || db::get(&lock.0, &rec.entity_id)
455 .ok()
456 .flatten()
457 .as_ref()
458 .is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
459 if !visible {
460 return Json(json!({
461 "found": false,
462 "entity_id": null,
463 (field_names::CANONICAL_NAME): null,
464 "namespace": null,
465 "aliases": [],
466 }))
467 .into_response();
468 }
469 Json(json!({
470 "found": true,
471 "entity_id": rec.entity_id,
472 (field_names::CANONICAL_NAME): rec.canonical_name,
473 "namespace": rec.namespace,
474 "aliases": rec.aliases,
475 }))
476 .into_response()
477 }
478 Ok(None) => Json(json!({
479 "found": false,
480 "entity_id": null,
481 (field_names::CANONICAL_NAME): null,
482 "namespace": null,
483 "aliases": [],
484 }))
485 .into_response(),
486 Err(e) => crate::handlers::errors::handler_error_500(&e),
487 }
488}
489
490/// Query parameters for `GET /api/v1/kg/timeline` (Pillar 2 / Stream C).
491#[derive(Debug, Deserialize)]
492pub struct KgTimelineQuery {
493 pub source_id: String,
494 pub since: Option<String>,
495 pub until: Option<String>,
496 pub limit: Option<usize>,
497}
498
499/// `GET /api/v1/kg/timeline?source_id=<>&since=<>&until=<>&limit=<>` —
500/// REST mirror of the MCP `memory_kg_timeline` tool. Returns outbound
501/// link assertions from `source_id` ordered by `valid_from ASC`.
502pub async fn kg_timeline(
503 State(app): State<AppState>,
504 headers: axum::http::HeaderMap,
505 Query(p): Query<KgTimelineQuery>,
506) -> impl IntoResponse {
507 if let Err(e) = validate::validate_id(&p.source_id) {
508 return (
509 StatusCode::BAD_REQUEST,
510 Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
511 )
512 .into_response();
513 }
514 let since = p.since.as_deref().map(str::trim).filter(|s| !s.is_empty());
515 let until = p.until.as_deref().map(str::trim).filter(|s| !s.is_empty());
516 if let Some(s) = since
517 && let Err(e) = validate::validate_expires_at_format(s)
518 {
519 return (
520 StatusCode::BAD_REQUEST,
521 Json(json!({"error": format!("invalid since: {e}")})),
522 )
523 .into_response();
524 }
525 if let Some(u) = until
526 && let Err(e) = validate::validate_expires_at_format(u)
527 {
528 return (
529 StatusCode::BAD_REQUEST,
530 Json(json!({"error": format!("invalid until: {e}")})),
531 )
532 .into_response();
533 }
534
535 // #944 SECURITY-high (Track A QC sweep, 2026-05-20) —
536 // caller-vs-source-memory-owner gate. Pre-fix the GET handler
537 // took NO `headers: HeaderMap` parameter, so any authenticated
538 // caller could read the full outbound link-event timeline
539 // (`target_id`, `relation`, `valid_from`, `valid_until`,
540 // `observed_by`, `title`, `target_namespace`) for ANY source_id
541 // — including memories owned by other tenants. Cross-tenant
542 // info-leak on the temporal-graph surface. Mirrors the #938
543 // `kg_invalidate` gate shape (commit 54706eeed, same file) and
544 // the #937 `delete_memory` shape (commit a582bdc5b).
545 let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
546 Ok(c) => c,
547 Err(err) => {
548 return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
549 }
550 };
551
552 // Fetch the source memory + verify caller owns it (or is the
553 // inbox target, or the row is unowned-legacy, or caller is
554 // "daemon" sentinel). Mirrors the gate shape in #938
555 // kg_invalidate and #937 delete_memory.
556 //
557 // #1134: branch on storage_backend so postgres-backed daemons
558 // read the source memory from postgres instead of the empty
559 // SQLite scratch connection (which made every postgres-backed
560 // kg_timeline call return 404 regardless of the actual memory's
561 // existence in the live store).
562 let extract_owner_target = |mem: &Memory| -> (String, String) {
563 let owner = mem
564 .metadata
565 .get("agent_id")
566 .and_then(|v| v.as_str())
567 .unwrap_or("")
568 .to_string();
569 let target = mem
570 .metadata
571 .get(field_names::TARGET_AGENT_ID)
572 .and_then(|v| v.as_str())
573 .unwrap_or("")
574 .to_string();
575 (owner, target)
576 };
577 let source_owner: Option<(String, String)> = {
578 #[cfg(feature = "sal")]
579 if matches!(app.storage_backend, StorageBackend::Postgres) {
580 let ctx = crate::store::CallerContext::for_agent(caller.clone());
581 match app.store.get(&ctx, &p.source_id).await {
582 Ok(mem) => Some(extract_owner_target(&mem)),
583 Err(e) => {
584 let msg = format!("{e:?}");
585 if msg.contains("NotFound") || msg.contains("not found") {
586 None
587 } else {
588 tracing::error!("kg_timeline: source lookup failed (postgres): {e:?}");
589 return (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
592 )
593 .into_response();
594 }
595 }
596 }
597 } else {
598 let lock = app.db.lock().await;
599 match db::get(&lock.0, &p.source_id) {
600 Ok(Some(mem)) => Some(extract_owner_target(&mem)),
601 Ok(None) => None,
602 Err(e) => {
603 tracing::error!("kg_timeline: source lookup failed: {e}");
604 return (
605 StatusCode::INTERNAL_SERVER_ERROR,
606 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
607 )
608 .into_response();
609 }
610 }
611 }
612 #[cfg(not(feature = "sal"))]
613 {
614 let lock = app.db.lock().await;
615 match db::get(&lock.0, &p.source_id) {
616 Ok(Some(mem)) => Some(extract_owner_target(&mem)),
617 Ok(None) => None,
618 Err(e) => {
619 tracing::error!("kg_timeline: source lookup failed: {e}");
620 return (
621 StatusCode::INTERNAL_SERVER_ERROR,
622 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
623 )
624 .into_response();
625 }
626 }
627 }
628 };
629 let Some((owner, target)) = source_owner else {
630 return (
631 StatusCode::NOT_FOUND,
632 Json(json!({
633 "found": false,
634 "source_id": p.source_id,
635 "error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
636 })),
637 )
638 .into_response();
639 };
640 let is_unowned_legacy = owner.is_empty();
641 if !is_unowned_legacy
642 && owner != caller
643 && target != caller
644 && caller != sentinels::DAEMON_PRINCIPAL
645 {
646 tracing::warn!(
647 target: super::AUTHZ_TRACE_TARGET,
648 "GET /api/v1/kg/timeline 403: caller {caller} != owner {owner} (source_id={})",
649 p.source_id
650 );
651 return (
652 StatusCode::FORBIDDEN,
653 Json(json!({
654 "error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
655 "owner": owner,
656 "caller": caller,
657 "source_id": p.source_id,
658 })),
659 )
660 .into_response();
661 }
662
663 // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): postgres dispatches via
664 // the new `MemoryStore::kg_timeline` trait method (the SAL is the
665 // canonical kg_timeline surface). The legacy
666 // `kg_timeline_via_store` helper stays in place for out-of-tree
667 // back-compat but new routes ride the trait. The adapter still
668 // resolves AGE vs CTE backend at connect time and projects rows in
669 // the shared `KgTimelineRow` shape so the wire envelope stays
670 // parity-equal to the SQLite path.
671 #[cfg(feature = "sal-postgres")]
672 if matches!(app.storage_backend, StorageBackend::Postgres) {
673 let limit = p.limit;
674 return match app
675 .store
676 .kg_timeline(&p.source_id, since, until, limit)
677 .await
678 {
679 Ok(events) => {
680 let events_json: Vec<serde_json::Value> = events
681 .iter()
682 .map(|e| {
683 json!({
684 "target_id": e.target_id,
685 "relation": e.relation,
686 (field_names::VALID_FROM): e.valid_from,
687 (field_names::VALID_UNTIL): e.valid_until,
688 (field_names::OBSERVED_BY): e.observed_by,
689 "title": e.title,
690 (field_names::TARGET_NAMESPACE): e.target_namespace,
691 })
692 })
693 .collect();
694 Json(json!({
695 "source_id": p.source_id,
696 "events": events_json,
697 "count": events.len(),
698 }))
699 .into_response()
700 }
701 Err(e) => store_err_to_response(e),
702 };
703 }
704
705 let lock = app.db.lock().await;
706 match db::kg_timeline(&lock.0, &p.source_id, since, until, p.limit) {
707 Ok(events) => {
708 let events_json: Vec<serde_json::Value> = events
709 .iter()
710 .map(|e| {
711 json!({
712 "target_id": e.target_id,
713 "relation": e.relation,
714 (field_names::VALID_FROM): e.valid_from,
715 (field_names::VALID_UNTIL): e.valid_until,
716 (field_names::OBSERVED_BY): e.observed_by,
717 "title": e.title,
718 (field_names::TARGET_NAMESPACE): e.target_namespace,
719 })
720 })
721 .collect();
722 Json(json!({
723 "source_id": p.source_id,
724 "events": events_json,
725 "count": events.len(),
726 }))
727 .into_response()
728 }
729 Err(e) => crate::handlers::errors::handler_error_500(&e),
730 }
731}
732
733/// JSON body for `POST /api/v1/kg/invalidate` (Pillar 2 / Stream C —
734/// `memory_kg_invalidate`). The link is identified by its composite
735/// key; `valid_until` defaults to wall-clock now when omitted.
736#[derive(Debug, Deserialize)]
737pub struct KgInvalidateBody {
738 pub source_id: String,
739 pub target_id: String,
740 pub relation: String,
741 pub valid_until: Option<String>,
742}
743
744/// `POST /api/v1/kg/invalidate` — REST mirror of `memory_kg_invalidate`.
745/// 200 with `{found: true, …, previous_valid_until}` when the link
746/// existed; 404 with `{found: false}` when no link matches the triple.
747pub async fn kg_invalidate(
748 State(app): State<AppState>,
749 headers: axum::http::HeaderMap,
750 Json(body): Json<KgInvalidateBody>,
751) -> impl IntoResponse {
752 if let Err(e) = validate::RequestValidator::validate_link_triple(
753 &body.source_id,
754 &body.target_id,
755 &body.relation,
756 ) {
757 return (
758 StatusCode::BAD_REQUEST,
759 Json(json!({"error": e.to_string()})),
760 )
761 .into_response();
762 }
763 let valid_until = body
764 .valid_until
765 .as_deref()
766 .map(str::trim)
767 .filter(|s| !s.is_empty());
768 if let Some(ts) = valid_until
769 && let Err(e) = validate::validate_expires_at_format(ts)
770 {
771 return (
772 StatusCode::BAD_REQUEST,
773 Json(json!({"error": format!("invalid valid_until: {e}")})),
774 )
775 .into_response();
776 }
777
778 // #938 SECURITY-high (Track A QC sweep, 2026-05-20) —
779 // caller-vs-source-memory-owner gate. Pre-fix any HTTP caller
780 // could forge temporal-graph state by invalidating another
781 // tenant's `:supersedes` / `:contradicts` / governance edges via
782 // `valid_until = now()`, hiding contradiction history. Mirrors the
783 // #930 caller-vs-owner gate shape on update/promote.
784 let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
785 Ok(c) => c,
786 Err(err) => {
787 return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
788 }
789 };
790
791 // Fetch the source memory + verify caller owns it (or is the
792 // inbox target, or the row is unowned-legacy, or caller is
793 // "daemon" sentinel). Mirrors the gate shape in #930 update_memory
794 // and #936 archive_purge.
795 let source_owner: Option<(String, String)> = {
796 let lock = app.db.lock().await;
797 match db::get(&lock.0, &body.source_id) {
798 Ok(Some(mem)) => {
799 let owner = mem
800 .metadata
801 .get("agent_id")
802 .and_then(|v| v.as_str())
803 .unwrap_or("")
804 .to_string();
805 let target = mem
806 .metadata
807 .get(field_names::TARGET_AGENT_ID)
808 .and_then(|v| v.as_str())
809 .unwrap_or("")
810 .to_string();
811 Some((owner, target))
812 }
813 Ok(None) => None,
814 Err(e) => {
815 tracing::error!("kg_invalidate: source lookup failed: {e}");
816 return (
817 StatusCode::INTERNAL_SERVER_ERROR,
818 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
819 )
820 .into_response();
821 }
822 }
823 };
824 let Some((owner, target)) = source_owner else {
825 return (
826 StatusCode::NOT_FOUND,
827 Json(json!({
828 "found": false,
829 "source_id": body.source_id,
830 "target_id": body.target_id,
831 "relation": body.relation,
832 "error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
833 })),
834 )
835 .into_response();
836 };
837 let is_unowned_legacy = owner.is_empty();
838 if !is_unowned_legacy
839 && owner != caller
840 && target != caller
841 && caller != sentinels::DAEMON_PRINCIPAL
842 {
843 tracing::warn!(
844 target: super::AUTHZ_TRACE_TARGET,
845 "POST /api/v1/kg/invalidate 403: caller {caller} != owner {owner} (source_id={})",
846 body.source_id
847 );
848 return (
849 StatusCode::FORBIDDEN,
850 Json(json!({
851 "error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
852 "owner": owner,
853 "caller": caller,
854 "source_id": body.source_id,
855 })),
856 )
857 .into_response();
858 }
859
860 // v0.7.0 SAL-routing batch-4 (FX-C2) — postgres dispatches via the
861 // canonical `MemoryStore::invalidate_link` trait method. The
862 // pre-fix `kg_invalidate_via_store` helper (an `as_any_for_postgres`
863 // downcast hatch) stays in place for back-compat callers but new
864 // routes ride the trait surface — no SAL-boundary bypass.
865 #[cfg(feature = "sal-postgres")]
866 if matches!(app.storage_backend, StorageBackend::Postgres) {
867 return match app
868 .store
869 .invalidate_link(
870 &body.source_id,
871 &body.target_id,
872 &body.relation,
873 valid_until,
874 )
875 .await
876 {
877 Ok(res) if res.found => (
878 StatusCode::OK,
879 Json(json!({
880 "found": true,
881 "source_id": body.source_id,
882 "target_id": body.target_id,
883 "relation": body.relation,
884 (field_names::VALID_UNTIL): res.valid_until,
885 (field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
886 })),
887 )
888 .into_response(),
889 Ok(_) => (
890 StatusCode::NOT_FOUND,
891 Json(json!({
892 "found": false,
893 "source_id": body.source_id,
894 "target_id": body.target_id,
895 "relation": body.relation,
896 })),
897 )
898 .into_response(),
899 Err(e) => store_err_to_response(e),
900 };
901 }
902
903 let lock = app.db.lock().await;
904 match db::invalidate_link(
905 &lock.0,
906 &body.source_id,
907 &body.target_id,
908 &body.relation,
909 valid_until,
910 ) {
911 Ok(Some(res)) => (
912 StatusCode::OK,
913 Json(json!({
914 "found": true,
915 "source_id": body.source_id,
916 "target_id": body.target_id,
917 "relation": body.relation,
918 (field_names::VALID_UNTIL): res.valid_until,
919 (field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
920 })),
921 )
922 .into_response(),
923 Ok(None) => (
924 StatusCode::NOT_FOUND,
925 Json(json!({
926 "found": false,
927 "source_id": body.source_id,
928 "target_id": body.target_id,
929 "relation": body.relation,
930 })),
931 )
932 .into_response(),
933 Err(e) => crate::handlers::errors::handler_error_500(&e),
934 }
935}
936
937/// JSON body for `POST /api/v1/kg/find_paths`.
938///
939/// `source_id` + `target_id` are required. `max_depth` defaults to the
940/// adapter's `FIND_PATHS_DEFAULT_DEPTH`; `max_results` clamps the
941/// returned path count.
942#[derive(Debug, Deserialize)]
943pub struct FindPathsBody {
944 /// Source memory id. Accepts the legacy `from_id` alias for
945 /// compatibility with the MCP `memory_find_paths` tool, the CLI
946 /// `find-paths --from`, and pre-v0.7.0 docs (#934 field-name drift
947 /// fix, 2026-05-20).
948 #[serde(alias = "from_id")]
949 pub source_id: String,
950 /// Target memory id. Accepts the legacy `to_id` alias for the same
951 /// MCP / CLI / docs compatibility surface as `source_id`.
952 #[serde(alias = "to_id")]
953 pub target_id: String,
954 #[serde(default)]
955 pub max_depth: Option<usize>,
956 #[serde(default)]
957 pub max_results: Option<usize>,
958}
959
960/// `POST /api/v1/kg/find_paths` — enumerate up to N paths between two
961/// memories. Wraps the SAL [`MemoryStore::find_paths`] surface so both
962/// SQLite (recursive CTE) and Postgres (AGE Cypher / CTE fallback)
963/// dispatch through the same handler.
964///
965/// Wire shape: `{paths: [[id, id, ...], ...], count}`. Each inner
966/// array is the chain of memory ids from `source_id` to `target_id`,
967/// inclusive.
968pub async fn kg_find_paths(
969 State(app): State<AppState>,
970 headers: HeaderMap,
971 Json(body): Json<FindPathsBody>,
972) -> impl IntoResponse {
973 if let Err(e) = validate::validate_id(&body.source_id) {
974 return (
975 StatusCode::BAD_REQUEST,
976 Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
977 )
978 .into_response();
979 }
980 if let Err(e) = validate::validate_id(&body.target_id) {
981 return (
982 StatusCode::BAD_REQUEST,
983 Json(json!({"error": format!("invalid target_id: {e}")})),
984 )
985 .into_response();
986 }
987
988 // #910 SAL-level — resolve the caller so the trait method's
989 // visibility filter (path-traversal flavour) sees the right
990 // principal. Header-only authentication on this POST surface;
991 // anonymous callers get a per-request `anonymous:req-…` id.
992 let header_agent_id = headers
993 .get(crate::HEADER_AGENT_ID)
994 .and_then(|v| v.to_str().ok());
995 let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
996 Ok(id) => id,
997 Err(e) => {
998 return (
999 StatusCode::BAD_REQUEST,
1000 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
1001 )
1002 .into_response();
1003 }
1004 };
1005
1006 #[cfg(feature = "sal")]
1007 {
1008 let ctx = crate::store::CallerContext::for_agent(&caller);
1009 return match app
1010 .store
1011 .find_paths(
1012 &ctx,
1013 &body.source_id,
1014 &body.target_id,
1015 body.max_depth,
1016 body.max_results,
1017 )
1018 .await
1019 {
1020 Ok(paths) => {
1021 if crate::audit::is_enabled() {
1022 crate::audit::emit(crate::audit::EventBuilder::new(
1023 crate::audit::AuditAction::Recall,
1024 crate::audit::actor(sentinels::AI_HTTP, "http_body", None),
1025 crate::audit::target_memory(
1026 body.source_id.clone(),
1027 String::new(),
1028 Some(format!("find_paths -> {}", body.target_id)),
1029 None,
1030 None,
1031 ),
1032 ));
1033 }
1034 let count = paths.len();
1035 Json(json!({
1036 "paths": paths,
1037 "count": count,
1038 "source_id": body.source_id,
1039 "target_id": body.target_id,
1040 }))
1041 .into_response()
1042 }
1043 Err(e) => {
1044 let msg = e.to_string();
1045 if msg.contains("max_depth") || msg.contains("depth") {
1046 return (
1047 StatusCode::UNPROCESSABLE_ENTITY,
1048 Json(json!({"error": msg})),
1049 )
1050 .into_response();
1051 }
1052 store_err_to_response(e)
1053 }
1054 };
1055 }
1056
1057 #[cfg(not(feature = "sal"))]
1058 {
1059 let _ = app;
1060 let _ = body;
1061 let _ = caller;
1062 (
1063 StatusCode::NOT_IMPLEMENTED,
1064 Json(json!({"error": "find_paths requires --features sal"})),
1065 )
1066 .into_response()
1067 }
1068}
1069
1070/// JSON body for `POST /api/v1/kg/query` (Pillar 2 / Stream C —
1071/// `memory_kg_query`). POST is used because `allowed_agents` is a list;
1072/// keeping it in a body avoids over-long query strings and keeps the
1073/// surface symmetric with `POST /api/v1/kg/invalidate`. `max_depth`
1074/// defaults to 1 and is bounded by `KG_QUERY_MAX_SUPPORTED_DEPTH`.
1075#[derive(Debug, Deserialize)]
1076pub struct KgQueryBody {
1077 /// Canonical name. Aliased by `from` (S82's wire shape).
1078 #[serde(default)]
1079 pub source_id: Option<String>,
1080 /// `from` alias for `source_id` — the cert harness S82 uses
1081 /// `{from, to, max_depth, rel_types}`.
1082 #[serde(default)]
1083 pub from: Option<String>,
1084 /// Optional target id — when present the query is interpreted as
1085 /// a find-path between (`source_id`, `to`); kg_query's existing
1086 /// surface ignores it but accepting it keeps the wire shape
1087 /// flexible for the cert harness.
1088 #[serde(default)]
1089 pub to: Option<String>,
1090 pub max_depth: Option<usize>,
1091 pub valid_at: Option<String>,
1092 pub allowed_agents: Option<Vec<String>>,
1093 pub limit: Option<usize>,
1094 /// NHI-P3-T7 (v0.7.0 NHI testing): when omitted or false, the
1095 /// "current view" filter excludes edges whose `valid_until` lies
1096 /// in the past (invalidated via `memory_kg_invalidate`). Pass
1097 /// `true` to traverse the full historical link graph.
1098 #[serde(default)]
1099 pub include_invalidated: bool,
1100 /// Optional relation-type filter — accepted for forward-compat
1101 /// with the find_paths shape; unused on the current trait
1102 /// surface (CTE walks `:related_to` only).
1103 #[serde(default)]
1104 pub rel_types: Option<Vec<String>>,
1105}
1106
1107/// #910 (security-medium, 2026-05-19) — apply the scope=private
1108/// visibility filter on `POST /api/v1/kg/query` traversal results.
1109/// Pre-#910 the handler returned every reachable target node from
1110/// the recursive-CTE / AGE Cypher walk; a target whose
1111/// `metadata.scope == "private"` was visible to any caller who could
1112/// pass `kg_query` validation, including callers other than the
1113/// target's `metadata.agent_id` owner. The fix mirrors the post-
1114/// filter applied in `memories_query::list_memories` — a row is
1115/// visible iff `metadata.scope != "private"` OR
1116/// `metadata.agent_id == caller`. Rows we cannot fetch (deleted
1117/// since the traversal, in another namespace the caller cannot
1118/// read, etc.) fail-closed (excluded).
1119#[cfg(feature = "sal-postgres")]
1120async fn kg_query_filter_visible(
1121 app: &AppState,
1122 caller: &str,
1123 target_ids: Vec<String>,
1124) -> std::collections::HashSet<String> {
1125 // v0.7.0 F-E3 fix (issue #1436): route through the canonical
1126 // `crate::visibility::is_visible_to_caller` helper instead of
1127 // reimplementing the predicate inline. The pre-fix copy was missing
1128 // the `target_agent_id` inbox carve-out (the same defect class #951
1129 // closed at the SAL layer — the kg post-filter pre-dated that fix
1130 // and silently dropped inbox rows the recipient was entitled to
1131 // see).
1132 use std::collections::HashSet;
1133 let mut visible: HashSet<String> = HashSet::with_capacity(target_ids.len());
1134 let ctx = crate::store::CallerContext::for_agent(caller);
1135 for id in target_ids {
1136 if let Ok(mem) = app.store.get(&ctx, &id).await {
1137 if crate::visibility::is_visible_to_caller(&mem, caller) {
1138 visible.insert(id);
1139 }
1140 }
1141 }
1142 visible
1143}
1144
1145/// `POST /api/v1/kg/query` — REST mirror of the MCP `memory_kg_query`
1146/// tool. Returns outbound multi-hop traversal from `source_id` (1..=5
1147/// hops) filtered by the temporal/agent windows. 400 for invalid
1148/// IDs/timestamps; 422 when `max_depth` exceeds the supported ceiling
1149/// (clearer than 500 for what is a documented limitation, not an
1150/// internal error).
1151pub async fn kg_query(
1152 State(app): State<AppState>,
1153 headers: HeaderMap,
1154 Json(body): Json<KgQueryBody>,
1155) -> impl IntoResponse {
1156 // #910 (security-medium, 2026-05-19) — resolve the caller via the
1157 // `X-Agent-Id` header so the scope=private visibility filter
1158 // below has a known principal to compare `metadata.agent_id`
1159 // against. Pre-#910 `kg_query` returned every reachable target
1160 // node regardless of the target memory's `metadata.scope` — a
1161 // caller could enumerate scope=private targets owned by other
1162 // agents by walking from a public source row. Anonymous callers
1163 // get a per-request `anonymous:req-…` id and see only
1164 // non-private targets.
1165 let header_agent_id = headers
1166 .get(crate::HEADER_AGENT_ID)
1167 .and_then(|v| v.to_str().ok());
1168 let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
1169 Ok(id) => id,
1170 Err(e) => {
1171 return (
1172 StatusCode::BAD_REQUEST,
1173 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
1174 )
1175 .into_response();
1176 }
1177 };
1178
1179 // S82's wire shape sends `from` instead of `source_id`; resolve
1180 // the canonical id from either field with `source_id` taking
1181 // precedence when both are supplied.
1182 //
1183 // #869 audit (Category B — safe default): empty `String` flows
1184 // into `validate_id` below which returns a typed 400 with the
1185 // "invalid source_id" envelope.
1186 let source_id = body
1187 .source_id
1188 .clone()
1189 .or_else(|| body.from.clone())
1190 .unwrap_or_default();
1191 if let Err(e) = validate::validate_id(&source_id) {
1192 return (
1193 StatusCode::BAD_REQUEST,
1194 Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
1195 )
1196 .into_response();
1197 }
1198 let max_depth = body.max_depth.unwrap_or(1);
1199 let valid_at = body
1200 .valid_at
1201 .as_deref()
1202 .map(str::trim)
1203 .filter(|s| !s.is_empty());
1204 if let Some(t) = valid_at
1205 && let Err(e) = validate::validate_expires_at_format(t)
1206 {
1207 return (
1208 StatusCode::BAD_REQUEST,
1209 Json(json!({"error": format!("invalid valid_at: {e}")})),
1210 )
1211 .into_response();
1212 }
1213 let allowed_agents: Option<Vec<String>> = body.allowed_agents.as_ref().map(|v| {
1214 v.iter()
1215 .map(|s| s.trim().to_string())
1216 .filter(|s| !s.is_empty())
1217 .collect()
1218 });
1219 if let Some(agents) = allowed_agents.as_ref() {
1220 for a in agents {
1221 if let Err(e) = validate::validate_agent_id(a) {
1222 return (
1223 StatusCode::BAD_REQUEST,
1224 Json(json!({"error": format!("invalid allowed_agents entry: {e}")})),
1225 )
1226 .into_response();
1227 }
1228 }
1229 }
1230
1231 // v0.7.0 ARCH-2 FX-C2-batch5 (2026-05-27): postgres dispatches via
1232 // the new `MemoryStore::kg_query` trait method (the SAL is the
1233 // canonical kg_query surface). The legacy `kg_query_via_store`
1234 // helper stays in place for out-of-tree back-compat but new routes
1235 // ride the trait. Backend (AGE vs CTE) is still resolved at
1236 // adapter connect time inside `kg_query_with_history`.
1237 // Temporal/agent filters are applied client-side post-traversal
1238 // because the AGE Cypher path returns the unfiltered topology —
1239 // match the SQLite recursive-CTE wire shape.
1240 #[cfg(feature = "sal-postgres")]
1241 if matches!(app.storage_backend, StorageBackend::Postgres) {
1242 return match app
1243 .store
1244 .kg_query(&source_id, max_depth, body.include_invalidated)
1245 .await
1246 {
1247 Ok(nodes) => {
1248 // #910 — fetch each target's metadata, filter by the
1249 // scope=private visibility rule (see
1250 // `kg_query_filter_visible`). Pre-#910 every reachable
1251 // target was returned verbatim regardless of the
1252 // target's owner / scope.
1253 let target_ids: Vec<String> = nodes.iter().map(|n| n.target_id.clone()).collect();
1254 let visible = kg_query_filter_visible(&app, &caller, target_ids).await;
1255 let nodes: Vec<_> = nodes
1256 .into_iter()
1257 .filter(|n| visible.contains(&n.target_id))
1258 .collect();
1259
1260 // S82's wire shape — when `to` is supplied, project a
1261 // single-path `paths` array of node-id chains so the
1262 // find-paths style consumer can read the result back
1263 // without a separate `find_paths` route.
1264 let memories_json: Vec<serde_json::Value> = nodes
1265 .iter()
1266 .map(|n| {
1267 json!({
1268 "target_id": n.target_id,
1269 "relation": n.relation,
1270 "depth": n.depth,
1271 "path": n.path,
1272 })
1273 })
1274 .collect();
1275 let mut paths_json: Vec<serde_json::Value> = Vec::new();
1276 if let Some(target) = body.to.as_deref() {
1277 // Find the first traversal path that ends at `target`
1278 // and project the chain as a list of node ids.
1279 for n in &nodes {
1280 if n.target_id == target {
1281 let chain: Vec<String> =
1282 n.path.split("->").map(str::to_string).collect();
1283 paths_json.push(serde_json::Value::Array(
1284 chain.into_iter().map(serde_json::Value::String).collect(),
1285 ));
1286 break;
1287 }
1288 }
1289 } else {
1290 for n in &nodes {
1291 paths_json.push(serde_json::Value::String(n.path.clone()));
1292 }
1293 }
1294 Json(json!({
1295 "source_id": source_id,
1296 "max_depth": max_depth,
1297 "memories": memories_json,
1298 "paths": paths_json,
1299 "count": nodes.len(),
1300 }))
1301 .into_response()
1302 }
1303 Err(e) => {
1304 let msg = e.to_string();
1305 if msg.contains("max_depth") || msg.contains("depth") {
1306 (
1307 StatusCode::UNPROCESSABLE_ENTITY,
1308 Json(json!({"error": msg})),
1309 )
1310 .into_response()
1311 } else {
1312 store_err_to_response(e)
1313 }
1314 }
1315 };
1316 }
1317
1318 // #910 — apply scope=private visibility filter on the SQLite path
1319 // too. The kg_query DB function returns the full reachable
1320 // topology with target metadata absent from the row shape; we
1321 // post-fetch each target's `metadata.scope` / `metadata.agent_id`
1322 // inside the same lock window so the filter sees an atomic view
1323 // of the traversal.
1324 let lock = app.db.lock().await;
1325 let kg_res = db::kg_query(
1326 &lock.0,
1327 &source_id,
1328 max_depth,
1329 valid_at,
1330 allowed_agents.as_deref(),
1331 body.limit,
1332 body.include_invalidated,
1333 );
1334 let nodes_opt = match &kg_res {
1335 Ok(nodes) => {
1336 // v0.7.0 F-E3 fix (#1436): route through the canonical
1337 // `is_visible_to_caller` helper. Pre-fix the predicate was
1338 // inlined here missing the inbox carve-out
1339 // (`target_agent_id` short-circuit) — the same defect class
1340 // #951 closed at the SAL layer.
1341 let mut visible: std::collections::HashSet<String> =
1342 std::collections::HashSet::with_capacity(nodes.len());
1343 for n in nodes {
1344 if let Ok(Some(mem)) = db::get(&lock.0, &n.target_id) {
1345 if crate::visibility::is_visible_to_caller(&mem, &caller) {
1346 visible.insert(n.target_id.clone());
1347 }
1348 }
1349 }
1350 Some(visible)
1351 }
1352 Err(_) => None,
1353 };
1354 drop(lock);
1355 match kg_res {
1356 Ok(nodes) => {
1357 let visible = nodes_opt.unwrap_or_default();
1358 let nodes: Vec<_> = nodes
1359 .into_iter()
1360 .filter(|n| visible.contains(&n.target_id))
1361 .collect();
1362 let memories_json: Vec<serde_json::Value> = nodes
1363 .iter()
1364 .map(|n| {
1365 json!({
1366 "target_id": n.target_id,
1367 "relation": n.relation,
1368 (field_names::VALID_FROM): n.valid_from,
1369 (field_names::VALID_UNTIL): n.valid_until,
1370 (field_names::OBSERVED_BY): n.observed_by,
1371 "title": n.title,
1372 (field_names::TARGET_NAMESPACE): n.target_namespace,
1373 "depth": n.depth,
1374 "path": n.path,
1375 })
1376 })
1377 .collect();
1378 let paths_json: Vec<&str> = nodes.iter().map(|n| n.path.as_str()).collect();
1379 Json(json!({
1380 "source_id": source_id,
1381 "max_depth": max_depth,
1382 "memories": memories_json,
1383 "paths": paths_json,
1384 "count": nodes.len(),
1385 }))
1386 .into_response()
1387 }
1388 Err(e) => {
1389 // The `kg_query` DB layer raises explicit errors for
1390 // depth=0 and for max_depth past the supported ceiling;
1391 // those are caller-fixable, not server faults.
1392 let msg = e.to_string();
1393 if msg.contains("max_depth") {
1394 return (
1395 StatusCode::UNPROCESSABLE_ENTITY,
1396 Json(json!({"error": msg})),
1397 )
1398 .into_response();
1399 }
1400 crate::handlers::errors::handler_error_500(&e)
1401 }
1402 }
1403}