use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::SystemTime;
use dashmap::DashMap;
use sha2::{Digest, Sha256};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use uuid::Uuid;
use aa_core::identity::{AgentId, SessionId};
use aa_core::time::Timestamp;
use aa_core::{AuditEntry, AuditEventType};
const APPROVAL_EVENT_CHANNEL_CAPACITY: usize = 64;
pub const DEFAULT_RESOLVED_HISTORY_CAP: usize = 1000;
pub type ApprovalRequestId = Uuid;
pub type ApprovalFuture = tokio::sync::oneshot::Receiver<ApprovalDecision>;
#[derive(Debug, Clone)]
pub struct ApprovalRequest {
pub request_id: ApprovalRequestId,
pub agent_id: String,
pub action: String,
pub condition_triggered: String,
pub submitted_at: u64,
pub timeout_secs: u64,
pub fallback: aa_core::PolicyResult,
pub team_id: Option<String>,
pub timeout_override_secs: Option<u64>,
pub escalation_role_override: Option<String>,
}
#[derive(Debug, Clone)]
pub struct RoutingHistoryEntry {
pub at: u64,
pub action: String,
pub from_role: Option<String>,
pub to_role: String,
}
#[derive(Debug, Clone, Default)]
struct RoutingMeta {
status: String,
target_role: Option<String>,
routed_at: Option<u64>,
escalate_at: Option<u64>,
history: Vec<RoutingHistoryEntry>,
}
#[derive(Debug, Clone)]
pub struct PendingApprovalRequest {
pub request_id: ApprovalRequestId,
pub agent_id: String,
pub action: String,
pub condition_triggered: String,
pub submitted_at: u64,
pub timeout_secs: u64,
pub team_id: Option<String>,
pub routing_status: Option<String>,
pub target_role: Option<String>,
pub routed_at: Option<u64>,
pub escalate_at: Option<u64>,
pub routing_history: Vec<RoutingHistoryEntry>,
}
#[derive(Debug, Clone)]
pub enum ApprovalLookup {
Pending(PendingApprovalRequest),
Resolved(ResolvedRecord),
}
#[derive(Debug, Clone)]
pub struct ResolvedRecord {
pub request_id: ApprovalRequestId,
pub agent_id: String,
pub action: String,
pub condition_triggered: String,
pub submitted_at: u64,
pub decided_at: u64,
pub status: String,
pub decided_by: String,
pub decision_reason: Option<String>,
pub team_id: Option<String>,
}
#[derive(Debug, Clone)]
pub enum ApprovalDecision {
Approved {
by: String,
reason: Option<String>,
},
Rejected {
by: String,
reason: String,
},
TimedOut {
fallback: aa_core::PolicyResult,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum ApprovalError {
NotFound,
AlreadyDecided,
}
impl std::fmt::Display for ApprovalError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound => write!(f, "approval request not found"),
Self::AlreadyDecided => write!(f, "approval request has already been decided"),
}
}
}
impl std::error::Error for ApprovalError {}
pub trait ApprovalResolvedNotifier: Send + Sync {
fn notify_resolved(&self, request_id: &str, decision: &ApprovalDecision);
}
pub struct ApprovalQueue {
pending: DashMap<ApprovalRequestId, (ApprovalRequest, oneshot::Sender<ApprovalDecision>)>,
routing_meta: DashMap<ApprovalRequestId, RoutingMeta>,
resolved_history: StdMutex<VecDeque<ResolvedRecord>>,
resolved_history_cap: usize,
audit_tx: Option<mpsc::Sender<AuditEntry>>,
audit_seq: AtomicU64,
audit_last_hash: Mutex<[u8; 32]>,
event_tx: broadcast::Sender<ApprovalRequest>,
expiry_event_tx: broadcast::Sender<ApprovalRequest>,
resolved_notifier: OnceLock<Arc<dyn ApprovalResolvedNotifier>>,
}
fn hash_to_16(s: &str) -> [u8; 16] {
let digest = Sha256::digest(s.as_bytes());
let mut out = [0u8; 16];
out.copy_from_slice(&digest[..16]);
out
}
impl ApprovalQueue {
pub fn new() -> Arc<Self> {
let (event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
let (expiry_event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
Arc::new(Self {
pending: DashMap::new(),
routing_meta: DashMap::new(),
resolved_history: StdMutex::new(VecDeque::with_capacity(DEFAULT_RESOLVED_HISTORY_CAP)),
resolved_history_cap: DEFAULT_RESOLVED_HISTORY_CAP,
audit_tx: None,
audit_seq: AtomicU64::new(0),
audit_last_hash: Mutex::new([0u8; 32]),
event_tx,
expiry_event_tx,
resolved_notifier: OnceLock::new(),
})
}
#[cfg(test)]
pub fn with_resolved_history_cap_for_tests(cap: usize) -> Arc<Self> {
let (event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
let (expiry_event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
Arc::new(Self {
pending: DashMap::new(),
routing_meta: DashMap::new(),
resolved_history: StdMutex::new(VecDeque::with_capacity(cap)),
resolved_history_cap: cap,
audit_tx: None,
audit_seq: AtomicU64::new(0),
audit_last_hash: Mutex::new([0u8; 32]),
event_tx,
expiry_event_tx,
resolved_notifier: OnceLock::new(),
})
}
pub fn with_audit(audit_tx: mpsc::Sender<AuditEntry>, initial_hash: [u8; 32]) -> Arc<Self> {
let (event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
let (expiry_event_tx, _) = broadcast::channel(APPROVAL_EVENT_CHANNEL_CAPACITY);
Arc::new(Self {
pending: DashMap::new(),
routing_meta: DashMap::new(),
resolved_history: StdMutex::new(VecDeque::with_capacity(DEFAULT_RESOLVED_HISTORY_CAP)),
resolved_history_cap: DEFAULT_RESOLVED_HISTORY_CAP,
audit_tx: Some(audit_tx),
audit_seq: AtomicU64::new(0),
audit_last_hash: Mutex::new(initial_hash),
event_tx,
expiry_event_tx,
resolved_notifier: OnceLock::new(),
})
}
pub fn set_resolved_notifier(&self, notifier: Arc<dyn ApprovalResolvedNotifier>) -> bool {
self.resolved_notifier.set(notifier).is_ok()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<ApprovalRequest> {
self.event_tx.subscribe()
}
pub fn subscribe_expirations(&self) -> broadcast::Receiver<ApprovalRequest> {
self.expiry_event_tx.subscribe()
}
pub fn list(&self) -> Vec<PendingApprovalRequest> {
self.pending
.iter()
.map(|entry| {
let req = &entry.value().0;
let meta = self.routing_meta.get(&req.request_id);
PendingApprovalRequest {
request_id: req.request_id,
agent_id: req.agent_id.clone(),
action: req.action.clone(),
condition_triggered: req.condition_triggered.clone(),
submitted_at: req.submitted_at,
timeout_secs: req.timeout_secs,
team_id: req.team_id.clone(),
routing_status: meta.as_ref().map(|m| m.status.clone()),
target_role: meta.as_ref().and_then(|m| m.target_role.clone()),
routed_at: meta.as_ref().and_then(|m| m.routed_at),
escalate_at: meta.as_ref().and_then(|m| m.escalate_at),
routing_history: meta.as_ref().map(|m| m.history.clone()).unwrap_or_default(),
}
})
.collect()
}
pub fn get_by_id(&self, id: ApprovalRequestId) -> Option<ApprovalLookup> {
if self.pending.contains_key(&id) {
return self
.list()
.into_iter()
.find(|p| p.request_id == id)
.map(ApprovalLookup::Pending);
}
let guard = self.resolved_history.lock().ok()?;
guard
.iter()
.rev() .find(|r| r.request_id == id)
.cloned()
.map(ApprovalLookup::Resolved)
}
pub fn list_resolved(&self, status_filter: Option<&str>, agent_filter: Option<&str>) -> Vec<ResolvedRecord> {
let guard = match self.resolved_history.lock() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
guard
.iter()
.filter(|r| match status_filter {
Some(s) => r.status == s,
None => true,
})
.filter(|r| match agent_filter {
Some(a) => r.agent_id == a,
None => true,
})
.cloned()
.collect()
}
pub fn record_routing(
&self,
id: ApprovalRequestId,
status: String,
target_role: Option<String>,
routed_at: Option<u64>,
escalate_at: Option<u64>,
history_entry: Option<RoutingHistoryEntry>,
) -> bool {
if !self.pending.contains_key(&id) {
return false;
}
self.routing_meta
.entry(id)
.and_modify(|m| {
m.status = status.clone();
if target_role.is_some() {
m.target_role = target_role.clone();
}
if routed_at.is_some() {
m.routed_at = routed_at;
}
if escalate_at.is_some() {
m.escalate_at = escalate_at;
}
if let Some(ref e) = history_entry {
m.history.push(e.clone());
}
})
.or_insert_with(|| RoutingMeta {
status,
target_role,
routed_at,
escalate_at,
history: history_entry.into_iter().collect(),
});
true
}
pub fn update_routing_status(&self, id: ApprovalRequestId, status: String) -> bool {
let now = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let action = if status.starts_with("escalated") {
"escalated"
} else {
"routed"
};
let entry = RoutingHistoryEntry {
at: now,
action: action.to_string(),
from_role: self.routing_meta.get(&id).and_then(|m| m.target_role.clone()),
to_role: status.clone(),
};
self.record_routing(id, status, None, None, None, Some(entry))
}
pub fn decide(&self, id: ApprovalRequestId, decision: ApprovalDecision) -> Result<(), ApprovalError> {
if self.resolve(id, decision) {
return Ok(());
}
let in_history = self
.resolved_history
.lock()
.map(|g| g.iter().any(|r| r.request_id == id))
.unwrap_or(false);
if in_history {
Err(ApprovalError::AlreadyDecided)
} else {
Err(ApprovalError::NotFound)
}
}
fn resolve(&self, id: ApprovalRequestId, decision: ApprovalDecision) -> bool {
self.routing_meta.remove(&id);
if let Some((_key, (req, tx))) = self.pending.remove(&id) {
let (event_type_str, decided_by) = match &decision {
ApprovalDecision::Approved { by, .. } => ("ApprovalGranted", by.clone()),
ApprovalDecision::Rejected { by, .. } => ("ApprovalDenied", by.clone()),
ApprovalDecision::TimedOut { .. } => ("ApprovalTimedOut", "timeout".to_string()),
};
let (status_str, decision_reason) = match &decision {
ApprovalDecision::Approved { reason, .. } => ("approved", reason.clone()),
ApprovalDecision::Rejected { reason, .. } => ("rejected", Some(reason.clone())),
ApprovalDecision::TimedOut { .. } => ("timed_out", None),
};
let decided_at = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let record = ResolvedRecord {
request_id: req.request_id,
agent_id: req.agent_id.clone(),
action: req.action.clone(),
condition_triggered: req.condition_triggered.clone(),
submitted_at: req.submitted_at,
decided_at,
status: status_str.to_string(),
decided_by: decided_by.clone(),
decision_reason,
team_id: req.team_id.clone(),
};
if let Ok(mut guard) = self.resolved_history.lock() {
if guard.len() >= self.resolved_history_cap {
guard.pop_front();
}
guard.push_back(record);
}
tracing::info!(
event_type = event_type_str,
request_id = %req.request_id,
agent_id = %req.agent_id,
action = %req.action,
decided_by = %decided_by,
"approval decision recorded"
);
if let Some(audit_tx) = &self.audit_tx {
let audit_event_type = match &decision {
ApprovalDecision::Approved { .. } => AuditEventType::ApprovalGranted,
ApprovalDecision::Rejected { .. } => AuditEventType::ApprovalDenied,
ApprovalDecision::TimedOut { .. } => AuditEventType::ApprovalTimedOut,
};
let seq = self.audit_seq.fetch_add(1, Ordering::Relaxed);
let agent_id = AgentId::from_bytes(hash_to_16(&req.agent_id));
let session_id = SessionId::from_bytes(hash_to_16(&req.request_id.to_string()));
let timestamp_ns = Timestamp::from(SystemTime::now()).as_nanos();
let payload = serde_json::json!({
"request_id": req.request_id.to_string(),
"agent_id": &req.agent_id,
"action": &req.action,
"condition_triggered": &req.condition_triggered,
"decided_by": &decided_by,
})
.to_string();
let (entry, hash_updated) = match self.audit_last_hash.try_lock() {
Ok(mut guard) => {
let entry = AuditEntry::new(
seq,
timestamp_ns,
audit_event_type,
agent_id,
session_id,
payload,
*guard,
);
*guard = *entry.entry_hash();
(entry, true)
}
Err(_) => {
let entry = AuditEntry::new(
seq,
timestamp_ns,
audit_event_type,
agent_id,
session_id,
payload,
[0u8; 32],
);
(entry, false)
}
};
if !hash_updated {
tracing::debug!(seq, "audit hash chain lock contended — entry uses zero previous_hash");
}
if let Err(e) = audit_tx.try_send(entry) {
match e {
mpsc::error::TrySendError::Full(_) => {
tracing::warn!(seq, "audit channel full — approval event dropped");
}
mpsc::error::TrySendError::Closed(_) => {
tracing::error!("audit channel closed — AuditWriter task has exited");
}
}
}
}
if matches!(decision, ApprovalDecision::TimedOut { .. }) {
let _ = self.expiry_event_tx.send(req.clone());
}
if let Some(notifier) = self.resolved_notifier.get() {
notifier.notify_resolved(&req.request_id.to_string(), &decision);
}
let _ = tx.send(decision);
true
} else {
false
}
}
pub fn submit(self: &Arc<Self>, request: ApprovalRequest) -> (ApprovalRequestId, ApprovalFuture) {
let id = request.request_id;
let timeout_secs = request.timeout_secs;
let fallback = request.fallback.clone();
tracing::info!(
event_type = "ApprovalRequested",
request_id = %id,
agent_id = %request.agent_id,
action = %request.action,
condition_triggered = %request.condition_triggered,
timeout_secs,
"approval requested"
);
if let Some(audit_tx) = &self.audit_tx {
let seq = self.audit_seq.fetch_add(1, Ordering::Relaxed);
let agent_id = AgentId::from_bytes(hash_to_16(&request.agent_id));
let session_id = SessionId::from_bytes(hash_to_16(&id.to_string()));
let timestamp_ns = Timestamp::from(SystemTime::now()).as_nanos();
let payload = serde_json::json!({
"request_id": id.to_string(),
"agent_id": &request.agent_id,
"action": &request.action,
"condition_triggered": &request.condition_triggered,
"timeout_secs": request.timeout_secs,
})
.to_string();
if let Ok(mut guard) = self.audit_last_hash.try_lock() {
let entry = AuditEntry::new(
seq,
timestamp_ns,
AuditEventType::ApprovalRequested,
agent_id,
session_id,
payload,
*guard,
);
*guard = *entry.entry_hash();
let _ = audit_tx.try_send(entry);
}
}
let (tx, rx) = oneshot::channel();
let _ = self.event_tx.send(request.clone());
self.pending.insert(id, (request, tx));
let queue = Arc::clone(self);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
queue.resolve(id, ApprovalDecision::TimedOut { fallback });
});
(id, rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn approval_request_id_is_uuid() {
let id: ApprovalRequestId = Uuid::new_v4();
assert!(!id.is_nil());
}
#[test]
fn approval_request_fields_are_accessible() {
let req = ApprovalRequest {
request_id: Uuid::new_v4(),
agent_id: "agent-1".to_string(),
action: "read_file /etc/passwd".to_string(),
condition_triggered: "sensitive-file-access".to_string(),
submitted_at: 1_700_000_000,
timeout_secs: 30,
fallback: aa_core::PolicyResult::Deny {
reason: "timed out".to_string(),
},
team_id: None,
timeout_override_secs: None,
escalation_role_override: None,
};
assert_eq!(req.agent_id, "agent-1");
assert_eq!(req.timeout_secs, 30);
assert!(!req.request_id.is_nil());
}
#[test]
fn approval_decision_approved_fields() {
let d = ApprovalDecision::Approved {
by: "alice".to_string(),
reason: Some("looks safe".to_string()),
};
if let ApprovalDecision::Approved { by, reason } = d {
assert_eq!(by, "alice");
assert_eq!(reason, Some("looks safe".to_string()));
} else {
panic!("wrong variant");
}
}
#[test]
fn approval_decision_rejected_fields() {
let d = ApprovalDecision::Rejected {
by: "bob".to_string(),
reason: "policy violation".to_string(),
};
if let ApprovalDecision::Rejected { by, reason } = d {
assert_eq!(by, "bob");
assert_eq!(reason, "policy violation");
} else {
panic!("wrong variant");
}
}
#[test]
fn approval_decision_timed_out_carries_fallback() {
let fallback = aa_core::PolicyResult::Deny {
reason: "expired".to_string(),
};
let d = ApprovalDecision::TimedOut {
fallback: fallback.clone(),
};
if let ApprovalDecision::TimedOut { fallback: f } = d {
assert_eq!(f, fallback);
} else {
panic!("wrong variant");
}
}
#[test]
fn approval_error_not_found_display() {
let e = ApprovalError::NotFound;
assert_eq!(e.to_string(), "approval request not found");
}
#[test]
fn approval_error_not_found_eq() {
assert_eq!(ApprovalError::NotFound, ApprovalError::NotFound);
}
#[test]
fn pending_approval_request_fields_match_source() {
let id = Uuid::new_v4();
let pending = PendingApprovalRequest {
request_id: id,
agent_id: "agent-1".to_string(),
action: "read_file /etc/passwd".to_string(),
condition_triggered: "sensitive-file-access".to_string(),
submitted_at: 1_700_000_000,
timeout_secs: 60,
team_id: None,
routing_status: None,
target_role: None,
routed_at: None,
escalate_at: None,
routing_history: vec![],
};
assert_eq!(pending.request_id, id);
assert_eq!(pending.agent_id, "agent-1");
assert_eq!(pending.timeout_secs, 60);
}
#[test]
fn new_queue_list_is_empty() {
let q = ApprovalQueue::new();
assert!(q.list().is_empty());
}
#[test]
fn decide_unknown_id_returns_not_found() {
let q = ApprovalQueue::new();
let result = q.decide(
Uuid::new_v4(),
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
);
assert_eq!(result, Err(ApprovalError::NotFound));
}
fn make_request(timeout_secs: u64) -> ApprovalRequest {
ApprovalRequest {
request_id: Uuid::new_v4(),
agent_id: "agent-1".to_string(),
action: "read_file /etc/passwd".to_string(),
condition_triggered: "sensitive-file-access".to_string(),
submitted_at: 1_700_000_000,
timeout_secs,
fallback: aa_core::PolicyResult::Deny {
reason: "timed out".to_string(),
},
team_id: None,
timeout_override_secs: None,
escalation_role_override: None,
}
}
#[tokio::test]
async fn submit_then_approve_resolves_future() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
let decision = fut.await.expect("future should resolve");
assert!(matches!(decision, ApprovalDecision::Approved { .. }));
}
#[tokio::test]
async fn submit_then_reject_resolves_future() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Rejected {
by: "bob".to_string(),
reason: "not allowed".to_string(),
},
)
.expect("decide should succeed");
let decision = fut.await.expect("future should resolve");
assert!(matches!(decision, ApprovalDecision::Rejected { .. }));
}
#[tokio::test]
async fn decide_after_resolve_returns_already_decided() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("first decide should succeed");
let result = q.decide(
id,
ApprovalDecision::Rejected {
by: "eve".to_string(),
reason: "too late".to_string(),
},
);
assert_eq!(result, Err(ApprovalError::AlreadyDecided));
}
#[tokio::test(start_paused = true)]
async fn submit_times_out_after_timeout_secs() {
let q = ApprovalQueue::new();
let req = make_request(5);
let (_rid, fut) = q.submit(req);
tokio::time::advance(std::time::Duration::from_secs(6)).await;
let decision = fut.await.expect("future should resolve after timeout");
assert!(matches!(decision, ApprovalDecision::TimedOut { .. }));
}
#[tokio::test(start_paused = true)]
async fn expiry_broadcast_fires_on_timeout() {
let q = ApprovalQueue::new();
let mut expiry_rx = q.subscribe_expirations();
let req = make_request(5);
let id = req.request_id;
let (_rid, fut) = q.submit(req);
tokio::time::advance(std::time::Duration::from_secs(6)).await;
let _ = fut.await;
let broadcasted = expiry_rx.try_recv().expect("expiry broadcast should fire on TimedOut");
assert_eq!(broadcasted.request_id, id);
}
#[tokio::test]
async fn expiry_broadcast_does_not_fire_on_manual_decision() {
let q = ApprovalQueue::new();
let mut expiry_rx = q.subscribe_expirations();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
assert!(
matches!(
expiry_rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
),
"manual approval must not emit on the expiry channel"
);
}
#[tokio::test]
async fn list_reflects_pending_and_clears_after_decide() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
let pending = q.list();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].request_id, id);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
assert!(q.list().is_empty());
}
#[tokio::test]
async fn subscribe_events_receives_submitted_request() {
let q = ApprovalQueue::new();
let mut rx = q.subscribe_events();
let req = make_request(60);
let expected_id = req.request_id;
let (_rid, _fut) = q.submit(req);
let received = rx.recv().await.expect("should receive approval event");
assert_eq!(received.request_id, expected_id);
assert_eq!(received.agent_id, "agent-1");
}
#[tokio::test]
async fn submit_100_concurrent_requests_all_resolve() {
use std::collections::HashMap;
let q = ApprovalQueue::new();
let n = 100_usize;
let mut futures_map = HashMap::new();
for _ in 0..n {
let req = make_request(60);
let id = req.request_id;
let (_rid, fut) = q.submit(req);
futures_map.insert(id, fut);
}
assert_eq!(q.list().len(), n);
let ids: Vec<_> = futures_map.keys().copied().collect();
for id in &ids {
q.decide(
*id,
ApprovalDecision::Approved {
by: "operator".to_string(),
reason: None,
},
)
.expect("decide should succeed for each request");
}
for (_id, fut) in futures_map {
let decision = fut.await.expect("future should resolve");
assert!(matches!(decision, ApprovalDecision::Approved { .. }));
}
assert!(q.list().is_empty());
}
#[tokio::test]
async fn submit_with_audit_emits_approval_requested_entry() {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(64);
let q = ApprovalQueue::with_audit(tx, [0u8; 32]);
let req = make_request(60);
let _id = req.request_id;
let (_rid, _fut) = q.submit(req);
let entry = rx.try_recv().expect("should receive ApprovalRequested entry");
assert_eq!(entry.event_type(), AuditEventType::ApprovalRequested);
assert_eq!(entry.seq(), 0);
}
#[tokio::test]
async fn decide_approved_emits_approval_granted_entry() {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(64);
let q = ApprovalQueue::with_audit(tx, [0u8; 32]);
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
let _ = rx.try_recv().expect("submit entry");
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
let entry = rx.try_recv().expect("should receive ApprovalGranted entry");
assert_eq!(entry.event_type(), AuditEventType::ApprovalGranted);
assert_eq!(entry.seq(), 1);
}
#[tokio::test]
async fn decide_rejected_emits_approval_denied_entry() {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(64);
let q = ApprovalQueue::with_audit(tx, [0u8; 32]);
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
let _ = rx.try_recv().expect("submit entry");
q.decide(
id,
ApprovalDecision::Rejected {
by: "bob".to_string(),
reason: "not allowed".to_string(),
},
)
.expect("decide should succeed");
let entry = rx.try_recv().expect("should receive ApprovalDenied entry");
assert_eq!(entry.event_type(), AuditEventType::ApprovalDenied);
}
#[tokio::test(start_paused = true)]
async fn timeout_emits_approval_timed_out_entry() {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(64);
let q = ApprovalQueue::with_audit(tx, [0u8; 32]);
let req = make_request(5);
let (_rid, _fut) = q.submit(req);
let _ = rx.try_recv().expect("submit entry");
tokio::time::advance(std::time::Duration::from_secs(6)).await;
tokio::task::yield_now().await;
let entry = rx.recv().await.expect("should receive ApprovalTimedOut entry");
assert_eq!(entry.event_type(), AuditEventType::ApprovalTimedOut);
}
#[tokio::test]
async fn audit_entries_form_hash_chain() {
let (tx, mut rx) = mpsc::channel::<AuditEntry>(64);
let q = ApprovalQueue::with_audit(tx, [0u8; 32]);
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
let entry0 = rx.try_recv().expect("first entry");
let entry1 = rx.try_recv().expect("second entry");
assert_eq!(*entry0.previous_hash(), [0u8; 32]);
assert_eq!(entry1.previous_hash(), entry0.entry_hash());
assert_ne!(entry0.entry_hash(), entry1.entry_hash());
}
#[tokio::test]
async fn no_audit_without_audit_channel() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
let decision = fut.await.expect("future should resolve");
assert!(matches!(decision, ApprovalDecision::Approved { .. }));
}
fn snapshot_resolved(q: &ApprovalQueue) -> Vec<ResolvedRecord> {
q.resolved_history.lock().unwrap().iter().cloned().collect()
}
#[tokio::test]
async fn decide_approved_pushes_resolved_record_into_history() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: Some("looks good".to_string()),
},
)
.expect("decide should succeed");
let history = snapshot_resolved(&q);
assert_eq!(history.len(), 1);
assert_eq!(history[0].request_id, id);
assert_eq!(history[0].status, "approved");
assert_eq!(history[0].decided_by, "alice");
assert_eq!(history[0].decision_reason.as_deref(), Some("looks good"));
}
#[tokio::test]
async fn decide_rejected_pushes_resolved_record_into_history() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Rejected {
by: "bob".to_string(),
reason: "policy violation".to_string(),
},
)
.expect("decide should succeed");
let history = snapshot_resolved(&q);
assert_eq!(history.len(), 1);
assert_eq!(history[0].status, "rejected");
assert_eq!(history[0].decided_by, "bob");
assert_eq!(history[0].decision_reason.as_deref(), Some("policy violation"));
}
#[tokio::test(start_paused = true)]
async fn timed_out_pushes_resolved_record_with_status_timed_out() {
let q = ApprovalQueue::new();
let mut expiry_rx = q.subscribe_expirations();
let req = make_request(5);
let _ = q.submit(req);
tokio::time::advance(std::time::Duration::from_secs(6)).await;
let _ = expiry_rx.recv().await.expect("expiry should fire");
let history = snapshot_resolved(&q);
assert_eq!(history.len(), 1);
assert_eq!(history[0].status, "timed_out");
assert_eq!(history[0].decided_by, "timeout");
assert!(history[0].decision_reason.is_none());
}
#[tokio::test]
async fn get_by_id_returns_pending_for_unresolved_request() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
let lookup = q.get_by_id(id).expect("pending request should be found");
match lookup {
ApprovalLookup::Pending(p) => assert_eq!(p.request_id, id),
ApprovalLookup::Resolved(_) => panic!("expected Pending variant"),
}
}
#[tokio::test]
async fn get_by_id_returns_resolved_after_decide() {
let q = ApprovalQueue::new();
let req = make_request(60);
let id = req.request_id;
let (_rid, _fut) = q.submit(req);
q.decide(
id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.expect("decide should succeed");
let lookup = q.get_by_id(id).expect("resolved request should be found");
match lookup {
ApprovalLookup::Resolved(r) => {
assert_eq!(r.request_id, id);
assert_eq!(r.status, "approved");
}
ApprovalLookup::Pending(_) => panic!("expected Resolved variant"),
}
}
#[tokio::test]
async fn get_by_id_returns_none_for_unknown_id() {
let q = ApprovalQueue::new();
assert!(q.get_by_id(Uuid::new_v4()).is_none());
}
#[tokio::test]
async fn list_resolved_filters_by_status() {
let q = ApprovalQueue::new();
let approved = make_request(60);
let approved_id = approved.request_id;
let rejected = make_request(60);
let rejected_id = rejected.request_id;
let (_, _) = q.submit(approved);
let (_, _) = q.submit(rejected);
q.decide(
approved_id,
ApprovalDecision::Approved {
by: "alice".to_string(),
reason: None,
},
)
.unwrap();
q.decide(
rejected_id,
ApprovalDecision::Rejected {
by: "bob".to_string(),
reason: "no".to_string(),
},
)
.unwrap();
let approved_only = q.list_resolved(Some("approved"), None);
assert_eq!(approved_only.len(), 1);
assert_eq!(approved_only[0].request_id, approved_id);
let rejected_only = q.list_resolved(Some("rejected"), None);
assert_eq!(rejected_only.len(), 1);
assert_eq!(rejected_only[0].request_id, rejected_id);
let all = q.list_resolved(None, None);
assert_eq!(all.len(), 2);
}
#[tokio::test]
async fn list_resolved_filters_by_agent() {
let q = ApprovalQueue::new();
let mut alice_req = make_request(60);
alice_req.agent_id = "alice-agent".to_string();
let alice_id = alice_req.request_id;
let mut bob_req = make_request(60);
bob_req.agent_id = "bob-agent".to_string();
let bob_id = bob_req.request_id;
let (_, _) = q.submit(alice_req);
let (_, _) = q.submit(bob_req);
for id in [alice_id, bob_id] {
q.decide(
id,
ApprovalDecision::Approved {
by: "tester".to_string(),
reason: None,
},
)
.unwrap();
}
let alice_only = q.list_resolved(None, Some("alice-agent"));
assert_eq!(alice_only.len(), 1);
assert_eq!(alice_only[0].agent_id, "alice-agent");
}
#[tokio::test]
async fn resolved_history_caps_oldest_first() {
let cap = 3;
let q = ApprovalQueue::with_resolved_history_cap_for_tests(cap);
let mut ids = Vec::new();
for _ in 0..(cap + 2) {
let req = make_request(60);
ids.push(req.request_id);
let (_rid, _fut) = q.submit(req);
}
for id in &ids {
q.decide(
*id,
ApprovalDecision::Approved {
by: "tester".to_string(),
reason: None,
},
)
.expect("decide should succeed");
}
let history = snapshot_resolved(&q);
assert_eq!(history.len(), cap, "history should not exceed cap");
let kept_ids: Vec<_> = history.iter().map(|r| r.request_id).collect();
assert_eq!(kept_ids, ids[ids.len() - cap..].to_vec());
}
}