1use std::path::PathBuf;
32
33use serde::{Deserialize, Serialize};
34
35use crate::io::FileWriterConfig;
38pub use crate::io::RotationPeriod;
39
40#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub enum DlqMode {
44 #[default]
47 Cascade,
48
49 FanOut,
51
52 FileOnly,
54
55 KafkaOnly,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(default)]
62pub struct DlqConfig {
63 pub enabled: bool,
65
66 pub mode: DlqMode,
68
69 pub queue_capacity: usize,
73
74 pub batch_size: usize,
77
78 pub flush_interval_ms: u64,
81
82 pub file: FileDlqConfig,
84
85 #[cfg(feature = "dlq-kafka")]
87 pub kafka: KafkaDlqConfig,
88
89 #[cfg(feature = "dlq-http")]
91 pub http: super::http::HttpDlqConfig,
92
93 #[cfg(feature = "dlq-redis")]
95 pub redis: super::redis_dlq::RedisDlqConfig,
96}
97
98impl Default for DlqConfig {
99 fn default() -> Self {
100 Self {
101 enabled: true,
102 mode: DlqMode::default(),
103 queue_capacity: 10_000,
104 batch_size: 256,
105 flush_interval_ms: 100,
106 file: FileDlqConfig::default(),
107 #[cfg(feature = "dlq-kafka")]
108 kafka: KafkaDlqConfig::default(),
109 #[cfg(feature = "dlq-http")]
110 http: super::http::HttpDlqConfig::default(),
111 #[cfg(feature = "dlq-redis")]
112 redis: super::redis_dlq::RedisDlqConfig::default(),
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
121#[serde(default)]
122pub struct FileDlqConfig {
123 pub enabled: bool,
125
126 pub path: PathBuf,
129
130 pub rotation: RotationPeriod,
132
133 pub max_age_days: u32,
135
136 pub compress_rotated: bool,
138}
139
140impl Default for FileDlqConfig {
141 fn default() -> Self {
142 Self {
143 enabled: true,
144 path: PathBuf::from("/var/spool/dfe/dlq"),
145 rotation: RotationPeriod::default(),
146 max_age_days: 30,
147 compress_rotated: true,
148 }
149 }
150}
151
152impl FileDlqConfig {
153 #[must_use]
155 pub fn to_writer_config(&self) -> FileWriterConfig {
156 FileWriterConfig {
157 path: self.path.clone(),
158 rotation: self.rotation,
159 max_age_days: self.max_age_days,
160 compress_rotated: self.compress_rotated,
161 }
162 }
163}
164
165#[cfg(feature = "dlq-kafka")]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168#[serde(default)]
169pub struct KafkaDlqConfig {
170 pub enabled: bool,
172
173 pub routing: DlqRouting,
175
176 pub topic_suffix: String,
178
179 pub common_topic: String,
181
182 pub send_timeout_ms: u64,
184}
185
186#[cfg(feature = "dlq-kafka")]
187impl Default for KafkaDlqConfig {
188 fn default() -> Self {
189 Self {
190 enabled: true,
191 routing: DlqRouting::default(),
192 topic_suffix: ".dlq".to_string(),
193 common_topic: "dfe.dlq".to_string(),
194 send_timeout_ms: 5000,
195 }
196 }
197}
198
199#[cfg(feature = "dlq-kafka")]
201#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub enum DlqRouting {
204 #[default]
207 PerTable,
208
209 Common,
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn test_config_defaults() {
219 let config = DlqConfig::default();
220 assert!(config.enabled);
221 assert_eq!(config.mode, DlqMode::Cascade);
222 assert!(config.file.enabled);
223 assert_eq!(config.file.max_age_days, 30);
224 assert!(config.file.compress_rotated);
225 assert_eq!(config.file.rotation, RotationPeriod::Hourly);
226 }
227
228 #[test]
229 fn test_config_serde_roundtrip() {
230 let config = DlqConfig {
231 mode: DlqMode::FanOut,
232 file: FileDlqConfig {
233 enabled: true,
234 path: "/tmp/test-dlq".into(),
235 rotation: RotationPeriod::Daily,
236 max_age_days: 7,
237 compress_rotated: false,
238 },
239 queue_capacity: 50_000,
240 batch_size: 128,
241 flush_interval_ms: 250,
242 ..DlqConfig::default()
243 };
244 let json = serde_json::to_string(&config).expect("serialise");
245 let parsed: DlqConfig = serde_json::from_str(&json).expect("deserialise");
246 assert_eq!(parsed.mode, DlqMode::FanOut);
247 assert_eq!(parsed.file.rotation, RotationPeriod::Daily);
248 assert_eq!(parsed.file.max_age_days, 7);
249 assert_eq!(parsed.queue_capacity, 50_000);
250 assert_eq!(parsed.batch_size, 128);
251 assert_eq!(parsed.flush_interval_ms, 250);
252 }
253
254 #[test]
255 fn test_dlq_mode_serde() {
256 let json = r#""cascade""#;
257 let mode: DlqMode = serde_json::from_str(json).expect("deserialise");
258 assert_eq!(mode, DlqMode::Cascade);
259
260 let json = r#""fan_out""#;
261 let mode: DlqMode = serde_json::from_str(json).expect("deserialise");
262 assert_eq!(mode, DlqMode::FanOut);
263 }
264}