1use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42 BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43 SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52#[derive(Clone)]
59pub struct Dlq {
60 sink: Option<BackgroundSink<DlqEntry>>,
61 join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62 enabled: bool,
63 mode: DlqMode,
64 cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("Dlq")
76 .field("enabled", &self.enabled)
77 .field("mode", &self.mode)
78 .field(
79 "pending",
80 &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81 )
82 .field(
83 "dropped",
84 &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90impl Dlq {
91 #[must_use]
94 pub fn disabled() -> Self {
95 Self {
96 sink: None,
97 join: Arc::new(AsyncMutex::new(None)),
98 enabled: false,
99 mode: DlqMode::default(),
100 cancel: CancellationToken::new(),
101 }
102 }
103
104 pub fn spawn(
116 config: &DlqConfig,
117 service_name: &str,
118 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119 #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120 shutdown: CancellationToken,
121 ) -> Result<Self, DlqError> {
122 if !config.enabled {
123 return Ok(Self::disabled());
124 }
125
126 let backends = build_backends(
127 config,
128 service_name,
129 #[cfg(feature = "dlq-kafka")]
130 kafka_config,
131 )?;
132
133 if backends.is_empty() {
134 warn!("DLQ enabled but no backends configured -- entries will be dropped");
135 return Ok(Self::disabled());
136 }
137
138 let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139 debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141 let drain = DlqDrain {
142 mode: config.mode,
143 backends,
144 };
145
146 let sink_config = BackgroundSinkConfig {
147 queue_capacity: config.queue_capacity,
148 batch_size: config.batch_size,
149 flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150 overflow: Overflow::Drop,
151 metric_prefix: Some("dfe_dlq"),
152 };
153
154 let cancel = shutdown.child_token();
159 let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161 Ok(Self {
162 sink: Some(sink),
163 join: Arc::new(AsyncMutex::new(Some(handle))),
164 enabled: true,
165 mode: config.mode,
166 cancel,
167 })
168 }
169
170 #[must_use]
172 pub fn is_enabled(&self) -> bool {
173 self.enabled
174 }
175
176 #[must_use]
178 pub fn mode(&self) -> DlqMode {
179 self.mode
180 }
181
182 #[must_use]
184 pub fn pending(&self) -> usize {
185 self.sink.as_ref().map_or(0, BackgroundSink::pending)
186 }
187
188 #[must_use]
190 pub fn dropped(&self) -> u64 {
191 self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192 }
193
194 pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204 let Some(sink) = self.sink.as_ref() else {
205 return Ok(());
206 };
207 sink.try_push(entry).map_err(map_sink_err)
208 }
209
210 pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
219 let Some(sink) = self.sink.as_ref() else {
220 return Ok(());
221 };
222 sink.push_blocking(entry).await.map_err(map_sink_err)
223 }
224
225 pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
232 let Some(sink) = self.sink.as_ref() else {
233 return Ok(());
234 };
235 for entry in entries {
236 sink.push_blocking(entry).await.map_err(map_sink_err)?;
237 }
238 Ok(())
239 }
240
241 pub async fn flush(&self) -> Result<(), DlqError> {
249 let Some(sink) = self.sink.as_ref() else {
250 return Ok(());
251 };
252 sink.flush().await.map_err(map_sink_err)
253 }
254
255 pub async fn shutdown(&self) -> Result<(), DlqError> {
267 self.cancel.cancel();
268 let mut guard = self.join.lock().await;
269 let Some(handle) = guard.take() else {
270 return Ok(());
271 };
272 handle
273 .join()
274 .await
275 .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
276 Ok(())
277 }
278}
279
280fn map_sink_err(e: SinkError) -> DlqError {
281 match e {
282 SinkError::Overflow => DlqError::QueueFull,
283 SinkError::Closed => DlqError::Closed,
284 SinkError::Drain(d) => DlqError::File(d.to_string()),
285 }
286}
287
288fn build_backends(
289 config: &DlqConfig,
290 service_name: &str,
291 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
292) -> Result<Vec<DlqBackend>, DlqError> {
293 let mut backends: Vec<DlqBackend> = Vec::new();
294 let mode = config.mode;
295
296 #[cfg(feature = "dlq-kafka")]
298 {
299 let want_kafka = matches!(
300 mode,
301 DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
302 );
303 if want_kafka && config.kafka.enabled {
304 let kc = kafka_config.ok_or_else(|| {
305 DlqError::Kafka(
306 "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
307 )
308 })?;
309 backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
310 kc,
311 &config.kafka,
312 )?));
313 }
314 }
315
316 let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
318 if want_file && config.file.enabled {
319 backends.push(DlqBackend::File(FileDlqInner::new(
320 &config.file,
321 service_name,
322 )?));
323 }
324
325 #[cfg(feature = "dlq-http")]
327 {
328 if config.http.enabled {
329 backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
330 &config.http,
331 )?));
332 }
333 }
334
335 #[cfg(feature = "dlq-redis")]
339 {
340 if config.redis.enabled {
341 let cfg = config.redis.clone();
342 let inner = tokio::task::block_in_place(|| {
343 tokio::runtime::Handle::current()
344 .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
345 })?;
346 backends.push(DlqBackend::Redis(inner));
347 }
348 }
349
350 Ok(backends)
351}
352
353struct DlqDrain {
356 mode: DlqMode,
357 backends: Vec<DlqBackend>,
358}
359
360impl SinkDrain<DlqEntry> for DlqDrain {
361 async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
362 if batch.is_empty() {
363 return Ok(());
364 }
365
366 match self.mode {
367 DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
368 let mut last_err: Option<DlqError> = None;
369 for backend in &mut self.backends {
370 match backend.send_batch(&batch).await {
371 Ok(()) => return Ok(()),
372 Err(e) => {
373 warn!(
374 backend = backend.name(),
375 error = %e,
376 count = batch.len(),
377 "DLQ backend failed in cascade, trying next"
378 );
379 last_err = Some(e);
380 }
381 }
382 }
383 let msg = last_err
384 .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
385 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
386 msg,
387 ))))
388 }
389 DlqMode::FanOut => {
390 let mut any_ok = false;
391 let mut errs: Vec<String> = Vec::new();
392 for backend in &mut self.backends {
393 match backend.send_batch(&batch).await {
394 Ok(()) => any_ok = true,
395 Err(e) => {
396 warn!(
397 backend = backend.name(),
398 error = %e,
399 count = batch.len(),
400 "DLQ backend failed in fan-out"
401 );
402 errs.push(format!("{}:{}", backend.name(), e));
403 }
404 }
405 }
406 if any_ok {
407 Ok(())
408 } else {
409 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
410 errs.join("; "),
411 ))))
412 }
413 }
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::dlq::config::{FileDlqConfig, RotationPeriod};
422 use crate::dlq::entry::DlqSource;
423
424 fn tmp_config(dir: &std::path::Path) -> DlqConfig {
425 DlqConfig {
426 file: FileDlqConfig {
427 enabled: true,
428 path: dir.to_path_buf(),
429 rotation: RotationPeriod::Daily,
430 max_age_days: 1,
431 compress_rotated: false,
432 },
433 mode: DlqMode::FileOnly,
434 queue_capacity: 1024,
435 batch_size: 16,
436 flush_interval_ms: 20,
437 ..DlqConfig::default()
438 }
439 }
440
441 fn test_entry(reason: &str) -> DlqEntry {
442 DlqEntry::new("test", reason, b"payload".to_vec())
443 .with_destination("acme.auth")
444 .with_source(DlqSource::kafka("events", 1, 42))
445 }
446
447 #[tokio::test]
448 async fn disabled_dlq_accepts_silently() {
449 let dlq = Dlq::disabled();
450 dlq.send(test_entry("err")).await.expect("noop");
451 dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
452 dlq.flush().await.expect("noop flush");
453 dlq.shutdown().await.expect("noop shutdown");
454 }
455
456 #[tokio::test]
457 async fn file_only_writes_and_flushes() {
458 let dir = tempfile::tempdir().expect("tempdir");
459 let shutdown = CancellationToken::new();
460 let dlq = Dlq::spawn(
461 &tmp_config(dir.path()),
462 "svc",
463 #[cfg(feature = "dlq-kafka")]
464 None,
465 #[cfg(not(feature = "dlq-kafka"))]
466 None,
467 shutdown.clone(),
468 )
469 .expect("spawn");
470
471 for i in 0..5 {
472 dlq.send(test_entry(&format!("err_{i}")))
473 .await
474 .expect("send");
475 }
476 dlq.flush().await.expect("flush");
477
478 let path = dir.path().join("svc/dlq.ndjson");
479 let body = std::fs::read_to_string(&path).expect("read");
480 let lines: Vec<&str> = body.trim().lines().collect();
481 assert_eq!(lines.len(), 5);
482
483 shutdown.cancel();
484 dlq.shutdown().await.expect("clean shutdown");
485 }
486
487 #[tokio::test]
488 async fn try_send_returns_queue_full_when_saturated() {
489 let dir = tempfile::tempdir().expect("tempdir");
490 let mut cfg = tmp_config(dir.path());
491 cfg.queue_capacity = 2;
492 cfg.batch_size = 1024;
493 cfg.flush_interval_ms = 60_000; let shutdown = CancellationToken::new();
495 let dlq = Dlq::spawn(
496 &cfg,
497 "svc",
498 #[cfg(feature = "dlq-kafka")]
499 None,
500 #[cfg(not(feature = "dlq-kafka"))]
501 None,
502 shutdown.clone(),
503 )
504 .expect("spawn");
505
506 let mut full_count = 0;
507 for i in 0..50 {
508 if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
509 full_count += 1;
510 }
511 }
512 assert!(full_count > 0, "expected at least one QueueFull");
513 shutdown.cancel();
514 }
515
516 #[tokio::test]
517 async fn dlq_clone_shares_state() {
518 let dir = tempfile::tempdir().expect("tempdir");
519 let shutdown = CancellationToken::new();
520 let dlq = Dlq::spawn(
521 &tmp_config(dir.path()),
522 "svc",
523 #[cfg(feature = "dlq-kafka")]
524 None,
525 #[cfg(not(feature = "dlq-kafka"))]
526 None,
527 shutdown.clone(),
528 )
529 .expect("spawn");
530
531 let dlq2 = dlq.clone();
532 dlq.send(test_entry("a")).await.expect("send a");
533 dlq2.send(test_entry("b")).await.expect("send b");
534 dlq.flush().await.expect("flush");
535
536 let path = dir.path().join("svc/dlq.ndjson");
537 let body = std::fs::read_to_string(&path).expect("read");
538 assert_eq!(body.trim().lines().count(), 2);
539
540 shutdown.cancel();
541 dlq.shutdown().await.expect("shutdown");
542 }
543}