Skip to main content

hyperi_rustlib/dlq/
backend.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/backend.rs
3// Purpose:   DlqBackend enum -- variant per supported backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Static enum dispatch for DLQ backends.
10//!
11//! Replaces the previous `#[async_trait] trait DlqBackend` + `Box<dyn>`
12//! shape. Each backend is a concrete variant; the drain matches over
13//! variants. No vtable, no `async_trait` macro, no heap-boxed future.
14//!
15//! See [`super::orchestrator::Dlq`] for usage.
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20/// A DLQ backend. One variant per supported destination.
21///
22/// Variants are feature-gated:
23///
24/// - [`Self::File`] -- always available
25/// - `Kafka` -- `dlq-kafka` feature
26/// - `Http` -- `dlq-http` feature
27/// - `Redis` -- `dlq-redis` feature
28///
29/// Each variant's inner struct lives in its sibling module
30/// (`file::FileDlqInner`, `kafka::KafkaDlqInner`, etc.). They are
31/// crate-private -- consumers configure DLQ via [`super::DlqConfig`] and
32/// drive it via [`super::orchestrator::Dlq`].
33#[non_exhaustive]
34pub enum DlqBackend {
35    /// NDJSON file backend with rotation.
36    File(super::file::FileDlqInner),
37
38    /// Kafka backend.
39    #[cfg(feature = "dlq-kafka")]
40    Kafka(super::kafka::KafkaDlqInner),
41
42    /// HTTP POST backend.
43    #[cfg(feature = "dlq-http")]
44    Http(super::http::HttpDlqInner),
45
46    /// Redis Streams backend.
47    #[cfg(feature = "dlq-redis")]
48    Redis(super::redis_dlq::RedisDlqInner),
49}
50
51impl std::fmt::Debug for DlqBackend {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str("DlqBackend::")?;
54        f.write_str(self.name())
55    }
56}
57
58impl DlqBackend {
59    /// Write a batch of entries to this backend. Called only by the
60    /// orchestrator's drain task -- never from a consumer hot path.
61    ///
62    /// # Errors
63    ///
64    /// Backend-specific. The orchestrator decides whether to cascade,
65    /// fall back, or fan out based on the configured [`super::DlqMode`].
66    pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
67        match self {
68            Self::File(b) => b.send_batch(batch).await,
69            #[cfg(feature = "dlq-kafka")]
70            Self::Kafka(b) => b.send_batch(batch).await,
71            #[cfg(feature = "dlq-http")]
72            Self::Http(b) => b.send_batch(batch).await,
73            #[cfg(feature = "dlq-redis")]
74            Self::Redis(b) => b.send_batch(batch).await,
75        }
76    }
77
78    /// Make every entry written so far DURABLE.
79    ///
80    /// Called from the BackgroundSink barrier handler when a consumer
81    /// invokes [`super::orchestrator::Dlq::flush`]. Each backend honours
82    /// the strongest durability it can express:
83    ///
84    /// - **File**: `flush()` on the rotating writer. `file-rotate`
85    ///   doesn't expose the inner `File`, so we can't `fsync()` -- this
86    ///   only flushes to the kernel page cache, so power loss before
87    ///   write-back can still lose data. Limitation, tracked until
88    ///   `file-rotate` exposes a sync hook.
89    /// - **Kafka**: `producer.flush()` -- blocks until every queued
90    ///   message is acked by the broker (per the producer's acks
91    ///   config). The real durability semantic.
92    /// - **HTTP**: no-op. `send_batch` already awaits the response.
93    /// - **Redis**: no-op. `send_batch` already awaits the XADD pipeline.
94    ///
95    /// # Errors
96    ///
97    /// Backend-specific. Surfaced to the BackgroundSink barrier
98    /// handler which logs and continues.
99    pub async fn flush_durable(&mut self) -> Result<(), DlqError> {
100        match self {
101            Self::File(b) => b.flush_durable().await,
102            #[cfg(feature = "dlq-kafka")]
103            Self::Kafka(b) => b.flush_durable().await,
104            #[cfg(feature = "dlq-http")]
105            Self::Http(_) => Ok(()),
106            #[cfg(feature = "dlq-redis")]
107            Self::Redis(_) => Ok(()),
108        }
109    }
110
111    /// Backend name for log / metric labels.
112    #[must_use]
113    pub fn name(&self) -> &'static str {
114        match self {
115            Self::File(_) => "file",
116            #[cfg(feature = "dlq-kafka")]
117            Self::Kafka(_) => "kafka",
118            #[cfg(feature = "dlq-http")]
119            Self::Http(_) => "http",
120            #[cfg(feature = "dlq-redis")]
121            Self::Redis(_) => "redis",
122        }
123    }
124}