1use crate::models::field_names;
13use axum::{
14 Json,
15 extract::{Path, Query, State},
16 http::{HeaderMap, StatusCode},
17 response::IntoResponse,
18};
19use serde::Deserialize;
20use serde_json::json;
21
22use crate::db;
23use crate::validate;
24
25use super::AppState;
26#[cfg(feature = "sal")]
27use super::StorageBackend;
28use super::fanout_or_503;
29#[cfg(feature = "sal")]
30use super::store_err_to_response;
31
32#[derive(Deserialize)]
33pub struct PendingListQuery {
34 #[serde(default)]
35 pub status: Option<String>,
36 #[serde(default)]
38 pub namespace: Option<String>,
39 #[serde(default = "default_pending_limit")]
40 pub limit: Option<usize>,
41}
42
43#[allow(clippy::unnecessary_wraps)]
44fn default_pending_limit() -> Option<usize> {
45 Some(100)
46}
47
48pub async fn list_pending(
49 State(app): State<AppState>,
50 headers: HeaderMap,
51 Query(p): Query<PendingListQuery>,
52) -> impl IntoResponse {
53 let limit = p.limit.unwrap_or(100).min(1000);
54
55 let header_agent_id = headers
79 .get(crate::HEADER_AGENT_ID)
80 .and_then(|v| v.to_str().ok());
81 let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
82 .unwrap_or_else(|_| crate::identity::sentinels::ANONYMOUS_INVALID.to_string());
83 let is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
84
85 #[cfg(feature = "sal-postgres")]
92 if matches!(app.storage_backend, StorageBackend::Postgres) {
93 return match crate::store::postgres::list_pending_actions_via_store(
94 &app.store,
95 p.status.as_deref(),
96 p.namespace.as_deref(),
97 limit,
98 )
99 .await
100 {
101 Ok(items) => {
102 let filtered: Vec<serde_json::Value> = if is_admin {
109 items
110 } else {
111 items
112 .into_iter()
113 .filter(|row| {
114 row.get(field_names::REQUESTED_BY)
115 .and_then(serde_json::Value::as_str)
116 .is_some_and(|rb| rb == caller)
117 })
118 .collect()
119 };
120 Json(json!({
121 "count": filtered.len(),
122 "pending": filtered,
123 (field_names::STORAGE_BACKEND): "postgres",
124 (field_names::OWNER_SCOPE): if is_admin { "admin" } else { "caller" },
125 }))
126 .into_response()
127 }
128 Err(e) => store_err_to_response(e),
129 };
130 }
131
132 let lock = app.db.lock().await;
133 match db::list_pending_actions(&lock.0, p.status.as_deref(), limit) {
134 Ok(items) => {
135 let filtered: Vec<crate::models::PendingAction> = if is_admin {
141 items
142 } else {
143 items
144 .into_iter()
145 .filter(|row| row.requested_by == caller)
146 .collect()
147 };
148 Json(json!({
149 "count": filtered.len(),
150 "pending": filtered,
151 (field_names::OWNER_SCOPE): if is_admin { "admin" } else { "caller" },
152 }))
153 .into_response()
154 }
155 Err(e) => crate::handlers::errors::handler_error_500(&e),
156 }
157}
158
159#[allow(clippy::too_many_lines)]
160pub async fn approve_pending(
161 State(app): State<AppState>,
162 headers: HeaderMap,
163 Path(id): Path<String>,
164 body_bytes: axum::body::Bytes,
165) -> impl IntoResponse {
166 use crate::db::ApproveOutcome;
167 use crate::models::PendingDecision;
168 if let Err(status) = super::verify_approval_hmac(&headers, &body_bytes, "POST", &id) {
178 return (
179 status,
180 Json(json!({
181 "error": crate::errors::msg::INVALID_OR_MISSING_SIGNATURE,
182 "hint": "POST /api/v1/pending/{id}/approve requires HMAC signing per K7's pattern. \
183 Set [hooks.subscription] hmac_secret in config and send \
184 X-AI-Memory-Signature: sha256=<HMAC-SHA256(SHA256(secret), \"<ts>.<METHOD>.<pending_id>.<body>\")> \
185 with X-AI-Memory-Timestamp: <unix-epoch-secs>."
186 })),
187 )
188 .into_response();
189 }
190 let state = app.db.clone();
191 if let Err(e) = validate::validate_id(&id) {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": e.to_string()})),
195 )
196 .into_response();
197 }
198 let header_agent_id = headers
199 .get(crate::HEADER_AGENT_ID)
200 .and_then(|v| v.to_str().ok());
201 let agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
202 Ok(a) => a,
203 Err(e) => {
204 return (
205 StatusCode::BAD_REQUEST,
206 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
207 )
208 .into_response();
209 }
210 };
211
212 crate::governance::audit::record_decision(
218 &agent_id,
219 "allow",
220 "pending_approve",
221 "",
222 json!({ (field_names::PENDING_ID): &id }),
223 );
224
225 #[cfg(feature = "sal")]
240 if matches!(app.storage_backend, StorageBackend::Postgres) {
241 use crate::store::ApproveOutcome as SalOutcome;
242 let ctx = crate::store::CallerContext::for_agent(agent_id.clone());
243 return match app
244 .store
245 .governance_approve_with_consensus(&ctx, &id, &agent_id)
246 .await
247 {
248 Ok(SalOutcome::Approved) => {
249 if crate::audit::is_enabled() {
250 crate::audit::emit(crate::audit::EventBuilder::new(
251 crate::audit::AuditAction::Approve,
252 crate::audit::actor(
253 agent_id.clone(),
254 crate::audit::synthesis_sources::HTTP_HEADER,
255 None,
256 ),
257 crate::audit::target_memory(id.clone(), String::new(), None, None, None),
258 ));
259 }
260 let executed_id: Option<String> =
266 match app.store.execute_pending_action(&ctx, &id).await {
267 Ok(eid) => eid,
268 Err(e) => {
269 tracing::warn!(
270 "approve_pending: execute_pending_action failed for {id}: {e}"
271 );
272 None
273 }
274 };
275 Json(json!({
276 "approved": true,
277 "id": id,
278 (field_names::DECIDED_BY): agent_id,
279 "executed": executed_id.is_some(),
280 "memory_id": executed_id,
281 (field_names::STORAGE_BACKEND): "postgres",
282 }))
283 .into_response()
284 }
285 Ok(SalOutcome::Pending { votes, quorum }) => (
286 StatusCode::ACCEPTED,
287 Json(json!({
288 "approved": false,
289 "status": "pending",
290 "id": id,
291 "votes": votes,
292 "quorum": quorum,
293 "reason": crate::errors::msg::CONSENSUS_NOT_REACHED,
294 (field_names::STORAGE_BACKEND): "postgres",
295 })),
296 )
297 .into_response(),
298 Ok(SalOutcome::Rejected(reason)) => (
299 StatusCode::FORBIDDEN,
300 Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
301 )
302 .into_response(),
303 Err(e) => store_err_to_response(e),
304 };
305 }
306
307 let lock = state.lock().await;
308 match db::approve_with_approver_type(&lock.0, &id, &agent_id) {
309 Ok(ApproveOutcome::Approved) => match db::execute_pending_action(&lock.0, &id) {
310 Ok(memory_id) => {
311 let produced_mem = memory_id
316 .as_deref()
317 .and_then(|mid| db::get(&lock.0, mid).ok().flatten());
318 drop(lock);
319 if let Some(fed) = app.federation.as_ref() {
320 let decision = PendingDecision {
321 id: id.clone(),
322 approved: true,
323 decider: agent_id.clone(),
324 };
325 match crate::federation::broadcast_pending_decision_quorum(fed, &decision).await
326 {
327 Ok(tracker) => {
328 if let Err(err) = crate::federation::finalise_quorum(&tracker) {
329 let payload =
331 crate::federation::QuorumNotMetPayload::from_err(&err);
332 return super::quorum_not_met_response(&payload);
333 }
334 }
335 Err(err) => {
336 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
337 return super::quorum_not_met_response(&payload);
338 }
339 }
340 if let Some(ref mem) = produced_mem
345 && let Some(resp) = fanout_or_503(&app, mem).await
346 {
347 return resp;
348 }
349 }
350 Json(json!({
351 "approved": true,
352 "id": id,
353 (field_names::DECIDED_BY): agent_id,
354 "executed": true,
355 "memory_id": memory_id,
356 }))
357 .into_response()
358 }
359 Err(e) => {
360 tracing::error!("execute pending error: {e}");
361 (
362 StatusCode::INTERNAL_SERVER_ERROR,
363 Json(json!({"error": super::approvals::APPROVED_BUT_EXECUTION_FAILED})),
364 )
365 .into_response()
366 }
367 },
368 Ok(ApproveOutcome::Pending { votes, quorum }) => (
369 StatusCode::ACCEPTED,
370 Json(json!({
371 "approved": false,
372 "status": "pending",
373 "id": id,
374 "votes": votes,
375 "quorum": quorum,
376 "reason": crate::errors::msg::CONSENSUS_NOT_REACHED,
377 })),
378 )
379 .into_response(),
380 Ok(ApproveOutcome::NotFound) => (
383 StatusCode::NOT_FOUND,
384 Json(json!({
385 "error": crate::errors::msg::pending_action_not_found(&id),
386 })),
387 )
388 .into_response(),
389 Ok(ApproveOutcome::Rejected(reason)) => (
390 StatusCode::FORBIDDEN,
391 Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
392 )
393 .into_response(),
394 Err(e) => crate::handlers::errors::handler_error_500(&e),
395 }
396}
397
398pub async fn reject_pending(
399 State(app): State<AppState>,
400 headers: HeaderMap,
401 Path(id): Path<String>,
402 body_bytes: axum::body::Bytes,
403) -> impl IntoResponse {
404 use crate::models::PendingDecision;
405 if let Err(status) = super::verify_approval_hmac(&headers, &body_bytes, "POST", &id) {
410 return (
411 status,
412 Json(json!({
413 "error": crate::errors::msg::INVALID_OR_MISSING_SIGNATURE,
414 "hint": "POST /api/v1/pending/{id}/reject requires HMAC signing per K7's pattern. \
415 Set [hooks.subscription] hmac_secret in config and send \
416 X-AI-Memory-Signature: sha256=<HMAC-SHA256(SHA256(secret), \"<ts>.<METHOD>.<pending_id>.<body>\")> \
417 with X-AI-Memory-Timestamp: <unix-epoch-secs>."
418 })),
419 )
420 .into_response();
421 }
422 let state = app.db.clone();
423 if let Err(e) = validate::validate_id(&id) {
424 return (
425 StatusCode::BAD_REQUEST,
426 Json(json!({"error": e.to_string()})),
427 )
428 .into_response();
429 }
430 let header_agent_id = headers
431 .get(crate::HEADER_AGENT_ID)
432 .and_then(|v| v.to_str().ok());
433 let agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
434 Ok(a) => a,
435 Err(e) => {
436 return (
437 StatusCode::BAD_REQUEST,
438 Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
439 )
440 .into_response();
441 }
442 };
443
444 crate::governance::audit::record_decision(
448 &agent_id,
449 "refuse",
450 "pending_reject",
451 "",
452 json!({ (field_names::PENDING_ID): &id }),
453 );
454
455 #[cfg(feature = "sal")]
457 if matches!(app.storage_backend, StorageBackend::Postgres) {
458 let ctx = crate::store::CallerContext::for_agent(agent_id.clone());
459 return match app.store.pending_decide(&ctx, &id, false, &agent_id).await {
460 Ok(true) => {
461 if crate::audit::is_enabled() {
462 crate::audit::emit(crate::audit::EventBuilder::new(
463 crate::audit::AuditAction::Reject,
464 crate::audit::actor(
465 agent_id.clone(),
466 crate::audit::synthesis_sources::HTTP_HEADER,
467 None,
468 ),
469 crate::audit::target_memory(id.clone(), String::new(), None, None, None),
470 ));
471 }
472 Json(json!({
473 "rejected": true,
474 "id": id,
475 (field_names::DECIDED_BY): agent_id,
476 (field_names::STORAGE_BACKEND): "postgres",
477 }))
478 .into_response()
479 }
480 Ok(false) => (
481 StatusCode::NOT_FOUND,
482 Json(json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED})),
483 )
484 .into_response(),
485 Err(e) => store_err_to_response(e),
486 };
487 }
488
489 let lock = state.lock().await;
490 match db::decide_pending_action(&lock.0, &id, false, &agent_id) {
491 Ok(true) => {
492 drop(lock);
493 if let Some(fed) = app.federation.as_ref() {
495 let decision = PendingDecision {
496 id: id.clone(),
497 approved: false,
498 decider: agent_id.clone(),
499 };
500 match crate::federation::broadcast_pending_decision_quorum(fed, &decision).await {
501 Ok(tracker) => {
502 if let Err(err) = crate::federation::finalise_quorum(&tracker) {
503 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
505 return super::quorum_not_met_response(&payload);
506 }
507 }
508 Err(err) => {
509 let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
510 return super::quorum_not_met_response(&payload);
511 }
512 }
513 }
514 Json(json!({"rejected": true, "id": id, (field_names::DECIDED_BY): agent_id}))
515 .into_response()
516 }
517 Ok(false) => (
518 StatusCode::NOT_FOUND,
519 Json(json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED})),
520 )
521 .into_response(),
522 Err(e) => crate::handlers::errors::handler_error_500(&e),
523 }
524}