hyperi_rustlib/dlq/backend.rs
1// Project: hyperi-rustlib
2// File: src/dlq/backend.rs
3// Purpose: DlqBackend enum -- variant per supported backend
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Static enum dispatch for DLQ backends.
10//!
11//! Replaces the previous `#[async_trait] trait DlqBackend` + `Box<dyn>`
12//! shape. Each backend is a concrete variant; the drain matches over
13//! variants. No vtable, no `async_trait` macro, no heap-boxed future.
14//!
15//! See [`super::orchestrator::Dlq`] for usage.
16
17use super::entry::DlqEntry;
18use super::error::DlqError;
19
20/// A DLQ backend. One variant per supported destination.
21///
22/// Variants are feature-gated:
23///
24/// - [`Self::File`] -- always available
25/// - [`Self::Kafka`] -- `dlq-kafka` feature
26/// - [`Self::Http`] -- `dlq-http` feature
27/// - [`Self::Redis`] -- `dlq-redis` feature
28///
29/// Each variant's inner struct lives in its sibling module
30/// (`file::FileDlqInner`, `kafka::KafkaDlqInner`, etc.). They are
31/// crate-private -- consumers configure DLQ via [`super::DlqConfig`] and
32/// drive it via [`super::orchestrator::Dlq`].
33#[non_exhaustive]
34pub enum DlqBackend {
35 /// NDJSON file backend with rotation.
36 File(super::file::FileDlqInner),
37
38 /// Kafka backend.
39 #[cfg(feature = "dlq-kafka")]
40 Kafka(super::kafka::KafkaDlqInner),
41
42 /// HTTP POST backend.
43 #[cfg(feature = "dlq-http")]
44 Http(super::http::HttpDlqInner),
45
46 /// Redis Streams backend.
47 #[cfg(feature = "dlq-redis")]
48 Redis(super::redis_dlq::RedisDlqInner),
49}
50
51impl std::fmt::Debug for DlqBackend {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str("DlqBackend::")?;
54 f.write_str(self.name())
55 }
56}
57
58impl DlqBackend {
59 /// Write a batch of entries to this backend. Called only by the
60 /// orchestrator's drain task -- never from a consumer hot path.
61 ///
62 /// # Errors
63 ///
64 /// Backend-specific. The orchestrator decides whether to cascade,
65 /// fall back, or fan out based on the configured [`super::DlqMode`].
66 pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
67 match self {
68 Self::File(b) => b.send_batch(batch).await,
69 #[cfg(feature = "dlq-kafka")]
70 Self::Kafka(b) => b.send_batch(batch).await,
71 #[cfg(feature = "dlq-http")]
72 Self::Http(b) => b.send_batch(batch).await,
73 #[cfg(feature = "dlq-redis")]
74 Self::Redis(b) => b.send_batch(batch).await,
75 }
76 }
77
78 /// Make every entry written so far DURABLE.
79 ///
80 /// Called from the BackgroundSink barrier handler when a consumer
81 /// invokes [`super::orchestrator::Dlq::flush`]. Each backend honours
82 /// the strongest durability it can express:
83 ///
84 /// - **File**: `flush()` on the rotating writer. `file-rotate`
85 /// doesn't expose the inner `File`, so we can't `fsync()` -- this
86 /// only flushes to the kernel page cache, so power loss before
87 /// write-back can still lose data. Limitation, tracked until
88 /// `file-rotate` exposes a sync hook.
89 /// - **Kafka**: `producer.flush()` -- blocks until every queued
90 /// message is acked by the broker (per the producer's acks
91 /// config). The real durability semantic.
92 /// - **HTTP**: no-op. `send_batch` already awaits the response.
93 /// - **Redis**: no-op. `send_batch` already awaits the XADD pipeline.
94 ///
95 /// # Errors
96 ///
97 /// Backend-specific. Surfaced to the BackgroundSink barrier
98 /// handler which logs and continues.
99 pub async fn flush_durable(&mut self) -> Result<(), DlqError> {
100 match self {
101 Self::File(b) => b.flush_durable().await,
102 #[cfg(feature = "dlq-kafka")]
103 Self::Kafka(b) => b.flush_durable().await,
104 #[cfg(feature = "dlq-http")]
105 Self::Http(_) => Ok(()),
106 #[cfg(feature = "dlq-redis")]
107 Self::Redis(_) => Ok(()),
108 }
109 }
110
111 /// Backend name for log / metric labels.
112 #[must_use]
113 pub fn name(&self) -> &'static str {
114 match self {
115 Self::File(_) => "file",
116 #[cfg(feature = "dlq-kafka")]
117 Self::Kafka(_) => "kafka",
118 #[cfg(feature = "dlq-http")]
119 Self::Http(_) => "http",
120 #[cfg(feature = "dlq-redis")]
121 Self::Redis(_) => "redis",
122 }
123 }
124}