ai_memory/approvals.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 K10 — Approval API (HTTP + SSE + MCP).
5//!
6//! When the governance gate returns `Pending`, an operator must
7//! eventually decide. v0.7.0 surfaces three transports for that
8//! decision:
9//!
10//! 1. **HTTP** — `POST /api/v1/approvals/{pending_id}` with the body
11//! `{"decision":"approve|deny","remember":"once|session|forever"}`.
12//! Gated behind the K7 `[hooks.subscription] hmac_secret` server-wide
13//! HMAC: requests without a valid `X-AI-Memory-Signature: sha256=…`
14//! header are rejected `401`.
15//! 2. **SSE** — `GET /api/v1/approvals/stream` server-sent events.
16//! Subscribers receive `approval_requested` (one per new
17//! `pending_actions` row) and `approval_decided` (one per
18//! approve/deny outcome) frames, fanned out through a process-wide
19//! `tokio::sync::broadcast` channel so multiple watchers can attach
20//! concurrently without contention on the DB lock.
21//! 3. **MCP** — the existing `memory_pending_approve` /
22//! `memory_pending_reject` tools gain an optional `remember`
23//! property. The K10 contract preserves the pre-K10 schema (no new
24//! tools, no removed properties) — so existing callers keep working
25//! unchanged and only opt into `remember` when they want
26//! forever-persisted permission rules.
27//!
28//! When `remember = "forever"`, K10 stamps a synthetic
29//! [`SyntheticPermissionRule`] into the process-wide registry so the
30//! same `(action, namespace, agent_id)` tuple auto-decides next time.
31//! K9 (the unified permission pipeline) will consult the registry from
32//! its rule-evaluation path; until K9 lands on this branch, the
33//! registry exists as an isolated K10-internal store that the K10 test
34//! suite can introspect to pin the contract.
35
36use std::sync::OnceLock;
37use std::sync::RwLock;
38
39use serde::{Deserialize, Serialize};
40use tokio::sync::broadcast;
41
42/// Capacity of the process-wide approval broadcast channel.
43///
44/// Sized to absorb a brief spike of `approval_requested` /
45/// `approval_decided` events without forcing a slow SSE subscriber to
46/// drop frames. SSE consumers see [`broadcast::error::RecvError::Lagged`]
47/// when this is exceeded; the [`approvals_sse`](crate::handlers::approvals_sse)
48/// handler turns that into a `lagged` SSE event so clients can re-sync
49/// via `GET /api/v1/pending`.
50pub const APPROVAL_BROADCAST_CAPACITY: usize = 1024;
51
52/// Decision an operator submits via the K10 transports.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "lowercase")]
55pub enum Decision {
56 Approve,
57 Deny,
58}
59
60/// How long a `remember` choice persists.
61///
62/// - `Once` — just this decision; no rule recorded.
63/// - `Session` — recorded in-memory; cleared on restart.
64/// - `Forever` — recorded in-memory AND queued for persistence to the
65/// live `config.toml` `[[permissions.rules]]` table on the next
66/// config write. (The actual disk write is owned by the K9 rule
67/// loader; K10's contract is to populate the registry that K9
68/// consults.)
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(rename_all = "lowercase")]
71pub enum Remember {
72 Once,
73 Session,
74 Forever,
75}
76
77/// One row in the K10 synthetic-permission-rule registry.
78///
79/// Mirrors the shape K9's `[[permissions.rules]]` table will use
80/// once K9 lands on the same branch — that way K9's loader can
81/// promote these in-memory rows into config-file rows without a
82/// schema translation step.
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct SyntheticPermissionRule {
85 /// `pending_actions.action_type` — `"store"`, `"delete"`, or `"promote"`.
86 pub action_type: String,
87 /// `pending_actions.namespace` — the namespace the original gated
88 /// action targeted.
89 pub namespace: String,
90 /// `pending_actions.requested_by` — the agent the rule auto-decides
91 /// for. `None` means "any agent in this namespace" (rare, but the
92 /// K10 contract reserves the slot for fleet-wide rules).
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub agent_id: Option<String>,
95 /// `"approve"` or `"deny"` — the auto-decision the gate should
96 /// return next time it sees a matching tuple.
97 pub decision: String,
98 /// RFC3339 timestamp the rule was recorded. Surfaced in audit
99 /// trails and (eventually) in K9's rule-summary doctor surface.
100 pub recorded_at: String,
101}
102
103/// Process-wide registry of `remember=forever` rules. Populated by
104/// the K10 transports; read by K9's rule resolver (when K9 lands).
105static SYNTHETIC_RULES: RwLock<Vec<SyntheticPermissionRule>> = RwLock::new(Vec::new());
106
107/// Append a synthetic rule to the registry.
108///
109/// Idempotent on the `(action_type, namespace, agent_id, decision)`
110/// tuple — calling twice with the same tuple is a no-op (the recorded
111/// timestamp from the first insert wins). Lock poisoning is treated as
112/// fatal-but-recoverable: we drop the poisoned guard and proceed
113/// against the inner data, mirroring the K3 `lock_permissions_mode_for_test`
114/// posture.
115pub fn record_synthetic_rule(rule: SyntheticPermissionRule) {
116 let mut guard = SYNTHETIC_RULES
117 .write()
118 .unwrap_or_else(std::sync::PoisonError::into_inner);
119 let already = guard.iter().any(|r| {
120 r.action_type == rule.action_type
121 && r.namespace == rule.namespace
122 && r.agent_id == rule.agent_id
123 && r.decision == rule.decision
124 });
125 if !already {
126 guard.push(rule);
127 }
128}
129
130/// Snapshot the registry. Returns a clone so callers can release the
131/// read lock immediately.
132#[must_use]
133pub fn list_synthetic_rules() -> Vec<SyntheticPermissionRule> {
134 SYNTHETIC_RULES
135 .read()
136 .map(|g| g.clone())
137 .unwrap_or_default()
138}
139
140/// Test-only: clear the registry. Production code never resets the
141/// registry mid-process; tests use this to assert against a clean slate.
142#[doc(hidden)]
143pub fn clear_synthetic_rules_for_test() {
144 if let Ok(mut g) = SYNTHETIC_RULES.write() {
145 g.clear();
146 }
147}
148
149/// One frame on the SSE stream.
150///
151/// Two variants today:
152/// - `ApprovalRequested` — fired when a `pending_actions` row is
153/// inserted (governance gate returned `Pending`).
154/// - `ApprovalDecided` — fired when an approve/reject decision is
155/// finalised (any of the three K10 transports).
156///
157/// Both carry the pending-action id so subscribers can round-trip back
158/// through `GET /api/v1/pending/{id}` for the full row payload.
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(tag = "event", rename_all = "snake_case")]
161pub enum ApprovalEvent {
162 ApprovalRequested {
163 pending_id: String,
164 action_type: String,
165 namespace: String,
166 requested_by: String,
167 requested_at: String,
168 },
169 ApprovalDecided {
170 pending_id: String,
171 decision: String,
172 decided_by: String,
173 remember: String,
174 /// Originating namespace of the pending row this decision
175 /// targets. Required by the K10 SSE filter (review #628
176 /// blocker C2): without it the receive-side filter cannot
177 /// scope the event to the right tenant.
178 #[serde(default)]
179 namespace: String,
180 /// Original requester for the pending row this decision
181 /// targets. Same rationale as `namespace` — the decision
182 /// frame is delivered to the original requester even if a
183 /// different operator pressed the approve button.
184 #[serde(default)]
185 requested_by: String,
186 },
187}
188
189impl ApprovalEvent {
190 /// Tenant agent the event belongs to — `requested_by` for both
191 /// variants. Used by the SSE handler to scope broadcasts to the
192 /// originating agent (review #628 blocker C2).
193 #[must_use]
194 pub fn tenant_agent_id(&self) -> &str {
195 match self {
196 ApprovalEvent::ApprovalRequested { requested_by, .. }
197 | ApprovalEvent::ApprovalDecided { requested_by, .. } => requested_by.as_str(),
198 }
199 }
200
201 /// Namespace the event belongs to. Used by the SSE handler in
202 /// concert with K9's permission rules to decide whether a
203 /// subscriber may see a cross-agent event.
204 #[must_use]
205 pub fn tenant_namespace(&self) -> &str {
206 match self {
207 ApprovalEvent::ApprovalRequested { namespace, .. }
208 | ApprovalEvent::ApprovalDecided { namespace, .. } => namespace.as_str(),
209 }
210 }
211}
212
213/// Process-wide broadcast channel for [`ApprovalEvent`]. Lazily
214/// initialised on first subscribe / publish — the server's HTTP layer
215/// touches it from `handlers::approvals_sse` and the publish side fires
216/// from `handlers::approve_via_approval_api`,
217/// `subscriptions::dispatch_approval_requested`, and the MCP
218/// approve/reject handlers.
219static APPROVAL_BUS: OnceLock<broadcast::Sender<ApprovalEvent>> = OnceLock::new();
220
221fn bus() -> &'static broadcast::Sender<ApprovalEvent> {
222 APPROVAL_BUS.get_or_init(|| {
223 let (tx, _rx) = broadcast::channel(APPROVAL_BROADCAST_CAPACITY);
224 tx
225 })
226}
227
228/// Publish an [`ApprovalEvent`] to all SSE subscribers.
229///
230/// No subscribers → swallowed silently (the `broadcast::Sender::send`
231/// `Err(SendError(_))` branch is the documented "no receivers" outcome
232/// and is never an error in this codebase: SSE is best-effort and we
233/// must not fail the underlying approve/reject path on a missing
234/// subscriber).
235pub fn publish(event: ApprovalEvent) {
236 let _ = bus().send(event);
237}
238
239/// Subscribe to the process-wide approval bus. Returns a fresh
240/// [`broadcast::Receiver`] that will see every event published AFTER
241/// this call (broadcast channels do not replay history — that's what
242/// `GET /api/v1/pending` is for).
243#[must_use]
244pub fn subscribe() -> broadcast::Receiver<ApprovalEvent> {
245 bus().subscribe()
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 /// Serialise the unit tests that mutate the global registry —
253 /// `cargo test` runs tests in parallel by default and the
254 /// `SYNTHETIC_RULES` static is shared across them.
255 fn registry_lock() -> std::sync::MutexGuard<'static, ()> {
256 use std::sync::Mutex;
257 static LOCK: Mutex<()> = Mutex::new(());
258 LOCK.lock()
259 .unwrap_or_else(std::sync::PoisonError::into_inner)
260 }
261
262 #[test]
263 fn record_and_list_round_trip() {
264 let _g = registry_lock();
265 clear_synthetic_rules_for_test();
266 let rule = SyntheticPermissionRule {
267 action_type: "store".into(),
268 namespace: "scratch".into(),
269 agent_id: Some("alice".into()),
270 decision: "approve".into(),
271 recorded_at: "2026-05-05T00:00:00Z".into(),
272 };
273 record_synthetic_rule(rule.clone());
274 let snap = list_synthetic_rules();
275 assert_eq!(snap.len(), 1);
276 assert_eq!(snap[0], rule);
277 }
278
279 #[test]
280 fn record_synthetic_rule_is_idempotent() {
281 let _g = registry_lock();
282 clear_synthetic_rules_for_test();
283 let rule = SyntheticPermissionRule {
284 action_type: "delete".into(),
285 namespace: "ns".into(),
286 agent_id: Some("bob".into()),
287 decision: "deny".into(),
288 recorded_at: "2026-05-05T00:00:00Z".into(),
289 };
290 record_synthetic_rule(rule.clone());
291 // Second call with a later timestamp must not double the row.
292 let mut later = rule.clone();
293 later.recorded_at = "2099-01-01T00:00:00Z".into();
294 record_synthetic_rule(later);
295 let snap = list_synthetic_rules();
296 assert_eq!(snap.len(), 1);
297 // First-writer-wins on the timestamp.
298 assert_eq!(snap[0].recorded_at, "2026-05-05T00:00:00Z");
299 }
300
301 #[tokio::test]
302 async fn publish_and_subscribe_round_trip() {
303 let mut rx = subscribe();
304 let evt = ApprovalEvent::ApprovalRequested {
305 pending_id: "pa-1".into(),
306 action_type: "store".into(),
307 namespace: "scratch".into(),
308 requested_by: "alice".into(),
309 requested_at: "2026-05-05T00:00:00Z".into(),
310 };
311 publish(evt.clone());
312 let received = rx.recv().await.expect("recv");
313 match received {
314 ApprovalEvent::ApprovalRequested { pending_id, .. } => assert_eq!(pending_id, "pa-1"),
315 _ => panic!("wrong variant"),
316 }
317 }
318}