1use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14use std::time::{Duration, SystemTime};
15
16use parking_lot::Mutex;
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19use uuid::Uuid;
20
21use rsigma_eval::ProcessResult;
22
23use crate::error::RuntimeError;
24use crate::io::{AckToken, IncidentEnvelope, Sink};
25use crate::metrics::MetricsHook;
26
27type DeliveryFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
28
29pub trait DeliverySink: Send + 'static {
34 fn deliver<'a>(
40 &'a mut self,
41 result: &'a ProcessResult,
42 ctx: &'a DeliveryContext,
43 ) -> DeliveryFuture<'a>;
44 fn deliver_incident<'a>(
47 &'a mut self,
48 _incident: &'a IncidentEnvelope,
49 _ctx: &'a DeliveryContext,
50 ) -> DeliveryFuture<'a> {
51 Box::pin(async { Ok(()) })
52 }
53 fn label(&self) -> &'static str;
55}
56
57impl DeliverySink for Sink {
58 fn deliver<'a>(
59 &'a mut self,
60 result: &'a ProcessResult,
61 _ctx: &'a DeliveryContext,
62 ) -> DeliveryFuture<'a> {
63 self.send(result)
64 }
65 fn deliver_incident<'a>(
66 &'a mut self,
67 incident: &'a IncidentEnvelope,
68 _ctx: &'a DeliveryContext,
69 ) -> DeliveryFuture<'a> {
70 self.send_incident(incident)
71 }
72 fn label(&self) -> &'static str {
73 self.kind_label()
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum OnFull {
80 Block,
83 Drop,
86}
87
88#[derive(Debug, Clone, Copy)]
90pub struct DeliveryConfig {
91 pub queue_depth: usize,
93 pub batch_max: usize,
95 pub batch_flush: Duration,
98 pub retry_max: u32,
100 pub backoff_base: Duration,
102 pub backoff_max: Duration,
104}
105
106impl Default for DeliveryConfig {
107 fn default() -> Self {
108 DeliveryConfig {
109 queue_depth: 1024,
110 batch_max: 64,
111 batch_flush: Duration::from_millis(50),
112 retry_max: 3,
113 backoff_base: Duration::from_millis(100),
114 backoff_max: Duration::from_secs(5),
115 }
116 }
117}
118
119pub struct DeliveryContext {
134 inner: OnceLock<ContextData>,
135}
136
137struct ContextData {
138 id_base: String,
139 first_attempt: SystemTime,
140}
141
142impl DeliveryContext {
143 pub fn new() -> Self {
145 DeliveryContext {
146 inner: OnceLock::new(),
147 }
148 }
149
150 fn data(&self) -> &ContextData {
151 self.inner.get_or_init(|| ContextData {
152 id_base: format!("msg_{}", Uuid::new_v4().simple()),
153 first_attempt: SystemTime::now(),
154 })
155 }
156
157 pub fn id_base(&self) -> &str {
160 &self.data().id_base
161 }
162
163 pub fn first_attempt(&self) -> SystemTime {
166 self.data().first_attempt
167 }
168}
169
170impl Default for DeliveryContext {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub struct DeliveryFailure {
181 pub serialized: String,
183 pub error: String,
185}
186
187struct AckGuard {
195 tokens: Mutex<Vec<AckToken>>,
196 ack_tx: mpsc::UnboundedSender<AckToken>,
197}
198
199impl Drop for AckGuard {
200 fn drop(&mut self) {
201 let tokens = std::mem::take(&mut *self.tokens.lock());
202 for token in tokens {
203 let _ = self.ack_tx.send(token);
208 }
209 }
210}
211
212enum DeliveryPayload {
214 Result(Arc<ProcessResult>),
215 Incident(Arc<IncidentEnvelope>),
216}
217
218struct DeliveryItem {
221 payload: DeliveryPayload,
222 _guard: Arc<AckGuard>,
223}
224
225struct SinkWorker {
227 tx: mpsc::Sender<DeliveryItem>,
228 handle: JoinHandle<()>,
229 on_full: OnFull,
230 label: &'static str,
231 metrics: Arc<dyn MetricsHook>,
232}
233
234impl SinkWorker {
235 fn spawn<S: DeliverySink>(
236 sink: S,
237 on_full: OnFull,
238 cfg: DeliveryConfig,
239 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
240 metrics: Arc<dyn MetricsHook>,
241 ) -> Self {
242 let label = sink.label();
243 metrics.register_sink(label);
244 let (tx, rx) = mpsc::channel(cfg.queue_depth.max(1));
245 let worker_metrics = metrics.clone();
246 let handle = tokio::spawn(worker_loop(sink, rx, cfg, dlq_tx, worker_metrics, label));
247 SinkWorker {
248 tx,
249 handle,
250 on_full,
251 label,
252 metrics,
253 }
254 }
255
256 async fn enqueue(&self, item: DeliveryItem) {
259 match self.on_full {
260 OnFull::Block => {
261 self.metrics.on_sink_queue_depth_change(self.label, 1);
262 if self.tx.send(item).await.is_err() {
263 self.metrics.on_sink_queue_depth_change(self.label, -1);
266 }
267 }
268 OnFull::Drop => match self.tx.try_send(item) {
269 Ok(()) => self.metrics.on_sink_queue_depth_change(self.label, 1),
270 Err(mpsc::error::TrySendError::Full(_)) => {
271 self.metrics.on_sink_dropped(self.label);
274 }
275 Err(mpsc::error::TrySendError::Closed(_)) => {}
276 },
277 }
278 }
279}
280
281pub struct Dispatcher {
283 workers: Vec<SinkWorker>,
284 ack_tx: mpsc::UnboundedSender<AckToken>,
285}
286
287impl Dispatcher {
288 pub fn spawn<S: DeliverySink>(
294 sinks: Vec<(S, OnFull, DeliveryConfig)>,
295 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
296 ack_tx: mpsc::UnboundedSender<AckToken>,
297 metrics: Arc<dyn MetricsHook>,
298 ) -> Self {
299 let workers = sinks
300 .into_iter()
301 .map(|(sink, on_full, cfg)| {
302 SinkWorker::spawn(sink, on_full, cfg, dlq_tx.clone(), metrics.clone())
303 })
304 .collect();
305 Dispatcher { workers, ack_tx }
306 }
307
308 pub async fn dispatch(&self, result: ProcessResult, tokens: Vec<AckToken>) {
311 if self.workers.is_empty() {
312 for token in tokens {
313 let _ = self.ack_tx.send(token);
314 }
315 return;
316 }
317 let guard = Arc::new(AckGuard {
318 tokens: Mutex::new(tokens),
319 ack_tx: self.ack_tx.clone(),
320 });
321 let result = Arc::new(result);
322 for worker in &self.workers {
323 worker
324 .enqueue(DeliveryItem {
325 payload: DeliveryPayload::Result(result.clone()),
326 _guard: guard.clone(),
327 })
328 .await;
329 }
330 }
331
332 pub async fn dispatch_incident(&self, incident: IncidentEnvelope) {
335 if self.workers.is_empty() {
336 return;
337 }
338 let guard = Arc::new(AckGuard {
339 tokens: Mutex::new(Vec::new()),
340 ack_tx: self.ack_tx.clone(),
341 });
342 let incident = Arc::new(incident);
343 for worker in &self.workers {
344 worker
345 .enqueue(DeliveryItem {
346 payload: DeliveryPayload::Incident(incident.clone()),
347 _guard: guard.clone(),
348 })
349 .await;
350 }
351 }
352
353 pub async fn shutdown(self) {
357 let mut handles = Vec::with_capacity(self.workers.len());
358 for worker in self.workers {
359 handles.push(worker.handle);
360 }
363 drop(self.ack_tx);
364 for handle in handles {
365 let _ = handle.await;
366 }
367 }
368}
369
370async fn worker_loop<S: DeliverySink>(
371 mut sink: S,
372 mut rx: mpsc::Receiver<DeliveryItem>,
373 cfg: DeliveryConfig,
374 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
375 metrics: Arc<dyn MetricsHook>,
376 label: &'static str,
377) {
378 while let Some(first) = rx.recv().await {
379 let mut batch = Vec::with_capacity(cfg.batch_max.clamp(1, 64));
380 batch.push(first);
381 while batch.len() < cfg.batch_max {
382 match rx.try_recv() {
383 Ok(item) => batch.push(item),
384 Err(_) => break,
385 }
386 }
387 metrics.on_sink_queue_depth_change(label, -(batch.len() as i64));
388 for item in &batch {
389 let target = match &item.payload {
390 DeliveryPayload::Result(r) => DeliverTarget::Result(r),
391 DeliveryPayload::Incident(e) => DeliverTarget::Incident(e),
392 };
393 deliver_one(&mut sink, target, &cfg, dlq_tx.as_ref(), &metrics, label).await;
394 }
395 drop(batch);
398 }
399}
400
401enum DeliverTarget<'a> {
403 Result(&'a ProcessResult),
404 Incident(&'a IncidentEnvelope),
405}
406
407async fn deliver_one<S: DeliverySink>(
408 sink: &mut S,
409 target: DeliverTarget<'_>,
410 cfg: &DeliveryConfig,
411 dlq_tx: Option<&mpsc::Sender<DeliveryFailure>>,
412 metrics: &Arc<dyn MetricsHook>,
413 label: &'static str,
414) {
415 let ctx = DeliveryContext::new();
418 let mut attempt: u32 = 0;
419 loop {
420 let outcome = match target {
421 DeliverTarget::Result(r) => sink.deliver(r, &ctx).await,
422 DeliverTarget::Incident(e) => sink.deliver_incident(e, &ctx).await,
423 };
424 match outcome {
425 Ok(()) => return,
426 Err(e) => {
427 let permanent = matches!(e, RuntimeError::Permanent(_));
431 if permanent || attempt >= cfg.retry_max {
432 metrics.on_sink_delivery_failed(label);
433 match dlq_tx {
434 Some(dlq) => {
435 let serialized = match target {
436 DeliverTarget::Result(r) => {
437 serde_json::to_string(r).unwrap_or_default()
438 }
439 DeliverTarget::Incident(e) => e.json.clone(),
440 };
441 let _ = dlq
442 .send(DeliveryFailure {
443 serialized,
444 error: format!("sink delivery failure: {e}"),
445 })
446 .await;
447 }
448 None => {
449 tracing::warn!(
450 sink = label,
451 error = %e,
452 "Sink delivery failed and no DLQ is configured; dropping result",
453 );
454 }
455 }
456 return;
457 }
458 attempt += 1;
459 metrics.on_sink_retry(label);
460 let delay = backoff_delay(cfg.backoff_base, cfg.backoff_max, attempt);
461 tracing::debug!(sink = label, attempt, error = %e, "Sink delivery retry");
462 tokio::time::sleep(delay).await;
463 }
464 }
465 }
466}
467
468fn backoff_delay(base: Duration, max: Duration, attempt: u32) -> Duration {
470 let shift = attempt.saturating_sub(1).min(20);
471 let factor: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
472 base.checked_mul(factor).unwrap_or(max).min(max)
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use std::sync::atomic::{AtomicUsize, Ordering};
479
480 use crate::metrics::NoopMetrics;
481
482 fn noop_metrics() -> Arc<dyn MetricsHook> {
483 Arc::new(NoopMetrics)
484 }
485
486 fn result() -> ProcessResult {
489 Vec::new()
490 }
491
492 fn fast_cfg() -> DeliveryConfig {
493 DeliveryConfig {
494 queue_depth: 64,
495 batch_max: 16,
496 batch_flush: Duration::from_millis(1),
497 retry_max: 5,
498 backoff_base: Duration::from_millis(1),
499 backoff_max: Duration::from_millis(5),
500 }
501 }
502
503 struct MockSink {
506 label: &'static str,
507 fail_first: Arc<AtomicUsize>,
508 always_fail: bool,
509 permanent: bool,
510 delivered: Arc<AtomicUsize>,
511 attempts: Arc<AtomicUsize>,
512 ctx_ids: Arc<std::sync::Mutex<Vec<String>>>,
515 gate: Option<tokio::sync::watch::Receiver<bool>>,
519 }
520
521 impl MockSink {
522 fn new(label: &'static str) -> Self {
523 MockSink {
524 label,
525 fail_first: Arc::new(AtomicUsize::new(0)),
526 always_fail: false,
527 permanent: false,
528 delivered: Arc::new(AtomicUsize::new(0)),
529 attempts: Arc::new(AtomicUsize::new(0)),
530 ctx_ids: Arc::new(std::sync::Mutex::new(Vec::new())),
531 gate: None,
532 }
533 }
534 }
535
536 impl DeliverySink for MockSink {
537 fn deliver<'a>(
538 &'a mut self,
539 _result: &'a ProcessResult,
540 ctx: &'a DeliveryContext,
541 ) -> DeliveryFuture<'a> {
542 let fail_first = self.fail_first.clone();
543 let delivered = self.delivered.clone();
544 let attempts = self.attempts.clone();
545 let always_fail = self.always_fail;
546 let permanent = self.permanent;
547 let gate = self.gate.clone();
548 let ctx_ids = self.ctx_ids.clone();
549 let ctx_id = ctx.id_base().to_string();
550 Box::pin(async move {
551 ctx_ids.lock().unwrap().push(ctx_id);
552 if let Some(mut rx) = gate {
553 loop {
554 if *rx.borrow() {
555 break;
556 }
557 if rx.changed().await.is_err() {
558 break;
559 }
560 }
561 }
562 attempts.fetch_add(1, Ordering::SeqCst);
563 if always_fail {
564 return Err(if permanent {
565 RuntimeError::Permanent("mock permanent".to_string())
566 } else {
567 RuntimeError::Io(std::io::Error::other("mock always fails"))
568 });
569 }
570 if fail_first.load(Ordering::SeqCst) > 0 {
571 fail_first.fetch_sub(1, Ordering::SeqCst);
572 return Err(RuntimeError::Io(std::io::Error::other("mock transient")));
573 }
574 delivered.fetch_add(1, Ordering::SeqCst);
575 Ok(())
576 })
577 }
578 fn label(&self) -> &'static str {
579 self.label
580 }
581 }
582
583 #[tokio::test]
584 async fn delivers_and_acks_single_sink() {
585 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
586 let sink = MockSink::new("mock");
587 let delivered = sink.delivered.clone();
588 let dispatcher = Dispatcher::spawn(
589 vec![(sink, OnFull::Block, fast_cfg())],
590 None,
591 ack_tx,
592 noop_metrics(),
593 );
594
595 for _ in 0..10 {
596 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
597 }
598 dispatcher.shutdown().await;
599
600 assert_eq!(delivered.load(Ordering::SeqCst), 10);
601 let mut acks = 0;
602 while ack_rx.try_recv().is_ok() {
603 acks += 1;
604 }
605 assert_eq!(acks, 10, "every dispatched event must be acked");
606 }
607
608 #[tokio::test]
609 async fn retry_reuses_the_same_delivery_context() {
610 let (ack_tx, _ack_rx) = mpsc::unbounded_channel();
611 let sink = MockSink::new("mock");
612 sink.fail_first.store(2, Ordering::SeqCst); let ctx_ids = sink.ctx_ids.clone();
614 let dispatcher = Dispatcher::spawn(
615 vec![(sink, OnFull::Block, fast_cfg())],
616 None,
617 ack_tx,
618 noop_metrics(),
619 );
620
621 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
622 dispatcher.shutdown().await;
623
624 let ids = ctx_ids.lock().unwrap().clone();
625 assert_eq!(
626 ids.len(),
627 3,
628 "two failures then success means three attempts"
629 );
630 assert!(
631 ids.iter().all(|id| *id == ids[0]),
632 "the delivery context id must be stable across retries: {ids:?}",
633 );
634 }
635
636 #[tokio::test]
637 async fn retries_then_succeeds() {
638 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
639 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
640 let sink = MockSink::new("mock");
641 sink.fail_first.store(3, Ordering::SeqCst); let delivered = sink.delivered.clone();
643 let dispatcher = Dispatcher::spawn(
644 vec![(sink, OnFull::Block, fast_cfg())],
645 Some(dlq_tx),
646 ack_tx,
647 noop_metrics(),
648 );
649
650 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
651 dispatcher.shutdown().await;
652
653 assert_eq!(delivered.load(Ordering::SeqCst), 1, "eventually delivered");
654 assert!(ack_rx.try_recv().is_ok(), "acked after success");
655 assert!(
656 dlq_rx.try_recv().is_err(),
657 "no DLQ entry on eventual success"
658 );
659 }
660
661 #[tokio::test]
662 async fn terminal_failure_routes_to_dlq_and_acks() {
663 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
664 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
665 let mut sink = MockSink::new("mock");
666 sink.always_fail = true;
667 let dispatcher = Dispatcher::spawn(
668 vec![(sink, OnFull::Block, fast_cfg())],
669 Some(dlq_tx),
670 ack_tx,
671 noop_metrics(),
672 );
673
674 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
675 dispatcher.shutdown().await;
676
677 let failure = dlq_rx.try_recv().expect("terminal failure routed to DLQ");
678 assert!(failure.error.contains("sink delivery failure"));
679 assert!(
680 ack_rx.try_recv().is_ok(),
681 "token acked after DLQ parking (matches prior behavior)",
682 );
683 }
684
685 #[tokio::test]
686 async fn permanent_failure_skips_retries_and_dlqs() {
687 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
688 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
689 let mut sink = MockSink::new("mock");
690 sink.always_fail = true;
691 sink.permanent = true;
692 let attempts = sink.attempts.clone();
693 let dispatcher = Dispatcher::spawn(
694 vec![(sink, OnFull::Block, fast_cfg())],
695 Some(dlq_tx),
696 ack_tx,
697 noop_metrics(),
698 );
699
700 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
701 dispatcher.shutdown().await;
702
703 assert_eq!(
704 attempts.load(Ordering::SeqCst),
705 1,
706 "a permanent failure must not be retried",
707 );
708 let failure = dlq_rx.try_recv().expect("permanent failure routed to DLQ");
709 assert!(failure.error.contains("permanent delivery failure"));
710 assert!(ack_rx.try_recv().is_ok());
711 }
712
713 #[tokio::test]
714 async fn ack_join_waits_for_all_sinks() {
715 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
716 let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
717 let fast = MockSink::new("fast");
718 let mut slow = MockSink::new("slow");
719 slow.gate = Some(gate_rx);
720
721 let dispatcher = Dispatcher::spawn(
722 vec![
723 (fast, OnFull::Block, fast_cfg()),
724 (slow, OnFull::Block, fast_cfg()),
725 ],
726 None,
727 ack_tx,
728 noop_metrics(),
729 );
730 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
731
732 tokio::time::sleep(Duration::from_millis(20)).await;
735 assert!(
736 ack_rx.try_recv().is_err(),
737 "ack must wait for the slow sink"
738 );
739
740 gate_tx.send(true).unwrap();
741 tokio::time::sleep(Duration::from_millis(20)).await;
743 assert!(
744 ack_rx.try_recv().is_ok(),
745 "ack fires once every sink confirms",
746 );
747 dispatcher.shutdown().await;
748 }
749
750 #[tokio::test]
751 async fn drop_on_full_never_blocks_and_still_acks() {
752 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
753 let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
754 let mut sink = MockSink::new("lossy");
755 sink.gate = Some(gate_rx);
756 let cfg = DeliveryConfig {
757 queue_depth: 1,
758 ..fast_cfg()
759 };
760 let dispatcher = Dispatcher::spawn(
761 vec![(sink, OnFull::Drop, cfg)],
762 None,
763 ack_tx,
764 noop_metrics(),
765 );
766
767 for _ in 0..50 {
770 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
771 }
772 gate_tx.send(true).unwrap();
773 dispatcher.shutdown().await;
774
775 let mut acks = 0;
777 while ack_rx.try_recv().is_ok() {
778 acks += 1;
779 }
780 assert_eq!(acks, 50, "lossy sink still acks every event (best-effort)");
781 }
782
783 #[test]
784 fn backoff_is_capped_and_exponential() {
785 let base = Duration::from_millis(100);
786 let max = Duration::from_secs(5);
787 assert_eq!(backoff_delay(base, max, 1), Duration::from_millis(100));
788 assert_eq!(backoff_delay(base, max, 2), Duration::from_millis(200));
789 assert_eq!(backoff_delay(base, max, 3), Duration::from_millis(400));
790 assert_eq!(backoff_delay(base, max, 100), max, "capped at max");
791 }
792}