use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tandem_types::{ApprovalListFilter, ApprovalRequest};
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
pub const DEDUP_CAP: usize = 8192;
#[async_trait]
pub trait ApprovalNotifier: Send + Sync {
fn name(&self) -> &str;
async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NotifierError {
Transient(String),
Permanent(String),
}
impl core::fmt::Display for NotifierError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Transient(reason) => write!(f, "transient: {reason}"),
Self::Permanent(reason) => write!(f, "permanent: {reason}"),
}
}
}
impl std::error::Error for NotifierError {}
#[async_trait]
pub trait PendingApprovalsSource: Send + Sync {
async fn list_pending(&self, filter: &ApprovalListFilter) -> Vec<ApprovalRequest>;
}
pub struct DedupRing {
seen: HashSet<String>,
order: std::collections::VecDeque<String>,
cap: usize,
}
impl DedupRing {
pub fn with_cap(cap: usize) -> Self {
Self {
seen: HashSet::with_capacity(cap.min(1024)),
order: std::collections::VecDeque::with_capacity(cap.min(1024)),
cap,
}
}
pub fn record_new(&mut self, key: &str) -> bool {
if self.seen.contains(key) {
return false;
}
if self.order.len() >= self.cap {
if let Some(oldest) = self.order.pop_front() {
self.seen.remove(&oldest);
}
}
self.seen.insert(key.to_string());
self.order.push_back(key.to_string());
true
}
pub fn prune_to(&mut self, current_request_ids: &HashSet<&str>) {
let to_remove: Vec<String> = self
.order
.iter()
.filter(|id| !current_request_ids.contains(id.as_str()))
.cloned()
.collect();
for id in to_remove {
self.seen.remove(&id);
self.order.retain(|existing| existing != &id);
}
}
pub fn len(&self) -> usize {
self.seen.len()
}
pub fn is_empty(&self) -> bool {
self.seen.is_empty()
}
}
pub async fn run_one_sweep(
source: &dyn PendingApprovalsSource,
notifiers: &[Arc<dyn ApprovalNotifier>],
filter: &ApprovalListFilter,
dedup: &mut DedupRing,
) -> SweepResult {
let pending = source.list_pending(filter).await;
let current_ids: HashSet<&str> = pending.iter().map(|r| r.request_id.as_str()).collect();
let mut new_count = 0usize;
let mut notify_attempts = 0usize;
let mut notify_failures = 0usize;
for request in &pending {
if !dedup.record_new(&request.request_id) {
continue;
}
new_count += 1;
for notifier in notifiers {
notify_attempts += 1;
match notifier.notify(request).await {
Ok(()) => {}
Err(error) => {
notify_failures += 1;
tracing::warn!(
target: "tandem_server::approval_outbound",
notifier = notifier.name(),
request_id = %request.request_id,
?error,
"approval notifier returned an error"
);
}
}
}
}
dedup.prune_to(¤t_ids);
SweepResult {
pending_count: pending.len(),
new_count,
notify_attempts,
notify_failures,
dedup_size: dedup.len(),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SweepResult {
pub pending_count: usize,
pub new_count: usize,
pub notify_attempts: usize,
pub notify_failures: usize,
pub dedup_size: usize,
}
pub async fn run_polling_loop(
source: Arc<dyn PendingApprovalsSource>,
notifiers: Arc<Vec<Arc<dyn ApprovalNotifier>>>,
filter: ApprovalListFilter,
interval: Duration,
cancel: Arc<std::sync::atomic::AtomicBool>,
) {
let mut dedup = DedupRing::with_cap(DEDUP_CAP);
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tick.tick().await;
if cancel.load(std::sync::atomic::Ordering::Relaxed) {
tracing::info!(
target: "tandem_server::approval_outbound",
"polling loop received cancel signal, exiting"
);
break;
}
let result = run_one_sweep(source.as_ref(), notifiers.as_ref(), &filter, &mut dedup).await;
if result.new_count > 0 || result.notify_failures > 0 {
tracing::info!(
target: "tandem_server::approval_outbound",
pending = result.pending_count,
new = result.new_count,
attempts = result.notify_attempts,
failures = result.notify_failures,
dedup = result.dedup_size,
"approval fan-out sweep complete"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use tandem_types::{ApprovalDecision, ApprovalSourceKind, ApprovalTenantRef};
fn fake_request(request_id: &str) -> ApprovalRequest {
ApprovalRequest {
request_id: request_id.to_string(),
source: ApprovalSourceKind::AutomationV2,
tenant: ApprovalTenantRef {
org_id: "local-default-org".to_string(),
workspace_id: "local-default-workspace".to_string(),
user_id: None,
},
run_id: format!("run-{request_id}"),
node_id: Some("send_email".to_string()),
workflow_name: Some("sales-research-outreach".to_string()),
action_kind: Some("send_email".to_string()),
action_preview_markdown: Some("Will email alice@example.com".to_string()),
surface_payload: None,
requested_at_ms: 1_700_000_000_000,
expires_at_ms: None,
decisions: vec![
ApprovalDecision::Approve,
ApprovalDecision::Rework,
ApprovalDecision::Cancel,
],
rework_targets: vec![],
instructions: None,
decided_by: None,
decided_at_ms: None,
decision: None,
rework_feedback: None,
}
}
struct CountingNotifier {
name: &'static str,
seen: Mutex<Vec<String>>,
fail_with: Option<NotifierError>,
}
impl CountingNotifier {
fn ok(name: &'static str) -> Arc<Self> {
Arc::new(Self {
name,
seen: Mutex::new(Vec::new()),
fail_with: None,
})
}
fn failing(name: &'static str, error: NotifierError) -> Arc<Self> {
Arc::new(Self {
name,
seen: Mutex::new(Vec::new()),
fail_with: Some(error),
})
}
fn seen_ids(&self) -> Vec<String> {
self.seen.lock().unwrap().clone()
}
}
#[async_trait]
impl ApprovalNotifier for CountingNotifier {
fn name(&self) -> &str {
self.name
}
async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError> {
self.seen.lock().unwrap().push(request.request_id.clone());
if let Some(err) = &self.fail_with {
return Err(err.clone());
}
Ok(())
}
}
struct VecSource {
requests: Mutex<Vec<ApprovalRequest>>,
}
impl VecSource {
fn new(initial: Vec<ApprovalRequest>) -> Arc<Self> {
Arc::new(Self {
requests: Mutex::new(initial),
})
}
fn set(&self, requests: Vec<ApprovalRequest>) {
*self.requests.lock().unwrap() = requests;
}
}
#[async_trait]
impl PendingApprovalsSource for VecSource {
async fn list_pending(&self, _filter: &ApprovalListFilter) -> Vec<ApprovalRequest> {
self.requests.lock().unwrap().clone()
}
}
#[tokio::test]
async fn first_sweep_dispatches_all_pending_to_every_notifier() {
let source = VecSource::new(vec![fake_request("a"), fake_request("b")]);
let n1 = CountingNotifier::ok("slack");
let n2 = CountingNotifier::ok("discord");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> =
vec![n1.clone() as Arc<dyn ApprovalNotifier>, n2.clone()];
let mut dedup = DedupRing::with_cap(16);
let result = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(result.pending_count, 2);
assert_eq!(result.new_count, 2);
assert_eq!(result.notify_attempts, 4);
assert_eq!(result.notify_failures, 0);
assert_eq!(n1.seen_ids(), vec!["a", "b"]);
assert_eq!(n2.seen_ids(), vec!["a", "b"]);
}
#[tokio::test]
async fn second_sweep_with_same_pending_does_not_redispatch() {
let source = VecSource::new(vec![fake_request("a")]);
let n1 = CountingNotifier::ok("slack");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
let mut dedup = DedupRing::with_cap(16);
let _ = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
let second = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(second.new_count, 0);
assert_eq!(second.notify_attempts, 0);
assert_eq!(n1.seen_ids(), vec!["a"]);
}
#[tokio::test]
async fn newly_added_pending_in_later_sweep_is_dispatched() {
let source = VecSource::new(vec![fake_request("a")]);
let n1 = CountingNotifier::ok("slack");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
let mut dedup = DedupRing::with_cap(16);
run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
source.set(vec![fake_request("a"), fake_request("b")]);
let second = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(second.new_count, 1);
assert_eq!(n1.seen_ids(), vec!["a", "b"]);
}
#[tokio::test]
async fn decided_request_is_pruned_so_resurfacing_fires_again() {
let source = VecSource::new(vec![fake_request("a")]);
let n1 = CountingNotifier::ok("slack");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
let mut dedup = DedupRing::with_cap(16);
run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
source.set(vec![]);
let cleared = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(cleared.dedup_size, 0);
assert!(dedup.is_empty());
source.set(vec![fake_request("a")]);
let resurfaced = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(resurfaced.new_count, 1);
assert_eq!(n1.seen_ids(), vec!["a", "a"]);
}
#[tokio::test]
async fn failing_notifier_does_not_block_other_notifiers() {
let source = VecSource::new(vec![fake_request("a")]);
let bad = CountingNotifier::failing(
"discord",
NotifierError::Transient("rate limit".to_string()),
);
let good = CountingNotifier::ok("slack");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![bad.clone(), good.clone()];
let mut dedup = DedupRing::with_cap(16);
let result = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(result.notify_attempts, 2);
assert_eq!(result.notify_failures, 1);
assert_eq!(bad.seen_ids(), vec!["a"]);
assert_eq!(good.seen_ids(), vec!["a"]);
}
#[tokio::test]
async fn empty_pending_is_a_noop() {
let source = VecSource::new(vec![]);
let n1 = CountingNotifier::ok("slack");
let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
let mut dedup = DedupRing::with_cap(16);
let result = run_one_sweep(
source.as_ref(),
¬ifiers,
&ApprovalListFilter::default(),
&mut dedup,
)
.await;
assert_eq!(result.pending_count, 0);
assert_eq!(result.new_count, 0);
assert_eq!(result.notify_attempts, 0);
assert!(n1.seen_ids().is_empty());
}
#[tokio::test]
async fn dedup_evicts_at_cap() {
let mut dedup = DedupRing::with_cap(3);
assert!(dedup.record_new("a"));
assert!(dedup.record_new("b"));
assert!(dedup.record_new("c"));
assert!(!dedup.record_new("a"));
assert!(dedup.record_new("d"));
assert!(dedup.record_new("a"));
}
#[test]
fn dedup_prune_to_removes_absent_entries() {
let mut dedup = DedupRing::with_cap(8);
dedup.record_new("a");
dedup.record_new("b");
dedup.record_new("c");
let mut current = HashSet::new();
current.insert("b");
dedup.prune_to(¤t);
assert!(!dedup.record_new("b"), "b should still be deduped");
assert!(dedup.record_new("a"), "a should be re-droppable");
assert!(dedup.record_new("c"), "c should be re-droppable");
}
#[test]
fn notifier_error_display_is_informative() {
assert_eq!(
format!("{}", NotifierError::Transient("rate".to_string())),
"transient: rate"
);
assert_eq!(
format!("{}", NotifierError::Permanent("misconfigured".to_string())),
"permanent: misconfigured"
);
}
}