1use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42 BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43 SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52#[derive(Clone)]
59pub struct Dlq {
60 sink: Option<BackgroundSink<DlqEntry>>,
61 join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62 enabled: bool,
63 mode: DlqMode,
64 cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("Dlq")
76 .field("enabled", &self.enabled)
77 .field("mode", &self.mode)
78 .field(
79 "pending",
80 &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81 )
82 .field(
83 "dropped",
84 &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90impl Dlq {
91 #[must_use]
94 pub fn disabled() -> Self {
95 Self {
96 sink: None,
97 join: Arc::new(AsyncMutex::new(None)),
98 enabled: false,
99 mode: DlqMode::default(),
100 cancel: CancellationToken::new(),
101 }
102 }
103
104 pub fn spawn(
116 config: &DlqConfig,
117 service_name: &str,
118 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119 #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120 shutdown: CancellationToken,
121 ) -> Result<Self, DlqError> {
122 if !config.enabled {
123 return Ok(Self::disabled());
124 }
125
126 let backends = build_backends(
127 config,
128 service_name,
129 #[cfg(feature = "dlq-kafka")]
130 kafka_config,
131 )?;
132
133 if backends.is_empty() {
134 warn!("DLQ enabled but no backends configured -- entries will be dropped");
135 return Ok(Self::disabled());
136 }
137
138 let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139 debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141 let drain = DlqDrain {
142 mode: config.mode,
143 backends,
144 };
145
146 let sink_config = BackgroundSinkConfig {
147 queue_capacity: config.queue_capacity,
148 batch_size: config.batch_size,
149 flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150 overflow: Overflow::Drop,
151 metric_prefix: Some("dfe_dlq"),
152 };
153
154 let cancel = shutdown.child_token();
159 let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161 Ok(Self {
162 sink: Some(sink),
163 join: Arc::new(AsyncMutex::new(Some(handle))),
164 enabled: true,
165 mode: config.mode,
166 cancel,
167 })
168 }
169
170 #[must_use]
172 pub fn is_enabled(&self) -> bool {
173 self.enabled
174 }
175
176 #[must_use]
178 pub fn mode(&self) -> DlqMode {
179 self.mode
180 }
181
182 #[must_use]
184 pub fn pending(&self) -> usize {
185 self.sink.as_ref().map_or(0, BackgroundSink::pending)
186 }
187
188 #[must_use]
190 pub fn dropped(&self) -> u64 {
191 self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192 }
193
194 pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204 let Some(sink) = self.sink.as_ref() else {
205 return Ok(());
206 };
207 #[cfg_attr(not(feature = "metrics"), allow(clippy::let_and_return))]
210 let res = sink.try_push(entry).map_err(map_sink_err);
211 #[cfg(feature = "metrics")]
216 if res.is_ok() {
217 metrics::counter!("dfe_dlq_admitted_total").increment(1);
218 metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
219 }
220 res
221 }
222
223 pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
232 let Some(sink) = self.sink.as_ref() else {
233 return Ok(());
234 };
235 #[cfg_attr(not(feature = "metrics"), allow(clippy::let_and_return))]
236 let res = sink.push_blocking(entry).await.map_err(map_sink_err);
237 #[cfg(feature = "metrics")]
239 if res.is_ok() {
240 metrics::counter!("dfe_dlq_admitted_total").increment(1);
241 metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
242 }
243 res
244 }
245
246 pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
253 let Some(sink) = self.sink.as_ref() else {
254 return Ok(());
255 };
256 for entry in entries {
257 sink.push_blocking(entry).await.map_err(map_sink_err)?;
258 #[cfg(feature = "metrics")]
261 metrics::counter!("dfe_dlq_admitted_total").increment(1);
262 }
263 #[cfg(feature = "metrics")]
264 metrics::gauge!("dfe_dlq_queue_depth").set(sink.pending() as f64);
265 Ok(())
266 }
267
268 pub async fn flush(&self) -> Result<(), DlqError> {
276 let Some(sink) = self.sink.as_ref() else {
277 return Ok(());
278 };
279 sink.flush().await.map_err(map_sink_err)
280 }
281
282 pub async fn shutdown(&self) -> Result<(), DlqError> {
294 self.cancel.cancel();
295 let mut guard = self.join.lock().await;
296 let Some(handle) = guard.take() else {
297 return Ok(());
298 };
299 handle
300 .join()
301 .await
302 .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
303 Ok(())
304 }
305}
306
307fn map_sink_err(e: SinkError) -> DlqError {
308 match e {
309 SinkError::Overflow => DlqError::QueueFull,
310 SinkError::Closed => DlqError::Closed,
311 SinkError::Drain(d) => DlqError::File(d.to_string()),
312 }
313}
314
315fn build_backends(
316 config: &DlqConfig,
317 service_name: &str,
318 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
319) -> Result<Vec<DlqBackend>, DlqError> {
320 let mut backends: Vec<DlqBackend> = Vec::new();
321 let mode = config.mode;
322
323 #[cfg(feature = "dlq-kafka")]
325 {
326 let want_kafka = matches!(
327 mode,
328 DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
329 );
330 if want_kafka && config.kafka.enabled {
331 let kc = kafka_config.ok_or_else(|| {
332 DlqError::Kafka(
333 "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
334 )
335 })?;
336 backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
337 kc,
338 &config.kafka,
339 )?));
340 }
341 }
342
343 let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
345 if want_file && config.file.enabled {
346 backends.push(DlqBackend::File(FileDlqInner::new(
347 &config.file,
348 service_name,
349 )?));
350 }
351
352 #[cfg(feature = "dlq-http")]
354 {
355 if config.http.enabled {
356 backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
357 &config.http,
358 )?));
359 }
360 }
361
362 #[cfg(feature = "dlq-redis")]
366 {
367 if config.redis.enabled {
368 let cfg = config.redis.clone();
369 let inner = tokio::task::block_in_place(|| {
370 tokio::runtime::Handle::current()
371 .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
372 })?;
373 backends.push(DlqBackend::Redis(inner));
374 }
375 }
376
377 Ok(backends)
378}
379
380struct DlqDrain {
383 mode: DlqMode,
384 backends: Vec<DlqBackend>,
385}
386
387impl SinkDrain<DlqEntry> for DlqDrain {
388 async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
389 if batch.is_empty() {
390 return Ok(());
391 }
392
393 match self.mode {
394 DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
395 let mut last_err: Option<DlqError> = None;
396 for backend in &mut self.backends {
397 match backend.send_batch(&batch).await {
398 Ok(()) => return Ok(()),
399 Err(e) => {
400 warn!(
401 backend = backend.name(),
402 error = %e,
403 count = batch.len(),
404 "DLQ backend failed in cascade, trying next"
405 );
406 #[cfg(feature = "metrics")]
409 metrics::counter!(
410 "dfe_dlq_retried_total",
411 "backend" => backend.name()
412 )
413 .increment(1);
414 last_err = Some(e);
415 }
416 }
417 }
418 let msg = last_err
419 .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
420 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
421 msg,
422 ))))
423 }
424 DlqMode::FanOut => {
425 let mut any_ok = false;
426 let mut errs: Vec<String> = Vec::new();
427 for backend in &mut self.backends {
428 match backend.send_batch(&batch).await {
429 Ok(()) => any_ok = true,
430 Err(e) => {
431 warn!(
432 backend = backend.name(),
433 error = %e,
434 count = batch.len(),
435 "DLQ backend failed in fan-out"
436 );
437 errs.push(format!("{}:{}", backend.name(), e));
438 }
439 }
440 }
441 if any_ok {
442 Ok(())
443 } else {
444 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
445 errs.join("; "),
446 ))))
447 }
448 }
449 }
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::dlq::config::{FileDlqConfig, RotationPeriod};
457 use crate::dlq::entry::DlqSource;
458
459 fn tmp_config(dir: &std::path::Path) -> DlqConfig {
460 DlqConfig {
461 file: FileDlqConfig {
462 enabled: true,
463 path: dir.to_path_buf(),
464 rotation: RotationPeriod::Daily,
465 max_age_days: 1,
466 compress_rotated: false,
467 },
468 mode: DlqMode::FileOnly,
469 queue_capacity: 1024,
470 batch_size: 16,
471 flush_interval_ms: 20,
472 ..DlqConfig::default()
473 }
474 }
475
476 fn test_entry(reason: &str) -> DlqEntry {
477 DlqEntry::new("test", reason, b"payload".to_vec())
478 .with_destination("acme.auth")
479 .with_source(DlqSource::kafka("events", 1, 42))
480 }
481
482 #[tokio::test]
483 async fn disabled_dlq_accepts_silently() {
484 let dlq = Dlq::disabled();
485 dlq.send(test_entry("err")).await.expect("noop");
486 dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
487 dlq.flush().await.expect("noop flush");
488 dlq.shutdown().await.expect("noop shutdown");
489 }
490
491 #[tokio::test]
492 async fn file_only_writes_and_flushes() {
493 let dir = tempfile::tempdir().expect("tempdir");
494 let shutdown = CancellationToken::new();
495 let dlq = Dlq::spawn(
496 &tmp_config(dir.path()),
497 "svc",
498 #[cfg(feature = "dlq-kafka")]
499 None,
500 #[cfg(not(feature = "dlq-kafka"))]
501 None,
502 shutdown.clone(),
503 )
504 .expect("spawn");
505
506 for i in 0..5 {
507 dlq.send(test_entry(&format!("err_{i}")))
508 .await
509 .expect("send");
510 }
511 dlq.flush().await.expect("flush");
512
513 let path = dir.path().join("svc/dlq.ndjson");
514 let body = std::fs::read_to_string(&path).expect("read");
515 let lines: Vec<&str> = body.trim().lines().collect();
516 assert_eq!(lines.len(), 5);
517
518 shutdown.cancel();
519 dlq.shutdown().await.expect("clean shutdown");
520 }
521
522 #[tokio::test]
523 async fn try_send_returns_queue_full_when_saturated() {
524 let dir = tempfile::tempdir().expect("tempdir");
525 let mut cfg = tmp_config(dir.path());
526 cfg.queue_capacity = 2;
527 cfg.batch_size = 1024;
528 cfg.flush_interval_ms = 60_000; let shutdown = CancellationToken::new();
530 let dlq = Dlq::spawn(
531 &cfg,
532 "svc",
533 #[cfg(feature = "dlq-kafka")]
534 None,
535 #[cfg(not(feature = "dlq-kafka"))]
536 None,
537 shutdown.clone(),
538 )
539 .expect("spawn");
540
541 let mut full_count = 0;
542 for i in 0..50 {
543 if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
544 full_count += 1;
545 }
546 }
547 assert!(full_count > 0, "expected at least one QueueFull");
548 shutdown.cancel();
549 }
550
551 #[tokio::test]
552 async fn dlq_clone_shares_state() {
553 let dir = tempfile::tempdir().expect("tempdir");
554 let shutdown = CancellationToken::new();
555 let dlq = Dlq::spawn(
556 &tmp_config(dir.path()),
557 "svc",
558 #[cfg(feature = "dlq-kafka")]
559 None,
560 #[cfg(not(feature = "dlq-kafka"))]
561 None,
562 shutdown.clone(),
563 )
564 .expect("spawn");
565
566 let dlq2 = dlq.clone();
567 dlq.send(test_entry("a")).await.expect("send a");
568 dlq2.send(test_entry("b")).await.expect("send b");
569 dlq.flush().await.expect("flush");
570
571 let path = dir.path().join("svc/dlq.ndjson");
572 let body = std::fs::read_to_string(&path).expect("read");
573 assert_eq!(body.trim().lines().count(), 2);
574
575 shutdown.cancel();
576 dlq.shutdown().await.expect("shutdown");
577 }
578}