hyperi_rustlib/dlq/backend.rs
1// Project: hyperi-rustlib
2// File: src/dlq/backend.rs
3// Purpose: DlqBackend enum -- variant per supported backend
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Static enum dispatch for DLQ backends.
10//!
11//! Replaces the previous `#[async_trait] trait DlqBackend` + `Box<dyn>`
12//! shape. Each backend is a concrete variant; the drain matches over
13//! variants. No vtable, no `async_trait` macro, no heap-boxed future.
14//!
15//! See [`super::orchestrator::Dlq`] for usage.
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20/// A DLQ backend. One variant per supported destination.
21///
22/// Variants are feature-gated:
23///
24/// - [`Self::File`] -- always available
25/// - [`Self::Kafka`] -- `dlq-kafka` feature
26/// - [`Self::Http`] -- `dlq-http` feature
27/// - [`Self::Redis`] -- `dlq-redis` feature
28///
29/// Each variant's inner struct lives in its sibling module
30/// (`file::FileDlqInner`, `kafka::KafkaDlqInner`, etc.). They are
31/// crate-private -- consumers configure DLQ via [`super::DlqConfig`] and
32/// drive it via [`super::orchestrator::Dlq`].
33#[non_exhaustive]
34pub enum DlqBackend {
35 /// NDJSON file backend with rotation.
36 File(super::file::FileDlqInner),
37
38 /// Kafka backend.
39 #[cfg(feature = "dlq-kafka")]
40 Kafka(super::kafka::KafkaDlqInner),
41
42 /// HTTP POST backend.
43 #[cfg(feature = "dlq-http")]
44 Http(super::http::HttpDlqInner),
45
46 /// Redis Streams backend.
47 #[cfg(feature = "dlq-redis")]
48 Redis(super::redis_dlq::RedisDlqInner),
49}
50
51impl std::fmt::Debug for DlqBackend {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str("DlqBackend::")?;
54 f.write_str(self.name())
55 }
56}
57
58impl DlqBackend {
59 /// Write a batch of entries to this backend. Called only by the
60 /// orchestrator's drain task -- never from a consumer hot path.
61 ///
62 /// # Errors
63 ///
64 /// Backend-specific. The orchestrator decides whether to cascade,
65 /// fall back, or fan out based on the configured [`super::DlqMode`].
66 pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
67 match self {
68 Self::File(b) => b.send_batch(batch).await,
69 #[cfg(feature = "dlq-kafka")]
70 Self::Kafka(b) => b.send_batch(batch).await,
71 #[cfg(feature = "dlq-http")]
72 Self::Http(b) => b.send_batch(batch).await,
73 #[cfg(feature = "dlq-redis")]
74 Self::Redis(b) => b.send_batch(batch).await,
75 }
76 }
77
78 /// Make every entry written so far DURABLE.
79 ///
80 /// Called from the BackgroundSink barrier handler when a consumer
81 /// invokes [`super::orchestrator::Dlq::flush`]. Each backend honours
82 /// the strongest durability it can express:
83 ///
84 /// - **File**: `flush()` on the rotating writer. The underlying
85 /// `file-rotate` crate doesn't expose the inner `File` handle, so
86 /// we can't `fsync()` from here -- `flush()` only guarantees that
87 /// buffered bytes have been handed to the kernel page cache. A
88 /// power-loss event between page-cache and disk can still lose
89 /// data. Documented limitation; tracked separately if/when
90 /// `file-rotate` exposes a sync hook.
91 /// - **Kafka**: `producer.flush()` -- blocks until every queued
92 /// message has been acked by the broker (per the producer's
93 /// acks config). This is the real durability semantic.
94 /// - **HTTP**: no-op. `send_batch` already awaits the response.
95 /// - **Redis**: no-op. `send_batch` already awaits the XADD pipeline.
96 ///
97 /// # Errors
98 ///
99 /// Backend-specific. Surfaced to the BackgroundSink barrier
100 /// handler which logs and continues.
101 pub async fn flush_durable(&mut self) -> Result<(), DlqError> {
102 match self {
103 Self::File(b) => b.flush_durable().await,
104 #[cfg(feature = "dlq-kafka")]
105 Self::Kafka(b) => b.flush_durable().await,
106 #[cfg(feature = "dlq-http")]
107 Self::Http(_) => Ok(()),
108 #[cfg(feature = "dlq-redis")]
109 Self::Redis(_) => Ok(()),
110 }
111 }
112
113 /// Backend name for log / metric labels.
114 #[must_use]
115 pub fn name(&self) -> &'static str {
116 match self {
117 Self::File(_) => "file",
118 #[cfg(feature = "dlq-kafka")]
119 Self::Kafka(_) => "kafka",
120 #[cfg(feature = "dlq-http")]
121 Self::Http(_) => "http",
122 #[cfg(feature = "dlq-redis")]
123 Self::Redis(_) => "redis",
124 }
125 }
126}