ai_memory/handlers/admin.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Admin HTTP handlers — agent registration, quota, stats, gc, export,
5//! import, and the parity `tools/list` mirror.
6//!
7//! Extracted from [`super::http`] under issue #650 follow-up 2. The
8//! handler bodies are unchanged; only the module-routing import surface
9//! moved. Wire compatibility preserved via `pub use admin::*` in
10//! [`super`].
11
12#![allow(clippy::too_many_lines)]
13
14use crate::models::field_names;
15use axum::{
16 Json,
17 extract::State,
18 http::{HeaderMap, StatusCode},
19 response::IntoResponse,
20};
21use chrono::Utc;
22use serde::Deserialize;
23use serde_json::json;
24#[cfg(feature = "sal")]
25use uuid::Uuid;
26
27use crate::db;
28#[cfg(feature = "sal")]
29use crate::models::{ConfidenceSource, Tier};
30use crate::models::{Memory, MemoryLink, RegisterAgentBody};
31use crate::validate;
32
33use super::AppState;
34use super::MAX_BULK_SIZE;
35#[cfg(feature = "sal")]
36use super::StorageBackend;
37use super::admin_role::require_admin;
38#[cfg(feature = "sal")]
39use super::store_err_to_response;
40
41pub async fn register_agent(
42 State(app): State<AppState>,
43 headers: HeaderMap,
44 Json(body): Json<RegisterAgentBody>,
45) -> impl IntoResponse {
46 if let Err(e) = validate::validate_agent_id(&body.agent_id) {
47 return (
48 StatusCode::BAD_REQUEST,
49 Json(json!({"error": e.to_string()})),
50 )
51 .into_response();
52 }
53 if let Err(e) = validate::validate_agent_type(&body.agent_type) {
54 return (
55 StatusCode::BAD_REQUEST,
56 Json(json!({"error": e.to_string()})),
57 )
58 .into_response();
59 }
60 // #869 audit (Category B — safe default): `capabilities` is
61 // `Option<Vec<String>>`; an absent field is semantically equivalent
62 // to "agent advertises no capabilities yet" which is exactly the
63 // empty-vec default. No serialisation involved.
64 let capabilities = body.capabilities.unwrap_or_default();
65 if let Err(e) = validate::validate_capabilities(&capabilities) {
66 return (
67 StatusCode::BAD_REQUEST,
68 Json(json!({"error": e.to_string()})),
69 )
70 .into_response();
71 }
72
73 // #911 (security-medium / SOC2, 2026-05-19) — admin action audit.
74 // `register_agent` and `archive_purge` are admin-class state-changing
75 // surfaces whose forensic-chain entry was previously silent. The
76 // caller agent_id is resolved via the X-Agent-Id header (the same
77 // primitive `resolve_http_agent_id` other handlers use); when no
78 // header is provided we record the synthesized `anonymous:req-…`
79 // actor so the chain entry pins the unattested call. Emitted
80 // BEFORE any storage write to preserve the audit trail even if
81 // the storage layer fails downstream.
82 let header_agent_id = headers
83 .get(crate::HEADER_AGENT_ID)
84 .and_then(|v| v.to_str().ok());
85 let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
86 .unwrap_or_else(|_| crate::identity::sentinels::ANONYMOUS_INVALID.to_string());
87 crate::governance::audit::record_decision(
88 &caller,
89 "allow",
90 "register_agent",
91 "",
92 json!({
93 "new_agent_id": body.agent_id,
94 (field_names::AGENT_TYPE): body.agent_type,
95 (field_names::CAPABILITIES): capabilities,
96 }),
97 );
98
99 // v0.7.0 Wave-3 Continuation 3 — postgres-backed daemons route the
100 // agent-registration write through `app.store` so the row lands in
101 // the same postgres `_agents` namespace that `list_agents` projects
102 // from. Pre-fix this handler wrote through `db::register_agent`
103 // against the sqlite scratch `app.db`, leaving postgres-backed
104 // daemons with POST→sqlite and GET→postgres asymmetry — registered
105 // agents never appeared in the list. Mirrors the import_memories +
106 // bulk_create dual-backend dispatch pattern. Federation fanout
107 // remains sqlite-only (broadcast_store_quorum uses sqlite-coupled
108 // fed-tracker state).
109 #[cfg(feature = "sal")]
110 if matches!(app.storage_backend, StorageBackend::Postgres) {
111 // #910 — admin surface (registration / list_agents / stats);
112 // bypass the SAL visibility filter so admin endpoints see the
113 // full row set regardless of metadata.scope.
114 let ctx =
115 crate::store::CallerContext::for_admin(crate::identity::sentinels::DAEMON_PRINCIPAL);
116 let now = Utc::now().to_rfc3339();
117 let mut metadata = json!({
118 "agent_id": &body.agent_id,
119 (field_names::AGENT_TYPE): &body.agent_type,
120 });
121 if let Some(obj) = metadata.as_object_mut() {
122 obj.insert(
123 field_names::CAPABILITIES.to_string(),
124 serde_json::to_value(&capabilities).unwrap_or_else(|_| json!([])),
125 );
126 }
127 let agent_mem = Memory {
128 id: Uuid::new_v4().to_string(),
129 tier: Tier::Long,
130 namespace: crate::models::AGENTS_NAMESPACE.to_string(),
131 title: crate::models::agent_registration_title(&body.agent_id),
132 content: format!("agent registration for {}", &body.agent_id),
133 tags: vec!["_agent_registration".to_string()],
134 priority: 5,
135 confidence: 1.0,
136 source: "api".to_string(),
137 access_count: 0,
138 created_at: now.clone(),
139 updated_at: now,
140 last_accessed_at: None,
141 expires_at: None,
142 metadata,
143 reflection_depth: 0,
144 memory_kind: crate::models::MemoryKind::Observation,
145 entity_id: None,
146 persona_version: None,
147 citations: Vec::new(),
148 source_uri: None,
149 source_span: None,
150 confidence_source: ConfidenceSource::CallerProvided,
151 confidence_signals: None,
152 confidence_decayed_at: None,
153 version: 1,
154 };
155 return match app.store.store(&ctx, &agent_mem).await {
156 Ok(id) => (
157 StatusCode::CREATED,
158 Json(json!({
159 "id": id,
160 "agent_id": body.agent_id,
161 (field_names::AGENT_TYPE): body.agent_type,
162 (field_names::CAPABILITIES): capabilities,
163 (field_names::STORAGE_BACKEND): "postgres",
164 })),
165 )
166 .into_response(),
167 Err(e) => store_err_to_response(e),
168 };
169 }
170
171 let lock = app.db.lock().await;
172 let register_result =
173 db::register_agent(&lock.0, &body.agent_id, &body.agent_type, &capabilities);
174 // Read the persisted `_agents` row back so we can fan it out to peers.
175 // The cluster-wide S12 invariant is that an agent registered on node-1
176 // is visible on node-4 — which only holds when the `_agents` namespace
177 // replicates via `broadcast_store_quorum`.
178 let registered_mem = match ®ister_result {
179 Ok(id) => db::get(&lock.0, id).ok().flatten(),
180 Err(_) => None,
181 };
182 drop(lock);
183
184 match register_result {
185 Ok(id) => {
186 if let (Some(fed), Some(mem)) = (app.federation.as_ref(), registered_mem.as_ref()) {
187 match crate::federation::broadcast_store_quorum(fed, mem).await {
188 Ok(tracker) => {
189 if let Err(err) = crate::federation::finalise_quorum(&tracker) {
190 // #869 — typed 503 envelope via the shared helper.
191 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
192 return super::quorum_not_met_response(&payload);
193 }
194 }
195 Err(e) => {
196 tracing::warn!("register_agent fanout error (local committed): {e:?}");
197 }
198 }
199 }
200 (
201 StatusCode::CREATED,
202 Json(json!({
203 (field_names::REGISTERED): true,
204 "id": id,
205 "agent_id": body.agent_id,
206 (field_names::AGENT_TYPE): body.agent_type,
207 (field_names::CAPABILITIES): capabilities,
208 })),
209 )
210 .into_response()
211 }
212 Err(e) => crate::handlers::errors::handler_error_500(&e),
213 }
214}
215
216/// #1539 — `PUT /api/v1/agents/{id}/pubkey`. Binds an Ed25519
217/// attestation public key to a registered agent so attesting HTTP
218/// clients no longer need an out-of-band DB write (the do-1461
219/// provisioning previously bound via ssh+psql on the region pg node
220/// under `REQUIRE_AGENT_ATTESTATION=1`). Admin-gated via
221/// [`require_admin`] (the #1582 authn-trusted predicate); the bind
222/// routes through the SAL `MemoryStore::bind_agent_pubkey`, which both
223/// adapters implement, so sqlite- and postgres-backed daemons get the
224/// same callable surface. The pubkey is validated as a real 32-byte
225/// Ed25519 curve point BEFORE any store call
226/// ([`validate::validate_agent_pubkey_b64`]).
227/// #1539 — canonical action/endpoint label for the pubkey-bind
228/// surface: the `require_admin` endpoint tag, the #911 audit action,
229/// and the postgres adapter's error label all share this spelling.
230pub const BIND_AGENT_PUBKEY_ACTION: &str = "bind_agent_pubkey";
231
232pub async fn bind_agent_pubkey(
233 State(app): State<AppState>,
234 headers: HeaderMap,
235 axum::extract::Path(agent_id): axum::extract::Path<String>,
236 Json(body): Json<crate::models::BindAgentPubkeyBody>,
237) -> impl IntoResponse {
238 let caller = match require_admin(&app, &headers, BIND_AGENT_PUBKEY_ACTION) {
239 Ok(c) => c,
240 Err(resp) => return resp,
241 };
242 if let Err(e) = validate::validate_agent_id(&agent_id) {
243 return (
244 StatusCode::BAD_REQUEST,
245 Json(json!({"error": e.to_string()})),
246 )
247 .into_response();
248 }
249 if let Err(e) = validate::validate_agent_pubkey_b64(&body.pubkey_b64) {
250 return (
251 StatusCode::BAD_REQUEST,
252 Json(json!({"error": e.to_string()})),
253 )
254 .into_response();
255 }
256 // Admin action audit (the #911 discipline): emitted BEFORE the
257 // store call so the forensic chain pins the attempt even when the
258 // bind later fails. The pubkey itself is public material; logging
259 // it aids enrollment forensics rather than leaking a secret.
260 crate::governance::audit::record_decision(
261 &caller,
262 "allow",
263 BIND_AGENT_PUBKEY_ACTION,
264 "",
265 json!({
266 "agent_id": agent_id,
267 "pubkey_b64": body.pubkey_b64,
268 }),
269 );
270 #[cfg(feature = "sal")]
271 {
272 let ctx =
273 crate::store::CallerContext::for_admin(crate::identity::sentinels::DAEMON_PRINCIPAL);
274 match app
275 .store
276 .bind_agent_pubkey(&ctx, &agent_id, body.pubkey_b64.trim())
277 .await
278 {
279 Ok(()) => (
280 StatusCode::OK,
281 Json(json!({
282 "bound": true,
283 "agent_id": agent_id,
284 })),
285 )
286 .into_response(),
287 Err(e) => store_err_to_response(e),
288 }
289 }
290 #[cfg(not(feature = "sal"))]
291 {
292 let lock = app.db.lock().await;
293 match db::bind_agent_pubkey(&lock.0, &agent_id, body.pubkey_b64.trim()) {
294 Ok(()) => (
295 StatusCode::OK,
296 Json(json!({
297 "bound": true,
298 "agent_id": agent_id,
299 })),
300 )
301 .into_response(),
302 Err(e) => crate::handlers::errors::handler_error_500(&e),
303 }
304 }
305}
306
307pub async fn list_agents(
308 State(app): State<AppState>,
309 headers: axum::http::HeaderMap,
310) -> impl IntoResponse {
311 // #946 SECURITY-medium (Track A QC sweep, 2026-05-20) — admin-
312 // only gate. Pre-fix any caller could enumerate the full NHI
313 // population + agent capabilities + registration timestamps.
314 // The handler uses `CallerContext::for_admin` below to bypass
315 // the SAL visibility filter; that's correct for operators but
316 // was unauthenticated. Mirror the #957 admin pattern.
317 if let Err(resp) = crate::handlers::admin_role::require_admin(&app, &headers, "list_agents") {
318 return resp;
319 }
320 // v0.7.0 ARCH-2 followup (FX-C2-batch3) — postgres-backed daemons
321 // route through `MemoryStore::list_agents`, which parses the
322 // `_agents`-namespace metadata blob into the canonical
323 // `AgentRegistration` shape exactly like SQLite's `db::list_agents`.
324 // Replaces the previous `list()` + client-side metadata-walk
325 // fold, which was a Drift cleanup target (audit doc, FX-C2 sub-
326 // batch dispatch plan §FX-C2-b).
327 #[cfg(feature = "sal-postgres")]
328 if matches!(app.storage_backend, StorageBackend::Postgres) {
329 return match app.store.list_agents().await {
330 Ok(agents) => (
331 StatusCode::OK,
332 Json(json!({"count": agents.len(), "agents": agents})),
333 )
334 .into_response(),
335 Err(e) => store_err_to_response(e),
336 };
337 }
338
339 let lock = app.db.lock().await;
340 match db::list_agents(&lock.0) {
341 Ok(agents) => (
342 StatusCode::OK,
343 Json(json!({"count": agents.len(), "agents": agents})),
344 )
345 .into_response(),
346 Err(e) => crate::handlers::errors::handler_error_500(&e),
347 }
348}
349
350/// JSON body for `POST /api/v1/quota/status`.
351///
352/// `agent_id` is required when the caller wants a single-agent
353/// snapshot; omitting it returns the full table (operator surface).
354///
355/// `namespace` (v0.7.0 #1156 — per-namespace K8 dimension):
356/// - Supplied with `agent_id`: returns the single
357/// `(agent_id, namespace)` row.
358/// - Supplied without `agent_id`: returns every agent's row in that
359/// namespace (operator-scoped, admin-gated).
360/// - Omitted with `agent_id` supplied: returns the **aggregate**
361/// view, summing counters across every namespace the agent has
362/// written into (preserves pre-#1156 single-row response shape).
363/// - Omitted with `agent_id` also omitted: full-substrate listing.
364#[derive(Debug, Deserialize)]
365pub struct QuotaStatusBody {
366 #[serde(default)]
367 pub agent_id: Option<String>,
368 #[serde(default)]
369 pub namespace: Option<String>,
370}
371
372/// `POST /api/v1/quota/status` — read the agent's quota row, or the
373/// full table when `agent_id` is omitted. Returns the canonical
374/// `QuotaStatus` JSON projection.
375///
376/// Dispatches via `app.store.quota_status(agent_id)` so postgres-backed
377/// daemons read from the postgres `agent_quotas` table rather than the
378/// scratch sqlite connection.
379pub async fn quota_status_handler(
380 State(app): State<AppState>,
381 headers: HeaderMap,
382 Json(body): Json<QuotaStatusBody>,
383) -> impl IntoResponse {
384 // #909 (security-medium, 2026-05-19) — sibling of #874/#901/#905/#907.
385 // The pre-#909 path accepted `body.agent_id` with no authn binding —
386 // any caller could probe `POST /api/v1/quota/status {agent_id:"alice"}`
387 // and read alice's quota row (cross-tenant disclosure: count of
388 // memories stored, last-reset timestamp, namespace usage stats).
389 // Authenticate via `X-Agent-Id` header; when `body.agent_id` is
390 // supplied it must MATCH the authenticated caller else 403. The
391 // operator-facing list path (body.agent_id absent) is preserved.
392 let header_agent_id = headers
393 .get(crate::HEADER_AGENT_ID)
394 .and_then(|v| v.to_str().ok());
395 let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
396 Ok(id) => id,
397 Err(e) => {
398 return (
399 StatusCode::BAD_REQUEST,
400 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
401 )
402 .into_response();
403 }
404 };
405 if let Some(agent_id) = body.agent_id.as_deref() {
406 if let Err(e) = validate::validate_agent_id(agent_id) {
407 return (
408 StatusCode::BAD_REQUEST,
409 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
410 )
411 .into_response();
412 }
413 if agent_id != caller {
414 return (
415 StatusCode::FORBIDDEN,
416 Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
417 )
418 .into_response();
419 }
420
421 // Postgres-backed daemons MUST take the SAL trait dispatch — the
422 // scratch sqlite connection at `app.db` has no `agent_quotas`
423 // rows.
424 #[cfg(feature = "sal")]
425 if matches!(app.storage_backend, StorageBackend::Postgres) {
426 return match app.store.quota_status(agent_id).await {
427 Ok(status) => Json(json!(status)).into_response(),
428 Err(e) => store_err_to_response(e),
429 };
430 }
431
432 // v0.7.0 #1156 — per-namespace K8 dimension. When the caller
433 // supplies an explicit `namespace`, return the single
434 // `(agent_id, namespace)` row; otherwise roll up the aggregate
435 // view across every namespace the agent has written into so
436 // the pre-#1156 single-row response shape is preserved.
437 let lock = app.db.lock().await;
438 let result = match body.namespace.as_deref() {
439 Some(ns) => crate::quotas::get_status(&lock.0, agent_id, ns),
440 None => crate::quotas::get_aggregate_status(&lock.0, agent_id),
441 };
442 return match result {
443 Ok(status) => Json(json!(status)).into_response(),
444 Err(e) => {
445 tracing::error!("quota_status handler error: {e}");
446 (
447 StatusCode::INTERNAL_SERVER_ERROR,
448 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
449 )
450 .into_response()
451 }
452 };
453 }
454
455 // No agent_id supplied — operator-facing list path.
456 //
457 // #960 SECURITY-medium (Track A QC sweep, 2026-05-20) — admin-
458 // only gate on the list path. Pre-fix any HTTP caller posting
459 // `{}` could enumerate the full per-agent quota table. Sibling
460 // of #909 (per-agent path) — same disclosure shape.
461 if let Err(resp) =
462 crate::handlers::admin_role::require_admin(&app, &headers, "quota_status_list")
463 {
464 return resp;
465 }
466 #[cfg(feature = "sal")]
467 if matches!(app.storage_backend, StorageBackend::Postgres) {
468 return match app.store.quota_status_list().await {
469 Ok(rows) => Json(json!({"quotas": rows, "count": rows.len()})).into_response(),
470 Err(e) => store_err_to_response(e),
471 };
472 }
473
474 // v0.7.0 #1156 — optional `?namespace=` filter on the operator
475 // listing path. When supplied, restricts the response to rows
476 // in that namespace; otherwise returns the full table.
477 let lock = app.db.lock().await;
478 match crate::quotas::list_status(&lock.0, body.namespace.as_deref()) {
479 Ok(rows) => {
480 let count = rows.len();
481 Json(json!({"quotas": rows, "count": count})).into_response()
482 }
483 Err(e) => {
484 tracing::error!("quota_status list handler error: {e}");
485 (
486 StatusCode::INTERNAL_SERVER_ERROR,
487 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
488 )
489 .into_response()
490 }
491 }
492}
493
494pub async fn get_stats(
495 State(app): State<AppState>,
496 headers: axum::http::HeaderMap,
497) -> impl IntoResponse {
498 // #946 SECURITY-medium (Track A QC sweep, 2026-05-20) — admin-only
499 // gate. Pre-fix any caller could enumerate full per-tier counts +
500 // per-namespace stats + WAL counters; admin-class endpoint.
501 if let Err(resp) = crate::handlers::admin_role::require_admin(&app, &headers, "get_stats") {
502 return resp;
503 }
504 // v0.7.0 ARCH-2 followup (FX-C2-batch3) — postgres-backed daemons
505 // now route stats through `MemoryStore::stats`, which projects the
506 // full `Stats` shape via SQL aggregates (total + per-tier + per-
507 // namespace + expiring_soon + links_count + table-size). This
508 // replaces the previous client-side fold over a 1M-limit
509 // `list()` scan which was a Drift cleanup target and would not
510 // scale to large deployments.
511 #[cfg(feature = "sal-postgres")]
512 if matches!(app.storage_backend, StorageBackend::Postgres) {
513 return match app.store.stats().await {
514 Ok(s) => {
515 // Project the SAL Stats shape into the postgres wire
516 // shape: by_tier as a wire-string-keyed map (mirrors
517 // the previous postgres envelope), per-namespace as
518 // a `{namespace: count}` map.
519 let mut by_tier_map = serde_json::Map::new();
520 for tc in &s.by_tier {
521 by_tier_map.insert(tc.tier.clone(), json!(tc.count));
522 }
523 let mut by_namespace_map = serde_json::Map::new();
524 for nc in &s.by_namespace {
525 by_namespace_map.insert(nc.namespace.clone(), json!(nc.count));
526 }
527 Json(json!({
528 (field_names::TOTAL_MEMORIES): s.total,
529 "by_tier": by_tier_map,
530 (field_names::BY_NAMESPACE): by_namespace_map,
531 "expiring_soon": s.expiring_soon,
532 "links_count": s.links_count,
533 "db_size_bytes": s.db_size_bytes,
534 (field_names::STORAGE_BACKEND): "postgres",
535 }))
536 .into_response()
537 }
538 Err(e) => store_err_to_response(e),
539 };
540 }
541
542 let lock = app.db.lock().await;
543 match db::stats(&lock.0, &lock.1) {
544 Ok(s) => Json(json!(s)).into_response(),
545 Err(e) => crate::handlers::errors::handler_error_500(&e),
546 }
547}
548
549pub async fn run_gc(State(app): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
550 // #1027 (security-critical, 2026-05-21) — admin-role gate. GC
551 // permanently sweeps expired rows; pre-#1027 the handler logged
552 // the caller to the forensic chain but accepted ANY API-key
553 // holder (no admin allowlist membership required). An attacker
554 // with the shared API key could force-purge mid-tier-expired
555 // rows across tenants in advance of any restore window. The
556 // require_admin gate now matches the shape of export_memories
557 // (#957) / forget_memories (#956): non-admin callers get a 403
558 // FORBIDDEN before any state change.
559 let caller = match require_admin(&app, &headers, "run_gc") {
560 Ok(c) => c,
561 Err(resp) => return resp,
562 };
563
564 // #913 (security-medium / SOC2, 2026-05-19) — admin/destructive
565 // state-change audit. GC permanently sweeps expired rows; the
566 // forensic-chain entry MUST land before the storage write so the
567 // audit trail captures the operator who triggered the sweep even
568 // when the downstream collector errors.
569 crate::governance::audit::record_decision(&caller, "allow", "run_gc", "", json!({}));
570
571 // v0.7.0 Wave-3 Continuation 3 (Phase 17) — postgres-backed daemons
572 // route through the SAL trait. Returns the same `{expired_deleted}`
573 // envelope so wire shape is backend-blind.
574 #[cfg(feature = "sal")]
575 if matches!(app.storage_backend, StorageBackend::Postgres) {
576 let archive_flag = {
577 let lock = app.db.lock().await;
578 lock.3
579 };
580 return match app.store.run_gc(archive_flag).await {
581 Ok(n) => {
582 Json(json!({(field_names::EXPIRED_DELETED): n, (field_names::STORAGE_BACKEND): "postgres"}))
583 .into_response()
584 }
585 Err(e) => store_err_to_response(e),
586 };
587 }
588
589 let lock = app.db.lock().await;
590 match db::gc(&lock.0, lock.3) {
591 Ok(n) => Json(json!({(field_names::EXPIRED_DELETED): n})).into_response(),
592 Err(e) => crate::handlers::errors::handler_error_500(&e),
593 }
594}
595
596pub async fn export_memories(State(app): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
597 // #957 (security-critical, 2026-05-20) — admin-role gate.
598 // Pre-#957 the handler took NO headers, accepted no caller, and
599 // dispatched directly to the export path which intentionally
600 // bypasses every visibility filter (postgres SAL branch uses
601 // `for_agent("export")` — see `src/store/postgres.rs:8577` — and
602 // the sqlite branch reads the whole `memories` table via
603 // `db::export_all`). The legacy `api_key_auth` middleware passes
604 // through when `api_key` is unset (the default — see #946 RCA),
605 // so the endpoint was open by default and any authenticated
606 // caller could dump every memory across every owner, every
607 // namespace, every scope (including `scope=private`) plus every
608 // link in the graph.
609 //
610 // Fix: require the caller's resolved `agent_id` (from
611 // `X-Agent-Id`, the same primitive every other handler uses)
612 // to appear in the operator-configured `[admin].agent_ids`
613 // allowlist before the corpus dump fires. Non-admin callers
614 // get `403 Forbidden` with the sanitised
615 // `{"error":"admin role required"}` body — intentionally
616 // generic so the rejection does not leak the allowlist
617 // configuration. The role decision is forensic-chain audited
618 // via `governance::audit::record_decision` whether admitted
619 // or rejected (`handlers::admin_role::require_admin`).
620 let caller = match crate::handlers::admin_role::require_admin(&app, &headers, "export_memories")
621 {
622 Ok(c) => c,
623 Err(resp) => return resp,
624 };
625
626 // v0.7.0 Wave-3 Continuation 3 (Phase 18) — postgres-backed daemons
627 // route through the SAL trait. Wire shape preserved:
628 // `{memories, links, count, exported_at}`. The admin gate above
629 // is the load-bearing authorisation check; the SAL-level
630 // `for_admin(caller)` context just preserves the full-fidelity
631 // backup semantic (admin export round-trips every row regardless
632 // of `metadata.scope`).
633 #[cfg(feature = "sal")]
634 if matches!(app.storage_backend, StorageBackend::Postgres) {
635 let _ = &caller; // resolved + audited above; SAL methods are
636 // owner-blind under the operator export contract.
637 let mems = match app.store.export_memories().await {
638 Ok(v) => v,
639 Err(e) => return store_err_to_response(e),
640 };
641 let links = match app.store.export_links().await {
642 Ok(v) => v,
643 Err(e) => return store_err_to_response(e),
644 };
645 let count = mems.len();
646 return Json(json!({
647 "memories": mems,
648 "links": links,
649 "count": count,
650 (field_names::EXPORTED_AT): Utc::now().to_rfc3339(),
651 (field_names::STORAGE_BACKEND): "postgres",
652 }))
653 .into_response();
654 }
655
656 let _ = &caller;
657 let lock = app.db.lock().await;
658 match (db::export_all(&lock.0), db::export_links(&lock.0)) {
659 (Ok(memories), Ok(links)) => {
660 let count = memories.len();
661 Json(json!({"memories": memories, "links": links, "count": count, (field_names::EXPORTED_AT): Utc::now().to_rfc3339()})).into_response()
662 }
663 (Err(e), _) | (_, Err(e)) => {
664 tracing::error!("export error: {e}");
665 (
666 StatusCode::INTERNAL_SERVER_ERROR,
667 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
668 )
669 .into_response()
670 }
671 }
672}
673
674pub async fn import_memories(
675 State(app): State<AppState>,
676 headers: HeaderMap,
677 Json(body): Json<ImportBody>,
678) -> impl IntoResponse {
679 if body.memories.len() > MAX_BULK_SIZE {
680 return (
681 StatusCode::BAD_REQUEST,
682 Json(json!({"error": format!("import limited to {} memories", MAX_BULK_SIZE)})),
683 )
684 .into_response();
685 }
686
687 // #956 (security-medium, 2026-05-20) — admin-role gate + provenance
688 // restamp on `/api/v1/import`. Pre-#956 the handler resolved the
689 // caller from `X-Agent-Id` but then took `mem.metadata.agent_id`
690 // verbatim from the request body. Any authenticated caller could
691 // submit `{"memories":[{"metadata":{"agent_id":"alice", ...}}]}`
692 // and stamp alice's name on the imported row — same forge primitive
693 // #874/#901/#905/#907/#909 closed across other surfaces. Mirrors
694 // #957 (export) and the CLI `--trust-source`-off branch at
695 // `src/cli/io.rs:97-118`.
696 //
697 // 1. Gate via `handlers::admin_role::require_admin` — sanitised
698 // `403 {"error":"admin role required"}` on non-admin callers,
699 // audited via `governance::audit::record_decision` whether
700 // admitted or rejected. Empty allowlist (v0.7.0 default) closes
701 // the endpoint to every caller (safe-by-default).
702 //
703 // 2. For each admitted row, restamp `metadata.agent_id` to the
704 // admin caller and preserve the body's original claim under
705 // `metadata.imported_from_agent_id` (only when the original
706 // differs from the caller — no provenance noise on identical
707 // writes). Mirrors the CLI restamp contract exactly.
708 let caller = match crate::handlers::admin_role::require_admin(&app, &headers, "import_memories")
709 {
710 Ok(c) => c,
711 Err(resp) => return resp,
712 };
713
714 // #913 (security-medium / SOC2, 2026-05-19) — admin/bulk-write audit.
715 // Import landings can move thousands of memories in one call; emit a
716 // single forensic-chain entry BEFORE the storage writes so the audit
717 // trail captures the batch size + caller identity even on partial
718 // success.
719 crate::governance::audit::record_decision(
720 &caller,
721 "allow",
722 "import_memories",
723 "",
724 json!({
725 "memory_count": body.memories.len(),
726 "link_count": body.links.as_ref().map(Vec::len).unwrap_or(0),
727 }),
728 );
729
730 // #956 provenance restamp closure. Applied per-row on both
731 // backends BEFORE validate / governance / store so all downstream
732 // consumers (governance enforce, store.store / db::insert) see
733 // the admin caller as the row's principal.
734 let restamp_agent_id = |mem: &mut Memory| {
735 if !mem.metadata.is_object() {
736 mem.metadata = json!({});
737 }
738 if let Some(obj) = mem.metadata.as_object_mut() {
739 let original = obj
740 .get("agent_id")
741 .and_then(serde_json::Value::as_str)
742 .map(ToString::to_string);
743 obj.insert(
744 "agent_id".to_string(),
745 serde_json::Value::String(caller.clone()),
746 );
747 if let Some(orig) = original
748 && orig != caller
749 {
750 obj.insert(
751 field_names::IMPORTED_FROM_AGENT_ID.to_string(),
752 serde_json::Value::String(orig),
753 );
754 }
755 }
756 };
757 // v0.7.0 Wave-3 Continuation 3 (Phase 18) — postgres-backed daemons
758 // route through the SAL trait. We re-use `app.store.store(...)` per
759 // memory (the upsert path that preserves agent_id immutability) and
760 // `app.store.link(...)` for each link; partial-success surfaces the
761 // same `{imported, errors}` envelope as the sqlite path.
762 #[cfg(feature = "sal")]
763 if matches!(app.storage_backend, StorageBackend::Postgres) {
764 // QC P1 fix (2026-05-20): import_memories now stamps the
765 // imported rows under the authenticated `caller` (resolved
766 // from X-Agent-Id, line 564) instead of a synthetic
767 // "http-import" principal. The SAL store path still applies
768 // its `metadata.agent_id` preservation contract — body-
769 // supplied agent_id wins when valid (e.g., legitimate
770 // re-import of memories already authored by another agent),
771 // but the ctx is the auth principal so visibility filters
772 // applied INSIDE store_inner (e.g., upsert dedup lookup)
773 // see the actual caller.
774 let ctx = crate::store::CallerContext::for_agent(caller.clone());
775 let mut imported = 0usize;
776 let mut errors: Vec<String> = Vec::new();
777 let mut pending: Vec<serde_json::Value> = Vec::new();
778 for mut mem in body.memories {
779 // #956 — restamp before validate / governance / store.
780 restamp_agent_id(&mut mem);
781 if let Err(e) = validate::RequestValidator::validate_memory(&mem) {
782 // Issue #851: never echo the raw `e` to the wire paired
783 // with the user-supplied id (the combo reflects the
784 // caller's request). Sanitize + log instead.
785 tracing::warn!(
786 "import_memories(postgres): validate_memory failed for {}: {e}",
787 mem.id
788 );
789 errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
790 continue;
791 }
792
793 // F-A2A1.5 (#705) — governance enforcement on the postgres
794 // import path. Mirrors the F-A2A1.2 delete/promote gates and
795 // the Wave-3 Continuation 3 create_memory gate: each imported
796 // row is a Store action and must be gated by the destination
797 // namespace's standard. Deny rows accumulate into `errors`
798 // alongside other per-row failures; Pending rows accumulate
799 // into `pending` with their pending_id so the caller can
800 // drive consensus. Without this gate, postgres-backed
801 // daemons silently bypassed namespace governance on the
802 // bulk-import surface (same A2A bypass cluster fold-A2A1.2
803 // closed on delete/promote/create paths).
804 use crate::models::GovernanceDecision;
805 // Post-#956 restamp, agent_id is always the admin caller.
806 let agent_id = mem
807 .metadata
808 .get("agent_id")
809 .and_then(|v| v.as_str())
810 .unwrap_or(caller.as_str());
811 let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
812 match app
813 .store
814 .enforce_governance_action(
815 crate::store::GovernedAction::Store,
816 &mem.namespace,
817 agent_id,
818 None,
819 None,
820 &payload_for_pending,
821 )
822 .await
823 {
824 Ok(GovernanceDecision::Allow) => {}
825 Ok(GovernanceDecision::Deny(refusal)) => {
826 let mut msg =
827 String::with_capacity(mem.id.len() + 2 + 50 + refusal.reason.len());
828 msg.push_str(&mem.id);
829 msg.push_str(": ");
830 msg.push_str(&crate::governance::deny_message(
831 "import",
832 crate::governance::DenyGate::Governance,
833 &refusal.reason,
834 ));
835 errors.push(msg);
836 continue;
837 }
838 Ok(GovernanceDecision::Pending(pending_id)) => {
839 pending.push(json!({
840 "id": mem.id,
841 "namespace": mem.namespace,
842 (field_names::PENDING_ID): pending_id,
843 }));
844 continue;
845 }
846 Err(e) => {
847 errors.push(format!("{}: governance error: {e}", mem.id));
848 continue;
849 }
850 }
851
852 match app.store.store(&ctx, &mem).await {
853 Ok(_) => imported += 1,
854 Err(e) => {
855 // Issue #851: SAL `store.store` errors can carry raw
856 // sqlx/sqlite text — sanitize before echoing.
857 tracing::warn!(
858 "import_memories(postgres): store.store failed for {}: {e}",
859 mem.id
860 );
861 errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
862 }
863 }
864 }
865 // #869 audit (Category B — safe default): `body.links` is
866 // `Option<Vec<MemoryLink>>`; an absent field means the bulk
867 // import payload carried no links. Empty-vec default produces
868 // a zero-iteration loop, which is the documented behaviour.
869 for link in body.links.unwrap_or_default() {
870 if validate::RequestValidator::validate_link_triple(
871 &link.source_id,
872 &link.target_id,
873 link.relation.as_str(),
874 )
875 .is_err()
876 {
877 continue;
878 }
879 let _ = app.store.link(&ctx, &link).await;
880 }
881 return Json(json!({
882 "imported": imported,
883 "errors": errors,
884 "pending": pending,
885 (field_names::STORAGE_BACKEND): "postgres",
886 }))
887 .into_response();
888 }
889
890 let lock = app.db.lock().await;
891 let mut imported = 0usize;
892 let mut errors = Vec::new();
893 for mut mem in body.memories {
894 // #956 — restamp before validate / insert.
895 restamp_agent_id(&mut mem);
896 if let Err(e) = validate::RequestValidator::validate_memory(&mem) {
897 // Issue #851: never echo `<id>: <validate error>` paired —
898 // the combo reflects the caller's request and the inner
899 // string can carry validate template detail. Sanitize + log.
900 tracing::warn!(
901 "import_memories: validate_memory failed for {}: {e}",
902 mem.id
903 );
904 errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
905 continue;
906 }
907 match db::insert(&lock.0, &mem) {
908 Ok(_) => imported += 1,
909 Err(e) => {
910 // Issue #851: db::insert errors include raw rusqlite
911 // text (SQL fragments, constraint names). Sanitize.
912 tracing::warn!("import_memories: db::insert failed for {}: {e}", mem.id);
913 errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
914 }
915 }
916 }
917 // #869 audit (Category B — safe default): sqlite branch mirror of
918 // the postgres-branch links loop above; same empty-vec semantics.
919 for link in body.links.unwrap_or_default() {
920 if validate::RequestValidator::validate_link_triple(
921 &link.source_id,
922 &link.target_id,
923 link.relation.as_str(),
924 )
925 .is_err()
926 {
927 continue;
928 }
929 let _ = db::create_link(
930 &lock.0,
931 &link.source_id,
932 &link.target_id,
933 link.relation.as_str(),
934 );
935 }
936 Json(json!({"imported": imported, "errors": errors})).into_response()
937}
938
939#[derive(serde::Deserialize)]
940pub struct ImportBody {
941 pub memories: Vec<Memory>,
942 #[serde(default)]
943 pub links: Option<Vec<MemoryLink>>,
944}
945
946/// `GET /api/v1/tools/list` — enumerate the MCP tools currently
947/// advertised under the daemon's resolved [`Profile`]. The response
948/// shape mirrors MCP `tools/list`: `{tools: [{name, description, ...}],
949/// schema_version: <tag>}`. Backend-agnostic — works on both sqlite
950/// and postgres daemons because the data is configuration, not user
951/// content.
952pub async fn tools_list(State(app): State<AppState>) -> impl IntoResponse {
953 // `tool_definitions_for_profile` already applies the C2 / C4
954 // trims that match the MCP `tools/list` shape. No further shaping
955 // is needed for the HTTP wire — the field names line up with the
956 // MCP JSON-RPC payload exactly.
957 let defs = crate::mcp::tool_definitions_for_profile(app.profile.as_ref());
958 (StatusCode::OK, Json(defs)).into_response()
959}