Skip to main content

rsigma_runtime/io/
delivery.rs

1//! Async delivery layer: one bounded queue and worker task per sink.
2//!
3//! The daemon fans every result into a [`Dispatcher`], which owns one
4//! [`SinkWorker`] per leaf sink. Each worker drains its queue sequentially,
5//! batches opportunistically, retries with bounded exponential backoff, and
6//! routes terminal failures to the DLQ. Acknowledgments use the lifetime of an
7//! [`AckGuard`]: each event's ack tokens fire only once every worker has
8//! durably committed (delivered or DLQ-parked) the event, so the at-least-once
9//! contract survives fan-out.
10
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14use std::time::{Duration, SystemTime};
15
16use parking_lot::Mutex;
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19use uuid::Uuid;
20
21use rsigma_eval::ProcessResult;
22
23use crate::error::RuntimeError;
24use crate::io::{AckToken, IncidentEnvelope, Sink};
25use crate::metrics::MetricsHook;
26
27type DeliveryFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
28
29/// A sink the delivery layer can drive: deliver one result, identify itself.
30///
31/// Implemented for the concrete [`crate::io::Sink`] enum; generic so the worker
32/// can be unit-tested against a mock without a test-only enum variant.
33pub trait DeliverySink: Send + 'static {
34    /// Deliver a single result, returning an error the worker may retry.
35    ///
36    /// `ctx` is minted once per queued item and handed back unchanged on every
37    /// retry, so a sink that derives request identity from it (e.g. a signed
38    /// webhook) reproduces the same id, timestamp, and signature on a re-send.
39    fn deliver<'a>(
40        &'a mut self,
41        result: &'a ProcessResult,
42        ctx: &'a DeliveryContext,
43    ) -> DeliveryFuture<'a>;
44    /// Deliver a single incident line. Defaults to a no-op so mock sinks and
45    /// any sink that does not carry incidents need no implementation.
46    fn deliver_incident<'a>(
47        &'a mut self,
48        _incident: &'a IncidentEnvelope,
49        _ctx: &'a DeliveryContext,
50    ) -> DeliveryFuture<'a> {
51        Box::pin(async { Ok(()) })
52    }
53    /// Short, stable label used for structured logs and per-sink metrics.
54    fn label(&self) -> &'static str;
55}
56
57impl DeliverySink for Sink {
58    fn deliver<'a>(
59        &'a mut self,
60        result: &'a ProcessResult,
61        _ctx: &'a DeliveryContext,
62    ) -> DeliveryFuture<'a> {
63        self.send(result)
64    }
65    fn deliver_incident<'a>(
66        &'a mut self,
67        incident: &'a IncidentEnvelope,
68        _ctx: &'a DeliveryContext,
69    ) -> DeliveryFuture<'a> {
70        self.send_incident(incident)
71    }
72    fn label(&self) -> &'static str {
73        self.kind_label()
74    }
75}
76
77/// Behavior when a worker's bounded queue is full.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum OnFull {
80    /// Block the dispatcher until the queue drains. Preserves at-least-once,
81    /// but a stalled sink eventually backpressures the whole pipeline.
82    Block,
83    /// Drop the result and increment a counter. Best-effort (at-most-once),
84    /// but never stalls the pipeline.
85    Drop,
86}
87
88/// Per-sink delivery tuning. Public so the CLI can build it from config.
89#[derive(Debug, Clone, Copy)]
90pub struct DeliveryConfig {
91    /// Bounded queue capacity between the dispatcher and the worker.
92    pub queue_depth: usize,
93    /// Maximum results drained into one worker batch.
94    pub batch_max: usize,
95    /// Maximum time a partial batch waits before flushing (reserved for
96    /// batch-oriented sinks; the Phase 1 sinks deliver per result).
97    pub batch_flush: Duration,
98    /// Maximum retries after the first attempt before routing to the DLQ.
99    pub retry_max: u32,
100    /// Base backoff for the first retry.
101    pub backoff_base: Duration,
102    /// Backoff ceiling.
103    pub backoff_max: Duration,
104}
105
106impl Default for DeliveryConfig {
107    fn default() -> Self {
108        DeliveryConfig {
109            queue_depth: 1024,
110            batch_max: 64,
111            batch_flush: Duration::from_millis(50),
112            retry_max: 3,
113            backoff_base: Duration::from_millis(100),
114            backoff_max: Duration::from_secs(5),
115        }
116    }
117}
118
119/// Per-delivery identity, created once per queued item and reused on every
120/// retry attempt.
121///
122/// The worker creates one of these before the retry loop and hands the same
123/// reference to the sink on each attempt. A sink that derives request identity
124/// from it (today, a signed webhook) therefore reproduces a byte-identical id,
125/// timestamp, and signature when a delivery is retried, which is what lets a
126/// receiver dedupe redeliveries and enforce a replay window.
127///
128/// The id and timestamp are computed lazily on first access: most deliveries go
129/// to sinks that never read them (stdout, file, NATS, OTLP), so the worker pays
130/// no per-item RNG call or allocation unless a sink actually signs. The first
131/// access (the first signing attempt) fixes the value, and every retry reuses
132/// the same one.
133pub struct DeliveryContext {
134    inner: OnceLock<ContextData>,
135}
136
137struct ContextData {
138    id_base: String,
139    first_attempt: SystemTime,
140}
141
142impl DeliveryContext {
143    /// Create a context with nothing computed yet. Cheap; no RNG, no syscall.
144    pub fn new() -> Self {
145        DeliveryContext {
146            inner: OnceLock::new(),
147        }
148    }
149
150    fn data(&self) -> &ContextData {
151        self.inner.get_or_init(|| ContextData {
152            id_base: format!("msg_{}", Uuid::new_v4().simple()),
153            first_attempt: SystemTime::now(),
154        })
155    }
156
157    /// Stable base id for this delivery (`msg_<uuid>`). A sink may append a
158    /// per-result suffix to address individual results within one batch.
159    pub fn id_base(&self) -> &str {
160        &self.data().id_base
161    }
162
163    /// Wall-clock time of the first access (the first delivery attempt that
164    /// reads the context); used as the signed timestamp.
165    pub fn first_attempt(&self) -> SystemTime {
166        self.data().first_attempt
167    }
168}
169
170impl Default for DeliveryContext {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176/// A result that could not be delivered after exhausting retries.
177///
178/// Emitted to the DLQ channel; the CLI wraps it into its own DLQ record so the
179/// runtime stays free of CLI types.
180pub struct DeliveryFailure {
181    /// The serialized `ProcessResult` that failed to deliver.
182    pub serialized: String,
183    /// Human-readable failure reason (the last transport error).
184    pub error: String,
185}
186
187/// Holds an event's ack tokens until every worker has committed it.
188///
189/// The dispatcher clones one `Arc<AckGuard>` into each worker's queue item.
190/// When the last clone drops (all workers committed, or dropped under
191/// `OnFull::Drop`), `Drop` forwards the tokens to the ack channel. An
192/// un-flushed worker at shutdown still holds its clone, so the tokens never
193/// fire and the source redelivers, preserving at-least-once.
194struct AckGuard {
195    tokens: Mutex<Vec<AckToken>>,
196    ack_tx: mpsc::UnboundedSender<AckToken>,
197}
198
199impl Drop for AckGuard {
200    fn drop(&mut self) {
201        let tokens = std::mem::take(&mut *self.tokens.lock());
202        for token in tokens {
203            // Unbounded and non-blocking: acks are cheap and the number of
204            // in-flight guards is bounded by the worker queues. A closed
205            // receiver (daemon shutting down) drops the token, which simply
206            // means no ack, i.e. the source redelivers.
207            let _ = self.ack_tx.send(token);
208        }
209    }
210}
211
212/// The payload of one unit of work: either a result batch or an incident line.
213enum DeliveryPayload {
214    Result(Arc<ProcessResult>),
215    Incident(Arc<IncidentEnvelope>),
216}
217
218/// One unit of work handed to a worker: a shared payload plus the shared ack
219/// guard whose lifetime gates the ack.
220struct DeliveryItem {
221    payload: DeliveryPayload,
222    _guard: Arc<AckGuard>,
223}
224
225/// Handle to a spawned per-sink worker task.
226struct SinkWorker {
227    tx: mpsc::Sender<DeliveryItem>,
228    handle: JoinHandle<()>,
229    on_full: OnFull,
230    label: &'static str,
231    metrics: Arc<dyn MetricsHook>,
232}
233
234impl SinkWorker {
235    fn spawn<S: DeliverySink>(
236        sink: S,
237        on_full: OnFull,
238        cfg: DeliveryConfig,
239        dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
240        metrics: Arc<dyn MetricsHook>,
241    ) -> Self {
242        let label = sink.label();
243        metrics.register_sink(label);
244        let (tx, rx) = mpsc::channel(cfg.queue_depth.max(1));
245        let worker_metrics = metrics.clone();
246        let handle = tokio::spawn(worker_loop(sink, rx, cfg, dlq_tx, worker_metrics, label));
247        SinkWorker {
248            tx,
249            handle,
250            on_full,
251            label,
252            metrics,
253        }
254    }
255
256    /// Enqueue an item, honoring the full-queue policy. Returns whether the
257    /// item was accepted (always true under `Block` unless the worker is gone).
258    async fn enqueue(&self, item: DeliveryItem) {
259        match self.on_full {
260            OnFull::Block => {
261                self.metrics.on_sink_queue_depth_change(self.label, 1);
262                if self.tx.send(item).await.is_err() {
263                    // Worker gone: undo the depth bump. The guard clone in
264                    // `item` drops here, contributing to the ack-join.
265                    self.metrics.on_sink_queue_depth_change(self.label, -1);
266                }
267            }
268            OnFull::Drop => match self.tx.try_send(item) {
269                Ok(()) => self.metrics.on_sink_queue_depth_change(self.label, 1),
270                Err(mpsc::error::TrySendError::Full(_)) => {
271                    // Dropped: the item's guard clone drops here, so the ack
272                    // still fires (best-effort for this lossy sink).
273                    self.metrics.on_sink_dropped(self.label);
274                }
275                Err(mpsc::error::TrySendError::Closed(_)) => {}
276            },
277        }
278    }
279}
280
281/// Drives every sink for the daemon output path.
282pub struct Dispatcher {
283    workers: Vec<SinkWorker>,
284    ack_tx: mpsc::UnboundedSender<AckToken>,
285}
286
287impl Dispatcher {
288    /// Spawn one worker per leaf sink, each with its own full-queue policy and
289    /// delivery tuning. `sinks` should already be flattened to leaves (see
290    /// [`crate::io::Sink::into_leaves`]); each carries its own
291    /// [`DeliveryConfig`] so a sink (e.g. a webhook) can override the global
292    /// retry/backoff/queue defaults.
293    pub fn spawn<S: DeliverySink>(
294        sinks: Vec<(S, OnFull, DeliveryConfig)>,
295        dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
296        ack_tx: mpsc::UnboundedSender<AckToken>,
297        metrics: Arc<dyn MetricsHook>,
298    ) -> Self {
299        let workers = sinks
300            .into_iter()
301            .map(|(sink, on_full, cfg)| {
302                SinkWorker::spawn(sink, on_full, cfg, dlq_tx.clone(), metrics.clone())
303            })
304            .collect();
305        Dispatcher { workers, ack_tx }
306    }
307
308    /// Fan one result and its ack tokens into every worker. Awaits queue space
309    /// for `Block` sinks; never blocks for `Drop` sinks.
310    pub async fn dispatch(&self, result: ProcessResult, tokens: Vec<AckToken>) {
311        if self.workers.is_empty() {
312            for token in tokens {
313                let _ = self.ack_tx.send(token);
314            }
315            return;
316        }
317        let guard = Arc::new(AckGuard {
318            tokens: Mutex::new(tokens),
319            ack_tx: self.ack_tx.clone(),
320        });
321        let result = Arc::new(result);
322        for worker in &self.workers {
323            worker
324                .enqueue(DeliveryItem {
325                    payload: DeliveryPayload::Result(result.clone()),
326                    _guard: guard.clone(),
327                })
328                .await;
329        }
330    }
331
332    /// Fan one incident line into every worker. Incidents are synthetic (no
333    /// input event), so they carry no ack tokens.
334    pub async fn dispatch_incident(&self, incident: IncidentEnvelope) {
335        if self.workers.is_empty() {
336            return;
337        }
338        let guard = Arc::new(AckGuard {
339            tokens: Mutex::new(Vec::new()),
340            ack_tx: self.ack_tx.clone(),
341        });
342        let incident = Arc::new(incident);
343        for worker in &self.workers {
344            worker
345                .enqueue(DeliveryItem {
346                    payload: DeliveryPayload::Incident(incident.clone()),
347                    _guard: guard.clone(),
348                })
349                .await;
350        }
351    }
352
353    /// Close every worker queue and await drain. The caller bounds this with
354    /// the drain timeout; un-drained items stay unacked so the source
355    /// redelivers them on restart.
356    pub async fn shutdown(self) {
357        let mut handles = Vec::with_capacity(self.workers.len());
358        for worker in self.workers {
359            handles.push(worker.handle);
360            // Dropping `worker.tx` (via `worker` going out of scope) closes the
361            // queue; the worker drains remaining items, then exits.
362        }
363        drop(self.ack_tx);
364        for handle in handles {
365            let _ = handle.await;
366        }
367    }
368}
369
370async fn worker_loop<S: DeliverySink>(
371    mut sink: S,
372    mut rx: mpsc::Receiver<DeliveryItem>,
373    cfg: DeliveryConfig,
374    dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
375    metrics: Arc<dyn MetricsHook>,
376    label: &'static str,
377) {
378    while let Some(first) = rx.recv().await {
379        let mut batch = Vec::with_capacity(cfg.batch_max.clamp(1, 64));
380        batch.push(first);
381        while batch.len() < cfg.batch_max {
382            match rx.try_recv() {
383                Ok(item) => batch.push(item),
384                Err(_) => break,
385            }
386        }
387        metrics.on_sink_queue_depth_change(label, -(batch.len() as i64));
388        for item in &batch {
389            let target = match &item.payload {
390                DeliveryPayload::Result(r) => DeliverTarget::Result(r),
391                DeliveryPayload::Incident(e) => DeliverTarget::Incident(e),
392            };
393            deliver_one(&mut sink, target, &cfg, dlq_tx.as_ref(), &metrics, label).await;
394        }
395        // Dropping `batch` drops each item's `Arc<AckGuard>` clone, advancing
396        // the ack-join.
397        drop(batch);
398    }
399}
400
401/// What a worker is delivering: a result batch or an incident line.
402enum DeliverTarget<'a> {
403    Result(&'a ProcessResult),
404    Incident(&'a IncidentEnvelope),
405}
406
407async fn deliver_one<S: DeliverySink>(
408    sink: &mut S,
409    target: DeliverTarget<'_>,
410    cfg: &DeliveryConfig,
411    dlq_tx: Option<&mpsc::Sender<DeliveryFailure>>,
412    metrics: &Arc<dyn MetricsHook>,
413    label: &'static str,
414) {
415    // Minted once and reused on every retry so a signed sink reproduces the
416    // same id/timestamp/signature on a re-send.
417    let ctx = DeliveryContext::new();
418    let mut attempt: u32 = 0;
419    loop {
420        let outcome = match target {
421            DeliverTarget::Result(r) => sink.deliver(r, &ctx).await,
422            DeliverTarget::Incident(e) => sink.deliver_incident(e, &ctx).await,
423        };
424        match outcome {
425            Ok(()) => return,
426            Err(e) => {
427                // A `Permanent` error will not heal on retry (e.g. a 4xx from a
428                // misrendered webhook body), so route it to the DLQ immediately
429                // rather than burning the retry budget.
430                let permanent = matches!(e, RuntimeError::Permanent(_));
431                if permanent || attempt >= cfg.retry_max {
432                    metrics.on_sink_delivery_failed(label);
433                    match dlq_tx {
434                        Some(dlq) => {
435                            let serialized = match target {
436                                DeliverTarget::Result(r) => {
437                                    serde_json::to_string(r).unwrap_or_default()
438                                }
439                                DeliverTarget::Incident(e) => e.json.clone(),
440                            };
441                            let _ = dlq
442                                .send(DeliveryFailure {
443                                    serialized,
444                                    error: format!("sink delivery failure: {e}"),
445                                })
446                                .await;
447                        }
448                        None => {
449                            tracing::warn!(
450                                sink = label,
451                                error = %e,
452                                "Sink delivery failed and no DLQ is configured; dropping result",
453                            );
454                        }
455                    }
456                    return;
457                }
458                attempt += 1;
459                metrics.on_sink_retry(label);
460                let delay = backoff_delay(cfg.backoff_base, cfg.backoff_max, attempt);
461                tracing::debug!(sink = label, attempt, error = %e, "Sink delivery retry");
462                tokio::time::sleep(delay).await;
463            }
464        }
465    }
466}
467
468/// Capped exponential backoff. `attempt` is 1 for the first retry.
469fn backoff_delay(base: Duration, max: Duration, attempt: u32) -> Duration {
470    let shift = attempt.saturating_sub(1).min(20);
471    let factor: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
472    base.checked_mul(factor).unwrap_or(max).min(max)
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use std::sync::atomic::{AtomicUsize, Ordering};
479
480    use crate::metrics::NoopMetrics;
481
482    fn noop_metrics() -> Arc<dyn MetricsHook> {
483        Arc::new(NoopMetrics)
484    }
485
486    /// One empty `ProcessResult`. The delivery layer treats results opaquely,
487    /// so content is irrelevant to the queue/retry/ack mechanics under test.
488    fn result() -> ProcessResult {
489        Vec::new()
490    }
491
492    fn fast_cfg() -> DeliveryConfig {
493        DeliveryConfig {
494            queue_depth: 64,
495            batch_max: 16,
496            batch_flush: Duration::from_millis(1),
497            retry_max: 5,
498            backoff_base: Duration::from_millis(1),
499            backoff_max: Duration::from_millis(5),
500        }
501    }
502
503    /// Configurable mock sink: fail the first `fail_first` deliveries, then
504    /// succeed; optionally gate on a notify; record successful deliveries.
505    struct MockSink {
506        label: &'static str,
507        fail_first: Arc<AtomicUsize>,
508        always_fail: bool,
509        permanent: bool,
510        delivered: Arc<AtomicUsize>,
511        attempts: Arc<AtomicUsize>,
512        // Records the delivery-context id seen on each attempt, to prove the
513        // worker reuses one context across retries.
514        ctx_ids: Arc<std::sync::Mutex<Vec<String>>>,
515        // A latching gate: deliveries block until the watch value is `true`,
516        // after which every delivery (including later ones) proceeds. A plain
517        // `Notify` would only wake waiters parked at the instant it fires.
518        gate: Option<tokio::sync::watch::Receiver<bool>>,
519    }
520
521    impl MockSink {
522        fn new(label: &'static str) -> Self {
523            MockSink {
524                label,
525                fail_first: Arc::new(AtomicUsize::new(0)),
526                always_fail: false,
527                permanent: false,
528                delivered: Arc::new(AtomicUsize::new(0)),
529                attempts: Arc::new(AtomicUsize::new(0)),
530                ctx_ids: Arc::new(std::sync::Mutex::new(Vec::new())),
531                gate: None,
532            }
533        }
534    }
535
536    impl DeliverySink for MockSink {
537        fn deliver<'a>(
538            &'a mut self,
539            _result: &'a ProcessResult,
540            ctx: &'a DeliveryContext,
541        ) -> DeliveryFuture<'a> {
542            let fail_first = self.fail_first.clone();
543            let delivered = self.delivered.clone();
544            let attempts = self.attempts.clone();
545            let always_fail = self.always_fail;
546            let permanent = self.permanent;
547            let gate = self.gate.clone();
548            let ctx_ids = self.ctx_ids.clone();
549            let ctx_id = ctx.id_base().to_string();
550            Box::pin(async move {
551                ctx_ids.lock().unwrap().push(ctx_id);
552                if let Some(mut rx) = gate {
553                    loop {
554                        if *rx.borrow() {
555                            break;
556                        }
557                        if rx.changed().await.is_err() {
558                            break;
559                        }
560                    }
561                }
562                attempts.fetch_add(1, Ordering::SeqCst);
563                if always_fail {
564                    return Err(if permanent {
565                        RuntimeError::Permanent("mock permanent".to_string())
566                    } else {
567                        RuntimeError::Io(std::io::Error::other("mock always fails"))
568                    });
569                }
570                if fail_first.load(Ordering::SeqCst) > 0 {
571                    fail_first.fetch_sub(1, Ordering::SeqCst);
572                    return Err(RuntimeError::Io(std::io::Error::other("mock transient")));
573                }
574                delivered.fetch_add(1, Ordering::SeqCst);
575                Ok(())
576            })
577        }
578        fn label(&self) -> &'static str {
579            self.label
580        }
581    }
582
583    #[tokio::test]
584    async fn delivers_and_acks_single_sink() {
585        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
586        let sink = MockSink::new("mock");
587        let delivered = sink.delivered.clone();
588        let dispatcher = Dispatcher::spawn(
589            vec![(sink, OnFull::Block, fast_cfg())],
590            None,
591            ack_tx,
592            noop_metrics(),
593        );
594
595        for _ in 0..10 {
596            dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
597        }
598        dispatcher.shutdown().await;
599
600        assert_eq!(delivered.load(Ordering::SeqCst), 10);
601        let mut acks = 0;
602        while ack_rx.try_recv().is_ok() {
603            acks += 1;
604        }
605        assert_eq!(acks, 10, "every dispatched event must be acked");
606    }
607
608    #[tokio::test]
609    async fn retry_reuses_the_same_delivery_context() {
610        let (ack_tx, _ack_rx) = mpsc::unbounded_channel();
611        let sink = MockSink::new("mock");
612        sink.fail_first.store(2, Ordering::SeqCst); // two failures, then success
613        let ctx_ids = sink.ctx_ids.clone();
614        let dispatcher = Dispatcher::spawn(
615            vec![(sink, OnFull::Block, fast_cfg())],
616            None,
617            ack_tx,
618            noop_metrics(),
619        );
620
621        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
622        dispatcher.shutdown().await;
623
624        let ids = ctx_ids.lock().unwrap().clone();
625        assert_eq!(
626            ids.len(),
627            3,
628            "two failures then success means three attempts"
629        );
630        assert!(
631            ids.iter().all(|id| *id == ids[0]),
632            "the delivery context id must be stable across retries: {ids:?}",
633        );
634    }
635
636    #[tokio::test]
637    async fn retries_then_succeeds() {
638        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
639        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
640        let sink = MockSink::new("mock");
641        sink.fail_first.store(3, Ordering::SeqCst); // < retry_max (5)
642        let delivered = sink.delivered.clone();
643        let dispatcher = Dispatcher::spawn(
644            vec![(sink, OnFull::Block, fast_cfg())],
645            Some(dlq_tx),
646            ack_tx,
647            noop_metrics(),
648        );
649
650        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
651        dispatcher.shutdown().await;
652
653        assert_eq!(delivered.load(Ordering::SeqCst), 1, "eventually delivered");
654        assert!(ack_rx.try_recv().is_ok(), "acked after success");
655        assert!(
656            dlq_rx.try_recv().is_err(),
657            "no DLQ entry on eventual success"
658        );
659    }
660
661    #[tokio::test]
662    async fn terminal_failure_routes_to_dlq_and_acks() {
663        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
664        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
665        let mut sink = MockSink::new("mock");
666        sink.always_fail = true;
667        let dispatcher = Dispatcher::spawn(
668            vec![(sink, OnFull::Block, fast_cfg())],
669            Some(dlq_tx),
670            ack_tx,
671            noop_metrics(),
672        );
673
674        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
675        dispatcher.shutdown().await;
676
677        let failure = dlq_rx.try_recv().expect("terminal failure routed to DLQ");
678        assert!(failure.error.contains("sink delivery failure"));
679        assert!(
680            ack_rx.try_recv().is_ok(),
681            "token acked after DLQ parking (matches prior behavior)",
682        );
683    }
684
685    #[tokio::test]
686    async fn permanent_failure_skips_retries_and_dlqs() {
687        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
688        let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
689        let mut sink = MockSink::new("mock");
690        sink.always_fail = true;
691        sink.permanent = true;
692        let attempts = sink.attempts.clone();
693        let dispatcher = Dispatcher::spawn(
694            vec![(sink, OnFull::Block, fast_cfg())],
695            Some(dlq_tx),
696            ack_tx,
697            noop_metrics(),
698        );
699
700        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
701        dispatcher.shutdown().await;
702
703        assert_eq!(
704            attempts.load(Ordering::SeqCst),
705            1,
706            "a permanent failure must not be retried",
707        );
708        let failure = dlq_rx.try_recv().expect("permanent failure routed to DLQ");
709        assert!(failure.error.contains("permanent delivery failure"));
710        assert!(ack_rx.try_recv().is_ok());
711    }
712
713    #[tokio::test]
714    async fn ack_join_waits_for_all_sinks() {
715        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
716        let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
717        let fast = MockSink::new("fast");
718        let mut slow = MockSink::new("slow");
719        slow.gate = Some(gate_rx);
720
721        let dispatcher = Dispatcher::spawn(
722            vec![
723                (fast, OnFull::Block, fast_cfg()),
724                (slow, OnFull::Block, fast_cfg()),
725            ],
726            None,
727            ack_tx,
728            noop_metrics(),
729        );
730        dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
731
732        // The fast sink delivered, but the slow sink is gated, so the guard
733        // still has a live clone and the ack must not have fired yet.
734        tokio::time::sleep(Duration::from_millis(20)).await;
735        assert!(
736            ack_rx.try_recv().is_err(),
737            "ack must wait for the slow sink"
738        );
739
740        gate_tx.send(true).unwrap();
741        // Allow the slow worker to complete and drop its guard clone.
742        tokio::time::sleep(Duration::from_millis(20)).await;
743        assert!(
744            ack_rx.try_recv().is_ok(),
745            "ack fires once every sink confirms",
746        );
747        dispatcher.shutdown().await;
748    }
749
750    #[tokio::test]
751    async fn drop_on_full_never_blocks_and_still_acks() {
752        let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
753        let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
754        let mut sink = MockSink::new("lossy");
755        sink.gate = Some(gate_rx);
756        let cfg = DeliveryConfig {
757            queue_depth: 1,
758            ..fast_cfg()
759        };
760        let dispatcher = Dispatcher::spawn(
761            vec![(sink, OnFull::Drop, cfg)],
762            None,
763            ack_tx,
764            noop_metrics(),
765        );
766
767        // Dispatch far more than the queue can hold while the sink is gated;
768        // dispatch must not block.
769        for _ in 0..50 {
770            dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
771        }
772        gate_tx.send(true).unwrap();
773        dispatcher.shutdown().await;
774
775        // Every event is acked exactly once whether delivered or dropped.
776        let mut acks = 0;
777        while ack_rx.try_recv().is_ok() {
778            acks += 1;
779        }
780        assert_eq!(acks, 50, "lossy sink still acks every event (best-effort)");
781    }
782
783    #[test]
784    fn backoff_is_capped_and_exponential() {
785        let base = Duration::from_millis(100);
786        let max = Duration::from_secs(5);
787        assert_eq!(backoff_delay(base, max, 1), Duration::from_millis(100));
788        assert_eq!(backoff_delay(base, max, 2), Duration::from_millis(200));
789        assert_eq!(backoff_delay(base, max, 3), Duration::from_millis(400));
790        assert_eq!(backoff_delay(base, max, 100), max, "capped at max");
791    }
792}