Skip to main content

hyperi_rustlib/dlq/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/config.rs
3// Purpose:   DLQ configuration types
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Configuration for the DLQ module.
10//!
11//! Supports file-based and Kafka-based backends with cascade or fan-out modes.
12//!
13//! ## Config Cascade Example
14//!
15//! ```yaml
16//! dlq:
17//!   mode: cascade
18//!   file:
19//!     enabled: true
20//!     path: /var/spool/dfe/dlq
21//!     rotation: hourly
22//!     max_age_days: 30
23//!     compress_rotated: true
24//!   kafka:
25//!     enabled: true
26//!     routing: per_table
27//!     topic_suffix: .dlq
28//!     common_topic: dfe.dlq
29//! ```
30
31use std::path::PathBuf;
32
33use serde::{Deserialize, Serialize};
34
35// Re-export RotationPeriod from the shared io module so existing consumers
36// of `dlq::RotationPeriod` continue to work without changes.
37use crate::io::FileWriterConfig;
38pub use crate::io::RotationPeriod;
39
40/// How backends are used when multiple are enabled.
41#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub enum DlqMode {
44    /// Try backends in order; stop on first success.
45    /// Default order: Kafka first, file fallback.
46    #[default]
47    Cascade,
48
49    /// Write to all enabled backends; report any failures.
50    FanOut,
51
52    /// File backend only (no Kafka dependency).
53    FileOnly,
54
55    /// Kafka backend only (current dfe-loader behaviour).
56    KafkaOnly,
57}
58
59/// Top-level DLQ configuration.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(default)]
62pub struct DlqConfig {
63    /// Whether DLQ is enabled.
64    pub enabled: bool,
65
66    /// Backend routing mode.
67    pub mode: DlqMode,
68
69    /// Bounded mpsc capacity. When the queue is full, `try_send` returns
70    /// `QueueFull` (overflow=Drop). Sized for failure-burst tolerance.
71    /// Default 10_000.
72    pub queue_capacity: usize,
73
74    /// Drain coalesces up to this many entries into one backend write.
75    /// Default 256.
76    pub batch_size: usize,
77
78    /// Flush a partial batch after this duration, even if not full.
79    /// Default 100 ms.
80    pub flush_interval_ms: u64,
81
82    /// File backend configuration.
83    pub file: FileDlqConfig,
84
85    /// Kafka backend configuration.
86    #[cfg(feature = "dlq-kafka")]
87    pub kafka: KafkaDlqConfig,
88
89    /// HTTP backend configuration.
90    #[cfg(feature = "dlq-http")]
91    pub http: super::http::HttpDlqConfig,
92
93    /// Redis backend configuration.
94    #[cfg(feature = "dlq-redis")]
95    pub redis: super::redis_dlq::RedisDlqConfig,
96}
97
98impl Default for DlqConfig {
99    fn default() -> Self {
100        Self {
101            enabled: true,
102            mode: DlqMode::default(),
103            queue_capacity: 10_000,
104            batch_size: 256,
105            flush_interval_ms: 100,
106            file: FileDlqConfig::default(),
107            #[cfg(feature = "dlq-kafka")]
108            kafka: KafkaDlqConfig::default(),
109            #[cfg(feature = "dlq-http")]
110            http: super::http::HttpDlqConfig::default(),
111            #[cfg(feature = "dlq-redis")]
112            redis: super::redis_dlq::RedisDlqConfig::default(),
113        }
114    }
115}
116
117/// File-based DLQ configuration.
118///
119/// Writes NDJSON files with automatic rotation and cleanup.
120#[derive(Debug, Clone, Serialize, Deserialize)]
121#[serde(default)]
122pub struct FileDlqConfig {
123    /// Enable the file backend.
124    pub enabled: bool,
125
126    /// Base directory for DLQ files.
127    /// Service name is appended as a subdirectory.
128    pub path: PathBuf,
129
130    /// File rotation period.
131    pub rotation: RotationPeriod,
132
133    /// Auto-cleanup files older than this many days.
134    pub max_age_days: u32,
135
136    /// Compress rotated files with flate2/gzip.
137    pub compress_rotated: bool,
138}
139
140impl Default for FileDlqConfig {
141    fn default() -> Self {
142        Self {
143            enabled: true,
144            path: PathBuf::from("/var/spool/dfe/dlq"),
145            rotation: RotationPeriod::default(),
146            max_age_days: 30,
147            compress_rotated: true,
148        }
149    }
150}
151
152impl FileDlqConfig {
153    /// Convert to the shared `FileWriterConfig` for use with `NdjsonWriter`.
154    #[must_use]
155    pub fn to_writer_config(&self) -> FileWriterConfig {
156        FileWriterConfig {
157            path: self.path.clone(),
158            rotation: self.rotation,
159            max_age_days: self.max_age_days,
160            compress_rotated: self.compress_rotated,
161        }
162    }
163}
164
165/// Kafka-based DLQ configuration.
166#[cfg(feature = "dlq-kafka")]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168#[serde(default)]
169pub struct KafkaDlqConfig {
170    /// Enable the Kafka backend.
171    pub enabled: bool,
172
173    /// Topic routing strategy.
174    pub routing: DlqRouting,
175
176    /// Suffix appended to destination for per-table routing.
177    pub topic_suffix: String,
178
179    /// Common topic when routing is `Common` or destination is unknown.
180    pub common_topic: String,
181
182    /// Send timeout in milliseconds.
183    pub send_timeout_ms: u64,
184}
185
186#[cfg(feature = "dlq-kafka")]
187impl Default for KafkaDlqConfig {
188    fn default() -> Self {
189        Self {
190            enabled: true,
191            routing: DlqRouting::default(),
192            topic_suffix: ".dlq".to_string(),
193            common_topic: "dfe.dlq".to_string(),
194            send_timeout_ms: 5000,
195        }
196    }
197}
198
199/// Kafka DLQ topic routing strategy.
200#[cfg(feature = "dlq-kafka")]
201#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub enum DlqRouting {
204    /// Route to topic matching destination with suffix.
205    /// e.g. "acme.auth" → "acme.auth.dlq"
206    #[default]
207    PerTable,
208
209    /// Route all failures to a single common topic.
210    Common,
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn test_config_defaults() {
219        let config = DlqConfig::default();
220        assert!(config.enabled);
221        assert_eq!(config.mode, DlqMode::Cascade);
222        assert!(config.file.enabled);
223        assert_eq!(config.file.max_age_days, 30);
224        assert!(config.file.compress_rotated);
225        assert_eq!(config.file.rotation, RotationPeriod::Hourly);
226    }
227
228    #[test]
229    fn test_config_serde_roundtrip() {
230        let config = DlqConfig {
231            mode: DlqMode::FanOut,
232            file: FileDlqConfig {
233                enabled: true,
234                path: "/tmp/test-dlq".into(),
235                rotation: RotationPeriod::Daily,
236                max_age_days: 7,
237                compress_rotated: false,
238            },
239            queue_capacity: 50_000,
240            batch_size: 128,
241            flush_interval_ms: 250,
242            ..DlqConfig::default()
243        };
244        let json = serde_json::to_string(&config).expect("serialise");
245        let parsed: DlqConfig = serde_json::from_str(&json).expect("deserialise");
246        assert_eq!(parsed.mode, DlqMode::FanOut);
247        assert_eq!(parsed.file.rotation, RotationPeriod::Daily);
248        assert_eq!(parsed.file.max_age_days, 7);
249        assert_eq!(parsed.queue_capacity, 50_000);
250        assert_eq!(parsed.batch_size, 128);
251        assert_eq!(parsed.flush_interval_ms, 250);
252    }
253
254    #[test]
255    fn test_dlq_mode_serde() {
256        let json = r#""cascade""#;
257        let mode: DlqMode = serde_json::from_str(json).expect("deserialise");
258        assert_eq!(mode, DlqMode::Cascade);
259
260        let json = r#""fan_out""#;
261        let mode: DlqMode = serde_json::from_str(json).expect("deserialise");
262        assert_eq!(mode, DlqMode::FanOut);
263    }
264}