Skip to main content

hyperi_rustlib/dlq/
orchestrator.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/orchestrator.rs
3// Purpose:   Dlq orchestrator over BackgroundSink<DlqEntry>
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Dlq orchestrator.
10//!
11//! Wraps a [`BackgroundSink<DlqEntry>`] whose drain (`DlqDrain`)
12//! dispatches batches across one or more [`super::DlqBackend`]
13//! variants using the configured [`DlqMode`].
14//!
15//! ## Hot path
16//!
17//! `try_send` / `send` queue an entry onto the in-memory mpsc and
18//! return. The drain task -- the only place that touches backends --
19//! coalesces queued entries into batches and writes to backends. The
20//! caller never blocks on disk, Kafka, HTTP, or Redis I/O.
21//!
22//! ## Modes
23//!
24//! - `Cascade` / `FileOnly` / `KafkaOnly` -- try backends in order,
25//!   stop on first success.
26//! - `FanOut` -- send to all backends, succeed if any succeed.
27//!
28//! ## Shutdown
29//!
30//! On `CancellationToken::cancel()` the drain finishes its in-flight
31//! batch, drains the queue, then exits. Use [`Dlq::shutdown`] for
32//! graceful join. Dropping all `Dlq` handles also triggers a clean
33//! exit (channel closes, drain drains, then exits).
34
35use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42    BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43    SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52/// Unified DLQ. Caller queues entries from any task; the orchestrator
53/// drains them off-runtime via the configured backends.
54///
55/// Clone is cheap (`mpsc::Sender` clone). The single-owner shutdown
56/// handle stays inside `Arc<AsyncMutex<Option<...>>>` so `Dlq` itself
57/// is `Clone`.
58#[derive(Clone)]
59pub struct Dlq {
60    sink: Option<BackgroundSink<DlqEntry>>,
61    join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62    enabled: bool,
63    mode: DlqMode,
64    /// Child of the user-supplied shutdown token. The drain task runs
65    /// on this child, so [`Dlq::shutdown`] can cancel only the DLQ
66    /// without affecting the caller's broader shutdown plan. When the
67    /// caller cancels their own token, the child fires too (normal
68    /// child-token semantics), so the drain still exits on global
69    /// shutdown.
70    cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("Dlq")
76            .field("enabled", &self.enabled)
77            .field("mode", &self.mode)
78            .field(
79                "pending",
80                &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81            )
82            .field(
83                "dropped",
84                &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85            )
86            .finish_non_exhaustive()
87    }
88}
89
90impl Dlq {
91    /// Build a disabled DLQ. All `send` / `try_send` calls succeed as
92    /// no-ops.
93    #[must_use]
94    pub fn disabled() -> Self {
95        Self {
96            sink: None,
97            join: Arc::new(AsyncMutex::new(None)),
98            enabled: false,
99            mode: DlqMode::default(),
100            cancel: CancellationToken::new(),
101        }
102    }
103
104    /// Spawn the DLQ with whatever backends the config enables.
105    ///
106    /// `kafka_config` is required if the config has `kafka.enabled =
107    /// true` (or the mode demands Kafka). Pass `None` if the service
108    /// has no Kafka transport -- Kafka mode/enabled flags are honoured
109    /// where possible and a clear `Err(DlqError::NotConfigured)` is
110    /// returned if Kafka is required but unavailable.
111    ///
112    /// # Errors
113    ///
114    /// Returns `Err` if any enabled backend fails to initialise.
115    pub fn spawn(
116        config: &DlqConfig,
117        service_name: &str,
118        #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119        #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120        shutdown: CancellationToken,
121    ) -> Result<Self, DlqError> {
122        if !config.enabled {
123            return Ok(Self::disabled());
124        }
125
126        let backends = build_backends(
127            config,
128            service_name,
129            #[cfg(feature = "dlq-kafka")]
130            kafka_config,
131        )?;
132
133        if backends.is_empty() {
134            warn!("DLQ enabled but no backends configured -- entries will be dropped");
135            return Ok(Self::disabled());
136        }
137
138        let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139        debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141        let drain = DlqDrain {
142            mode: config.mode,
143            backends,
144        };
145
146        let sink_config = BackgroundSinkConfig {
147            queue_capacity: config.queue_capacity,
148            batch_size: config.batch_size,
149            flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150            overflow: Overflow::Drop,
151            metric_prefix: Some("dfe_dlq"),
152        };
153
154        // Derive a child token so `Dlq::shutdown` can stop the drain
155        // without forcing the caller to cancel their broader shutdown
156        // plan. The child fires automatically when the parent fires,
157        // so global shutdown still drains the DLQ.
158        let cancel = shutdown.child_token();
159        let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161        Ok(Self {
162            sink: Some(sink),
163            join: Arc::new(AsyncMutex::new(Some(handle))),
164            enabled: true,
165            mode: config.mode,
166            cancel,
167        })
168    }
169
170    /// Whether the DLQ is accepting entries.
171    #[must_use]
172    pub fn is_enabled(&self) -> bool {
173        self.enabled
174    }
175
176    /// Configured routing mode (informational).
177    #[must_use]
178    pub fn mode(&self) -> DlqMode {
179        self.mode
180    }
181
182    /// Approximate queue depth (drain may be mid-recv).
183    #[must_use]
184    pub fn pending(&self) -> usize {
185        self.sink.as_ref().map_or(0, BackgroundSink::pending)
186    }
187
188    /// Total entries dropped due to overflow since spawn.
189    #[must_use]
190    pub fn dropped(&self) -> u64 {
191        self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192    }
193
194    /// Sync-shaped queue submission. Returns immediately. On a full
195    /// queue, returns `Err(DlqError::QueueFull)` and increments the
196    /// drop counter -- caller decides whether to log, escalate, or
197    /// proceed.
198    ///
199    /// # Errors
200    ///
201    /// `QueueFull` if the in-memory queue is full. `Closed` if the
202    /// drain has exited.
203    pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204        let Some(sink) = self.sink.as_ref() else {
205            return Ok(());
206        };
207        sink.try_push(entry).map_err(map_sink_err)
208    }
209
210    /// Async submission that awaits queue space.
211    ///
212    /// Successful return means the entry is queued, NOT that it is
213    /// durably written. Use [`Self::flush`] for that.
214    ///
215    /// # Errors
216    ///
217    /// `Closed` if the drain has exited.
218    pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
219        let Some(sink) = self.sink.as_ref() else {
220            return Ok(());
221        };
222        sink.push_blocking(entry).await.map_err(map_sink_err)
223    }
224
225    /// Async batch submission. Each entry is queued individually; the
226    /// drain decides how to coalesce.
227    ///
228    /// # Errors
229    ///
230    /// `Closed` if the drain has exited mid-batch.
231    pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
232        let Some(sink) = self.sink.as_ref() else {
233            return Ok(());
234        };
235        for entry in entries {
236            sink.push_blocking(entry).await.map_err(map_sink_err)?;
237        }
238        Ok(())
239    }
240
241    /// Block until every entry queued before this call is durably
242    /// written by the drain.
243    ///
244    /// # Errors
245    ///
246    /// `Closed` if the drain has exited before this barrier was
247    /// processed.
248    pub async fn flush(&self) -> Result<(), DlqError> {
249        let Some(sink) = self.sink.as_ref() else {
250            return Ok(());
251        };
252        sink.flush().await.map_err(map_sink_err)
253    }
254
255    /// Initiate shutdown and await graceful drain exit.
256    ///
257    /// Cancels the internal child token (drain observes the cancellation
258    /// in its next `select!`, flushes its remaining batch, and exits),
259    /// then awaits the drain task. This is the canonical "stop the DLQ
260    /// and wait for it" call -- the previous version only awaited the
261    /// join and would hang forever unless the caller had separately
262    /// cancelled the token passed to `spawn`.
263    ///
264    /// Idempotent: safe to call from many clones; the join happens
265    /// once. Subsequent calls observe an empty join slot and return Ok.
266    ///
267    /// # Errors
268    ///
269    /// Returns `Err(DlqError::Closed)` if the drain task panicked.
270    pub async fn shutdown(&self) -> Result<(), DlqError> {
271        // Trip the child token so the drain notices on its next
272        // select!. Idempotent -- CancellationToken::cancel handles
273        // re-cancellation.
274        self.cancel.cancel();
275        let mut guard = self.join.lock().await;
276        let Some(handle) = guard.take() else {
277            return Ok(());
278        };
279        handle
280            .join()
281            .await
282            .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
283        Ok(())
284    }
285}
286
287fn map_sink_err(e: SinkError) -> DlqError {
288    match e {
289        SinkError::Overflow => DlqError::QueueFull,
290        SinkError::Closed => DlqError::Closed,
291        SinkError::Drain(d) => DlqError::File(d.to_string()),
292    }
293}
294
295fn build_backends(
296    config: &DlqConfig,
297    service_name: &str,
298    #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
299) -> Result<Vec<DlqBackend>, DlqError> {
300    let mut backends: Vec<DlqBackend> = Vec::new();
301    let mode = config.mode;
302
303    // Kafka first (primary in cascade) -- feature-gated.
304    #[cfg(feature = "dlq-kafka")]
305    {
306        let want_kafka = matches!(
307            mode,
308            DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
309        );
310        if want_kafka && config.kafka.enabled {
311            let kc = kafka_config.ok_or_else(|| {
312                DlqError::Kafka(
313                    "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
314                )
315            })?;
316            backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
317                kc,
318                &config.kafka,
319            )?));
320        }
321    }
322
323    // File second (fallback in cascade) -- always available.
324    let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
325    if want_file && config.file.enabled {
326        backends.push(DlqBackend::File(FileDlqInner::new(
327            &config.file,
328            service_name,
329        )?));
330    }
331
332    // HTTP -- feature-gated, added when explicitly enabled.
333    #[cfg(feature = "dlq-http")]
334    {
335        if config.http.enabled {
336            backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
337                &config.http,
338            )?));
339        }
340    }
341
342    // Redis -- feature-gated. Requires async constructor; we build a
343    // tokio runtime handle inline. Spawn() must run inside a tokio
344    // runtime (true for every HyperI service).
345    #[cfg(feature = "dlq-redis")]
346    {
347        if config.redis.enabled {
348            let cfg = config.redis.clone();
349            let inner = tokio::task::block_in_place(|| {
350                tokio::runtime::Handle::current()
351                    .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
352            })?;
353            backends.push(DlqBackend::Redis(inner));
354        }
355    }
356
357    Ok(backends)
358}
359
360/// Drain task -- owns the backends and implements cascade / fan-out
361/// dispatch. Lives inside the actor task spawned by `BackgroundSink`.
362struct DlqDrain {
363    mode: DlqMode,
364    backends: Vec<DlqBackend>,
365}
366
367impl SinkDrain<DlqEntry> for DlqDrain {
368    async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
369        if batch.is_empty() {
370            return Ok(());
371        }
372
373        match self.mode {
374            DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
375                let mut last_err: Option<DlqError> = None;
376                for backend in &mut self.backends {
377                    match backend.send_batch(&batch).await {
378                        Ok(()) => return Ok(()),
379                        Err(e) => {
380                            warn!(
381                                backend = backend.name(),
382                                error = %e,
383                                count = batch.len(),
384                                "DLQ backend failed in cascade, trying next"
385                            );
386                            last_err = Some(e);
387                        }
388                    }
389                }
390                let msg = last_err
391                    .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
392                Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
393                    msg,
394                ))))
395            }
396            DlqMode::FanOut => {
397                let mut any_ok = false;
398                let mut errs: Vec<String> = Vec::new();
399                for backend in &mut self.backends {
400                    match backend.send_batch(&batch).await {
401                        Ok(()) => any_ok = true,
402                        Err(e) => {
403                            warn!(
404                                backend = backend.name(),
405                                error = %e,
406                                count = batch.len(),
407                                "DLQ backend failed in fan-out"
408                            );
409                            errs.push(format!("{}:{}", backend.name(), e));
410                        }
411                    }
412                }
413                if any_ok {
414                    Ok(())
415                } else {
416                    Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
417                        errs.join("; "),
418                    ))))
419                }
420            }
421        }
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use crate::dlq::config::{FileDlqConfig, RotationPeriod};
429    use crate::dlq::entry::DlqSource;
430
431    fn tmp_config(dir: &std::path::Path) -> DlqConfig {
432        DlqConfig {
433            file: FileDlqConfig {
434                enabled: true,
435                path: dir.to_path_buf(),
436                rotation: RotationPeriod::Daily,
437                max_age_days: 1,
438                compress_rotated: false,
439            },
440            mode: DlqMode::FileOnly,
441            queue_capacity: 1024,
442            batch_size: 16,
443            flush_interval_ms: 20,
444            ..DlqConfig::default()
445        }
446    }
447
448    fn test_entry(reason: &str) -> DlqEntry {
449        DlqEntry::new("test", reason, b"payload".to_vec())
450            .with_destination("acme.auth")
451            .with_source(DlqSource::kafka("events", 1, 42))
452    }
453
454    #[tokio::test]
455    async fn disabled_dlq_accepts_silently() {
456        let dlq = Dlq::disabled();
457        dlq.send(test_entry("err")).await.expect("noop");
458        dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
459        dlq.flush().await.expect("noop flush");
460        dlq.shutdown().await.expect("noop shutdown");
461    }
462
463    #[tokio::test]
464    async fn file_only_writes_and_flushes() {
465        let dir = tempfile::tempdir().expect("tempdir");
466        let shutdown = CancellationToken::new();
467        let dlq = Dlq::spawn(
468            &tmp_config(dir.path()),
469            "svc",
470            #[cfg(feature = "dlq-kafka")]
471            None,
472            #[cfg(not(feature = "dlq-kafka"))]
473            None,
474            shutdown.clone(),
475        )
476        .expect("spawn");
477
478        for i in 0..5 {
479            dlq.send(test_entry(&format!("err_{i}")))
480                .await
481                .expect("send");
482        }
483        dlq.flush().await.expect("flush");
484
485        let path = dir.path().join("svc/dlq.ndjson");
486        let body = std::fs::read_to_string(&path).expect("read");
487        let lines: Vec<&str> = body.trim().lines().collect();
488        assert_eq!(lines.len(), 5);
489
490        shutdown.cancel();
491        dlq.shutdown().await.expect("clean shutdown");
492    }
493
494    #[tokio::test]
495    async fn try_send_returns_queue_full_when_saturated() {
496        let dir = tempfile::tempdir().expect("tempdir");
497        let mut cfg = tmp_config(dir.path());
498        cfg.queue_capacity = 2;
499        cfg.batch_size = 1024;
500        cfg.flush_interval_ms = 60_000; // drain rarely fires
501        let shutdown = CancellationToken::new();
502        let dlq = Dlq::spawn(
503            &cfg,
504            "svc",
505            #[cfg(feature = "dlq-kafka")]
506            None,
507            #[cfg(not(feature = "dlq-kafka"))]
508            None,
509            shutdown.clone(),
510        )
511        .expect("spawn");
512
513        let mut full_count = 0;
514        for i in 0..50 {
515            if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
516                full_count += 1;
517            }
518        }
519        assert!(full_count > 0, "expected at least one QueueFull");
520        shutdown.cancel();
521    }
522
523    #[tokio::test]
524    async fn dlq_clone_shares_state() {
525        let dir = tempfile::tempdir().expect("tempdir");
526        let shutdown = CancellationToken::new();
527        let dlq = Dlq::spawn(
528            &tmp_config(dir.path()),
529            "svc",
530            #[cfg(feature = "dlq-kafka")]
531            None,
532            #[cfg(not(feature = "dlq-kafka"))]
533            None,
534            shutdown.clone(),
535        )
536        .expect("spawn");
537
538        let dlq2 = dlq.clone();
539        dlq.send(test_entry("a")).await.expect("send a");
540        dlq2.send(test_entry("b")).await.expect("send b");
541        dlq.flush().await.expect("flush");
542
543        let path = dir.path().join("svc/dlq.ndjson");
544        let body = std::fs::read_to_string(&path).expect("read");
545        assert_eq!(body.trim().lines().count(), 2);
546
547        shutdown.cancel();
548        dlq.shutdown().await.expect("shutdown");
549    }
550}