Skip to main content

rsigma_runtime/io/
delivery.rs

1//! Async delivery layer: one bounded queue and worker task per sink.
2//!
3//! The daemon fans every result into a [`Dispatcher`], which owns one
4//! [`SinkWorker`] per leaf sink. Each worker drains its queue sequentially,
5//! batches opportunistically, retries with bounded exponential backoff, and
6//! routes terminal failures to the DLQ. Acknowledgments use the lifetime of an
7//! [`AckGuard`]: each event's ack tokens fire only once every worker has
8//! durably committed (delivered or DLQ-parked) the event, so the at-least-once
9//! contract survives fan-out.
10
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::Mutex;
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19
20use rsigma_eval::ProcessResult;
21
22use crate::error::RuntimeError;
23use crate::io::{AckToken, Sink};
24use crate::metrics::MetricsHook;
25
26type DeliveryFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
27
28/// A sink the delivery layer can drive: deliver one result, identify itself.
29///
30/// Implemented for the concrete [`crate::io::Sink`] enum; generic so the worker
31/// can be unit-tested against a mock without a test-only enum variant.
32pub trait DeliverySink: Send + 'static {
33    /// Deliver a single result, returning an error the worker may retry.
34    fn deliver<'a>(&'a mut self, result: &'a ProcessResult) -> DeliveryFuture<'a>;
35    /// Short, stable label used for structured logs and per-sink metrics.
36    fn label(&self) -> &'static str;
37}
38
39impl DeliverySink for Sink {
40    fn deliver<'a>(&'a mut self, result: &'a ProcessResult) -> DeliveryFuture<'a> {
41        self.send(result)
42    }
43    fn label(&self) -> &'static str {
44        self.kind_label()
45    }
46}
47
48/// Behavior when a worker's bounded queue is full.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum OnFull {
51    /// Block the dispatcher until the queue drains. Preserves at-least-once,
52    /// but a stalled sink eventually backpressures the whole pipeline.
53    Block,
54    /// Drop the result and increment a counter. Best-effort (at-most-once),
55    /// but never stalls the pipeline.
56    Drop,
57}
58
59/// Per-sink delivery tuning. Public so the CLI can build it from config.
60#[derive(Debug, Clone, Copy)]
61pub struct DeliveryConfig {
62    /// Bounded queue capacity between the dispatcher and the worker.
63    pub queue_depth: usize,
64    /// Maximum results drained into one worker batch.
65    pub batch_max: usize,
66    /// Maximum time a partial batch waits before flushing (reserved for
67    /// batch-oriented sinks; the Phase 1 sinks deliver per result).
68    pub batch_flush: Duration,
69    /// Maximum retries after the first attempt before routing to the DLQ.
70    pub retry_max: u32,
71    /// Base backoff for the first retry.
72    pub backoff_base: Duration,
73    /// Backoff ceiling.
74    pub backoff_max: Duration,
75}
76
77impl Default for DeliveryConfig {
78    fn default() -> Self {
79        DeliveryConfig {
80            queue_depth: 1024,
81            batch_max: 64,
82            batch_flush: Duration::from_millis(50),
83            retry_max: 3,
84            backoff_base: Duration::from_millis(100),
85            backoff_max: Duration::from_secs(5),
86        }
87    }
88}
89
90/// A result that could not be delivered after exhausting retries.
91///
92/// Emitted to the DLQ channel; the CLI wraps it into its own DLQ record so the
93/// runtime stays free of CLI types.
94pub struct DeliveryFailure {
95    /// The serialized `ProcessResult` that failed to deliver.
96    pub serialized: String,
97    /// Human-readable failure reason (the last transport error).
98    pub error: String,
99}
100
101/// Holds an event's ack tokens until every worker has committed it.
102///
103/// The dispatcher clones one `Arc<AckGuard>` into each worker's queue item.
104/// When the last clone drops (all workers committed, or dropped under
105/// `OnFull::Drop`), `Drop` forwards the tokens to the ack channel. An
106/// un-flushed worker at shutdown still holds its clone, so the tokens never
107/// fire and the source redelivers, preserving at-least-once.
108struct AckGuard {
109    tokens: Mutex<Vec<AckToken>>,
110    ack_tx: mpsc::UnboundedSender<AckToken>,
111}
112
113impl Drop for AckGuard {
114    fn drop(&mut self) {
115        let tokens = std::mem::take(&mut *self.tokens.lock());
116        for token in tokens {
117            // Unbounded and non-blocking: acks are cheap and the number of
118            // in-flight guards is bounded by the worker queues. A closed
119            // receiver (daemon shutting down) drops the token, which simply
120            // means no ack, i.e. the source redelivers.
121            let _ = self.ack_tx.send(token);
122        }
123    }
124}
125
126/// One unit of work handed to a worker: a shared result plus the shared ack
127/// guard whose lifetime gates the ack.
128struct DeliveryItem {
129    result: Arc<ProcessResult>,
130    _guard: Arc<AckGuard>,
131}
132
133/// Handle to a spawned per-sink worker task.
134struct SinkWorker {
135    tx: mpsc::Sender<DeliveryItem>,
136    handle: JoinHandle<()>,
137    on_full: OnFull,
138    label: &'static str,
139    metrics: Arc<dyn MetricsHook>,
140}
141
142impl SinkWorker {
143    fn spawn<S: DeliverySink>(
144        sink: S,
145        on_full: OnFull,
146        cfg: DeliveryConfig,
147        dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
148        metrics: Arc<dyn MetricsHook>,
149    ) -> Self {
150        let label = sink.label();
151        metrics.register_sink(label);
152        let (tx, rx) = mpsc::channel(cfg.queue_depth.max(1));
153        let worker_metrics = metrics.clone();
154        let handle = tokio::spawn(worker_loop(sink, rx, cfg, dlq_tx, worker_metrics, label));
155        SinkWorker {
156            tx,
157            handle,
158            on_full,
159            label,
160            metrics,
161        }
162    }
163
164    /// Enqueue an item, honoring the full-queue policy. Returns whether the
165    /// item was accepted (always true under `Block` unless the worker is gone).
166    async fn enqueue(&self, item: DeliveryItem) {
167        match self.on_full {
168            OnFull::Block => {
169                self.metrics.on_sink_queue_depth_change(self.label, 1);
170                if self.tx.send(item).await.is_err() {
171                    // Worker gone: undo the depth bump. The guard clone in
172                    // `item` drops here, contributing to the ack-join.
173                    self.metrics.on_sink_queue_depth_change(self.label, -1);
174                }
175            }
176            OnFull::Drop => match self.tx.try_send(item) {
177                Ok(()) => self.metrics.on_sink_queue_depth_change(self.label, 1),
178                Err(mpsc::error::TrySendError::Full(_)) => {
179                    // Dropped: the item's guard clone drops here, so the ack
180                    // still fires (best-effort for this lossy sink).
181                    self.metrics.on_sink_dropped(self.label);
182                }
183                Err(mpsc::error::TrySendError::Closed(_)) => {}
184            },
185        }
186    }
187}
188
189/// Drives every sink for the daemon output path.
190pub struct Dispatcher {
191    workers: Vec<SinkWorker>,
192    ack_tx: mpsc::UnboundedSender<AckToken>,
193}
194
195impl Dispatcher {
196    /// Spawn one worker per leaf sink, each with its own full-queue policy and
197    /// delivery tuning. `sinks` should already be flattened to leaves (see
198    /// [`crate::io::Sink::into_leaves`]); each carries its own
199    /// [`DeliveryConfig`] so a sink (e.g. a webhook) can override the global
200    /// retry/backoff/queue defaults.
201    pub fn spawn<S: DeliverySink>(
202        sinks: Vec<(S, OnFull, DeliveryConfig)>,
203        dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
204        ack_tx: mpsc::UnboundedSender<AckToken>,
205        metrics: Arc<dyn MetricsHook>,
206    ) -> Self {
207        let workers = sinks
208            .into_iter()
209            .map(|(sink, on_full, cfg)| {
210                SinkWorker::spawn(sink, on_full, cfg, dlq_tx.clone(), metrics.clone())
211            })
212            .collect();
213        Dispatcher { workers, ack_tx }
214    }
215
216    /// Fan one result and its ack tokens into every worker. Awaits queue space
217    /// for `Block` sinks; never blocks for `Drop` sinks.
218    pub async fn dispatch(&self, result: ProcessResult, tokens: Vec<AckToken>) {
219        if self.workers.is_empty() {
220            for token in tokens {
221                let _ = self.ack_tx.send(token);
222            }
223            return;
224        }
225        let guard = Arc::new(AckGuard {
226            tokens: Mutex::new(tokens),
227            ack_tx: self.ack_tx.clone(),
228        });
229        let result = Arc::new(result);
230        for worker in &self.workers {
231            worker
232                .enqueue(DeliveryItem {
233                    result: result.clone(),
234                    _guard: guard.clone(),
235                })
236                .await;
237        }
238    }
239
240    /// Close every worker queue and await drain. The caller bounds this with
241    /// the drain timeout; un-drained items stay unacked so the source
242    /// redelivers them on restart.
243    pub async fn shutdown(self) {
244        let mut handles = Vec::with_capacity(self.workers.len());
245        for worker in self.workers {
246            handles.push(worker.handle);
247            // Dropping `worker.tx` (via `worker` going out of scope) closes the
248            // queue; the worker drains remaining items, then exits.
249        }
250        drop(self.ack_tx);
251        for handle in handles {
252            let _ = handle.await;
253        }
254    }
255}
256
257async fn worker_loop<S: DeliverySink>(
258    mut sink: S,
259    mut rx: mpsc::Receiver<DeliveryItem>,
260    cfg: DeliveryConfig,
261    dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
262    metrics: Arc<dyn MetricsHook>,
263    label: &'static str,
264) {
265    while let Some(first) = rx.recv().await {
266        let mut batch = Vec::with_capacity(cfg.batch_max.clamp(1, 64));
267        batch.push(first);
268        while batch.len() < cfg.batch_max {
269            match rx.try_recv() {
270                Ok(item) => batch.push(item),
271                Err(_) => break,
272            }
273        }
274        metrics.on_sink_queue_depth_change(label, -(batch.len() as i64));
275        for item in &batch {
276            deliver_one(
277                &mut sink,
278                &item.result,
279                &cfg,
280                dlq_tx.as_ref(),
281                &metrics,
282                label,
283            )
284            .await;
285        }
286        // Dropping `batch` drops each item's `Arc<AckGuard>` clone, advancing
287        // the ack-join.
288        drop(batch);
289    }
290}
291
292async fn deliver_one<S: DeliverySink>(
293    sink: &mut S,
294    result: &ProcessResult,
295    cfg: &DeliveryConfig,
296    dlq_tx: Option<&mpsc::Sender<DeliveryFailure>>,
297    metrics: &Arc<dyn MetricsHook>,
298    label: &'static str,
299) {
300    let mut attempt: u32 = 0;
301    loop {
302        match sink.deliver(result).await {
303            Ok(()) => return,
304            Err(e) => {
305                // A `Permanent` error will not heal on retry (e.g. a 4xx from a
306                // misrendered webhook body), so route it to the DLQ immediately
307                // rather than burning the retry budget.
308                let permanent = matches!(e, RuntimeError::Permanent(_));
309                if permanent || attempt >= cfg.retry_max {
310                    metrics.on_sink_delivery_failed(label);
311                    match dlq_tx {
312                        Some(dlq) => {
313                            let serialized = serde_json::to_string(result).unwrap_or_default();
314                            let _ = dlq
315                                .send(DeliveryFailure {
316                                    serialized,
317                                    error: format!("sink delivery failure: {e}"),
318                                })
319                                .await;
320                        }
321                        None => {
322                            tracing::warn!(
323                                sink = label,
324                                error = %e,
325                                "Sink delivery failed and no DLQ is configured; dropping result",
326                            );
327                        }
328                    }
329                    return;
330                }
331                attempt += 1;
332                metrics.on_sink_retry(label);
333                let delay = backoff_delay(cfg.backoff_base, cfg.backoff_max, attempt);
334                tracing::debug!(sink = label, attempt, error = %e, "Sink delivery retry");
335                tokio::time::sleep(delay).await;
336            }
337        }
338    }
339}
340
341/// Capped exponential backoff. `attempt` is 1 for the first retry.
342fn backoff_delay(base: Duration, max: Duration, attempt: u32) -> Duration {
343    let shift = attempt.saturating_sub(1).min(20);
344    let factor: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
345    base.checked_mul(factor).unwrap_or(max).min(max)
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use std::sync::atomic::{AtomicUsize, Ordering};
352
353    use crate::metrics::NoopMetrics;
354
355    fn noop_metrics() -> Arc<dyn MetricsHook> {
356        Arc::new(NoopMetrics)
357    }
358
359    /// One empty `ProcessResult`. The delivery layer treats results opaquely,
360    /// so content is irrelevant to the queue/retry/ack mechanics under test.
361    fn result() -> ProcessResult {
362        Vec::new()
363    }
364
365    fn fast_cfg() -> DeliveryConfig {
366        DeliveryConfig {
367            queue_depth: 64,
368            batch_max: 16,
369            batch_flush: Duration::from_millis(1),
370            retry_max: 5,
371            backoff_base: Duration::from_millis(1),
372            backoff_max: Duration::from_millis(5),
373        }
374    }
375
376    /// Configurable mock sink: fail the first `fail_first` deliveries, then
377    /// succeed; optionally gate on a notify; record successful deliveries.
378    struct MockSink {
379        label: &'static str,
380        fail_first: Arc<AtomicUsize>,
381        always_fail: bool,
382        permanent: bool,
383        delivered: Arc<AtomicUsize>,
384        attempts: Arc<AtomicUsize>,
385        // A latching gate: deliveries block until the watch value is `true`,
386        // after which every delivery (including later ones) proceeds. A plain
387        // `Notify` would only wake waiters parked at the instant it fires.
388        gate: Option<tokio::sync::watch::Receiver<bool>>,
389    }
390
391    impl MockSink {
392        fn new(label: &'static str) -> Self {
393            MockSink {
394                label,
395                fail_first: Arc::new(AtomicUsize::new(0)),
396                always_fail: false,
397                permanent: false,
398                delivered: Arc::new(AtomicUsize::new(0)),
399                attempts: Arc::new(AtomicUsize::new(0)),
400                gate: None,
401            }
402        }
403    }
404
405    impl DeliverySink for MockSink {
406        fn deliver<'a>(&'a mut self, _result: &'a ProcessResult) -> DeliveryFuture<'a> {
407            let fail_first = self.fail_first.clone();
408            let delivered = self.delivered.clone();
409            let attempts = self.attempts.clone();
410            let always_fail = self.always_fail;
411            let permanent = self.permanent;
412            let gate = self.gate.clone();
413            Box::pin(async move {
414                if let Some(mut rx) = gate {
415                    loop {
416                        if *rx.borrow() {
417                            break;
418                        }
419                        if rx.changed().await.is_err() {
420                            break;
421                        }
422                    }
423                }
424                attempts.fetch_add(1, Ordering::SeqCst);
425                if always_fail {
426                    return Err(if permanent {
427                        RuntimeError::Permanent("mock permanent".to_string())
428                    } else {
429                        RuntimeError::Io(std::io::Error::other("mock always fails"))
430                    });
431                }
432                if fail_first.load(Ordering::SeqCst) > 0 {
433                    fail_first.fetch_sub(1, Ordering::SeqCst);
434                    return Err(RuntimeError::Io(std::io::Error::other("mock transient")));
435                }
436                delivered.fetch_add(1, Ordering::SeqCst);
437                Ok(())
438            })
439        }
440        fn label(&self) -> &'static str {
441            self.label
442        }
443    }
444
445    #[tokio::test]
446    async fn delivers_and_acks_single_sink() {
447        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
448        let sink = MockSink::new("mock");
449        let delivered = sink.delivered.clone();
450        let dispatcher = Dispatcher::spawn(
451            vec![(sink, OnFull::Block, fast_cfg())],
452            None,
453            ack_tx,
454            noop_metrics(),
455        );
456
457        for _ in 0..10 {
458            dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
459        }
460        dispatcher.shutdown().await;
461
462        assert_eq!(delivered.load(Ordering::SeqCst), 10);
463        let mut acks = 0;
464        while ack_rx.try_recv().is_ok() {
465            acks += 1;
466        }
467        assert_eq!(acks, 10, "every dispatched event must be acked");
468    }
469
470    #[tokio::test]
471    async fn retries_then_succeeds() {
472        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
473        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
474        let sink = MockSink::new("mock");
475        sink.fail_first.store(3, Ordering::SeqCst); // < retry_max (5)
476        let delivered = sink.delivered.clone();
477        let dispatcher = Dispatcher::spawn(
478            vec![(sink, OnFull::Block, fast_cfg())],
479            Some(dlq_tx),
480            ack_tx,
481            noop_metrics(),
482        );
483
484        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
485        dispatcher.shutdown().await;
486
487        assert_eq!(delivered.load(Ordering::SeqCst), 1, "eventually delivered");
488        assert!(ack_rx.try_recv().is_ok(), "acked after success");
489        assert!(
490            dlq_rx.try_recv().is_err(),
491            "no DLQ entry on eventual success"
492        );
493    }
494
495    #[tokio::test]
496    async fn terminal_failure_routes_to_dlq_and_acks() {
497        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
498        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
499        let mut sink = MockSink::new("mock");
500        sink.always_fail = true;
501        let dispatcher = Dispatcher::spawn(
502            vec![(sink, OnFull::Block, fast_cfg())],
503            Some(dlq_tx),
504            ack_tx,
505            noop_metrics(),
506        );
507
508        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
509        dispatcher.shutdown().await;
510
511        let failure = dlq_rx.try_recv().expect("terminal failure routed to DLQ");
512        assert!(failure.error.contains("sink delivery failure"));
513        assert!(
514            ack_rx.try_recv().is_ok(),
515            "token acked after DLQ parking (matches prior behavior)",
516        );
517    }
518
519    #[tokio::test]
520    async fn permanent_failure_skips_retries_and_dlqs() {
521        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
522        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
523        let mut sink = MockSink::new("mock");
524        sink.always_fail = true;
525        sink.permanent = true;
526        let attempts = sink.attempts.clone();
527        let dispatcher = Dispatcher::spawn(
528            vec![(sink, OnFull::Block, fast_cfg())],
529            Some(dlq_tx),
530            ack_tx,
531            noop_metrics(),
532        );
533
534        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
535        dispatcher.shutdown().await;
536
537        assert_eq!(
538            attempts.load(Ordering::SeqCst),
539            1,
540            "a permanent failure must not be retried",
541        );
542        let failure = dlq_rx.try_recv().expect("permanent failure routed to DLQ");
543        assert!(failure.error.contains("permanent delivery failure"));
544        assert!(ack_rx.try_recv().is_ok());
545    }
546
547    #[tokio::test]
548    async fn ack_join_waits_for_all_sinks() {
549        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
550        let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
551        let fast = MockSink::new("fast");
552        let mut slow = MockSink::new("slow");
553        slow.gate = Some(gate_rx);
554
555        let dispatcher = Dispatcher::spawn(
556            vec![
557                (fast, OnFull::Block, fast_cfg()),
558                (slow, OnFull::Block, fast_cfg()),
559            ],
560            None,
561            ack_tx,
562            noop_metrics(),
563        );
564        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
565
566        // The fast sink delivered, but the slow sink is gated, so the guard
567        // still has a live clone and the ack must not have fired yet.
568        tokio::time::sleep(Duration::from_millis(20)).await;
569        assert!(
570            ack_rx.try_recv().is_err(),
571            "ack must wait for the slow sink"
572        );
573
574        gate_tx.send(true).unwrap();
575        // Allow the slow worker to complete and drop its guard clone.
576        tokio::time::sleep(Duration::from_millis(20)).await;
577        assert!(
578            ack_rx.try_recv().is_ok(),
579            "ack fires once every sink confirms",
580        );
581        dispatcher.shutdown().await;
582    }
583
584    #[tokio::test]
585    async fn drop_on_full_never_blocks_and_still_acks() {
586        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
587        let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
588        let mut sink = MockSink::new("lossy");
589        sink.gate = Some(gate_rx);
590        let cfg = DeliveryConfig {
591            queue_depth: 1,
592            ..fast_cfg()
593        };
594        let dispatcher = Dispatcher::spawn(
595            vec![(sink, OnFull::Drop, cfg)],
596            None,
597            ack_tx,
598            noop_metrics(),
599        );
600
601        // Dispatch far more than the queue can hold while the sink is gated;
602        // dispatch must not block.
603        for _ in 0..50 {
604            dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
605        }
606        gate_tx.send(true).unwrap();
607        dispatcher.shutdown().await;
608
609        // Every event is acked exactly once whether delivered or dropped.
610        let mut acks = 0;
611        while ack_rx.try_recv().is_ok() {
612            acks += 1;
613        }
614        assert_eq!(acks, 50, "lossy sink still acks every event (best-effort)");
615    }
616
617    #[test]
618    fn backoff_is_capped_and_exponential() {
619        let base = Duration::from_millis(100);
620        let max = Duration::from_secs(5);
621        assert_eq!(backoff_delay(base, max, 1), Duration::from_millis(100));
622        assert_eq!(backoff_delay(base, max, 2), Duration::from_millis(200));
623        assert_eq!(backoff_delay(base, max, 3), Duration::from_millis(400));
624        assert_eq!(backoff_delay(base, max, 100), max, "capped at max");
625    }
626}