Skip to main content

hyperi_rustlib/dlq/
orchestrator.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/orchestrator.rs
3// Purpose:   Dlq orchestrator over BackgroundSink<DlqEntry>
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Dlq orchestrator.
10//!
11//! Wraps a [`BackgroundSink<DlqEntry>`] whose drain (`DlqDrain`)
12//! dispatches batches across one or more [`super::DlqBackend`]
13//! variants using the configured [`DlqMode`].
14//!
15//! ## Hot path
16//!
17//! `try_send` / `send` queue an entry onto the in-memory mpsc and
18//! return. The drain task -- the only place that touches backends --
19//! coalesces queued entries into batches and writes to backends. The
20//! caller never blocks on disk, Kafka, HTTP, or Redis I/O.
21//!
22//! ## Modes
23//!
24//! - `Cascade` / `FileOnly` / `KafkaOnly` -- try backends in order,
25//!   stop on first success.
26//! - `FanOut` -- send to all backends, succeed if any succeed.
27//!
28//! ## Shutdown
29//!
30//! On `CancellationToken::cancel()` the drain finishes its in-flight
31//! batch, drains the queue, then exits. Use [`Dlq::shutdown`] for
32//! graceful join. Dropping all `Dlq` handles also triggers a clean
33//! exit (channel closes, drain drains, then exits).
34
35use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42    BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43    SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52/// Unified DLQ. Caller queues entries from any task; the orchestrator
53/// drains them off-runtime via the configured backends.
54///
55/// Clone is cheap (`mpsc::Sender` clone). The single-owner shutdown
56/// handle stays inside `Arc<AsyncMutex<Option<...>>>` so `Dlq` itself
57/// is `Clone`.
58#[derive(Clone)]
59pub struct Dlq {
60    sink: Option<BackgroundSink<DlqEntry>>,
61    join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62    enabled: bool,
63    mode: DlqMode,
64    /// Child of the user-supplied shutdown token. The drain task runs
65    /// on this child, so [`Dlq::shutdown`] can cancel only the DLQ
66    /// without affecting the caller's broader shutdown plan. When the
67    /// caller cancels their own token, the child fires too (normal
68    /// child-token semantics), so the drain still exits on global
69    /// shutdown.
70    cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("Dlq")
76            .field("enabled", &self.enabled)
77            .field("mode", &self.mode)
78            .field(
79                "pending",
80                &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81            )
82            .field(
83                "dropped",
84                &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85            )
86            .finish_non_exhaustive()
87    }
88}
89
90impl Dlq {
91    /// Build a disabled DLQ. All `send` / `try_send` calls succeed as
92    /// no-ops.
93    #[must_use]
94    pub fn disabled() -> Self {
95        Self {
96            sink: None,
97            join: Arc::new(AsyncMutex::new(None)),
98            enabled: false,
99            mode: DlqMode::default(),
100            cancel: CancellationToken::new(),
101        }
102    }
103
104    /// Spawn the DLQ with whatever backends the config enables.
105    ///
106    /// `kafka_config` is required if the config has `kafka.enabled =
107    /// true` (or the mode demands Kafka). Pass `None` if the service
108    /// has no Kafka transport -- Kafka mode/enabled flags are honoured
109    /// where possible and a clear `Err(DlqError::NotConfigured)` is
110    /// returned if Kafka is required but unavailable.
111    ///
112    /// # Errors
113    ///
114    /// Returns `Err` if any enabled backend fails to initialise.
115    pub fn spawn(
116        config: &DlqConfig,
117        service_name: &str,
118        #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119        #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120        shutdown: CancellationToken,
121    ) -> Result<Self, DlqError> {
122        if !config.enabled {
123            return Ok(Self::disabled());
124        }
125
126        let backends = build_backends(
127            config,
128            service_name,
129            #[cfg(feature = "dlq-kafka")]
130            kafka_config,
131        )?;
132
133        if backends.is_empty() {
134            warn!("DLQ enabled but no backends configured -- entries will be dropped");
135            return Ok(Self::disabled());
136        }
137
138        let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139        debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141        let drain = DlqDrain {
142            mode: config.mode,
143            backends,
144        };
145
146        let sink_config = BackgroundSinkConfig {
147            queue_capacity: config.queue_capacity,
148            batch_size: config.batch_size,
149            flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150            overflow: Overflow::Drop,
151            metric_prefix: Some("dfe_dlq"),
152        };
153
154        // Derive a child token so `Dlq::shutdown` can stop the drain
155        // without forcing the caller to cancel their broader shutdown
156        // plan. The child fires automatically when the parent fires,
157        // so global shutdown still drains the DLQ.
158        let cancel = shutdown.child_token();
159        let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161        Ok(Self {
162            sink: Some(sink),
163            join: Arc::new(AsyncMutex::new(Some(handle))),
164            enabled: true,
165            mode: config.mode,
166            cancel,
167        })
168    }
169
170    /// Whether the DLQ is accepting entries.
171    #[must_use]
172    pub fn is_enabled(&self) -> bool {
173        self.enabled
174    }
175
176    /// Configured routing mode (informational).
177    #[must_use]
178    pub fn mode(&self) -> DlqMode {
179        self.mode
180    }
181
182    /// Approximate queue depth (drain may be mid-recv).
183    #[must_use]
184    pub fn pending(&self) -> usize {
185        self.sink.as_ref().map_or(0, BackgroundSink::pending)
186    }
187
188    /// Total entries dropped due to overflow since spawn.
189    #[must_use]
190    pub fn dropped(&self) -> u64 {
191        self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192    }
193
194    /// Sync-shaped queue submission. Returns immediately. On a full
195    /// queue, returns `Err(DlqError::QueueFull)` and increments the
196    /// drop counter -- caller decides whether to log, escalate, or
197    /// proceed.
198    ///
199    /// # Errors
200    ///
201    /// `QueueFull` if the in-memory queue is full. `Closed` if the
202    /// drain has exited.
203    pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204        let Some(sink) = self.sink.as_ref() else {
205            return Ok(());
206        };
207        // `let_and_return` is allowed because the binding IS consumed by the
208        // metrics block below when the `metrics` feature is on.
209        #[cfg_attr(not(feature = "metrics"), allow(clippy::let_and_return))]
210        let res = sink.try_push(entry).map_err(map_sink_err);
211        // New default (metrics audit): admission + queue depth. The drop path
212        // is already counted as `dfe_dlq_dropped_total` by the BackgroundSink.
213        // `dfe_dlq_queue_depth` is a clear-named alias of the sink's
214        // `dfe_dlq_pending` gauge -- rising depth = downstream failing.
215        #[cfg(feature = "metrics")]
216        if res.is_ok() {
217            metrics::counter!("dfe_dlq_admitted_total").increment(1);
218            metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
219        }
220        res
221    }
222
223    /// Async submission that awaits queue space.
224    ///
225    /// Successful return means the entry is queued, NOT that it is
226    /// durably written. Use [`Self::flush`] for that.
227    ///
228    /// # Errors
229    ///
230    /// `Closed` if the drain has exited.
231    pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
232        let Some(sink) = self.sink.as_ref() else {
233            return Ok(());
234        };
235        #[cfg_attr(not(feature = "metrics"), allow(clippy::let_and_return))]
236        let res = sink.push_blocking(entry).await.map_err(map_sink_err);
237        // New default (metrics audit): admission + queue depth (see `try_send`).
238        #[cfg(feature = "metrics")]
239        if res.is_ok() {
240            metrics::counter!("dfe_dlq_admitted_total").increment(1);
241            metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
242        }
243        res
244    }
245
246    /// Async batch submission. Each entry is queued individually; the
247    /// drain decides how to coalesce.
248    ///
249    /// # Errors
250    ///
251    /// `Closed` if the drain has exited mid-batch.
252    pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
253        let Some(sink) = self.sink.as_ref() else {
254            return Ok(());
255        };
256        for entry in entries {
257            sink.push_blocking(entry).await.map_err(map_sink_err)?;
258            // Count each admitted entry (matches try_send/send). A mid-batch
259            // error counts the prefix already pushed -- at-least-once.
260            #[cfg(feature = "metrics")]
261            metrics::counter!("dfe_dlq_admitted_total").increment(1);
262        }
263        #[cfg(feature = "metrics")]
264        metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
265        Ok(())
266    }
267
268    /// Block until every entry queued before this call is durably
269    /// written by the drain.
270    ///
271    /// # Errors
272    ///
273    /// `Closed` if the drain has exited before this barrier was
274    /// processed.
275    pub async fn flush(&self) -> Result<(), DlqError> {
276        let Some(sink) = self.sink.as_ref() else {
277            return Ok(());
278        };
279        sink.flush().await.map_err(map_sink_err)
280    }
281
282    /// Cancel the internal child token (drain flushes its batch and
283    /// exits), then await the drain. Cancelling here rather than only
284    /// awaiting the join is what stops `shutdown` hanging when the
285    /// caller has not separately cancelled the token passed to `spawn`.
286    ///
287    /// Idempotent across clones: the join happens once; later calls see
288    /// an empty join slot and return Ok.
289    ///
290    /// # Errors
291    ///
292    /// Returns `Err(DlqError::Closed)` if the drain task panicked.
293    pub async fn shutdown(&self) -> Result<(), DlqError> {
294        self.cancel.cancel();
295        let mut guard = self.join.lock().await;
296        let Some(handle) = guard.take() else {
297            return Ok(());
298        };
299        handle
300            .join()
301            .await
302            .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
303        Ok(())
304    }
305}
306
307fn map_sink_err(e: SinkError) -> DlqError {
308    match e {
309        SinkError::Overflow => DlqError::QueueFull,
310        SinkError::Closed => DlqError::Closed,
311        SinkError::Drain(d) => DlqError::File(d.to_string()),
312    }
313}
314
315fn build_backends(
316    config: &DlqConfig,
317    service_name: &str,
318    #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
319) -> Result<Vec<DlqBackend>, DlqError> {
320    let mut backends: Vec<DlqBackend> = Vec::new();
321    let mode = config.mode;
322
323    // Kafka first (primary in cascade) -- feature-gated.
324    #[cfg(feature = "dlq-kafka")]
325    {
326        let want_kafka = matches!(
327            mode,
328            DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
329        );
330        if want_kafka && config.kafka.enabled {
331            let kc = kafka_config.ok_or_else(|| {
332                DlqError::Kafka(
333                    "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
334                )
335            })?;
336            backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
337                kc,
338                &config.kafka,
339            )?));
340        }
341    }
342
343    // File second (fallback in cascade) -- always available.
344    let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
345    if want_file && config.file.enabled {
346        backends.push(DlqBackend::File(FileDlqInner::new(
347            &config.file,
348            service_name,
349        )?));
350    }
351
352    // HTTP -- feature-gated, added when explicitly enabled.
353    #[cfg(feature = "dlq-http")]
354    {
355        if config.http.enabled {
356            backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
357                &config.http,
358            )?));
359        }
360    }
361
362    // Redis -- feature-gated. Requires async constructor; we build a
363    // tokio runtime handle inline. Spawn() must run inside a tokio
364    // runtime (true for every HyperI service).
365    #[cfg(feature = "dlq-redis")]
366    {
367        if config.redis.enabled {
368            let cfg = config.redis.clone();
369            let inner = tokio::task::block_in_place(|| {
370                tokio::runtime::Handle::current()
371                    .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
372            })?;
373            backends.push(DlqBackend::Redis(inner));
374        }
375    }
376
377    Ok(backends)
378}
379
380/// Drain task -- owns the backends and implements cascade / fan-out
381/// dispatch. Lives inside the actor task spawned by `BackgroundSink`.
382struct DlqDrain {
383    mode: DlqMode,
384    backends: Vec<DlqBackend>,
385}
386
387impl SinkDrain<DlqEntry> for DlqDrain {
388    async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
389        if batch.is_empty() {
390            return Ok(());
391        }
392
393        match self.mode {
394            DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
395                let mut last_err: Option<DlqError> = None;
396                for backend in &mut self.backends {
397                    match backend.send_batch(&batch).await {
398                        Ok(()) => return Ok(()),
399                        Err(e) => {
400                            warn!(
401                                backend = backend.name(),
402                                error = %e,
403                                count = batch.len(),
404                                "DLQ backend failed in cascade, trying next"
405                            );
406                            // New default (metrics audit): a cascade fall-through
407                            // to the next backend is a retry attempt.
408                            #[cfg(feature = "metrics")]
409                            metrics::counter!(
410                                "dfe_dlq_retried_total",
411                                "backend" => backend.name()
412                            )
413                            .increment(1);
414                            last_err = Some(e);
415                        }
416                    }
417                }
418                let msg = last_err
419                    .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
420                Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
421                    msg,
422                ))))
423            }
424            DlqMode::FanOut => {
425                let mut any_ok = false;
426                let mut errs: Vec<String> = Vec::new();
427                for backend in &mut self.backends {
428                    match backend.send_batch(&batch).await {
429                        Ok(()) => any_ok = true,
430                        Err(e) => {
431                            warn!(
432                                backend = backend.name(),
433                                error = %e,
434                                count = batch.len(),
435                                "DLQ backend failed in fan-out"
436                            );
437                            errs.push(format!("{}:{}", backend.name(), e));
438                        }
439                    }
440                }
441                if any_ok {
442                    Ok(())
443                } else {
444                    Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
445                        errs.join("; "),
446                    ))))
447                }
448            }
449        }
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::dlq::config::{FileDlqConfig, RotationPeriod};
457    use crate::dlq::entry::DlqSource;
458
459    fn tmp_config(dir: &std::path::Path) -> DlqConfig {
460        DlqConfig {
461            file: FileDlqConfig {
462                enabled: true,
463                path: dir.to_path_buf(),
464                rotation: RotationPeriod::Daily,
465                max_age_days: 1,
466                compress_rotated: false,
467            },
468            mode: DlqMode::FileOnly,
469            queue_capacity: 1024,
470            batch_size: 16,
471            flush_interval_ms: 20,
472            ..DlqConfig::default()
473        }
474    }
475
476    fn test_entry(reason: &str) -> DlqEntry {
477        DlqEntry::new("test", reason, b"payload".to_vec())
478            .with_destination("acme.auth")
479            .with_source(DlqSource::kafka("events", 1, 42))
480    }
481
482    #[tokio::test]
483    async fn disabled_dlq_accepts_silently() {
484        let dlq = Dlq::disabled();
485        dlq.send(test_entry("err")).await.expect("noop");
486        dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
487        dlq.flush().await.expect("noop flush");
488        dlq.shutdown().await.expect("noop shutdown");
489    }
490
491    #[tokio::test]
492    async fn file_only_writes_and_flushes() {
493        let dir = tempfile::tempdir().expect("tempdir");
494        let shutdown = CancellationToken::new();
495        let dlq = Dlq::spawn(
496            &tmp_config(dir.path()),
497            "svc",
498            #[cfg(feature = "dlq-kafka")]
499            None,
500            #[cfg(not(feature = "dlq-kafka"))]
501            None,
502            shutdown.clone(),
503        )
504        .expect("spawn");
505
506        for i in 0..5 {
507            dlq.send(test_entry(&format!("err_{i}")))
508                .await
509                .expect("send");
510        }
511        dlq.flush().await.expect("flush");
512
513        let path = dir.path().join("svc/dlq.ndjson");
514        let body = std::fs::read_to_string(&path).expect("read");
515        let lines: Vec<&str> = body.trim().lines().collect();
516        assert_eq!(lines.len(), 5);
517
518        shutdown.cancel();
519        dlq.shutdown().await.expect("clean shutdown");
520    }
521
522    #[tokio::test]
523    async fn try_send_returns_queue_full_when_saturated() {
524        let dir = tempfile::tempdir().expect("tempdir");
525        let mut cfg = tmp_config(dir.path());
526        cfg.queue_capacity = 2;
527        cfg.batch_size = 1024;
528        cfg.flush_interval_ms = 60_000; // drain rarely fires
529        let shutdown = CancellationToken::new();
530        let dlq = Dlq::spawn(
531            &cfg,
532            "svc",
533            #[cfg(feature = "dlq-kafka")]
534            None,
535            #[cfg(not(feature = "dlq-kafka"))]
536            None,
537            shutdown.clone(),
538        )
539        .expect("spawn");
540
541        let mut full_count = 0;
542        for i in 0..50 {
543            if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
544                full_count += 1;
545            }
546        }
547        assert!(full_count > 0, "expected at least one QueueFull");
548        shutdown.cancel();
549    }
550
551    #[tokio::test]
552    async fn dlq_clone_shares_state() {
553        let dir = tempfile::tempdir().expect("tempdir");
554        let shutdown = CancellationToken::new();
555        let dlq = Dlq::spawn(
556            &tmp_config(dir.path()),
557            "svc",
558            #[cfg(feature = "dlq-kafka")]
559            None,
560            #[cfg(not(feature = "dlq-kafka"))]
561            None,
562            shutdown.clone(),
563        )
564        .expect("spawn");
565
566        let dlq2 = dlq.clone();
567        dlq.send(test_entry("a")).await.expect("send a");
568        dlq2.send(test_entry("b")).await.expect("send b");
569        dlq.flush().await.expect("flush");
570
571        let path = dir.path().join("svc/dlq.ndjson");
572        let body = std::fs::read_to_string(&path).expect("read");
573        assert_eq!(body.trim().lines().count(), 2);
574
575        shutdown.cancel();
576        dlq.shutdown().await.expect("shutdown");
577    }
578}