use std::fmt;
use std::future::Future;
use std::pin::Pin;
use bytes::Bytes;
use crate::consumer::ConsumerRecord;
use crate::producer::ProducerRecord;
pub trait DeadLetterQueue: Send + Sync + fmt::Debug {
fn send(
&self,
record: ProducerRecord,
error: String,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
}
pub fn build_dlq_record(
dlq_topic: &str,
original: &ConsumerRecord,
error: &dyn fmt::Display,
) -> ProducerRecord {
let mut headers: Vec<(String, Bytes)> = original
.headers
.iter()
.map(|(k, v)| {
(
match std::str::from_utf8(k) {
Ok(s) => s.to_owned(),
Err(_) => {
use std::fmt::Write;
let mut s = String::with_capacity(4 + k.len() * 2);
s.push_str("hex:");
for byte in k.iter() {
let _ = write!(s, "{byte:02x}");
}
s
}
},
v.clone().unwrap_or_default(),
)
})
.collect();
headers.push((
"__krafka.dlq.original.topic".to_string(),
Bytes::from(original.topic.clone()),
));
headers.push((
"__krafka.dlq.original.partition".to_string(),
Bytes::from(original.partition.to_string()),
));
headers.push((
"__krafka.dlq.original.offset".to_string(),
Bytes::from(original.offset.to_string()),
));
headers.push((
"__krafka.dlq.exception.message".to_string(),
Bytes::from(error.to_string()),
));
ProducerRecord {
topic: dlq_topic.to_string(),
partition: None,
key: original.key.clone(),
value: original.value.clone().unwrap_or_default(),
timestamp: None,
headers,
record_name: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::consumer::ConsumerRecord;
#[test]
fn test_build_dlq_record_provenance_headers() {
let original = ConsumerRecord::new(
"source-topic",
2,
42,
Some(Bytes::from("key")),
Some(Bytes::from("value")),
);
let record = build_dlq_record("source-topic.DLQ", &original, &"decode error");
assert_eq!(record.topic, "source-topic.DLQ");
assert_eq!(record.key, Some(Bytes::from("key")));
assert_eq!(record.value, Bytes::from("value"));
let hdr = |name: &str| -> Option<Bytes> {
record
.headers
.iter()
.find(|(k, _)| k == name)
.map(|(_, v)| v.clone())
};
assert_eq!(
hdr("__krafka.dlq.original.topic"),
Some(Bytes::from("source-topic"))
);
assert_eq!(
hdr("__krafka.dlq.original.partition"),
Some(Bytes::from("2"))
);
assert_eq!(hdr("__krafka.dlq.original.offset"), Some(Bytes::from("42")));
assert_eq!(
hdr("__krafka.dlq.exception.message"),
Some(Bytes::from("decode error"))
);
}
#[test]
fn test_build_dlq_record_original_headers_preserved() {
let mut original = ConsumerRecord::new("t", 0, 0, None, Some(Bytes::from("v")));
original
.headers
.push((Bytes::from("x-trace-id"), Some(Bytes::from("abc123"))));
let record = build_dlq_record("t.DLQ", &original, &"error");
assert_eq!(record.headers[0].0, "x-trace-id");
assert_eq!(record.headers[0].1, Bytes::from("abc123"));
assert!(
record
.headers
.iter()
.any(|(k, _)| k == "__krafka.dlq.original.topic")
);
}
#[test]
fn test_build_dlq_record_no_value() {
let original = ConsumerRecord::new("t", 0, 0, None, None);
let record = build_dlq_record("t.DLQ", &original, &"tombstone");
assert_eq!(record.value, Bytes::new());
}
}