1use std::collections::HashSet;
41use std::sync::Arc;
42use std::time::Duration;
43
44use async_trait::async_trait;
45use tandem_types::{ApprovalListFilter, ApprovalRequest};
46
47pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
50
51pub const DEDUP_CAP: usize = 8192;
55
56#[async_trait]
64pub trait ApprovalNotifier: Send + Sync {
65 fn name(&self) -> &str;
67
68 async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError>;
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum NotifierError {
76 Transient(String),
80 Permanent(String),
84}
85
86impl core::fmt::Display for NotifierError {
87 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
88 match self {
89 Self::Transient(reason) => write!(f, "transient: {reason}"),
90 Self::Permanent(reason) => write!(f, "permanent: {reason}"),
91 }
92 }
93}
94
95impl std::error::Error for NotifierError {}
96
97#[async_trait]
101pub trait PendingApprovalsSource: Send + Sync {
102 async fn list_pending(&self, filter: &ApprovalListFilter) -> Vec<ApprovalRequest>;
103}
104
105pub struct DedupRing {
108 seen: HashSet<String>,
109 order: std::collections::VecDeque<String>,
110 cap: usize,
111}
112
113impl DedupRing {
114 pub fn with_cap(cap: usize) -> Self {
115 Self {
116 seen: HashSet::with_capacity(cap.min(1024)),
117 order: std::collections::VecDeque::with_capacity(cap.min(1024)),
118 cap,
119 }
120 }
121
122 pub fn record_new(&mut self, key: &str) -> bool {
124 if self.seen.contains(key) {
125 return false;
126 }
127 if self.order.len() >= self.cap {
128 if let Some(oldest) = self.order.pop_front() {
129 self.seen.remove(&oldest);
130 }
131 }
132 self.seen.insert(key.to_string());
133 self.order.push_back(key.to_string());
134 true
135 }
136
137 pub fn prune_to(&mut self, current_request_ids: &HashSet<&str>) {
141 let to_remove: Vec<String> = self
142 .order
143 .iter()
144 .filter(|id| !current_request_ids.contains(id.as_str()))
145 .cloned()
146 .collect();
147 for id in to_remove {
148 self.seen.remove(&id);
149 self.order.retain(|existing| existing != &id);
151 }
152 }
153
154 pub fn len(&self) -> usize {
155 self.seen.len()
156 }
157
158 pub fn is_empty(&self) -> bool {
159 self.seen.is_empty()
160 }
161}
162
163pub async fn run_one_sweep(
169 source: &dyn PendingApprovalsSource,
170 notifiers: &[Arc<dyn ApprovalNotifier>],
171 filter: &ApprovalListFilter,
172 dedup: &mut DedupRing,
173) -> SweepResult {
174 let pending = source.list_pending(filter).await;
175 let current_ids: HashSet<&str> = pending.iter().map(|r| r.request_id.as_str()).collect();
176
177 let mut new_count = 0usize;
178 let mut notify_attempts = 0usize;
179 let mut notify_failures = 0usize;
180
181 for request in &pending {
182 if !dedup.record_new(&request.request_id) {
183 continue;
184 }
185 new_count += 1;
186 for notifier in notifiers {
187 notify_attempts += 1;
188 match notifier.notify(request).await {
189 Ok(()) => {}
190 Err(error) => {
191 notify_failures += 1;
192 tracing::warn!(
193 target: "tandem_server::approval_outbound",
194 notifier = notifier.name(),
195 request_id = %request.request_id,
196 ?error,
197 "approval notifier returned an error"
198 );
199 }
200 }
201 }
202 }
203
204 dedup.prune_to(¤t_ids);
205
206 SweepResult {
207 pending_count: pending.len(),
208 new_count,
209 notify_attempts,
210 notify_failures,
211 dedup_size: dedup.len(),
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct SweepResult {
217 pub pending_count: usize,
218 pub new_count: usize,
219 pub notify_attempts: usize,
220 pub notify_failures: usize,
221 pub dedup_size: usize,
222}
223
224pub async fn run_polling_loop(
230 source: Arc<dyn PendingApprovalsSource>,
231 notifiers: Arc<Vec<Arc<dyn ApprovalNotifier>>>,
232 filter: ApprovalListFilter,
233 interval: Duration,
234 cancel: Arc<std::sync::atomic::AtomicBool>,
235) {
236 let mut dedup = DedupRing::with_cap(DEDUP_CAP);
237 let mut tick = tokio::time::interval(interval);
238 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
239 loop {
240 tick.tick().await;
241 if cancel.load(std::sync::atomic::Ordering::Relaxed) {
242 tracing::info!(
243 target: "tandem_server::approval_outbound",
244 "polling loop received cancel signal, exiting"
245 );
246 break;
247 }
248 let result = run_one_sweep(source.as_ref(), notifiers.as_ref(), &filter, &mut dedup).await;
249 if result.new_count > 0 || result.notify_failures > 0 {
250 tracing::info!(
251 target: "tandem_server::approval_outbound",
252 pending = result.pending_count,
253 new = result.new_count,
254 attempts = result.notify_attempts,
255 failures = result.notify_failures,
256 dedup = result.dedup_size,
257 "approval fan-out sweep complete"
258 );
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use std::sync::Mutex;
267 use tandem_types::{ApprovalDecision, ApprovalSourceKind, ApprovalTenantRef};
268
269 fn fake_request(request_id: &str) -> ApprovalRequest {
270 ApprovalRequest {
271 request_id: request_id.to_string(),
272 source: ApprovalSourceKind::AutomationV2,
273 tenant: ApprovalTenantRef {
274 org_id: "local-default-org".to_string(),
275 workspace_id: "local-default-workspace".to_string(),
276 user_id: None,
277 },
278 run_id: format!("run-{request_id}"),
279 node_id: Some("send_email".to_string()),
280 workflow_name: Some("sales-research-outreach".to_string()),
281 action_kind: Some("send_email".to_string()),
282 action_preview_markdown: Some("Will email alice@example.com".to_string()),
283 surface_payload: None,
284 requested_at_ms: 1_700_000_000_000,
285 expires_at_ms: None,
286 decisions: vec![
287 ApprovalDecision::Approve,
288 ApprovalDecision::Rework,
289 ApprovalDecision::Cancel,
290 ],
291 rework_targets: vec![],
292 instructions: None,
293 decided_by: None,
294 decided_at_ms: None,
295 decision: None,
296 rework_feedback: None,
297 }
298 }
299
300 struct CountingNotifier {
301 name: &'static str,
302 seen: Mutex<Vec<String>>,
303 fail_with: Option<NotifierError>,
304 }
305
306 impl CountingNotifier {
307 fn ok(name: &'static str) -> Arc<Self> {
308 Arc::new(Self {
309 name,
310 seen: Mutex::new(Vec::new()),
311 fail_with: None,
312 })
313 }
314 fn failing(name: &'static str, error: NotifierError) -> Arc<Self> {
315 Arc::new(Self {
316 name,
317 seen: Mutex::new(Vec::new()),
318 fail_with: Some(error),
319 })
320 }
321 fn seen_ids(&self) -> Vec<String> {
322 self.seen.lock().unwrap().clone()
323 }
324 }
325
326 #[async_trait]
327 impl ApprovalNotifier for CountingNotifier {
328 fn name(&self) -> &str {
329 self.name
330 }
331 async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError> {
332 self.seen.lock().unwrap().push(request.request_id.clone());
333 if let Some(err) = &self.fail_with {
334 return Err(err.clone());
335 }
336 Ok(())
337 }
338 }
339
340 struct VecSource {
341 requests: Mutex<Vec<ApprovalRequest>>,
342 }
343
344 impl VecSource {
345 fn new(initial: Vec<ApprovalRequest>) -> Arc<Self> {
346 Arc::new(Self {
347 requests: Mutex::new(initial),
348 })
349 }
350 fn set(&self, requests: Vec<ApprovalRequest>) {
351 *self.requests.lock().unwrap() = requests;
352 }
353 }
354
355 #[async_trait]
356 impl PendingApprovalsSource for VecSource {
357 async fn list_pending(&self, _filter: &ApprovalListFilter) -> Vec<ApprovalRequest> {
358 self.requests.lock().unwrap().clone()
359 }
360 }
361
362 #[tokio::test]
363 async fn first_sweep_dispatches_all_pending_to_every_notifier() {
364 let source = VecSource::new(vec![fake_request("a"), fake_request("b")]);
365 let n1 = CountingNotifier::ok("slack");
366 let n2 = CountingNotifier::ok("discord");
367 let notifiers: Vec<Arc<dyn ApprovalNotifier>> =
368 vec![n1.clone() as Arc<dyn ApprovalNotifier>, n2.clone()];
369 let mut dedup = DedupRing::with_cap(16);
370
371 let result = run_one_sweep(
372 source.as_ref(),
373 ¬ifiers,
374 &ApprovalListFilter::default(),
375 &mut dedup,
376 )
377 .await;
378
379 assert_eq!(result.pending_count, 2);
380 assert_eq!(result.new_count, 2);
381 assert_eq!(result.notify_attempts, 4);
382 assert_eq!(result.notify_failures, 0);
383 assert_eq!(n1.seen_ids(), vec!["a", "b"]);
384 assert_eq!(n2.seen_ids(), vec!["a", "b"]);
385 }
386
387 #[tokio::test]
388 async fn second_sweep_with_same_pending_does_not_redispatch() {
389 let source = VecSource::new(vec![fake_request("a")]);
390 let n1 = CountingNotifier::ok("slack");
391 let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
392 let mut dedup = DedupRing::with_cap(16);
393
394 let _ = run_one_sweep(
395 source.as_ref(),
396 ¬ifiers,
397 &ApprovalListFilter::default(),
398 &mut dedup,
399 )
400 .await;
401 let second = run_one_sweep(
402 source.as_ref(),
403 ¬ifiers,
404 &ApprovalListFilter::default(),
405 &mut dedup,
406 )
407 .await;
408
409 assert_eq!(second.new_count, 0);
410 assert_eq!(second.notify_attempts, 0);
411 assert_eq!(n1.seen_ids(), vec!["a"]);
412 }
413
414 #[tokio::test]
415 async fn newly_added_pending_in_later_sweep_is_dispatched() {
416 let source = VecSource::new(vec![fake_request("a")]);
417 let n1 = CountingNotifier::ok("slack");
418 let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
419 let mut dedup = DedupRing::with_cap(16);
420
421 run_one_sweep(
422 source.as_ref(),
423 ¬ifiers,
424 &ApprovalListFilter::default(),
425 &mut dedup,
426 )
427 .await;
428 source.set(vec![fake_request("a"), fake_request("b")]);
429 let second = run_one_sweep(
430 source.as_ref(),
431 ¬ifiers,
432 &ApprovalListFilter::default(),
433 &mut dedup,
434 )
435 .await;
436
437 assert_eq!(second.new_count, 1);
438 assert_eq!(n1.seen_ids(), vec!["a", "b"]);
439 }
440
441 #[tokio::test]
442 async fn decided_request_is_pruned_so_resurfacing_fires_again() {
443 let source = VecSource::new(vec![fake_request("a")]);
444 let n1 = CountingNotifier::ok("slack");
445 let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
446 let mut dedup = DedupRing::with_cap(16);
447
448 run_one_sweep(
449 source.as_ref(),
450 ¬ifiers,
451 &ApprovalListFilter::default(),
452 &mut dedup,
453 )
454 .await;
455 source.set(vec![]);
457 let cleared = run_one_sweep(
458 source.as_ref(),
459 ¬ifiers,
460 &ApprovalListFilter::default(),
461 &mut dedup,
462 )
463 .await;
464 assert_eq!(cleared.dedup_size, 0);
465 assert!(dedup.is_empty());
466
467 source.set(vec![fake_request("a")]);
470 let resurfaced = run_one_sweep(
471 source.as_ref(),
472 ¬ifiers,
473 &ApprovalListFilter::default(),
474 &mut dedup,
475 )
476 .await;
477 assert_eq!(resurfaced.new_count, 1);
478 assert_eq!(n1.seen_ids(), vec!["a", "a"]);
479 }
480
481 #[tokio::test]
482 async fn failing_notifier_does_not_block_other_notifiers() {
483 let source = VecSource::new(vec![fake_request("a")]);
484 let bad = CountingNotifier::failing(
485 "discord",
486 NotifierError::Transient("rate limit".to_string()),
487 );
488 let good = CountingNotifier::ok("slack");
489 let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![bad.clone(), good.clone()];
490 let mut dedup = DedupRing::with_cap(16);
491
492 let result = run_one_sweep(
493 source.as_ref(),
494 ¬ifiers,
495 &ApprovalListFilter::default(),
496 &mut dedup,
497 )
498 .await;
499
500 assert_eq!(result.notify_attempts, 2);
501 assert_eq!(result.notify_failures, 1);
502 assert_eq!(bad.seen_ids(), vec!["a"]);
503 assert_eq!(good.seen_ids(), vec!["a"]);
504 }
505
506 #[tokio::test]
507 async fn empty_pending_is_a_noop() {
508 let source = VecSource::new(vec![]);
509 let n1 = CountingNotifier::ok("slack");
510 let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
511 let mut dedup = DedupRing::with_cap(16);
512
513 let result = run_one_sweep(
514 source.as_ref(),
515 ¬ifiers,
516 &ApprovalListFilter::default(),
517 &mut dedup,
518 )
519 .await;
520
521 assert_eq!(result.pending_count, 0);
522 assert_eq!(result.new_count, 0);
523 assert_eq!(result.notify_attempts, 0);
524 assert!(n1.seen_ids().is_empty());
525 }
526
527 #[tokio::test]
528 async fn dedup_evicts_at_cap() {
529 let mut dedup = DedupRing::with_cap(3);
530 assert!(dedup.record_new("a"));
531 assert!(dedup.record_new("b"));
532 assert!(dedup.record_new("c"));
533 assert!(!dedup.record_new("a"));
534 assert!(dedup.record_new("d"));
536 assert!(dedup.record_new("a"));
538 }
539
540 #[test]
541 fn dedup_prune_to_removes_absent_entries() {
542 let mut dedup = DedupRing::with_cap(8);
543 dedup.record_new("a");
544 dedup.record_new("b");
545 dedup.record_new("c");
546 let mut current = HashSet::new();
547 current.insert("b");
548 dedup.prune_to(¤t);
549 assert!(!dedup.record_new("b"), "b should still be deduped");
550 assert!(dedup.record_new("a"), "a should be re-droppable");
551 assert!(dedup.record_new("c"), "c should be re-droppable");
552 }
553
554 #[test]
555 fn notifier_error_display_is_informative() {
556 assert_eq!(
557 format!("{}", NotifierError::Transient("rate".to_string())),
558 "transient: rate"
559 );
560 assert_eq!(
561 format!("{}", NotifierError::Permanent("misconfigured".to_string())),
562 "permanent: misconfigured"
563 );
564 }
565}