use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, SystemTime};
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use uuid::Uuid;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
use crate::io::{AckToken, IncidentEnvelope, Sink};
use crate::metrics::MetricsHook;
type DeliveryFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;
pub trait DeliverySink: Send + 'static {
fn deliver<'a>(
&'a mut self,
result: &'a ProcessResult,
ctx: &'a DeliveryContext,
) -> DeliveryFuture<'a>;
fn deliver_incident<'a>(
&'a mut self,
_incident: &'a IncidentEnvelope,
_ctx: &'a DeliveryContext,
) -> DeliveryFuture<'a> {
Box::pin(async { Ok(()) })
}
fn label(&self) -> &'static str;
}
impl DeliverySink for Sink {
fn deliver<'a>(
&'a mut self,
result: &'a ProcessResult,
_ctx: &'a DeliveryContext,
) -> DeliveryFuture<'a> {
self.send(result)
}
fn deliver_incident<'a>(
&'a mut self,
incident: &'a IncidentEnvelope,
_ctx: &'a DeliveryContext,
) -> DeliveryFuture<'a> {
self.send_incident(incident)
}
fn label(&self) -> &'static str {
self.kind_label()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnFull {
Block,
Drop,
}
#[derive(Debug, Clone, Copy)]
pub struct DeliveryConfig {
pub queue_depth: usize,
pub batch_max: usize,
pub batch_flush: Duration,
pub retry_max: u32,
pub backoff_base: Duration,
pub backoff_max: Duration,
}
impl Default for DeliveryConfig {
fn default() -> Self {
DeliveryConfig {
queue_depth: 1024,
batch_max: 64,
batch_flush: Duration::from_millis(50),
retry_max: 3,
backoff_base: Duration::from_millis(100),
backoff_max: Duration::from_secs(5),
}
}
}
pub struct DeliveryContext {
inner: OnceLock<ContextData>,
}
struct ContextData {
id_base: String,
first_attempt: SystemTime,
}
impl DeliveryContext {
pub fn new() -> Self {
DeliveryContext {
inner: OnceLock::new(),
}
}
fn data(&self) -> &ContextData {
self.inner.get_or_init(|| ContextData {
id_base: format!("msg_{}", Uuid::new_v4().simple()),
first_attempt: SystemTime::now(),
})
}
pub fn id_base(&self) -> &str {
&self.data().id_base
}
pub fn first_attempt(&self) -> SystemTime {
self.data().first_attempt
}
}
impl Default for DeliveryContext {
fn default() -> Self {
Self::new()
}
}
pub struct DeliveryFailure {
pub serialized: String,
pub error: String,
}
struct AckGuard {
tokens: Mutex<Vec<AckToken>>,
ack_tx: mpsc::UnboundedSender<AckToken>,
}
impl Drop for AckGuard {
fn drop(&mut self) {
let tokens = std::mem::take(&mut *self.tokens.lock());
for token in tokens {
let _ = self.ack_tx.send(token);
}
}
}
enum DeliveryPayload {
Result(Arc<ProcessResult>),
Incident(Arc<IncidentEnvelope>),
}
struct DeliveryItem {
payload: DeliveryPayload,
_guard: Arc<AckGuard>,
}
struct SinkWorker {
tx: mpsc::Sender<DeliveryItem>,
handle: JoinHandle<()>,
on_full: OnFull,
label: &'static str,
metrics: Arc<dyn MetricsHook>,
}
impl SinkWorker {
fn spawn<S: DeliverySink>(
sink: S,
on_full: OnFull,
cfg: DeliveryConfig,
dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
metrics: Arc<dyn MetricsHook>,
) -> Self {
let label = sink.label();
metrics.register_sink(label);
let (tx, rx) = mpsc::channel(cfg.queue_depth.max(1));
let worker_metrics = metrics.clone();
let handle = tokio::spawn(worker_loop(sink, rx, cfg, dlq_tx, worker_metrics, label));
SinkWorker {
tx,
handle,
on_full,
label,
metrics,
}
}
async fn enqueue(&self, item: DeliveryItem) {
match self.on_full {
OnFull::Block => {
self.metrics.on_sink_queue_depth_change(self.label, 1);
if self.tx.send(item).await.is_err() {
self.metrics.on_sink_queue_depth_change(self.label, -1);
}
}
OnFull::Drop => match self.tx.try_send(item) {
Ok(()) => self.metrics.on_sink_queue_depth_change(self.label, 1),
Err(mpsc::error::TrySendError::Full(_)) => {
self.metrics.on_sink_dropped(self.label);
}
Err(mpsc::error::TrySendError::Closed(_)) => {}
},
}
}
}
pub struct Dispatcher {
workers: Vec<SinkWorker>,
ack_tx: mpsc::UnboundedSender<AckToken>,
}
impl Dispatcher {
pub fn spawn<S: DeliverySink>(
sinks: Vec<(S, OnFull, DeliveryConfig)>,
dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
ack_tx: mpsc::UnboundedSender<AckToken>,
metrics: Arc<dyn MetricsHook>,
) -> Self {
let workers = sinks
.into_iter()
.map(|(sink, on_full, cfg)| {
SinkWorker::spawn(sink, on_full, cfg, dlq_tx.clone(), metrics.clone())
})
.collect();
Dispatcher { workers, ack_tx }
}
pub async fn dispatch(&self, result: ProcessResult, tokens: Vec<AckToken>) {
if self.workers.is_empty() {
for token in tokens {
let _ = self.ack_tx.send(token);
}
return;
}
let guard = Arc::new(AckGuard {
tokens: Mutex::new(tokens),
ack_tx: self.ack_tx.clone(),
});
let result = Arc::new(result);
for worker in &self.workers {
worker
.enqueue(DeliveryItem {
payload: DeliveryPayload::Result(result.clone()),
_guard: guard.clone(),
})
.await;
}
}
pub async fn dispatch_incident(&self, incident: IncidentEnvelope) {
if self.workers.is_empty() {
return;
}
let guard = Arc::new(AckGuard {
tokens: Mutex::new(Vec::new()),
ack_tx: self.ack_tx.clone(),
});
let incident = Arc::new(incident);
for worker in &self.workers {
worker
.enqueue(DeliveryItem {
payload: DeliveryPayload::Incident(incident.clone()),
_guard: guard.clone(),
})
.await;
}
}
pub async fn shutdown(self) {
let mut handles = Vec::with_capacity(self.workers.len());
for worker in self.workers {
handles.push(worker.handle);
}
drop(self.ack_tx);
for handle in handles {
let _ = handle.await;
}
}
}
async fn worker_loop<S: DeliverySink>(
mut sink: S,
mut rx: mpsc::Receiver<DeliveryItem>,
cfg: DeliveryConfig,
dlq_tx: Option<mpsc::Sender<DeliveryFailure>>,
metrics: Arc<dyn MetricsHook>,
label: &'static str,
) {
while let Some(first) = rx.recv().await {
let mut batch = Vec::with_capacity(cfg.batch_max.clamp(1, 64));
batch.push(first);
while batch.len() < cfg.batch_max {
match rx.try_recv() {
Ok(item) => batch.push(item),
Err(_) => break,
}
}
metrics.on_sink_queue_depth_change(label, -(batch.len() as i64));
for item in &batch {
let target = match &item.payload {
DeliveryPayload::Result(r) => DeliverTarget::Result(r),
DeliveryPayload::Incident(e) => DeliverTarget::Incident(e),
};
deliver_one(&mut sink, target, &cfg, dlq_tx.as_ref(), &metrics, label).await;
}
drop(batch);
}
}
enum DeliverTarget<'a> {
Result(&'a ProcessResult),
Incident(&'a IncidentEnvelope),
}
async fn deliver_one<S: DeliverySink>(
sink: &mut S,
target: DeliverTarget<'_>,
cfg: &DeliveryConfig,
dlq_tx: Option<&mpsc::Sender<DeliveryFailure>>,
metrics: &Arc<dyn MetricsHook>,
label: &'static str,
) {
let ctx = DeliveryContext::new();
let mut attempt: u32 = 0;
loop {
let outcome = match target {
DeliverTarget::Result(r) => sink.deliver(r, &ctx).await,
DeliverTarget::Incident(e) => sink.deliver_incident(e, &ctx).await,
};
match outcome {
Ok(()) => return,
Err(e) => {
let permanent = matches!(e, RuntimeError::Permanent(_));
if permanent || attempt >= cfg.retry_max {
metrics.on_sink_delivery_failed(label);
match dlq_tx {
Some(dlq) => {
let serialized = match target {
DeliverTarget::Result(r) => {
serde_json::to_string(r).unwrap_or_default()
}
DeliverTarget::Incident(e) => e.json.clone(),
};
let _ = dlq
.send(DeliveryFailure {
serialized,
error: format!("sink delivery failure: {e}"),
})
.await;
}
None => {
tracing::warn!(
sink = label,
error = %e,
"Sink delivery failed and no DLQ is configured; dropping result",
);
}
}
return;
}
attempt += 1;
metrics.on_sink_retry(label);
let delay = backoff_delay(cfg.backoff_base, cfg.backoff_max, attempt);
tracing::debug!(sink = label, attempt, error = %e, "Sink delivery retry");
tokio::time::sleep(delay).await;
}
}
}
}
fn backoff_delay(base: Duration, max: Duration, attempt: u32) -> Duration {
let shift = attempt.saturating_sub(1).min(20);
let factor: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
base.checked_mul(factor).unwrap_or(max).min(max)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::metrics::NoopMetrics;
fn noop_metrics() -> Arc<dyn MetricsHook> {
Arc::new(NoopMetrics)
}
fn result() -> ProcessResult {
Vec::new()
}
fn fast_cfg() -> DeliveryConfig {
DeliveryConfig {
queue_depth: 64,
batch_max: 16,
batch_flush: Duration::from_millis(1),
retry_max: 5,
backoff_base: Duration::from_millis(1),
backoff_max: Duration::from_millis(5),
}
}
struct MockSink {
label: &'static str,
fail_first: Arc<AtomicUsize>,
always_fail: bool,
permanent: bool,
delivered: Arc<AtomicUsize>,
attempts: Arc<AtomicUsize>,
ctx_ids: Arc<std::sync::Mutex<Vec<String>>>,
gate: Option<tokio::sync::watch::Receiver<bool>>,
}
impl MockSink {
fn new(label: &'static str) -> Self {
MockSink {
label,
fail_first: Arc::new(AtomicUsize::new(0)),
always_fail: false,
permanent: false,
delivered: Arc::new(AtomicUsize::new(0)),
attempts: Arc::new(AtomicUsize::new(0)),
ctx_ids: Arc::new(std::sync::Mutex::new(Vec::new())),
gate: None,
}
}
}
impl DeliverySink for MockSink {
fn deliver<'a>(
&'a mut self,
_result: &'a ProcessResult,
ctx: &'a DeliveryContext,
) -> DeliveryFuture<'a> {
let fail_first = self.fail_first.clone();
let delivered = self.delivered.clone();
let attempts = self.attempts.clone();
let always_fail = self.always_fail;
let permanent = self.permanent;
let gate = self.gate.clone();
let ctx_ids = self.ctx_ids.clone();
let ctx_id = ctx.id_base().to_string();
Box::pin(async move {
ctx_ids.lock().unwrap().push(ctx_id);
if let Some(mut rx) = gate {
loop {
if *rx.borrow() {
break;
}
if rx.changed().await.is_err() {
break;
}
}
}
attempts.fetch_add(1, Ordering::SeqCst);
if always_fail {
return Err(if permanent {
RuntimeError::Permanent("mock permanent".to_string())
} else {
RuntimeError::Io(std::io::Error::other("mock always fails"))
});
}
if fail_first.load(Ordering::SeqCst) > 0 {
fail_first.fetch_sub(1, Ordering::SeqCst);
return Err(RuntimeError::Io(std::io::Error::other("mock transient")));
}
delivered.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
fn label(&self) -> &'static str {
self.label
}
}
#[tokio::test]
async fn delivers_and_acks_single_sink() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let sink = MockSink::new("mock");
let delivered = sink.delivered.clone();
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Block, fast_cfg())],
None,
ack_tx,
noop_metrics(),
);
for _ in 0..10 {
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
}
dispatcher.shutdown().await;
assert_eq!(delivered.load(Ordering::SeqCst), 10);
let mut acks = 0;
while ack_rx.try_recv().is_ok() {
acks += 1;
}
assert_eq!(acks, 10, "every dispatched event must be acked");
}
#[tokio::test]
async fn retry_reuses_the_same_delivery_context() {
let (ack_tx, _ack_rx) = mpsc::unbounded_channel();
let sink = MockSink::new("mock");
sink.fail_first.store(2, Ordering::SeqCst); let ctx_ids = sink.ctx_ids.clone();
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Block, fast_cfg())],
None,
ack_tx,
noop_metrics(),
);
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
dispatcher.shutdown().await;
let ids = ctx_ids.lock().unwrap().clone();
assert_eq!(
ids.len(),
3,
"two failures then success means three attempts"
);
assert!(
ids.iter().all(|id| *id == ids[0]),
"the delivery context id must be stable across retries: {ids:?}",
);
}
#[tokio::test]
async fn retries_then_succeeds() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
let sink = MockSink::new("mock");
sink.fail_first.store(3, Ordering::SeqCst); let delivered = sink.delivered.clone();
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Block, fast_cfg())],
Some(dlq_tx),
ack_tx,
noop_metrics(),
);
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
dispatcher.shutdown().await;
assert_eq!(delivered.load(Ordering::SeqCst), 1, "eventually delivered");
assert!(ack_rx.try_recv().is_ok(), "acked after success");
assert!(
dlq_rx.try_recv().is_err(),
"no DLQ entry on eventual success"
);
}
#[tokio::test]
async fn terminal_failure_routes_to_dlq_and_acks() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
let mut sink = MockSink::new("mock");
sink.always_fail = true;
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Block, fast_cfg())],
Some(dlq_tx),
ack_tx,
noop_metrics(),
);
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
dispatcher.shutdown().await;
let failure = dlq_rx.try_recv().expect("terminal failure routed to DLQ");
assert!(failure.error.contains("sink delivery failure"));
assert!(
ack_rx.try_recv().is_ok(),
"token acked after DLQ parking (matches prior behavior)",
);
}
#[tokio::test]
async fn permanent_failure_skips_retries_and_dlqs() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let (dlq_tx, mut dlq_rx) = mpsc::channel(8);
let mut sink = MockSink::new("mock");
sink.always_fail = true;
sink.permanent = true;
let attempts = sink.attempts.clone();
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Block, fast_cfg())],
Some(dlq_tx),
ack_tx,
noop_metrics(),
);
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
dispatcher.shutdown().await;
assert_eq!(
attempts.load(Ordering::SeqCst),
1,
"a permanent failure must not be retried",
);
let failure = dlq_rx.try_recv().expect("permanent failure routed to DLQ");
assert!(failure.error.contains("permanent delivery failure"));
assert!(ack_rx.try_recv().is_ok());
}
#[tokio::test]
async fn ack_join_waits_for_all_sinks() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
let fast = MockSink::new("fast");
let mut slow = MockSink::new("slow");
slow.gate = Some(gate_rx);
let dispatcher = Dispatcher::spawn(
vec![
(fast, OnFull::Block, fast_cfg()),
(slow, OnFull::Block, fast_cfg()),
],
None,
ack_tx,
noop_metrics(),
);
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(
ack_rx.try_recv().is_err(),
"ack must wait for the slow sink"
);
gate_tx.send(true).unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(
ack_rx.try_recv().is_ok(),
"ack fires once every sink confirms",
);
dispatcher.shutdown().await;
}
#[tokio::test]
async fn drop_on_full_never_blocks_and_still_acks() {
let (ack_tx, mut ack_rx) = mpsc::unbounded_channel();
let (gate_tx, gate_rx) = tokio::sync::watch::channel(false);
let mut sink = MockSink::new("lossy");
sink.gate = Some(gate_rx);
let cfg = DeliveryConfig {
queue_depth: 1,
..fast_cfg()
};
let dispatcher = Dispatcher::spawn(
vec![(sink, OnFull::Drop, cfg)],
None,
ack_tx,
noop_metrics(),
);
for _ in 0..50 {
dispatcher.dispatch(result(), vec![AckToken::Noop]).await;
}
gate_tx.send(true).unwrap();
dispatcher.shutdown().await;
let mut acks = 0;
while ack_rx.try_recv().is_ok() {
acks += 1;
}
assert_eq!(acks, 50, "lossy sink still acks every event (best-effort)");
}
#[test]
fn backoff_is_capped_and_exponential() {
let base = Duration::from_millis(100);
let max = Duration::from_secs(5);
assert_eq!(backoff_delay(base, max, 1), Duration::from_millis(100));
assert_eq!(backoff_delay(base, max, 2), Duration::from_millis(200));
assert_eq!(backoff_delay(base, max, 3), Duration::from_millis(400));
assert_eq!(backoff_delay(base, max, 100), max, "capped at max");
}
}