Skip to main content

ai_memory/handlers/
governance.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Governance HTTP handlers — pending-action list / approve / reject.
5//!
6//! Extracted from [`super::http`] under issue #650 follow-up 2. Wire
7//! shape is identical (re-exported from [`super`] so `handlers::list_pending`
8//! / `handlers::approve_pending` / `handlers::reject_pending` continue
9//! to resolve). The K10 SSE approval stream lives in [`super::approvals`]
10//! because it carries its own state (subscriber map).
11
12use crate::models::field_names;
13use axum::{
14    Json,
15    extract::{Path, Query, State},
16    http::{HeaderMap, StatusCode},
17    response::IntoResponse,
18};
19use serde::Deserialize;
20use serde_json::json;
21
22use crate::db;
23use crate::validate;
24
25use super::AppState;
26#[cfg(feature = "sal")]
27use super::StorageBackend;
28use super::fanout_or_503;
29#[cfg(feature = "sal")]
30use super::store_err_to_response;
31
32#[derive(Deserialize)]
33pub struct PendingListQuery {
34    #[serde(default)]
35    pub status: Option<String>,
36    /// Optional namespace filter — S34 uses `?namespace=...&limit=50`.
37    #[serde(default)]
38    pub namespace: Option<String>,
39    #[serde(default = "default_pending_limit")]
40    pub limit: Option<usize>,
41}
42
43#[allow(clippy::unnecessary_wraps)]
44fn default_pending_limit() -> Option<usize> {
45    Some(100)
46}
47
48pub async fn list_pending(
49    State(app): State<AppState>,
50    headers: HeaderMap,
51    Query(p): Query<PendingListQuery>,
52) -> impl IntoResponse {
53    let limit = p.limit.unwrap_or(100).min(1000);
54
55    // #958 (security-medium, 2026-05-20) — caller-vs-requester gate.
56    // Pre-#958 the handler took NO `headers: HeaderMap`, resolved no
57    // caller, and dispatched directly to the underlying list (sqlite
58    // `db::list_pending_actions` / postgres
59    // `list_pending_actions_via_store`) which themselves take NO
60    // `caller` parameter. The K10 SSE handler (`approvals_sse`)
61    // already applies the per-#628 tenant filter via
62    // `sse_event_visible_to`, but the polling-style HTTP list path
63    // was the legacy gap that same issue closed for the SSE channel
64    // only. Any HTTP caller could enumerate every pending governance
65    // action across every owner + every namespace — leaking the
66    // proposed memory body, the requester agent_id, and the target
67    // namespace topology.
68    //
69    // Fix: resolve the caller from `X-Agent-Id` (the same primitive
70    // every other handler uses), check the admin role allowlist via
71    // the shared `handlers::admin_role::is_admin_caller` predicate
72    // (the #957 SHIP-cluster operator-bypass posture), and post-
73    // filter the pending list to rows whose `requested_by` matches
74    // the resolved caller. Admin callers bypass the filter — the
75    // legitimate operator queue-view surface. Non-admin callers see
76    // only their OWN pending rows; cross-tenant rows are silently
77    // dropped (no enumeration / count leak).
78    let header_agent_id = headers
79        .get(crate::HEADER_AGENT_ID)
80        .and_then(|v| v.to_str().ok());
81    let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
82        .unwrap_or_else(|_| crate::identity::sentinels::ANONYMOUS_INVALID.to_string());
83    let is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
84
85    // v0.7.0 Wave-3 Continuation 5 — postgres-backed daemons read
86    // from the `pending_actions` table directly. The full governance
87    // pipeline (Phase 20 / Cont 4 chain walk) writes pending rows on
88    // both backends; this list path lights them up on the read side
89    // so S34's "bob lists pending → approve/reject → charlie sees
90    // approved" round-trip works end-to-end on postgres.
91    #[cfg(feature = "sal-postgres")]
92    if matches!(app.storage_backend, StorageBackend::Postgres) {
93        return match crate::store::postgres::list_pending_actions_via_store(
94            &app.store,
95            p.status.as_deref(),
96            p.namespace.as_deref(),
97            limit,
98        )
99        .await
100        {
101            Ok(items) => {
102                // #958 post-filter: drop rows whose `requested_by`
103                // does not match the caller, unless the caller is an
104                // operator-allowlisted admin. The postgres JSON shape
105                // produced by `list_pending_actions_via_store`
106                // includes `requested_by` as a top-level string field
107                // (see `src/store/postgres.rs::list_pending_actions`).
108                let filtered: Vec<serde_json::Value> = if is_admin {
109                    items
110                } else {
111                    items
112                        .into_iter()
113                        .filter(|row| {
114                            row.get(field_names::REQUESTED_BY)
115                                .and_then(serde_json::Value::as_str)
116                                .is_some_and(|rb| rb == caller)
117                        })
118                        .collect()
119                };
120                Json(json!({
121                    "count": filtered.len(),
122                    "pending": filtered,
123                    (field_names::STORAGE_BACKEND): "postgres",
124                    (field_names::OWNER_SCOPE): if is_admin { "admin" } else { "caller" },
125                }))
126                .into_response()
127            }
128            Err(e) => store_err_to_response(e),
129        };
130    }
131
132    let lock = app.db.lock().await;
133    match db::list_pending_actions(&lock.0, p.status.as_deref(), limit) {
134        Ok(items) => {
135            // #958 post-filter: drop rows whose `requested_by`
136            // does not match the caller, unless the caller is an
137            // operator-allowlisted admin. `PendingAction.requested_by`
138            // is a plain `String` (see
139            // `src/models/namespace.rs::PendingAction`).
140            let filtered: Vec<crate::models::PendingAction> = if is_admin {
141                items
142            } else {
143                items
144                    .into_iter()
145                    .filter(|row| row.requested_by == caller)
146                    .collect()
147            };
148            Json(json!({
149                "count": filtered.len(),
150                "pending": filtered,
151                (field_names::OWNER_SCOPE): if is_admin { "admin" } else { "caller" },
152            }))
153            .into_response()
154        }
155        Err(e) => crate::handlers::errors::handler_error_500(&e),
156    }
157}
158
159#[allow(clippy::too_many_lines)]
160pub async fn approve_pending(
161    State(app): State<AppState>,
162    headers: HeaderMap,
163    Path(id): Path<String>,
164    body_bytes: axum::body::Bytes,
165) -> impl IntoResponse {
166    use crate::db::ApproveOutcome;
167    use crate::models::PendingDecision;
168    // S5-C1 (v0.7.0 fix campaign 2026-05-13): privileged governance
169    // endpoints MUST verify HMAC. The legacy `api_key_auth` middleware
170    // pass-throughs when `api_key` is unset (default!), which means an
171    // attacker could approve any pending action by spoofing `X-Agent-Id`.
172    // We mirror the K10 SSE handler's posture and require
173    // `X-AI-Memory-Signature` on every inbound approve request,
174    // regardless of `api_key` configuration. Without a server-wide
175    // `[hooks.subscription].hmac_secret`, `verify_approval_hmac`
176    // refuses every request — the safe default.
177    if let Err(status) = super::verify_approval_hmac(&headers, &body_bytes, "POST", &id) {
178        return (
179            status,
180            Json(json!({
181                "error": crate::errors::msg::INVALID_OR_MISSING_SIGNATURE,
182                "hint": "POST /api/v1/pending/{id}/approve requires HMAC signing per K7's pattern. \
183                        Set [hooks.subscription] hmac_secret in config and send \
184                        X-AI-Memory-Signature: sha256=<HMAC-SHA256(SHA256(secret), \"<ts>.<METHOD>.<pending_id>.<body>\")> \
185                        with X-AI-Memory-Timestamp: <unix-epoch-secs>."
186            })),
187        )
188            .into_response();
189    }
190    let state = app.db.clone();
191    if let Err(e) = validate::validate_id(&id) {
192        return (
193            StatusCode::BAD_REQUEST,
194            Json(json!({"error": e.to_string()})),
195        )
196            .into_response();
197    }
198    let header_agent_id = headers
199        .get(crate::HEADER_AGENT_ID)
200        .and_then(|v| v.to_str().ok());
201    let agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
202        Ok(a) => a,
203        Err(e) => {
204            return (
205                StatusCode::BAD_REQUEST,
206                Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
207            )
208                .into_response();
209        }
210    };
211
212    // #913 (security-medium / SOC2, 2026-05-19) — admin governance audit.
213    // Approve is the canonical privileged gate operation; the forensic-
214    // chain row MUST land before the storage write so the audit trail
215    // captures the approver's identity + pending_id even when the
216    // downstream consensus / execution path errors.
217    crate::governance::audit::record_decision(
218        &agent_id,
219        "allow",
220        "pending_approve",
221        "",
222        json!({ (field_names::PENDING_ID): &id }),
223    );
224
225    // v0.7.0 Wave-3 Continuation 3 (Phase 20) — postgres-backed approve
226    // routes through the FULL governance pipeline:
227    // - inheritance-chain walk over `namespace_meta` (with explicit
228    //   parent + `/`-derived ancestors, bounded + cycle-safe)
229    // - approver_type variations: Human / Agent(required) / Consensus(N)
230    // - multi-vote consensus state machine: registered-agent gating,
231    //   case-insensitive duplicate-vote dedup, threshold transition
232    // - audit emit + structured response envelope (Approved / Pending
233    //   with vote count + quorum / Rejected with reason)
234    //
235    // Federation fanout for the decision + executed memory remains
236    // sqlite-only (the broadcast_pending_decision_quorum path uses
237    // sqlite-coupled fed-tracker state); postgres operators relying on
238    // multi-node consistency should poll peers.
239    #[cfg(feature = "sal")]
240    if matches!(app.storage_backend, StorageBackend::Postgres) {
241        use crate::store::ApproveOutcome as SalOutcome;
242        let ctx = crate::store::CallerContext::for_agent(agent_id.clone());
243        return match app
244            .store
245            .governance_approve_with_consensus(&ctx, &id, &agent_id)
246            .await
247        {
248            Ok(SalOutcome::Approved) => {
249                if crate::audit::is_enabled() {
250                    crate::audit::emit(crate::audit::EventBuilder::new(
251                        crate::audit::AuditAction::Approve,
252                        crate::audit::actor(
253                            agent_id.clone(),
254                            crate::audit::synthesis_sources::HTTP_HEADER,
255                            None,
256                        ),
257                        crate::audit::target_memory(id.clone(), String::new(), None, None, None),
258                    ));
259                }
260                // v0.7.0 Wave-3 Continuation 5 (S34) — execute the
261                // approved action so the memory materialises in the
262                // namespace where the cert oracle expects it. Mirrors
263                // sqlite's `db::execute_pending_action` for the
264                // `store` / `delete` / `promote` action types.
265                let executed_id: Option<String> =
266                    match app.store.execute_pending_action(&ctx, &id).await {
267                        Ok(eid) => eid,
268                        Err(e) => {
269                            tracing::warn!(
270                                "approve_pending: execute_pending_action failed for {id}: {e}"
271                            );
272                            None
273                        }
274                    };
275                Json(json!({
276                    "approved": true,
277                    "id": id,
278                    (field_names::DECIDED_BY): agent_id,
279                    "executed": executed_id.is_some(),
280                    "memory_id": executed_id,
281                    (field_names::STORAGE_BACKEND): "postgres",
282                }))
283                .into_response()
284            }
285            Ok(SalOutcome::Pending { votes, quorum }) => (
286                StatusCode::ACCEPTED,
287                Json(json!({
288                    "approved": false,
289                    "status": "pending",
290                    "id": id,
291                    "votes": votes,
292                    "quorum": quorum,
293                    "reason": crate::errors::msg::CONSENSUS_NOT_REACHED,
294                    (field_names::STORAGE_BACKEND): "postgres",
295                })),
296            )
297                .into_response(),
298            Ok(SalOutcome::Rejected(reason)) => (
299                StatusCode::FORBIDDEN,
300                Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
301            )
302                .into_response(),
303            Err(e) => store_err_to_response(e),
304        };
305    }
306
307    let lock = state.lock().await;
308    match db::approve_with_approver_type(&lock.0, &id, &agent_id) {
309        Ok(ApproveOutcome::Approved) => match db::execute_pending_action(&lock.0, &id) {
310            Ok(memory_id) => {
311                // v0.6.2 (S34): fan out the decision AND the resulting
312                // memory so approve on one node makes the governed write
313                // visible on every peer. Drop the DB lock before any
314                // outbound HTTP.
315                let produced_mem = memory_id
316                    .as_deref()
317                    .and_then(|mid| db::get(&lock.0, mid).ok().flatten());
318                drop(lock);
319                if let Some(fed) = app.federation.as_ref() {
320                    let decision = PendingDecision {
321                        id: id.clone(),
322                        approved: true,
323                        decider: agent_id.clone(),
324                    };
325                    match crate::federation::broadcast_pending_decision_quorum(fed, &decision).await
326                    {
327                        Ok(tracker) => {
328                            if let Err(err) = crate::federation::finalise_quorum(&tracker) {
329                                // #869 — typed 503 envelope via the shared helper.
330                                let payload =
331                                    crate::federation::QuorumNotMetPayload::from_err(&err);
332                                return super::quorum_not_met_response(&payload);
333                            }
334                        }
335                        Err(err) => {
336                            let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
337                            return super::quorum_not_met_response(&payload);
338                        }
339                    }
340                    // If approval produced a brand-new memory (store
341                    // path), also broadcast it so peers have the row.
342                    // delete / promote paths produce no new memory
343                    // (the pending payload carries memory_id).
344                    if let Some(ref mem) = produced_mem
345                        && let Some(resp) = fanout_or_503(&app, mem).await
346                    {
347                        return resp;
348                    }
349                }
350                Json(json!({
351                    "approved": true,
352                    "id": id,
353                    (field_names::DECIDED_BY): agent_id,
354                    "executed": true,
355                    "memory_id": memory_id,
356                }))
357                .into_response()
358            }
359            Err(e) => {
360                tracing::error!("execute pending error: {e}");
361                (
362                    StatusCode::INTERNAL_SERVER_ERROR,
363                    Json(json!({"error": super::approvals::APPROVED_BUT_EXECUTION_FAILED})),
364                )
365                    .into_response()
366            }
367        },
368        Ok(ApproveOutcome::Pending { votes, quorum }) => (
369            StatusCode::ACCEPTED,
370            Json(json!({
371                "approved": false,
372                "status": "pending",
373                "id": id,
374                "votes": votes,
375                "quorum": quorum,
376                "reason": crate::errors::msg::CONSENSUS_NOT_REACHED,
377            })),
378        )
379            .into_response(),
380        // #1620 — missing pending id is 404, matching the postgres
381        // branch's StoreError::NotFound mapping (was 403 Rejected).
382        Ok(ApproveOutcome::NotFound) => (
383            StatusCode::NOT_FOUND,
384            Json(json!({
385                "error": crate::errors::msg::pending_action_not_found(&id),
386            })),
387        )
388            .into_response(),
389        Ok(ApproveOutcome::Rejected(reason)) => (
390            StatusCode::FORBIDDEN,
391            Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
392        )
393            .into_response(),
394        Err(e) => crate::handlers::errors::handler_error_500(&e),
395    }
396}
397
398pub async fn reject_pending(
399    State(app): State<AppState>,
400    headers: HeaderMap,
401    Path(id): Path<String>,
402    body_bytes: axum::body::Bytes,
403) -> impl IntoResponse {
404    use crate::models::PendingDecision;
405    // S5-C1 (v0.7.0 fix campaign 2026-05-13): parity with approve_pending.
406    // Legacy reject endpoint MUST verify HMAC for the same reason — an
407    // unsigned reject is just as dangerous (denial-of-service against
408    // governance state, write-amplifies pending row churn).
409    if let Err(status) = super::verify_approval_hmac(&headers, &body_bytes, "POST", &id) {
410        return (
411            status,
412            Json(json!({
413                "error": crate::errors::msg::INVALID_OR_MISSING_SIGNATURE,
414                "hint": "POST /api/v1/pending/{id}/reject requires HMAC signing per K7's pattern. \
415                        Set [hooks.subscription] hmac_secret in config and send \
416                        X-AI-Memory-Signature: sha256=<HMAC-SHA256(SHA256(secret), \"<ts>.<METHOD>.<pending_id>.<body>\")> \
417                        with X-AI-Memory-Timestamp: <unix-epoch-secs>."
418            })),
419        )
420            .into_response();
421    }
422    let state = app.db.clone();
423    if let Err(e) = validate::validate_id(&id) {
424        return (
425            StatusCode::BAD_REQUEST,
426            Json(json!({"error": e.to_string()})),
427        )
428            .into_response();
429    }
430    let header_agent_id = headers
431        .get(crate::HEADER_AGENT_ID)
432        .and_then(|v| v.to_str().ok());
433    let agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
434        Ok(a) => a,
435        Err(e) => {
436            return (
437                StatusCode::BAD_REQUEST,
438                Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
439            )
440                .into_response();
441        }
442    };
443
444    // #913 (security-medium / SOC2, 2026-05-19) — admin governance audit.
445    // Reject is the privileged-gate denial path; mirror approve so both
446    // outcomes appear in the forensic chain BEFORE the storage write.
447    crate::governance::audit::record_decision(
448        &agent_id,
449        "refuse",
450        "pending_reject",
451        "",
452        json!({ (field_names::PENDING_ID): &id }),
453    );
454
455    // v0.7.0 Wave-3 Continuation 2 (Phase 11) — postgres-backed reject.
456    #[cfg(feature = "sal")]
457    if matches!(app.storage_backend, StorageBackend::Postgres) {
458        let ctx = crate::store::CallerContext::for_agent(agent_id.clone());
459        return match app.store.pending_decide(&ctx, &id, false, &agent_id).await {
460            Ok(true) => {
461                if crate::audit::is_enabled() {
462                    crate::audit::emit(crate::audit::EventBuilder::new(
463                        crate::audit::AuditAction::Reject,
464                        crate::audit::actor(
465                            agent_id.clone(),
466                            crate::audit::synthesis_sources::HTTP_HEADER,
467                            None,
468                        ),
469                        crate::audit::target_memory(id.clone(), String::new(), None, None, None),
470                    ));
471                }
472                Json(json!({
473                    "rejected": true,
474                    "id": id,
475                    (field_names::DECIDED_BY): agent_id,
476                    (field_names::STORAGE_BACKEND): "postgres",
477                }))
478                .into_response()
479            }
480            Ok(false) => (
481                StatusCode::NOT_FOUND,
482                Json(json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED})),
483            )
484                .into_response(),
485            Err(e) => store_err_to_response(e),
486        };
487    }
488
489    let lock = state.lock().await;
490    match db::decide_pending_action(&lock.0, &id, false, &agent_id) {
491        Ok(true) => {
492            drop(lock);
493            // v0.6.2 (S34): fan out the reject so peers converge.
494            if let Some(fed) = app.federation.as_ref() {
495                let decision = PendingDecision {
496                    id: id.clone(),
497                    approved: false,
498                    decider: agent_id.clone(),
499                };
500                match crate::federation::broadcast_pending_decision_quorum(fed, &decision).await {
501                    Ok(tracker) => {
502                        if let Err(err) = crate::federation::finalise_quorum(&tracker) {
503                            // #869 — typed 503 envelope via the shared helper.
504                            let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
505                            return super::quorum_not_met_response(&payload);
506                        }
507                    }
508                    Err(err) => {
509                        let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
510                        return super::quorum_not_met_response(&payload);
511                    }
512                }
513            }
514            Json(json!({"rejected": true, "id": id, (field_names::DECIDED_BY): agent_id}))
515                .into_response()
516        }
517        Ok(false) => (
518            StatusCode::NOT_FOUND,
519            Json(json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED})),
520        )
521            .into_response(),
522        Err(e) => crate::handlers::errors::handler_error_500(&e),
523    }
524}