1use std::sync::Arc;
36
37use tokio::sync::Mutex as AsyncMutex;
38use tokio_util::sync::CancellationToken;
39use tracing::{debug, warn};
40
41use crate::concurrency::{
42 BackgroundSink, BackgroundSinkConfig, BackgroundSinkHandle, DrainError, Overflow, SinkDrain,
43 SinkError,
44};
45
46use super::backend::DlqBackend;
47use super::config::{DlqConfig, DlqMode};
48use super::entry::DlqEntry;
49use super::error::DlqError;
50use super::file::FileDlqInner;
51
52#[derive(Clone)]
59pub struct Dlq {
60 sink: Option<BackgroundSink<DlqEntry>>,
61 join: Arc<AsyncMutex<Option<BackgroundSinkHandle>>>,
62 enabled: bool,
63 mode: DlqMode,
64 cancel: CancellationToken,
71}
72
73impl std::fmt::Debug for Dlq {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("Dlq")
76 .field("enabled", &self.enabled)
77 .field("mode", &self.mode)
78 .field(
79 "pending",
80 &self.sink.as_ref().map_or(0, BackgroundSink::pending),
81 )
82 .field(
83 "dropped",
84 &self.sink.as_ref().map_or(0, BackgroundSink::dropped),
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90impl Dlq {
91 #[must_use]
94 pub fn disabled() -> Self {
95 Self {
96 sink: None,
97 join: Arc::new(AsyncMutex::new(None)),
98 enabled: false,
99 mode: DlqMode::default(),
100 cancel: CancellationToken::new(),
101 }
102 }
103
104 pub fn spawn(
116 config: &DlqConfig,
117 service_name: &str,
118 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
119 #[cfg(not(feature = "dlq-kafka"))] _kafka_config: Option<&()>,
120 shutdown: CancellationToken,
121 ) -> Result<Self, DlqError> {
122 if !config.enabled {
123 return Ok(Self::disabled());
124 }
125
126 let backends = build_backends(
127 config,
128 service_name,
129 #[cfg(feature = "dlq-kafka")]
130 kafka_config,
131 )?;
132
133 if backends.is_empty() {
134 warn!("DLQ enabled but no backends configured -- entries will be dropped");
135 return Ok(Self::disabled());
136 }
137
138 let names: Vec<&'static str> = backends.iter().map(DlqBackend::name).collect();
139 debug!(mode = ?config.mode, backends = ?names, "DLQ initialised");
140
141 let drain = DlqDrain {
142 mode: config.mode,
143 backends,
144 };
145
146 let sink_config = BackgroundSinkConfig {
147 queue_capacity: config.queue_capacity,
148 batch_size: config.batch_size,
149 flush_interval: std::time::Duration::from_millis(config.flush_interval_ms),
150 overflow: Overflow::Drop,
151 metric_prefix: Some("dfe_dlq"),
152 };
153
154 let cancel = shutdown.child_token();
159 let (sink, handle) = BackgroundSink::spawn(drain, sink_config, cancel.clone());
160
161 Ok(Self {
162 sink: Some(sink),
163 join: Arc::new(AsyncMutex::new(Some(handle))),
164 enabled: true,
165 mode: config.mode,
166 cancel,
167 })
168 }
169
170 #[must_use]
172 pub fn is_enabled(&self) -> bool {
173 self.enabled
174 }
175
176 #[must_use]
178 pub fn mode(&self) -> DlqMode {
179 self.mode
180 }
181
182 #[must_use]
184 pub fn pending(&self) -> usize {
185 self.sink.as_ref().map_or(0, BackgroundSink::pending)
186 }
187
188 #[must_use]
190 pub fn dropped(&self) -> u64 {
191 self.sink.as_ref().map_or(0, BackgroundSink::dropped)
192 }
193
194 pub fn try_send(&self, entry: DlqEntry) -> Result<(), DlqError> {
204 let Some(sink) = self.sink.as_ref() else {
205 return Ok(());
206 };
207 sink.try_push(entry).map_err(map_sink_err)
208 }
209
210 pub async fn send(&self, entry: DlqEntry) -> Result<(), DlqError> {
219 let Some(sink) = self.sink.as_ref() else {
220 return Ok(());
221 };
222 sink.push_blocking(entry).await.map_err(map_sink_err)
223 }
224
225 pub async fn send_batch(&self, entries: Vec<DlqEntry>) -> Result<(), DlqError> {
232 let Some(sink) = self.sink.as_ref() else {
233 return Ok(());
234 };
235 for entry in entries {
236 sink.push_blocking(entry).await.map_err(map_sink_err)?;
237 }
238 Ok(())
239 }
240
241 pub async fn flush(&self) -> Result<(), DlqError> {
249 let Some(sink) = self.sink.as_ref() else {
250 return Ok(());
251 };
252 sink.flush().await.map_err(map_sink_err)
253 }
254
255 pub async fn shutdown(&self) -> Result<(), DlqError> {
271 self.cancel.cancel();
275 let mut guard = self.join.lock().await;
276 let Some(handle) = guard.take() else {
277 return Ok(());
278 };
279 handle
280 .join()
281 .await
282 .map_err(|e| DlqError::File(format!("DLQ drain join failed: {e}")))?;
283 Ok(())
284 }
285}
286
287fn map_sink_err(e: SinkError) -> DlqError {
288 match e {
289 SinkError::Overflow => DlqError::QueueFull,
290 SinkError::Closed => DlqError::Closed,
291 SinkError::Drain(d) => DlqError::File(d.to_string()),
292 }
293}
294
295fn build_backends(
296 config: &DlqConfig,
297 service_name: &str,
298 #[cfg(feature = "dlq-kafka")] kafka_config: Option<&crate::transport::KafkaConfig>,
299) -> Result<Vec<DlqBackend>, DlqError> {
300 let mut backends: Vec<DlqBackend> = Vec::new();
301 let mode = config.mode;
302
303 #[cfg(feature = "dlq-kafka")]
305 {
306 let want_kafka = matches!(
307 mode,
308 DlqMode::Cascade | DlqMode::FanOut | DlqMode::KafkaOnly
309 );
310 if want_kafka && config.kafka.enabled {
311 let kc = kafka_config.ok_or_else(|| {
312 DlqError::Kafka(
313 "DLQ Kafka backend enabled but no KafkaConfig provided to Dlq::spawn".into(),
314 )
315 })?;
316 backends.push(DlqBackend::Kafka(super::kafka::KafkaDlqInner::new(
317 kc,
318 &config.kafka,
319 )?));
320 }
321 }
322
323 let want_file = matches!(mode, DlqMode::Cascade | DlqMode::FanOut | DlqMode::FileOnly);
325 if want_file && config.file.enabled {
326 backends.push(DlqBackend::File(FileDlqInner::new(
327 &config.file,
328 service_name,
329 )?));
330 }
331
332 #[cfg(feature = "dlq-http")]
334 {
335 if config.http.enabled {
336 backends.push(DlqBackend::Http(super::http::HttpDlqInner::new(
337 &config.http,
338 )?));
339 }
340 }
341
342 #[cfg(feature = "dlq-redis")]
346 {
347 if config.redis.enabled {
348 let cfg = config.redis.clone();
349 let inner = tokio::task::block_in_place(|| {
350 tokio::runtime::Handle::current()
351 .block_on(super::redis_dlq::RedisDlqInner::new(&cfg))
352 })?;
353 backends.push(DlqBackend::Redis(inner));
354 }
355 }
356
357 Ok(backends)
358}
359
360struct DlqDrain {
363 mode: DlqMode,
364 backends: Vec<DlqBackend>,
365}
366
367impl SinkDrain<DlqEntry> for DlqDrain {
368 async fn write_batch(&mut self, batch: Vec<DlqEntry>) -> Result<(), DrainError> {
369 if batch.is_empty() {
370 return Ok(());
371 }
372
373 match self.mode {
374 DlqMode::Cascade | DlqMode::FileOnly | DlqMode::KafkaOnly => {
375 let mut last_err: Option<DlqError> = None;
376 for backend in &mut self.backends {
377 match backend.send_batch(&batch).await {
378 Ok(()) => return Ok(()),
379 Err(e) => {
380 warn!(
381 backend = backend.name(),
382 error = %e,
383 count = batch.len(),
384 "DLQ backend failed in cascade, trying next"
385 );
386 last_err = Some(e);
387 }
388 }
389 }
390 let msg = last_err
391 .map_or_else(|| "no backends configured".to_string(), |e| e.to_string());
392 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
393 msg,
394 ))))
395 }
396 DlqMode::FanOut => {
397 let mut any_ok = false;
398 let mut errs: Vec<String> = Vec::new();
399 for backend in &mut self.backends {
400 match backend.send_batch(&batch).await {
401 Ok(()) => any_ok = true,
402 Err(e) => {
403 warn!(
404 backend = backend.name(),
405 error = %e,
406 count = batch.len(),
407 "DLQ backend failed in fan-out"
408 );
409 errs.push(format!("{}:{}", backend.name(), e));
410 }
411 }
412 }
413 if any_ok {
414 Ok(())
415 } else {
416 Err(DrainError::Backend(Box::new(DlqError::AllBackendsFailed(
417 errs.join("; "),
418 ))))
419 }
420 }
421 }
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use crate::dlq::config::{FileDlqConfig, RotationPeriod};
429 use crate::dlq::entry::DlqSource;
430
431 fn tmp_config(dir: &std::path::Path) -> DlqConfig {
432 DlqConfig {
433 file: FileDlqConfig {
434 enabled: true,
435 path: dir.to_path_buf(),
436 rotation: RotationPeriod::Daily,
437 max_age_days: 1,
438 compress_rotated: false,
439 },
440 mode: DlqMode::FileOnly,
441 queue_capacity: 1024,
442 batch_size: 16,
443 flush_interval_ms: 20,
444 ..DlqConfig::default()
445 }
446 }
447
448 fn test_entry(reason: &str) -> DlqEntry {
449 DlqEntry::new("test", reason, b"payload".to_vec())
450 .with_destination("acme.auth")
451 .with_source(DlqSource::kafka("events", 1, 42))
452 }
453
454 #[tokio::test]
455 async fn disabled_dlq_accepts_silently() {
456 let dlq = Dlq::disabled();
457 dlq.send(test_entry("err")).await.expect("noop");
458 dlq.send_batch(vec![test_entry("err")]).await.expect("noop");
459 dlq.flush().await.expect("noop flush");
460 dlq.shutdown().await.expect("noop shutdown");
461 }
462
463 #[tokio::test]
464 async fn file_only_writes_and_flushes() {
465 let dir = tempfile::tempdir().expect("tempdir");
466 let shutdown = CancellationToken::new();
467 let dlq = Dlq::spawn(
468 &tmp_config(dir.path()),
469 "svc",
470 #[cfg(feature = "dlq-kafka")]
471 None,
472 #[cfg(not(feature = "dlq-kafka"))]
473 None,
474 shutdown.clone(),
475 )
476 .expect("spawn");
477
478 for i in 0..5 {
479 dlq.send(test_entry(&format!("err_{i}")))
480 .await
481 .expect("send");
482 }
483 dlq.flush().await.expect("flush");
484
485 let path = dir.path().join("svc/dlq.ndjson");
486 let body = std::fs::read_to_string(&path).expect("read");
487 let lines: Vec<&str> = body.trim().lines().collect();
488 assert_eq!(lines.len(), 5);
489
490 shutdown.cancel();
491 dlq.shutdown().await.expect("clean shutdown");
492 }
493
494 #[tokio::test]
495 async fn try_send_returns_queue_full_when_saturated() {
496 let dir = tempfile::tempdir().expect("tempdir");
497 let mut cfg = tmp_config(dir.path());
498 cfg.queue_capacity = 2;
499 cfg.batch_size = 1024;
500 cfg.flush_interval_ms = 60_000; let shutdown = CancellationToken::new();
502 let dlq = Dlq::spawn(
503 &cfg,
504 "svc",
505 #[cfg(feature = "dlq-kafka")]
506 None,
507 #[cfg(not(feature = "dlq-kafka"))]
508 None,
509 shutdown.clone(),
510 )
511 .expect("spawn");
512
513 let mut full_count = 0;
514 for i in 0..50 {
515 if let Err(DlqError::QueueFull) = dlq.try_send(test_entry(&format!("err_{i}"))) {
516 full_count += 1;
517 }
518 }
519 assert!(full_count > 0, "expected at least one QueueFull");
520 shutdown.cancel();
521 }
522
523 #[tokio::test]
524 async fn dlq_clone_shares_state() {
525 let dir = tempfile::tempdir().expect("tempdir");
526 let shutdown = CancellationToken::new();
527 let dlq = Dlq::spawn(
528 &tmp_config(dir.path()),
529 "svc",
530 #[cfg(feature = "dlq-kafka")]
531 None,
532 #[cfg(not(feature = "dlq-kafka"))]
533 None,
534 shutdown.clone(),
535 )
536 .expect("spawn");
537
538 let dlq2 = dlq.clone();
539 dlq.send(test_entry("a")).await.expect("send a");
540 dlq2.send(test_entry("b")).await.expect("send b");
541 dlq.flush().await.expect("flush");
542
543 let path = dir.path().join("svc/dlq.ndjson");
544 let body = std::fs::read_to_string(&path).expect("read");
545 assert_eq!(body.trim().lines().count(), 2);
546
547 shutdown.cancel();
548 dlq.shutdown().await.expect("shutdown");
549 }
550}