use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{debug, error, info};
use crate::transport::KafkaConfig;
use crate::transport::kafka::{KafkaProducer, ProducerProfile};
use super::config::{DlqRouting, KafkaDlqConfig};
use super::entry::DlqEntry;
use super::error::DlqError;
pub struct KafkaDlqInner {
producer: KafkaProducer,
routing: DlqRouting,
topic_suffix: String,
common_topic: String,
entries_written: AtomicU64,
write_errors: AtomicU64,
}
impl std::fmt::Debug for KafkaDlqInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaDlqInner")
.field("routing", &self.routing)
.field("topic_suffix", &self.topic_suffix)
.field("common_topic", &self.common_topic)
.field(
"entries_written",
&self.entries_written.load(Ordering::Relaxed),
)
.field("write_errors", &self.write_errors.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl KafkaDlqInner {
pub fn new(kafka_config: &KafkaConfig, dlq_config: &KafkaDlqConfig) -> Result<Self, DlqError> {
let producer = KafkaProducer::new(kafka_config, ProducerProfile::LowLatency)
.map_err(|e| DlqError::Kafka(format!("failed to create DLQ producer: {e}")))?;
info!(
routing = ?dlq_config.routing,
suffix = %dlq_config.topic_suffix,
common_topic = %dlq_config.common_topic,
"Kafka DLQ backend initialised"
);
Ok(Self {
producer,
routing: dlq_config.routing,
topic_suffix: dlq_config.topic_suffix.clone(),
common_topic: dlq_config.common_topic.clone(),
entries_written: AtomicU64::new(0),
write_errors: AtomicU64::new(0),
})
}
fn resolve_topic(&self, entry: &DlqEntry) -> String {
match self.routing {
DlqRouting::Common => self.common_topic.clone(),
DlqRouting::PerTable => entry.destination.as_ref().map_or_else(
|| self.common_topic.clone(),
|dest| format!("{dest}{}", self.topic_suffix),
),
}
}
pub async fn send_batch(&mut self, batch: &[DlqEntry]) -> Result<(), DlqError> {
for entry in batch {
let topic = self.resolve_topic(entry);
let payload = serde_json::to_vec(entry)
.map_err(|e| DlqError::Serialization(format!("DLQ serialise: {e}")))?;
match self.producer.send(&topic, None, &payload) {
Ok(()) => {
self.entries_written.fetch_add(1, Ordering::Relaxed);
debug!(topic = %topic, reason = %entry.reason, "DLQ entry queued to Kafka");
}
Err(e) => {
self.write_errors.fetch_add(1, Ordering::Relaxed);
error!(
error = %e,
topic = %topic,
reason = %entry.reason,
"Failed to queue DLQ entry to Kafka"
);
return Err(DlqError::Kafka(format!("DLQ send failed: {e}")));
}
}
}
Ok(())
}
#[must_use]
pub fn entries_written(&self) -> u64 {
self.entries_written.load(Ordering::Relaxed)
}
#[must_use]
pub fn write_errors(&self) -> u64 {
self.write_errors.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolve_topic_per_table() {
let routing = DlqRouting::PerTable;
let suffix = ".dlq";
let common = "dfe.dlq";
let entry = DlqEntry::new("loader", "error", vec![]).with_destination("acme.auth");
let topic = match routing {
DlqRouting::PerTable => entry
.destination
.as_ref()
.map_or_else(|| common.to_string(), |dest| format!("{dest}{suffix}")),
DlqRouting::Common => common.to_string(),
};
assert_eq!(topic, "acme.auth.dlq");
let entry_no_dest = DlqEntry::new("loader", "error", vec![]);
let topic = match routing {
DlqRouting::PerTable => entry_no_dest
.destination
.as_ref()
.map_or_else(|| common.to_string(), |dest| format!("{dest}{suffix}")),
DlqRouting::Common => common.to_string(),
};
assert_eq!(topic, "dfe.dlq");
}
#[test]
fn resolve_topic_common_ignores_destination() {
let routing = DlqRouting::Common;
let common = "all-errors.dlq";
let entry = DlqEntry::new("loader", "error", vec![]).with_destination("acme.auth");
let topic = match routing {
DlqRouting::Common => common.to_string(),
DlqRouting::PerTable => unreachable!(
"per-table case is exercised by the sibling test; this match must hit Common"
),
};
let _ = entry;
assert_eq!(topic, "all-errors.dlq");
}
}