1use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use parking_lot::Mutex;
17use tokio::sync::mpsc;
18use tokio::task::JoinHandle;
19
20use rsigma_eval::ProcessResult;
21
22use crate::error::RuntimeError;
23use crate::io::{AckToken, Sink};
24use crate::metrics::MetricsHook;
25
26type DeliveryFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
27
28pub trait DeliverySink: Send + 'static {
33 fn deliver<'a>(&'a mut self, result: &'a ProcessResult) -> DeliveryFuture<'a>;
35 fn label(&self) -> &'static str;
37}
38
39impl DeliverySink for Sink {
40 fn deliver<'a>(&'a mut self, result: &'a ProcessResult) -> DeliveryFuture<'a> {
41 self.send(result)
42 }
43 fn label(&self) -> &'static str {
44 self.kind_label()
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum OnFull {
51 Block,
54 Drop,
57}
58
59#[derive(Debug, Clone, Copy)]
61pub struct DeliveryConfig {
62 pub queue_depth: usize,
64 pub batch_max: usize,
66 pub batch_flush: Duration,
69 pub retry_max: u32,
71 pub backoff_base: Duration,
73 pub backoff_max: Duration,
75}
76
77impl Default for DeliveryConfig {
78 fn default() -> Self {
79 DeliveryConfig {
80 queue_depth: 1024,
81 batch_max: 64,
82 batch_flush: Duration::from_millis(50),
83 retry_max: 3,
84 backoff_base: Duration::from_millis(100),
85 backoff_max: Duration::from_secs(5),
86 }
87 }
88}
89
90pub struct DeliveryFailure {
95 pub serialized: String,
97 pub error: String,
99}
100
101struct AckGuard {
109 tokens: Mutex<Vec<AckToken>>,
110 ack_tx: mpsc::UnboundedSender<AckToken>,
111}
112
113impl Drop for AckGuard {
114 fn drop(&mut self) {
115 let tokens = std::mem::take(&mut *self.tokens.lock());
116 for token in tokens {
117 let _ = self.ack_tx.send(token);
122 }
123 }
124}
125
126struct DeliveryItem {
129 result: Arc<ProcessResult>,
130 _guard: Arc<AckGuard>,
131}
132
133struct SinkWorker {
135 tx: mpsc::Sender<DeliveryItem>,
136 handle: JoinHandle<()>,
137 on_full: OnFull,
138 label: &'static str,
139 metrics: Arc<dyn MetricsHook>,
140}
141
142impl SinkWorker {
143 fn spawn<S: DeliverySink>(
144 sink: S,
145 on_full: OnFull,
146 cfg: DeliveryConfig,
147 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
148 metrics: Arc<dyn MetricsHook>,
149 ) -> Self {
150 let label = sink.label();
151 metrics.register_sink(label);
152 let (tx, rx) = mpsc::channel(cfg.queue_depth.max(1));
153 let worker_metrics = metrics.clone();
154 let handle = tokio::spawn(worker_loop(sink, rx, cfg, dlq_tx, worker_metrics, label));
155 SinkWorker {
156 tx,
157 handle,
158 on_full,
159 label,
160 metrics,
161 }
162 }
163
164 async fn enqueue(&self, item: DeliveryItem) {
167 match self.on_full {
168 OnFull::Block => {
169 self.metrics.on_sink_queue_depth_change(self.label, 1);
170 if self.tx.send(item).await.is_err() {
171 self.metrics.on_sink_queue_depth_change(self.label, -1);
174 }
175 }
176 OnFull::Drop => match self.tx.try_send(item) {
177 Ok(()) => self.metrics.on_sink_queue_depth_change(self.label, 1),
178 Err(mpsc::error::TrySendError::Full(_)) => {
179 self.metrics.on_sink_dropped(self.label);
182 }
183 Err(mpsc::error::TrySendError::Closed(_)) => {}
184 },
185 }
186 }
187}
188
189pub struct Dispatcher {
191 workers: Vec<SinkWorker>,
192 ack_tx: mpsc::UnboundedSender<AckToken>,
193}
194
195impl Dispatcher {
196 pub fn spawn<S: DeliverySink>(
202 sinks: Vec<(S, OnFull, DeliveryConfig)>,
203 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
204 ack_tx: mpsc::UnboundedSender<AckToken>,
205 metrics: Arc<dyn MetricsHook>,
206 ) -> Self {
207 let workers = sinks
208 .into_iter()
209 .map(|(sink, on_full, cfg)| {
210 SinkWorker::spawn(sink, on_full, cfg, dlq_tx.clone(), metrics.clone())
211 })
212 .collect();
213 Dispatcher { workers, ack_tx }
214 }
215
216 pub async fn dispatch(&self, result: ProcessResult, tokens: Vec<AckToken>) {
219 if self.workers.is_empty() {
220 for token in tokens {
221 let _ = self.ack_tx.send(token);
222 }
223 return;
224 }
225 let guard = Arc::new(AckGuard {
226 tokens: Mutex::new(tokens),
227 ack_tx: self.ack_tx.clone(),
228 });
229 let result = Arc::new(result);
230 for worker in &self.workers {
231 worker
232 .enqueue(DeliveryItem {
233 result: result.clone(),
234 _guard: guard.clone(),
235 })
236 .await;
237 }
238 }
239
240 pub async fn shutdown(self) {
244 let mut handles = Vec::with_capacity(self.workers.len());
245 for worker in self.workers {
246 handles.push(worker.handle);
247 }
250 drop(self.ack_tx);
251 for handle in handles {
252 let _ = handle.await;
253 }
254 }
255}
256
257async fn worker_loop<S: DeliverySink>(
258 mut sink: S,
259 mut rx: mpsc::Receiver<DeliveryItem>,
260 cfg: DeliveryConfig,
261 dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
262 metrics: Arc<dyn MetricsHook>,
263 label: &'static str,
264) {
265 while let Some(first) = rx.recv().await {
266 let mut batch = Vec::with_capacity(cfg.batch_max.clamp(1, 64));
267 batch.push(first);
268 while batch.len() < cfg.batch_max {
269 match rx.try_recv() {
270 Ok(item) => batch.push(item),
271 Err(_) => break,
272 }
273 }
274 metrics.on_sink_queue_depth_change(label, -(batch.len() as i64));
275 for item in &batch {
276 deliver_one(
277 &mut sink,
278 &item.result,
279 &cfg,
280 dlq_tx.as_ref(),
281 &metrics,
282 label,
283 )
284 .await;
285 }
286 drop(batch);
289 }
290}
291
292async fn deliver_one<S: DeliverySink>(
293 sink: &mut S,
294 result: &ProcessResult,
295 cfg: &DeliveryConfig,
296 dlq_tx: Option<&mpsc::Sender<DeliveryFailure>>,
297 metrics: &Arc<dyn MetricsHook>,
298 label: &'static str,
299) {
300 let mut attempt: u32 = 0;
301 loop {
302 match sink.deliver(result).await {
303 Ok(()) => return,
304 Err(e) => {
305 let permanent = matches!(e, RuntimeError::Permanent(_));
309 if permanent || attempt >= cfg.retry_max {
310 metrics.on_sink_delivery_failed(label);
311 match dlq_tx {
312 Some(dlq) => {
313 let serialized = serde_json::to_string(result).unwrap_or_default();
314 let _ = dlq
315 .send(DeliveryFailure {
316 serialized,
317 error: format!("sink delivery failure: {e}"),
318 })
319 .await;
320 }
321 None => {
322 tracing::warn!(
323 sink = label,
324 error = %e,
325 "Sink delivery failed and no DLQ is configured; dropping result",
326 );
327 }
328 }
329 return;
330 }
331 attempt += 1;
332 metrics.on_sink_retry(label);
333 let delay = backoff_delay(cfg.backoff_base, cfg.backoff_max, attempt);
334 tracing::debug!(sink = label, attempt, error = %e, "Sink delivery retry");
335 tokio::time::sleep(delay).await;
336 }
337 }
338 }
339}
340
341fn backoff_delay(base: Duration, max: Duration, attempt: u32) -> Duration {
343 let shift = attempt.saturating_sub(1).min(20);
344 let factor: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
345 base.checked_mul(factor).unwrap_or(max).min(max)
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use std::sync::atomic::{AtomicUsize, Ordering};
352
353 use crate::metrics::NoopMetrics;
354
355 fn noop_metrics() -> Arc<dyn MetricsHook> {
356 Arc::new(NoopMetrics)
357 }
358
359 fn result() -> ProcessResult {
362 Vec::new()
363 }
364
365 fn fast_cfg() -> DeliveryConfig {
366 DeliveryConfig {
367 queue_depth: 64,
368 batch_max: 16,
369 batch_flush: Duration::from_millis(1),
370 retry_max: 5,
371 backoff_base: Duration::from_millis(1),
372 backoff_max: Duration::from_millis(5),
373 }
374 }
375
376 struct MockSink {
379 label: &'static str,
380 fail_first: Arc<AtomicUsize>,
381 always_fail: bool,
382 permanent: bool,
383 delivered: Arc<AtomicUsize>,
384 attempts: Arc<AtomicUsize>,
385 gate: Option<tokio::sync::watch::Receiver<bool>>,
389 }
390
391 impl MockSink {
392 fn new(label: &'static str) -> Self {
393 MockSink {
394 label,
395 fail_first: Arc::new(AtomicUsize::new(0)),
396 always_fail: false,
397 permanent: false,
398 delivered: Arc::new(AtomicUsize::new(0)),
399 attempts: Arc::new(AtomicUsize::new(0)),
400 gate: None,
401 }
402 }
403 }
404
405 impl DeliverySink for MockSink {
406 fn deliver<'a>(&'a mut self, _result: &'a ProcessResult) -> DeliveryFuture<'a> {
407 let fail_first = self.fail_first.clone();
408 let delivered = self.delivered.clone();
409 let attempts = self.attempts.clone();
410 let always_fail = self.always_fail;
411 let permanent = self.permanent;
412 let gate = self.gate.clone();
413 Box::pin(async move {
414 if let Some(mut rx) = gate {
415 loop {
416 if *rx.borrow() {
417 break;
418 }
419 if rx.changed().await.is_err() {
420 break;
421 }
422 }
423 }
424 attempts.fetch_add(1, Ordering::SeqCst);
425 if always_fail {
426 return Err(if permanent {
427 RuntimeError::Permanent("mock permanent".to_string())
428 } else {
429 RuntimeError::Io(std::io::Error::other("mock always fails"))
430 });
431 }
432 if fail_first.load(Ordering::SeqCst) > 0 {
433 fail_first.fetch_sub(1, Ordering::SeqCst);
434 return Err(RuntimeError::Io(std::io::Error::other("mock transient")));
435 }
436 delivered.fetch_add(1, Ordering::SeqCst);
437 Ok(())
438 })
439 }
440 fn label(&self) -> &'static str {
441 self.label
442 }
443 }
444
445 #[tokio::test]
446 async fn delivers_and_acks_single_sink() {
447 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
448 let sink = MockSink::new("mock");
449 let delivered = sink.delivered.clone();
450 let dispatcher = Dispatcher::spawn(
451 vec![(sink, OnFull::Block, fast_cfg())],
452 None,
453 ack_tx,
454 noop_metrics(),
455 );
456
457 for _ in 0..10 {
458 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
459 }
460 dispatcher.shutdown().await;
461
462 assert_eq!(delivered.load(Ordering::SeqCst), 10);
463 let mut acks = 0;
464 while ack_rx.try_recv().is_ok() {
465 acks += 1;
466 }
467 assert_eq!(acks, 10, "every dispatched event must be acked");
468 }
469
470 #[tokio::test]
471 async fn retries_then_succeeds() {
472 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
473 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
474 let sink = MockSink::new("mock");
475 sink.fail_first.store(3, Ordering::SeqCst); let delivered = sink.delivered.clone();
477 let dispatcher = Dispatcher::spawn(
478 vec![(sink, OnFull::Block, fast_cfg())],
479 Some(dlq_tx),
480 ack_tx,
481 noop_metrics(),
482 );
483
484 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
485 dispatcher.shutdown().await;
486
487 assert_eq!(delivered.load(Ordering::SeqCst), 1, "eventually delivered");
488 assert!(ack_rx.try_recv().is_ok(), "acked after success");
489 assert!(
490 dlq_rx.try_recv().is_err(),
491 "no DLQ entry on eventual success"
492 );
493 }
494
495 #[tokio::test]
496 async fn terminal_failure_routes_to_dlq_and_acks() {
497 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
498 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
499 let mut sink = MockSink::new("mock");
500 sink.always_fail = true;
501 let dispatcher = Dispatcher::spawn(
502 vec![(sink, OnFull::Block, fast_cfg())],
503 Some(dlq_tx),
504 ack_tx,
505 noop_metrics(),
506 );
507
508 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
509 dispatcher.shutdown().await;
510
511 let failure = dlq_rx.try_recv().expect("terminal failure routed to DLQ");
512 assert!(failure.error.contains("sink delivery failure"));
513 assert!(
514 ack_rx.try_recv().is_ok(),
515 "token acked after DLQ parking (matches prior behavior)",
516 );
517 }
518
519 #[tokio::test]
520 async fn permanent_failure_skips_retries_and_dlqs() {
521 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
522 let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
523 let mut sink = MockSink::new("mock");
524 sink.always_fail = true;
525 sink.permanent = true;
526 let attempts = sink.attempts.clone();
527 let dispatcher = Dispatcher::spawn(
528 vec![(sink, OnFull::Block, fast_cfg())],
529 Some(dlq_tx),
530 ack_tx,
531 noop_metrics(),
532 );
533
534 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
535 dispatcher.shutdown().await;
536
537 assert_eq!(
538 attempts.load(Ordering::SeqCst),
539 1,
540 "a permanent failure must not be retried",
541 );
542 let failure = dlq_rx.try_recv().expect("permanent failure routed to DLQ");
543 assert!(failure.error.contains("permanent delivery failure"));
544 assert!(ack_rx.try_recv().is_ok());
545 }
546
547 #[tokio::test]
548 async fn ack_join_waits_for_all_sinks() {
549 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
550 let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
551 let fast = MockSink::new("fast");
552 let mut slow = MockSink::new("slow");
553 slow.gate = Some(gate_rx);
554
555 let dispatcher = Dispatcher::spawn(
556 vec![
557 (fast, OnFull::Block, fast_cfg()),
558 (slow, OnFull::Block, fast_cfg()),
559 ],
560 None,
561 ack_tx,
562 noop_metrics(),
563 );
564 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
565
566 tokio::time::sleep(Duration::from_millis(20)).await;
569 assert!(
570 ack_rx.try_recv().is_err(),
571 "ack must wait for the slow sink"
572 );
573
574 gate_tx.send(true).unwrap();
575 tokio::time::sleep(Duration::from_millis(20)).await;
577 assert!(
578 ack_rx.try_recv().is_ok(),
579 "ack fires once every sink confirms",
580 );
581 dispatcher.shutdown().await;
582 }
583
584 #[tokio::test]
585 async fn drop_on_full_never_blocks_and_still_acks() {
586 let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
587 let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
588 let mut sink = MockSink::new("lossy");
589 sink.gate = Some(gate_rx);
590 let cfg = DeliveryConfig {
591 queue_depth: 1,
592 ..fast_cfg()
593 };
594 let dispatcher = Dispatcher::spawn(
595 vec![(sink, OnFull::Drop, cfg)],
596 None,
597 ack_tx,
598 noop_metrics(),
599 );
600
601 for _ in 0..50 {
604 dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
605 }
606 gate_tx.send(true).unwrap();
607 dispatcher.shutdown().await;
608
609 let mut acks = 0;
611 while ack_rx.try_recv().is_ok() {
612 acks += 1;
613 }
614 assert_eq!(acks, 50, "lossy sink still acks every event (best-effort)");
615 }
616
617 #[test]
618 fn backoff_is_capped_and_exponential() {
619 let base = Duration::from_millis(100);
620 let max = Duration::from_secs(5);
621 assert_eq!(backoff_delay(base, max, 1), Duration::from_millis(100));
622 assert_eq!(backoff_delay(base, max, 2), Duration::from_millis(200));
623 assert_eq!(backoff_delay(base, max, 3), Duration::from_millis(400));
624 assert_eq!(backoff_delay(base, max, 100), max, "capped at max");
625 }
626}