Skip to main content

hyperi_rustlib/dlq/
orchestrator.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/orchestrator.rs
3// Purpose:   Dlq orchestrator over BackgroundSink<DlqEntry>
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Dlq orchestrator.
10//!
11//! Wraps a [`BackgroundSink<DlqEntry>`] whose drain (`DlqDrain`)
12//! dispatches batches across one or more [`super::DlqBackend`]
13//! variants using the configured [`DlqMode`].
14//!
15//! ## Hot path
16//!
17//! `try_send` / `send` queue an entry onto the in-memory mpsc and
18//! return. The drain task -- the only place that touches backends --
19//! coalesces queued entries into batches and writes to backends. The
20//! caller never blocks on disk, Kafka, HTTP, or Redis I/O.
21//!
22//! ## Modes
23//!
24//! - `Cascade` / `FileOnly` / `KafkaOnly` -- try backends in order,
25//!   stop on first success.
26//! - `FanOut` -- send to all backends, succeed if any succeed.
27//!
28//! ## Shutdown
29//!
30//! On `CancellationToken::cancel()` the drain finishes its in-flight
31//! batch, drains the queue, then exits. Use [`Dlq::shutdown`] for
32//! graceful join. Dropping all `Dlq` handles also triggers a clean
33//! exit (channel closes, drain drains, then exits).
34
35use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42    BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43    SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52/// Unified DLQ. Caller queues entries from any task; the orchestrator
53/// drains them off-runtime via the configured backends.
54///
55/// Clone is cheap (`mpsc::Sender` clone). The single-owner shutdown
56/// handle stays inside `Arc<AsyncMutex<Option<...>>>` so `Dlq` itself
57/// is `Clone`.
58#[derive(Clone)]
59pub struct Dlq {
60    sink: Option<BackgroundSink<DlqEntry>>,
61    join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62    enabled: bool,
63    mode: DlqMode,
64    /// Child of the user-supplied shutdown token. The drain task runs
65    /// on this child, so [`Dlq::shutdown`] can cancel only the DLQ
66    /// without affecting the caller's broader shutdown plan. When the
67    /// caller cancels their own token, the child fires too (normal
68    /// child-token semantics), so the drain still exits on global
69    /// shutdown.
70    cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("Dlq")
76            .field("enabled", &self.enabled)
77            .field("mode", &self.mode)
78            .field(
79                "pending",
80                &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81            )
82            .field(
83                "dropped",
84                &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85            )
86            .finish_non_exhaustive()
87    }
88}
89
90impl Dlq {
91    /// Build a disabled DLQ. All `send` / `try_send` calls succeed as
92    /// no-ops.
93    #[must_use]
94    pub fn disabled() -> Self {
95        Self {
96            sink: None,
97            join: Arc::new(AsyncMutex::new(None)),
98            enabled: false,
99            mode: DlqMode::default(),
100            cancel: CancellationToken::new(),
101        }
102    }
103
104    /// Spawn the DLQ with whatever backends the config enables.
105    ///
106    /// `kafka_config` is required if the config has `kafka.enabled =
107    /// true` (or the mode demands Kafka). Pass `None` if the service
108    /// has no Kafka transport -- Kafka mode/enabled flags are honoured
109    /// where possible and a clear `Err(DlqError::NotConfigured)` is
110    /// returned if Kafka is required but unavailable.
111    ///
112    /// # Errors
113    ///
114    /// Returns `Err` if any enabled backend fails to initialise.
115    pub fn spawn(
116        config: &DlqConfig,
117        service_name: &str,
118        #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119        #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120        shutdown: CancellationToken,
121    ) -> Result<Self, DlqError> {
122        if !config.enabled {
123            return Ok(Self::disabled());
124        }
125
126        let backends = build_backends(
127            config,
128            service_name,
129            #[cfg(feature = "dlq-kafka")]
130            kafka_config,
131        )?;
132
133        if backends.is_empty() {
134            warn!("DLQ enabled but no backends configured -- entries will be dropped");
135            return Ok(Self::disabled());
136        }
137
138        let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139        debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141        let drain = DlqDrain {
142            mode: config.mode,
143            backends,
144        };
145
146        let sink_config = BackgroundSinkConfig {
147            queue_capacity: config.queue_capacity,
148            batch_size: config.batch_size,
149            flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150            overflow: Overflow::Drop,
151            metric_prefix: Some("dfe_dlq"),
152        };
153
154        // Derive a child token so `Dlq::shutdown` can stop the drain
155        // without forcing the caller to cancel their broader shutdown
156        // plan. The child fires automatically when the parent fires,
157        // so global shutdown still drains the DLQ.
158        let cancel = shutdown.child_token();
159        let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161        Ok(Self {
162            sink: Some(sink),
163            join: Arc::new(AsyncMutex::new(Some(handle))),
164            enabled: true,
165            mode: config.mode,
166            cancel,
167        })
168    }
169
170    /// Whether the DLQ is accepting entries.
171    #[must_use]
172    pub fn is_enabled(&self) -> bool {
173        self.enabled
174    }
175
176    /// Configured routing mode (informational).
177    #[must_use]
178    pub fn mode(&self) -> DlqMode {
179        self.mode
180    }
181
182    /// Approximate queue depth (drain may be mid-recv).
183    #[must_use]
184    pub fn pending(&self) -> usize {
185        self.sink.as_ref().map_or(0, BackgroundSink::pending)
186    }
187
188    /// Total entries dropped due to overflow since spawn.
189    #[must_use]
190    pub fn dropped(&self) -> u64 {
191        self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192    }
193
194    /// Sync-shaped queue submission. Returns immediately. On a full
195    /// queue, returns `Err(DlqError::QueueFull)` and increments the
196    /// drop counter -- caller decides whether to log, escalate, or
197    /// proceed.
198    ///
199    /// # Errors
200    ///
201    /// `QueueFull` if the in-memory queue is full. `Closed` if the
202    /// drain has exited.
203    pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204        let Some(sink) = self.sink.as_ref() else {
205            return Ok(());
206        };
207        sink.try_push(entry).map_err(map_sink_err)
208    }
209
210    /// Async submission that awaits queue space.
211    ///
212    /// Successful return means the entry is queued, NOT that it is
213    /// durably written. Use [`Self::flush`] for that.
214    ///
215    /// # Errors
216    ///
217    /// `Closed` if the drain has exited.
218    pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
219        let Some(sink) = self.sink.as_ref() else {
220            return Ok(());
221        };
222        sink.push_blocking(entry).await.map_err(map_sink_err)
223    }
224
225    /// Async batch submission. Each entry is queued individually; the
226    /// drain decides how to coalesce.
227    ///
228    /// # Errors
229    ///
230    /// `Closed` if the drain has exited mid-batch.
231    pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
232        let Some(sink) = self.sink.as_ref() else {
233            return Ok(());
234        };
235        for entry in entries {
236            sink.push_blocking(entry).await.map_err(map_sink_err)?;
237        }
238        Ok(())
239    }
240
241    /// Block until every entry queued before this call is durably
242    /// written by the drain.
243    ///
244    /// # Errors
245    ///
246    /// `Closed` if the drain has exited before this barrier was
247    /// processed.
248    pub async fn flush(&self) -> Result<(), DlqError> {
249        let Some(sink) = self.sink.as_ref() else {
250            return Ok(());
251        };
252        sink.flush().await.map_err(map_sink_err)
253    }
254
255    /// Cancel the internal child token (drain flushes its batch and
256    /// exits), then await the drain. Cancelling here rather than only
257    /// awaiting the join is what stops `shutdown` hanging when the
258    /// caller has not separately cancelled the token passed to `spawn`.
259    ///
260    /// Idempotent across clones: the join happens once; later calls see
261    /// an empty join slot and return Ok.
262    ///
263    /// # Errors
264    ///
265    /// Returns `Err(DlqError::Closed)` if the drain task panicked.
266    pub async fn shutdown(&self) -> Result<(), DlqError> {
267        self.cancel.cancel();
268        let mut guard = self.join.lock().await;
269        let Some(handle) = guard.take() else {
270            return Ok(());
271        };
272        handle
273            .join()
274            .await
275            .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
276        Ok(())
277    }
278}
279
280fn map_sink_err(e: SinkError) -> DlqError {
281    match e {
282        SinkError::Overflow => DlqError::QueueFull,
283        SinkError::Closed => DlqError::Closed,
284        SinkError::Drain(d) => DlqError::File(d.to_string()),
285    }
286}
287
288fn build_backends(
289    config: &DlqConfig,
290    service_name: &str,
291    #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
292) -> Result<Vec<DlqBackend>, DlqError> {
293    let mut backends: Vec<DlqBackend> = Vec::new();
294    let mode = config.mode;
295
296    // Kafka first (primary in cascade) -- feature-gated.
297    #[cfg(feature = "dlq-kafka")]
298    {
299        let want_kafka = matches!(
300            mode,
301            DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
302        );
303        if want_kafka && config.kafka.enabled {
304            let kc = kafka_config.ok_or_else(|| {
305                DlqError::Kafka(
306                    "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
307                )
308            })?;
309            backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
310                kc,
311                &config.kafka,
312            )?));
313        }
314    }
315
316    // File second (fallback in cascade) -- always available.
317    let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
318    if want_file && config.file.enabled {
319        backends.push(DlqBackend::File(FileDlqInner::new(
320            &config.file,
321            service_name,
322        )?));
323    }
324
325    // HTTP -- feature-gated, added when explicitly enabled.
326    #[cfg(feature = "dlq-http")]
327    {
328        if config.http.enabled {
329            backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
330                &config.http,
331            )?));
332        }
333    }
334
335    // Redis -- feature-gated. Requires async constructor; we build a
336    // tokio runtime handle inline. Spawn() must run inside a tokio
337    // runtime (true for every HyperI service).
338    #[cfg(feature = "dlq-redis")]
339    {
340        if config.redis.enabled {
341            let cfg = config.redis.clone();
342            let inner = tokio::task::block_in_place(|| {
343                tokio::runtime::Handle::current()
344                    .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
345            })?;
346            backends.push(DlqBackend::Redis(inner));
347        }
348    }
349
350    Ok(backends)
351}
352
353/// Drain task -- owns the backends and implements cascade / fan-out
354/// dispatch. Lives inside the actor task spawned by `BackgroundSink`.
355struct DlqDrain {
356    mode: DlqMode,
357    backends: Vec<DlqBackend>,
358}
359
360impl SinkDrain<DlqEntry> for DlqDrain {
361    async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
362        if batch.is_empty() {
363            return Ok(());
364        }
365
366        match self.mode {
367            DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
368                let mut last_err: Option<DlqError> = None;
369                for backend in &mut self.backends {
370                    match backend.send_batch(&batch).await {
371                        Ok(()) => return Ok(()),
372                        Err(e) => {
373                            warn!(
374                                backend = backend.name(),
375                                error = %e,
376                                count = batch.len(),
377                                "DLQ backend failed in cascade, trying next"
378                            );
379                            last_err = Some(e);
380                        }
381                    }
382                }
383                let msg = last_err
384                    .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
385                Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
386                    msg,
387                ))))
388            }
389            DlqMode::FanOut => {
390                let mut any_ok = false;
391                let mut errs: Vec<String> = Vec::new();
392                for backend in &mut self.backends {
393                    match backend.send_batch(&batch).await {
394                        Ok(()) => any_ok = true,
395                        Err(e) => {
396                            warn!(
397                                backend = backend.name(),
398                                error = %e,
399                                count = batch.len(),
400                                "DLQ backend failed in fan-out"
401                            );
402                            errs.push(format!("{}:{}", backend.name(), e));
403                        }
404                    }
405                }
406                if any_ok {
407                    Ok(())
408                } else {
409                    Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
410                        errs.join("; "),
411                    ))))
412                }
413            }
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::dlq::config::{FileDlqConfig, RotationPeriod};
422    use crate::dlq::entry::DlqSource;
423
424    fn tmp_config(dir: &std::path::Path) -> DlqConfig {
425        DlqConfig {
426            file: FileDlqConfig {
427                enabled: true,
428                path: dir.to_path_buf(),
429                rotation: RotationPeriod::Daily,
430                max_age_days: 1,
431                compress_rotated: false,
432            },
433            mode: DlqMode::FileOnly,
434            queue_capacity: 1024,
435            batch_size: 16,
436            flush_interval_ms: 20,
437            ..DlqConfig::default()
438        }
439    }
440
441    fn test_entry(reason: &str) -> DlqEntry {
442        DlqEntry::new("test", reason, b"payload".to_vec())
443            .with_destination("acme.auth")
444            .with_source(DlqSource::kafka("events", 1, 42))
445    }
446
447    #[tokio::test]
448    async fn disabled_dlq_accepts_silently() {
449        let dlq = Dlq::disabled();
450        dlq.send(test_entry("err")).await.expect("noop");
451        dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
452        dlq.flush().await.expect("noop flush");
453        dlq.shutdown().await.expect("noop shutdown");
454    }
455
456    #[tokio::test]
457    async fn file_only_writes_and_flushes() {
458        let dir = tempfile::tempdir().expect("tempdir");
459        let shutdown = CancellationToken::new();
460        let dlq = Dlq::spawn(
461            &tmp_config(dir.path()),
462            "svc",
463            #[cfg(feature = "dlq-kafka")]
464            None,
465            #[cfg(not(feature = "dlq-kafka"))]
466            None,
467            shutdown.clone(),
468        )
469        .expect("spawn");
470
471        for i in 0..5 {
472            dlq.send(test_entry(&format!("err_{i}")))
473                .await
474                .expect("send");
475        }
476        dlq.flush().await.expect("flush");
477
478        let path = dir.path().join("svc/dlq.ndjson");
479        let body = std::fs::read_to_string(&path).expect("read");
480        let lines: Vec<&str> = body.trim().lines().collect();
481        assert_eq!(lines.len(), 5);
482
483        shutdown.cancel();
484        dlq.shutdown().await.expect("clean shutdown");
485    }
486
487    #[tokio::test]
488    async fn try_send_returns_queue_full_when_saturated() {
489        let dir = tempfile::tempdir().expect("tempdir");
490        let mut cfg = tmp_config(dir.path());
491        cfg.queue_capacity = 2;
492        cfg.batch_size = 1024;
493        cfg.flush_interval_ms = 60_000; // drain rarely fires
494        let shutdown = CancellationToken::new();
495        let dlq = Dlq::spawn(
496            &cfg,
497            "svc",
498            #[cfg(feature = "dlq-kafka")]
499            None,
500            #[cfg(not(feature = "dlq-kafka"))]
501            None,
502            shutdown.clone(),
503        )
504        .expect("spawn");
505
506        let mut full_count = 0;
507        for i in 0..50 {
508            if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
509                full_count += 1;
510            }
511        }
512        assert!(full_count > 0, "expected at least one QueueFull");
513        shutdown.cancel();
514    }
515
516    #[tokio::test]
517    async fn dlq_clone_shares_state() {
518        let dir = tempfile::tempdir().expect("tempdir");
519        let shutdown = CancellationToken::new();
520        let dlq = Dlq::spawn(
521            &tmp_config(dir.path()),
522            "svc",
523            #[cfg(feature = "dlq-kafka")]
524            None,
525            #[cfg(not(feature = "dlq-kafka"))]
526            None,
527            shutdown.clone(),
528        )
529        .expect("spawn");
530
531        let dlq2 = dlq.clone();
532        dlq.send(test_entry("a")).await.expect("send a");
533        dlq2.send(test_entry("b")).await.expect("send b");
534        dlq.flush().await.expect("flush");
535
536        let path = dir.path().join("svc/dlq.ndjson");
537        let body = std::fs::read_to_string(&path).expect("read");
538        assert_eq!(body.trim().lines().count(), 2);
539
540        shutdown.cancel();
541        dlq.shutdown().await.expect("shutdown");
542    }
543}