use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::sync::oneshot;
use aa_proto::assembly::gateway::v1::Decision;
use crate::invalidation_client::InvalidationSink;
#[derive(Default)]
pub struct ApprovalSink {
waiters: Arc<DashMap<String, oneshot::Sender<Decision>>>,
}
impl ApprovalSink {
pub fn new() -> Self {
Self::default()
}
pub fn waiter_count(&self) -> usize {
self.waiters.len()
}
pub fn wait_for_approval(
&self,
request_id: impl Into<String>,
deadline: Duration,
) -> impl Future<Output = Decision> {
let request_id = request_id.into();
let (tx, rx) = oneshot::channel();
self.waiters.insert(request_id.clone(), tx);
let waiters = Arc::clone(&self.waiters);
async move {
match tokio::time::timeout(deadline, rx).await {
Ok(Ok(decision)) => decision,
Ok(Err(_)) => Decision::Pending,
Err(_) => {
waiters.remove(&request_id);
Decision::Pending
}
}
}
}
}
impl InvalidationSink for ApprovalSink {
fn on_policy_invalidated(&self, _agent_id: &str) {}
fn on_approval_resolved(&self, request_id: &str, decision: Decision) {
if let Some((_id, tx)) = self.waiters.remove(request_id) {
let _ = tx.send(decision);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn wait_resolves_when_event_arrives() {
let sink = ApprovalSink::new();
let fut = sink.wait_for_approval("r1", Duration::from_secs(5));
assert_eq!(sink.waiter_count(), 1);
sink.on_approval_resolved("r1", Decision::Approved);
assert_eq!(fut.await, Decision::Approved);
assert_eq!(sink.waiter_count(), 0, "delivered waiter is removed");
}
#[tokio::test]
async fn wait_resolves_with_denied_verdict() {
let sink = ApprovalSink::new();
let fut = sink.wait_for_approval("r1", Duration::from_secs(5));
sink.on_approval_resolved("r1", Decision::Denied);
assert_eq!(fut.await, Decision::Denied);
}
#[tokio::test(start_paused = true)]
async fn wait_times_out_to_pending_not_denied() {
let sink = ApprovalSink::new();
let decision = sink.wait_for_approval("r1", Duration::from_millis(50)).await;
assert_eq!(decision, Decision::Pending, "timeout must not auto-deny");
assert_eq!(sink.waiter_count(), 0, "timed-out waiter is dropped");
}
#[tokio::test]
async fn event_without_waiter_is_dropped() {
let sink = ApprovalSink::new();
sink.on_approval_resolved("ghost", Decision::Approved);
assert_eq!(sink.waiter_count(), 0);
}
}