Skip to main content

tandem_server/app/
approval_outbound.rs

1//! Notification fan-out for newly pending approvals.
2//!
3//! When a workflow run pauses on a `HumanApprovalGate`, surfaces (Slack,
4//! Discord, Telegram, control-panel inbox) need to learn about it without
5//! polling the engine themselves. This task closes the loop: it polls the
6//! cross-subsystem aggregator (`/approvals/pending`, W1.5) on a short
7//! interval, dedups against an in-memory set of `request_id`s already
8//! announced, and dispatches each new request to the registered
9//! [`ApprovalNotifier`] implementations.
10//!
11//! # Why polling, not the broadcast event bus
12//!
13//! The Plan agent flagged the existing event-bus pattern
14//! (`workflows.rs:628`, `app/tasks.rs:392`) as wrong for approvals: those
15//! subscribers drop on `tokio::sync::broadcast::error::RecvError::Lagged(_)`
16//! and a missed approval means a stuck run. Polling the aggregator avoids
17//! that failure mode entirely — the aggregator is an idempotent read of
18//! durable state, so a slow notifier or a process restart cannot lose a
19//! pending approval. We get correctness over latency (5s worst-case
20//! notification delay vs. milliseconds), which is the right trade-off for
21//! human approval flows.
22//!
23//! # Surface-side delivery
24//!
25//! The task itself is surface-agnostic: it accepts any `Vec<Arc<dyn
26//! ApprovalNotifier>>`. The Slack/Discord/Telegram channel adapters supply
27//! concrete notifiers that translate `ApprovalRequest` into the rich
28//! interactive cards built in W2/W4. A future hosted-control-plane sidecar
29//! can register its own notifier without touching this module.
30//!
31//! # Dedup
32//!
33//! The task maintains an in-memory `HashSet<String>` keyed by
34//! `request_id`. When a request is decided (and therefore disappears from
35//! the aggregator's pending list), the dedup set entry is pruned on the
36//! next sweep so that the same `request_id` could in principle resurface
37//! (it never should — `request_id`s are stable per gate — but the prune is
38//! a safety net for misconfigured aggregators).
39
40use std::collections::HashSet;
41use std::sync::Arc;
42use std::time::Duration;
43
44use async_trait::async_trait;
45use tandem_types::{ApprovalListFilter, ApprovalRequest};
46
47/// Default polling interval. 5 seconds keeps human-approval latency tight
48/// without overloading the aggregator (which is an in-memory walk).
49pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
50
51/// Hard cap on the in-memory dedup set so a runaway emit never grows
52/// unboundedly. When the cap is hit, the oldest-inserted entries are
53/// evicted via FIFO (we wrap a `VecDeque` next to the `HashSet`).
54pub const DEDUP_CAP: usize = 8192;
55
56/// Implemented by anything that wants to be notified of a new pending
57/// approval. The notifier is responsible for surface-specific filtering
58/// (e.g. only deliver to channels whose tenant matches the request) and for
59/// retry/backoff against transient platform failures.
60///
61/// `Notifier::notify` MUST NOT block the fan-out task. Implementations
62/// should spawn their own work or use a bounded internal queue.
63#[async_trait]
64pub trait ApprovalNotifier: Send + Sync {
65    /// Stable name used for logging (`"slack"`, `"discord"`, etc.).
66    fn name(&self) -> &str;
67
68    /// Deliver a single approval request to this surface. Errors are logged
69    /// by the fan-out task but do not stop the polling loop — a flaky
70    /// channel must not delay other surfaces.
71    async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError>;
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum NotifierError {
76    /// Transient failure (rate limit, network blip). Caller logs, fan-out
77    /// retries on the next sweep automatically because the request is
78    /// still in the aggregator.
79    Transient(String),
80    /// Permanent failure (invalid card, channel not configured). The
81    /// request will keep appearing in the aggregator until decided; the
82    /// notifier should suppress its own retries to avoid log spam.
83    Permanent(String),
84}
85
86impl core::fmt::Display for NotifierError {
87    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
88        match self {
89            Self::Transient(reason) => write!(f, "transient: {reason}"),
90            Self::Permanent(reason) => write!(f, "permanent: {reason}"),
91        }
92    }
93}
94
95impl std::error::Error for NotifierError {}
96
97/// Source of pending approvals. Used by tests to inject a mock aggregator
98/// without spinning up the full HTTP stack. The production wiring uses the
99/// `tandem-server` crate's `list_pending_approvals` directly.
100#[async_trait]
101pub trait PendingApprovalsSource: Send + Sync {
102    async fn list_pending(&self, filter: &ApprovalListFilter) -> Vec<ApprovalRequest>;
103}
104
105/// In-memory dedup with FIFO eviction at the cap. Public for direct use in
106/// tests; the fan-out task wraps it internally.
107pub struct DedupRing {
108    seen: HashSet<String>,
109    order: std::collections::VecDeque<String>,
110    cap: usize,
111}
112
113impl DedupRing {
114    pub fn with_cap(cap: usize) -> Self {
115        Self {
116            seen: HashSet::with_capacity(cap.min(1024)),
117            order: std::collections::VecDeque::with_capacity(cap.min(1024)),
118            cap,
119        }
120    }
121
122    /// `true` if the key is new (and now recorded). `false` if already seen.
123    pub fn record_new(&mut self, key: &str) -> bool {
124        if self.seen.contains(key) {
125            return false;
126        }
127        if self.order.len() >= self.cap {
128            if let Some(oldest) = self.order.pop_front() {
129                self.seen.remove(&oldest);
130            }
131        }
132        self.seen.insert(key.to_string());
133        self.order.push_back(key.to_string());
134        true
135    }
136
137    /// Drop entries whose request_id no longer appears in the latest
138    /// aggregator snapshot — those approvals have been decided and the
139    /// notifier should never see them again.
140    pub fn prune_to(&mut self, current_request_ids: &HashSet<&str>) {
141        let to_remove: Vec<String> = self
142            .order
143            .iter()
144            .filter(|id| !current_request_ids.contains(id.as_str()))
145            .cloned()
146            .collect();
147        for id in to_remove {
148            self.seen.remove(&id);
149            // O(N) drain; acceptable since order length is bounded.
150            self.order.retain(|existing| existing != &id);
151        }
152    }
153
154    pub fn len(&self) -> usize {
155        self.seen.len()
156    }
157
158    pub fn is_empty(&self) -> bool {
159        self.seen.is_empty()
160    }
161}
162
163/// One sweep of the fan-out: poll the aggregator, dispatch new requests to
164/// every notifier, prune decided requests from the dedup ring.
165///
166/// Returned counts are the number of (a) new requests dispatched and (b)
167/// total notifier calls attempted — useful for metrics + tests.
168pub async fn run_one_sweep(
169    source: &dyn PendingApprovalsSource,
170    notifiers: &[Arc<dyn ApprovalNotifier>],
171    filter: &ApprovalListFilter,
172    dedup: &mut DedupRing,
173) -> SweepResult {
174    let pending = source.list_pending(filter).await;
175    let current_ids: HashSet<&str> = pending.iter().map(|r| r.request_id.as_str()).collect();
176
177    let mut new_count = 0usize;
178    let mut notify_attempts = 0usize;
179    let mut notify_failures = 0usize;
180
181    for request in &pending {
182        if !dedup.record_new(&request.request_id) {
183            continue;
184        }
185        new_count += 1;
186        for notifier in notifiers {
187            notify_attempts += 1;
188            match notifier.notify(request).await {
189                Ok(()) => {}
190                Err(error) => {
191                    notify_failures += 1;
192                    tracing::warn!(
193                        target: "tandem_server::approval_outbound",
194                        notifier = notifier.name(),
195                        request_id = %request.request_id,
196                        ?error,
197                        "approval notifier returned an error"
198                    );
199                }
200            }
201        }
202    }
203
204    dedup.prune_to(&current_ids);
205
206    SweepResult {
207        pending_count: pending.len(),
208        new_count,
209        notify_attempts,
210        notify_failures,
211        dedup_size: dedup.len(),
212    }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct SweepResult {
217    pub pending_count: usize,
218    pub new_count: usize,
219    pub notify_attempts: usize,
220    pub notify_failures: usize,
221    pub dedup_size: usize,
222}
223
224/// Long-running polling loop. Calls `run_one_sweep` every `interval`.
225///
226/// Dropping the returned `JoinHandle` does not stop the loop; cancellation
227/// is intentional via the cooperative `cancel` flag (Arc<AtomicBool>) so
228/// shutdown is deterministic without `tokio::task::abort`.
229pub async fn run_polling_loop(
230    source: Arc<dyn PendingApprovalsSource>,
231    notifiers: Arc<Vec<Arc<dyn ApprovalNotifier>>>,
232    filter: ApprovalListFilter,
233    interval: Duration,
234    cancel: Arc<std::sync::atomic::AtomicBool>,
235) {
236    let mut dedup = DedupRing::with_cap(DEDUP_CAP);
237    let mut tick = tokio::time::interval(interval);
238    tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
239    loop {
240        tick.tick().await;
241        if cancel.load(std::sync::atomic::Ordering::Relaxed) {
242            tracing::info!(
243                target: "tandem_server::approval_outbound",
244                "polling loop received cancel signal, exiting"
245            );
246            break;
247        }
248        let result = run_one_sweep(source.as_ref(), notifiers.as_ref(), &filter, &mut dedup).await;
249        if result.new_count > 0 || result.notify_failures > 0 {
250            tracing::info!(
251                target: "tandem_server::approval_outbound",
252                pending = result.pending_count,
253                new = result.new_count,
254                attempts = result.notify_attempts,
255                failures = result.notify_failures,
256                dedup = result.dedup_size,
257                "approval fan-out sweep complete"
258            );
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use std::sync::Mutex;
267    use tandem_types::{ApprovalDecision, ApprovalSourceKind, ApprovalTenantRef};
268
269    fn fake_request(request_id: &str) -> ApprovalRequest {
270        ApprovalRequest {
271            request_id: request_id.to_string(),
272            source: ApprovalSourceKind::AutomationV2,
273            tenant: ApprovalTenantRef {
274                org_id: "local-default-org".to_string(),
275                workspace_id: "local-default-workspace".to_string(),
276                user_id: None,
277            },
278            run_id: format!("run-{request_id}"),
279            node_id: Some("send_email".to_string()),
280            workflow_name: Some("sales-research-outreach".to_string()),
281            action_kind: Some("send_email".to_string()),
282            action_preview_markdown: Some("Will email alice@example.com".to_string()),
283            surface_payload: None,
284            requested_at_ms: 1_700_000_000_000,
285            expires_at_ms: None,
286            decisions: vec![
287                ApprovalDecision::Approve,
288                ApprovalDecision::Rework,
289                ApprovalDecision::Cancel,
290            ],
291            rework_targets: vec![],
292            instructions: None,
293            decided_by: None,
294            decided_at_ms: None,
295            decision: None,
296            rework_feedback: None,
297        }
298    }
299
300    struct CountingNotifier {
301        name: &'static str,
302        seen: Mutex<Vec<String>>,
303        fail_with: Option<NotifierError>,
304    }
305
306    impl CountingNotifier {
307        fn ok(name: &'static str) -> Arc<Self> {
308            Arc::new(Self {
309                name,
310                seen: Mutex::new(Vec::new()),
311                fail_with: None,
312            })
313        }
314        fn failing(name: &'static str, error: NotifierError) -> Arc<Self> {
315            Arc::new(Self {
316                name,
317                seen: Mutex::new(Vec::new()),
318                fail_with: Some(error),
319            })
320        }
321        fn seen_ids(&self) -> Vec<String> {
322            self.seen.lock().unwrap().clone()
323        }
324    }
325
326    #[async_trait]
327    impl ApprovalNotifier for CountingNotifier {
328        fn name(&self) -> &str {
329            self.name
330        }
331        async fn notify(&self, request: &ApprovalRequest) -> Result<(), NotifierError> {
332            self.seen.lock().unwrap().push(request.request_id.clone());
333            if let Some(err) = &self.fail_with {
334                return Err(err.clone());
335            }
336            Ok(())
337        }
338    }
339
340    struct VecSource {
341        requests: Mutex<Vec<ApprovalRequest>>,
342    }
343
344    impl VecSource {
345        fn new(initial: Vec<ApprovalRequest>) -> Arc<Self> {
346            Arc::new(Self {
347                requests: Mutex::new(initial),
348            })
349        }
350        fn set(&self, requests: Vec<ApprovalRequest>) {
351            *self.requests.lock().unwrap() = requests;
352        }
353    }
354
355    #[async_trait]
356    impl PendingApprovalsSource for VecSource {
357        async fn list_pending(&self, _filter: &ApprovalListFilter) -> Vec<ApprovalRequest> {
358            self.requests.lock().unwrap().clone()
359        }
360    }
361
362    #[tokio::test]
363    async fn first_sweep_dispatches_all_pending_to_every_notifier() {
364        let source = VecSource::new(vec![fake_request("a"), fake_request("b")]);
365        let n1 = CountingNotifier::ok("slack");
366        let n2 = CountingNotifier::ok("discord");
367        let notifiers: Vec<Arc<dyn ApprovalNotifier>> =
368            vec![n1.clone() as Arc<dyn ApprovalNotifier>, n2.clone()];
369        let mut dedup = DedupRing::with_cap(16);
370
371        let result = run_one_sweep(
372            source.as_ref(),
373            &notifiers,
374            &ApprovalListFilter::default(),
375            &mut dedup,
376        )
377        .await;
378
379        assert_eq!(result.pending_count, 2);
380        assert_eq!(result.new_count, 2);
381        assert_eq!(result.notify_attempts, 4);
382        assert_eq!(result.notify_failures, 0);
383        assert_eq!(n1.seen_ids(), vec!["a", "b"]);
384        assert_eq!(n2.seen_ids(), vec!["a", "b"]);
385    }
386
387    #[tokio::test]
388    async fn second_sweep_with_same_pending_does_not_redispatch() {
389        let source = VecSource::new(vec![fake_request("a")]);
390        let n1 = CountingNotifier::ok("slack");
391        let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
392        let mut dedup = DedupRing::with_cap(16);
393
394        let _ = run_one_sweep(
395            source.as_ref(),
396            &notifiers,
397            &ApprovalListFilter::default(),
398            &mut dedup,
399        )
400        .await;
401        let second = run_one_sweep(
402            source.as_ref(),
403            &notifiers,
404            &ApprovalListFilter::default(),
405            &mut dedup,
406        )
407        .await;
408
409        assert_eq!(second.new_count, 0);
410        assert_eq!(second.notify_attempts, 0);
411        assert_eq!(n1.seen_ids(), vec!["a"]);
412    }
413
414    #[tokio::test]
415    async fn newly_added_pending_in_later_sweep_is_dispatched() {
416        let source = VecSource::new(vec![fake_request("a")]);
417        let n1 = CountingNotifier::ok("slack");
418        let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
419        let mut dedup = DedupRing::with_cap(16);
420
421        run_one_sweep(
422            source.as_ref(),
423            &notifiers,
424            &ApprovalListFilter::default(),
425            &mut dedup,
426        )
427        .await;
428        source.set(vec![fake_request("a"), fake_request("b")]);
429        let second = run_one_sweep(
430            source.as_ref(),
431            &notifiers,
432            &ApprovalListFilter::default(),
433            &mut dedup,
434        )
435        .await;
436
437        assert_eq!(second.new_count, 1);
438        assert_eq!(n1.seen_ids(), vec!["a", "b"]);
439    }
440
441    #[tokio::test]
442    async fn decided_request_is_pruned_so_resurfacing_fires_again() {
443        let source = VecSource::new(vec![fake_request("a")]);
444        let n1 = CountingNotifier::ok("slack");
445        let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
446        let mut dedup = DedupRing::with_cap(16);
447
448        run_one_sweep(
449            source.as_ref(),
450            &notifiers,
451            &ApprovalListFilter::default(),
452            &mut dedup,
453        )
454        .await;
455        // "a" was decided and disappears from pending.
456        source.set(vec![]);
457        let cleared = run_one_sweep(
458            source.as_ref(),
459            &notifiers,
460            &ApprovalListFilter::default(),
461            &mut dedup,
462        )
463        .await;
464        assert_eq!(cleared.dedup_size, 0);
465        assert!(dedup.is_empty());
466
467        // If for some reason the same request_id appears again (it
468        // shouldn't, but the system must not get stuck), it dispatches.
469        source.set(vec![fake_request("a")]);
470        let resurfaced = run_one_sweep(
471            source.as_ref(),
472            &notifiers,
473            &ApprovalListFilter::default(),
474            &mut dedup,
475        )
476        .await;
477        assert_eq!(resurfaced.new_count, 1);
478        assert_eq!(n1.seen_ids(), vec!["a", "a"]);
479    }
480
481    #[tokio::test]
482    async fn failing_notifier_does_not_block_other_notifiers() {
483        let source = VecSource::new(vec![fake_request("a")]);
484        let bad = CountingNotifier::failing(
485            "discord",
486            NotifierError::Transient("rate limit".to_string()),
487        );
488        let good = CountingNotifier::ok("slack");
489        let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![bad.clone(), good.clone()];
490        let mut dedup = DedupRing::with_cap(16);
491
492        let result = run_one_sweep(
493            source.as_ref(),
494            &notifiers,
495            &ApprovalListFilter::default(),
496            &mut dedup,
497        )
498        .await;
499
500        assert_eq!(result.notify_attempts, 2);
501        assert_eq!(result.notify_failures, 1);
502        assert_eq!(bad.seen_ids(), vec!["a"]);
503        assert_eq!(good.seen_ids(), vec!["a"]);
504    }
505
506    #[tokio::test]
507    async fn empty_pending_is_a_noop() {
508        let source = VecSource::new(vec![]);
509        let n1 = CountingNotifier::ok("slack");
510        let notifiers: Vec<Arc<dyn ApprovalNotifier>> = vec![n1.clone()];
511        let mut dedup = DedupRing::with_cap(16);
512
513        let result = run_one_sweep(
514            source.as_ref(),
515            &notifiers,
516            &ApprovalListFilter::default(),
517            &mut dedup,
518        )
519        .await;
520
521        assert_eq!(result.pending_count, 0);
522        assert_eq!(result.new_count, 0);
523        assert_eq!(result.notify_attempts, 0);
524        assert!(n1.seen_ids().is_empty());
525    }
526
527    #[tokio::test]
528    async fn dedup_evicts_at_cap() {
529        let mut dedup = DedupRing::with_cap(3);
530        assert!(dedup.record_new("a"));
531        assert!(dedup.record_new("b"));
532        assert!(dedup.record_new("c"));
533        assert!(!dedup.record_new("a"));
534        // Insert one more — oldest ("a") should be evicted.
535        assert!(dedup.record_new("d"));
536        // "a" is no longer remembered, so re-inserting is a new record.
537        assert!(dedup.record_new("a"));
538    }
539
540    #[test]
541    fn dedup_prune_to_removes_absent_entries() {
542        let mut dedup = DedupRing::with_cap(8);
543        dedup.record_new("a");
544        dedup.record_new("b");
545        dedup.record_new("c");
546        let mut current = HashSet::new();
547        current.insert("b");
548        dedup.prune_to(&current);
549        assert!(!dedup.record_new("b"), "b should still be deduped");
550        assert!(dedup.record_new("a"), "a should be re-droppable");
551        assert!(dedup.record_new("c"), "c should be re-droppable");
552    }
553
554    #[test]
555    fn notifier_error_display_is_informative() {
556        assert_eq!(
557            format!("{}", NotifierError::Transient("rate".to_string())),
558            "transient: rate"
559        );
560        assert_eq!(
561            format!("{}", NotifierError::Permanent("misconfigured".to_string())),
562            "permanent: misconfigured"
563        );
564    }
565}