use crate::models::field_names;
use axum::{
Json,
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use chrono::Utc;
use serde::Deserialize;
use serde_json::json;
use super::transport::{AppState, constant_time_eq};
#[cfg(feature = "sal")]
use super::{StorageBackend, store_err_to_response};
use crate::db;
use crate::validate;
pub(crate) const APPROVED_BUT_EXECUTION_FAILED: &str = "approved but execution failed";
#[derive(Debug, Deserialize)]
pub struct ApprovalRequestBody {
pub decision: crate::approvals::Decision,
#[serde(default = "default_remember")]
pub remember: crate::approvals::Remember,
}
fn default_remember() -> crate::approvals::Remember {
crate::approvals::Remember::Once
}
pub(crate) const APPROVAL_HMAC_MAX_AGE_SECS: i64 = 300;
pub(crate) const APPROVAL_HMAC_MAX_SKEW_SECS: i64 = 60;
pub(crate) fn verify_approval_hmac(
headers: &HeaderMap,
body: &[u8],
method: &str,
pending_id: &str,
) -> Result<(), StatusCode> {
let secret = match crate::config::active_hooks_hmac_secret() {
Some(s) => s,
None => {
tracing::warn!("K10 approval rejected: no [hooks.subscription].hmac_secret configured");
return Err(StatusCode::UNAUTHORIZED);
}
};
let sig_header = headers
.get(crate::HEADER_AI_MEMORY_SIGNATURE)
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;
let sig_hex = sig_header
.strip_prefix("sha256=")
.ok_or(StatusCode::UNAUTHORIZED)?;
let timestamp = headers
.get(crate::HEADER_AI_MEMORY_TIMESTAMP)
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;
let ts_secs: i64 = timestamp.parse().map_err(|_| {
tracing::warn!(
"K10 approval rejected: X-AI-Memory-Timestamp not a Unix epoch integer: {timestamp:?}"
);
StatusCode::UNAUTHORIZED
})?;
let now_secs = Utc::now().timestamp();
let delta = now_secs - ts_secs;
if delta > APPROVAL_HMAC_MAX_AGE_SECS {
tracing::warn!(
"K10 approval rejected: stale signature (age {delta}s > {APPROVAL_HMAC_MAX_AGE_SECS}s window)"
);
return Err(StatusCode::UNAUTHORIZED);
}
if delta < -APPROVAL_HMAC_MAX_SKEW_SECS {
tracing::warn!(
"K10 approval rejected: future-dated signature (skew {}s > {APPROVAL_HMAC_MAX_SKEW_SECS}s tolerance)",
-delta
);
return Err(StatusCode::UNAUTHORIZED);
}
let body_str = std::str::from_utf8(body).map_err(|_| StatusCode::UNAUTHORIZED)?;
let canonical = format!("{timestamp}.{method}.{pending_id}.{body_str}");
let key_hash = crate::subscriptions::sha256_hex(&secret);
let expected = crate::subscriptions::hmac_sha256_hex(&key_hash, &canonical);
if !constant_time_eq(expected.as_bytes(), sig_hex.as_bytes()) {
return Err(StatusCode::UNAUTHORIZED);
}
if !record_hmac_nonce(sig_hex, ts_secs) {
tracing::warn!(
"K10 approval rejected: signature replay (sig={}…)",
&sig_hex[..16.min(sig_hex.len())]
);
return Err(StatusCode::UNAUTHORIZED);
}
Ok(())
}
fn record_hmac_nonce(sig_hex: &str, ts_secs: i64) -> bool {
use std::collections::HashMap;
use std::sync::OnceLock;
static CACHE: OnceLock<std::sync::Mutex<HashMap<String, i64>>> = OnceLock::new();
let cache = CACHE.get_or_init(|| std::sync::Mutex::new(HashMap::new()));
let mut guard = cache.lock().unwrap_or_else(|p| p.into_inner());
let now = Utc::now().timestamp();
let ttl = APPROVAL_HMAC_MAX_AGE_SECS.saturating_mul(2);
guard.retain(|_, t| now.saturating_sub(*t) < ttl);
if guard.contains_key(sig_hex) {
return false;
}
guard.insert(sig_hex.to_string(), ts_secs);
true
}
#[allow(clippy::too_many_lines)]
pub async fn approval_decide(
State(app): State<AppState>,
headers: HeaderMap,
Path(id): Path<String>,
body_bytes: axum::body::Bytes,
) -> impl IntoResponse {
if let Err(status) = verify_approval_hmac(&headers, &body_bytes, "POST", &id) {
return (
status,
Json(json!({"error": crate::errors::msg::INVALID_OR_MISSING_SIGNATURE})),
)
.into_response();
}
let body: ApprovalRequestBody = match serde_json::from_slice(&body_bytes) {
Ok(b) => b,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid body: {e}")})),
)
.into_response();
}
};
if let Err(e) = validate::validate_id(&id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
Ok(a) => a,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
};
let decision_kind = match body.decision {
crate::approvals::Decision::Approve => "approval_decide_approve",
crate::approvals::Decision::Deny => "approval_decide_deny",
};
let decision_outcome = match body.decision {
crate::approvals::Decision::Approve => "allow",
crate::approvals::Decision::Deny => "refuse",
};
crate::governance::audit::record_decision(
&agent_id,
decision_outcome,
decision_kind,
"",
json!({ (field_names::PENDING_ID): &id }),
);
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return approval_decide_postgres(&app, &id, &agent_id, &body).await;
}
let lock = app.db.lock().await;
let pending_snapshot = db::get_pending_action(&lock.0, &id).ok().flatten();
let outcome = match body.decision {
crate::approvals::Decision::Approve => {
match db::approve_with_approver_type(&lock.0, &id, &agent_id) {
Ok(crate::db::ApproveOutcome::Approved) => {
let executed = db::execute_pending_action(&lock.0, &id);
match executed {
Ok(memory_id) => json!({
"approved": true,
"id": id,
(field_names::DECIDED_BY): agent_id,
"executed": true,
"memory_id": memory_id,
"remember": format!("{:?}", body.remember).to_lowercase(),
}),
Err(e) => {
tracing::error!("execute pending error: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": APPROVED_BUT_EXECUTION_FAILED})),
)
.into_response();
}
}
}
Ok(crate::db::ApproveOutcome::Pending { votes, quorum }) => json!({
"approved": false,
"status": "pending",
"id": id,
"votes": votes,
"quorum": quorum,
"remember": format!("{:?}", body.remember).to_lowercase(),
}),
Ok(crate::db::ApproveOutcome::NotFound) => {
return (
axum::http::StatusCode::NOT_FOUND,
Json(json!({
"error": crate::errors::msg::pending_action_not_found(&id),
})),
)
.into_response();
}
Ok(crate::db::ApproveOutcome::Rejected(reason)) => {
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
)
.into_response();
}
Err(e) => {
return crate::handlers::errors::handler_error_500(&e);
}
}
}
crate::approvals::Decision::Deny => {
match db::decide_pending_action(&lock.0, &id, false, &agent_id) {
Ok(true) => json!({
"rejected": true,
"id": id,
(field_names::DECIDED_BY): agent_id,
"remember": format!("{:?}", body.remember).to_lowercase(),
}),
Ok(false) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED})),
)
.into_response();
}
Err(e) => {
return crate::handlers::errors::handler_error_500(&e);
}
}
}
};
drop(lock);
publish_decision_event(
&id,
&agent_id,
body.decision,
body.remember,
pending_snapshot,
);
Json(outcome).into_response()
}
#[cfg(feature = "sal")]
async fn approval_decide_postgres(
app: &AppState,
id: &str,
agent_id: &str,
body: &ApprovalRequestBody,
) -> axum::response::Response {
let ctx = crate::store::CallerContext::for_agent(agent_id.to_string());
let pending_snapshot = app.store.get_pending(&ctx, id).await.ok().flatten();
let remember_label = format!("{:?}", body.remember).to_lowercase();
let outcome = match body.decision {
crate::approvals::Decision::Approve => {
match app
.store
.governance_approve_with_consensus(&ctx, id, agent_id)
.await
{
Ok(crate::store::ApproveOutcome::Approved) => {
match app.store.execute_pending_action(&ctx, id).await {
Ok(memory_id) => json!({
"approved": true,
"id": id,
(field_names::DECIDED_BY): agent_id,
"executed": true,
"memory_id": memory_id,
"remember": remember_label,
}),
Err(e) => {
tracing::error!("approval_decide(postgres): execute pending: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": APPROVED_BUT_EXECUTION_FAILED})),
)
.into_response();
}
}
}
Ok(crate::store::ApproveOutcome::Pending { votes, quorum }) => json!({
"approved": false,
"status": "pending",
"id": id,
"votes": votes,
"quorum": quorum,
"remember": remember_label,
}),
Ok(crate::store::ApproveOutcome::Rejected(reason)) => {
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::approve_rejected(reason)})),
)
.into_response();
}
Err(e) => return store_err_to_response(e),
}
}
crate::approvals::Decision::Deny => {
match app.store.pending_decide(&ctx, id, false, agent_id).await {
Ok(true) => json!({
"rejected": true,
"id": id,
(field_names::DECIDED_BY): agent_id,
"remember": remember_label,
}),
Ok(false) => {
return (
StatusCode::NOT_FOUND,
Json(
json!({"error": crate::errors::msg::PENDING_ACTION_NOT_FOUND_OR_DECIDED}),
),
)
.into_response();
}
Err(e) => return store_err_to_response(e),
}
}
};
publish_decision_event(id, agent_id, body.decision, body.remember, pending_snapshot);
Json(outcome).into_response()
}
fn publish_decision_event(
id: &str,
agent_id: &str,
decision: crate::approvals::Decision,
remember: crate::approvals::Remember,
pending_snapshot: Option<crate::models::PendingAction>,
) {
let decision_label = match decision {
crate::approvals::Decision::Approve => "approve",
crate::approvals::Decision::Deny => "deny",
};
let remember_label = match remember {
crate::approvals::Remember::Once => "once",
crate::approvals::Remember::Session => "session",
crate::approvals::Remember::Forever => "forever",
};
let evt_namespace = pending_snapshot
.as_ref()
.map(|p| p.namespace.clone())
.unwrap_or_default();
let evt_requested_by = pending_snapshot
.as_ref()
.map(|p| p.requested_by.clone())
.unwrap_or_default();
crate::approvals::publish(crate::approvals::ApprovalEvent::ApprovalDecided {
pending_id: id.to_string(),
decision: decision_label.to_string(),
decided_by: agent_id.to_string(),
remember: remember_label.to_string(),
namespace: evt_namespace,
requested_by: evt_requested_by,
});
if matches!(
remember,
crate::approvals::Remember::Forever | crate::approvals::Remember::Session
) && let Some(snap) = pending_snapshot
{
crate::approvals::record_synthetic_rule(crate::approvals::SyntheticPermissionRule {
action_type: snap.action_type,
namespace: snap.namespace,
agent_id: Some(snap.requested_by),
decision: decision_label.to_string(),
recorded_at: Utc::now().to_rfc3339(),
});
}
}
#[must_use]
pub fn sse_event_visible_to(
subscriber_agent: &str,
event: &crate::approvals::ApprovalEvent,
) -> bool {
if subscriber_agent.is_empty() {
return false;
}
if subscriber_agent.starts_with("host:") {
return false;
}
let event_agent = event.tenant_agent_id();
if !event_agent.is_empty() && event_agent == subscriber_agent {
return true;
}
let event_namespace = event.tenant_namespace();
if event_namespace.is_empty() {
return false;
}
let rules = crate::permissions::active_permission_rules();
rules.iter().any(|r| {
matches!(r.decision, crate::permissions::RuleDecision::Allow)
&& crate::permissions::glob_matches(&r.agent_pattern, subscriber_agent)
&& crate::permissions::glob_matches(&r.namespace_pattern, event_namespace)
})
}
pub async fn approvals_sse(
State(_app): State<AppState>,
headers: HeaderMap,
) -> axum::response::Sse<
impl futures_core::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>,
> {
use axum::response::sse::{Event, KeepAlive, Sse};
use futures_core::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration as StdDuration;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
let subscriber_agent = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok())
.map(str::trim)
.filter(|s| !s.starts_with("host:"))
.unwrap_or("")
.to_string();
struct ApprovalSseStream {
inner: BroadcastStream<crate::approvals::ApprovalEvent>,
subscriber_agent: String,
}
impl Stream for ApprovalSseStream {
type Item = Result<Event, std::convert::Infallible>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Ok(evt))) => {
if !sse_event_visible_to(&self.subscriber_agent, &evt) {
continue;
}
let (event_name, json_value) = match &evt {
crate::approvals::ApprovalEvent::ApprovalRequested { .. } => (
crate::subscriptions::webhook_events::APPROVAL_REQUESTED,
serde_json::to_value(&evt),
),
crate::approvals::ApprovalEvent::ApprovalDecided { .. } => {
("approval_decided", serde_json::to_value(&evt))
}
};
let data = match json_value {
Ok(v) => serde_json::to_string(&v).unwrap_or_else(|_| "{}".into()),
Err(e) => {
tracing::error!(
"approvals_sse: serialise ApprovalEvent failed: {e}"
);
return Poll::Ready(Some(Ok(Event::default()
.event("error")
.data(r#"{"error":"event_serialise_failed"}"#))));
}
};
return Poll::Ready(Some(Ok(Event::default()
.event(event_name)
.data(data))));
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_n)))) => {
let body = serde_json::json!({"lagged": true}).to_string();
return Poll::Ready(Some(Ok(Event::default().event("lagged").data(body))));
}
}
}
}
}
let rx = crate::approvals::subscribe();
let stream = ApprovalSseStream {
inner: BroadcastStream::new(rx),
subscriber_agent,
};
Sse::new(stream).keep_alive(KeepAlive::new().interval(StdDuration::from_secs(15)))
}