Skip to main content

hyperi_rustlib/dlq/
backend.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/backend.rs
3// Purpose:   DlqBackend enum -- variant per supported backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Static enum dispatch for DLQ backends.
10//!
11//! Replaces the previous `#[async_trait] trait DlqBackend` + `Box<dyn>`
12//! shape. Each backend is a concrete variant; the drain matches over
13//! variants. No vtable, no `async_trait` macro, no heap-boxed future.
14//!
15//! See [`super::orchestrator::Dlq`] for usage.
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20/// A DLQ backend. One variant per supported destination.
21///
22/// Variants are feature-gated:
23///
24/// - [`Self::File`] -- always available
25/// - [`Self::Kafka`] -- `dlq-kafka` feature
26/// - [`Self::Http`] -- `dlq-http` feature
27/// - [`Self::Redis`] -- `dlq-redis` feature
28///
29/// Each variant's inner struct lives in its sibling module
30/// (`file::FileDlqInner`, `kafka::KafkaDlqInner`, etc.). They are
31/// crate-private -- consumers configure DLQ via [`super::DlqConfig`] and
32/// drive it via [`super::orchestrator::Dlq`].
33#[non_exhaustive]
34pub enum DlqBackend {
35    /// NDJSON file backend with rotation.
36    File(super::file::FileDlqInner),
37
38    /// Kafka backend.
39    #[cfg(feature = "dlq-kafka")]
40    Kafka(super::kafka::KafkaDlqInner),
41
42    /// HTTP POST backend.
43    #[cfg(feature = "dlq-http")]
44    Http(super::http::HttpDlqInner),
45
46    /// Redis Streams backend.
47    #[cfg(feature = "dlq-redis")]
48    Redis(super::redis_dlq::RedisDlqInner),
49}
50
51impl std::fmt::Debug for DlqBackend {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str("DlqBackend::")?;
54        f.write_str(self.name())
55    }
56}
57
58impl DlqBackend {
59    /// Write a batch of entries to this backend. Called only by the
60    /// orchestrator's drain task -- never from a consumer hot path.
61    ///
62    /// # Errors
63    ///
64    /// Backend-specific. The orchestrator decides whether to cascade,
65    /// fall back, or fan out based on the configured [`super::DlqMode`].
66    pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
67        match self {
68            Self::File(b) => b.send_batch(batch).await,
69            #[cfg(feature = "dlq-kafka")]
70            Self::Kafka(b) => b.send_batch(batch).await,
71            #[cfg(feature = "dlq-http")]
72            Self::Http(b) => b.send_batch(batch).await,
73            #[cfg(feature = "dlq-redis")]
74            Self::Redis(b) => b.send_batch(batch).await,
75        }
76    }
77
78    /// Make every entry written so far DURABLE.
79    ///
80    /// Called from the BackgroundSink barrier handler when a consumer
81    /// invokes [`super::orchestrator::Dlq::flush`]. Each backend honours
82    /// the strongest durability it can express:
83    ///
84    /// - **File**: `flush()` on the rotating writer. The underlying
85    ///   `file-rotate` crate doesn't expose the inner `File` handle, so
86    ///   we can't `fsync()` from here -- `flush()` only guarantees that
87    ///   buffered bytes have been handed to the kernel page cache. A
88    ///   power-loss event between page-cache and disk can still lose
89    ///   data. Documented limitation; tracked separately if/when
90    ///   `file-rotate` exposes a sync hook.
91    /// - **Kafka**: `producer.flush()` -- blocks until every queued
92    ///   message has been acked by the broker (per the producer's
93    ///   acks config). This is the real durability semantic.
94    /// - **HTTP**: no-op. `send_batch` already awaits the response.
95    /// - **Redis**: no-op. `send_batch` already awaits the XADD pipeline.
96    ///
97    /// # Errors
98    ///
99    /// Backend-specific. Surfaced to the BackgroundSink barrier
100    /// handler which logs and continues.
101    pub async fn flush_durable(&mut self) -> Result<(), DlqError> {
102        match self {
103            Self::File(b) => b.flush_durable().await,
104            #[cfg(feature = "dlq-kafka")]
105            Self::Kafka(b) => b.flush_durable().await,
106            #[cfg(feature = "dlq-http")]
107            Self::Http(_) => Ok(()),
108            #[cfg(feature = "dlq-redis")]
109            Self::Redis(_) => Ok(()),
110        }
111    }
112
113    /// Backend name for log / metric labels.
114    #[must_use]
115    pub fn name(&self) -> &'static str {
116        match self {
117            Self::File(_) => "file",
118            #[cfg(feature = "dlq-kafka")]
119            Self::Kafka(_) => "kafka",
120            #[cfg(feature = "dlq-http")]
121            Self::Http(_) => "http",
122            #[cfg(feature = "dlq-redis")]
123            Self::Redis(_) => "redis",
124        }
125    }
126}