use base64::Engine;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DlqEntry {
pub timestamp: i64,
pub service: String,
pub reason: String,
pub attempts: u32,
#[serde(
serialize_with = "serialize_payload",
deserialize_with = "deserialize_payload"
)]
pub payload: Vec<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub destination: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source: Option<DlqSource>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DlqSource {
#[serde(skip_serializing_if = "Option::is_none")]
pub topic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i64>,
}
impl DlqEntry {
pub fn new(service: impl Into<String>, reason: impl Into<String>, payload: Vec<u8>) -> Self {
Self {
timestamp: chrono::Utc::now().timestamp_millis(),
service: service.into(),
reason: reason.into(),
attempts: 0,
payload,
destination: None,
source: None,
}
}
#[must_use]
pub fn with_destination(mut self, dest: impl Into<String>) -> Self {
self.destination = Some(dest.into());
self
}
#[must_use]
pub fn with_source(mut self, source: DlqSource) -> Self {
self.source = Some(source);
self
}
#[must_use]
pub fn with_attempts(mut self, attempts: u32) -> Self {
self.attempts = attempts;
self
}
}
impl DlqSource {
pub fn kafka(topic: impl Into<String>, partition: i32, offset: i64) -> Self {
Self {
topic: Some(topic.into()),
partition: Some(partition),
offset: Some(offset),
}
}
}
fn serialize_payload<S>(payload: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let encoded = base64::engine::general_purpose::STANDARD.encode(payload);
serializer.serialize_str(&encoded)
}
fn deserialize_payload<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
base64::engine::general_purpose::STANDARD
.decode(&s)
.map_err(serde::de::Error::custom)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_entry_new() {
let entry = DlqEntry::new("loader", "schema_mismatch", b"test payload".to_vec());
assert_eq!(entry.service, "loader");
assert_eq!(entry.reason, "schema_mismatch");
assert_eq!(entry.payload, b"test payload");
assert_eq!(entry.attempts, 0);
assert!(entry.destination.is_none());
assert!(entry.source.is_none());
assert!(entry.timestamp > 0);
}
#[test]
fn test_entry_builders() {
let entry = DlqEntry::new("receiver", "parse_error", b"bad json".to_vec())
.with_destination("acme.auth")
.with_source(DlqSource::kafka("events", 3, 12345))
.with_attempts(2);
assert_eq!(entry.destination.as_deref(), Some("acme.auth"));
assert_eq!(entry.attempts, 2);
let src = entry.source.as_ref().expect("source should be set");
assert_eq!(src.topic.as_deref(), Some("events"));
assert_eq!(src.partition, Some(3));
assert_eq!(src.offset, Some(12345));
}
#[test]
fn test_serde_roundtrip() {
let entry = DlqEntry::new("loader", "type_error", b"\x00\x01\x02\xff".to_vec())
.with_destination("db.table")
.with_source(DlqSource::kafka("topic", 0, 999));
let json = serde_json::to_string(&entry).expect("serialise");
let parsed: DlqEntry = serde_json::from_str(&json).expect("deserialise");
assert_eq!(parsed.service, entry.service);
assert_eq!(parsed.reason, entry.reason);
assert_eq!(parsed.payload, entry.payload);
assert_eq!(parsed.destination, entry.destination);
assert_eq!(parsed.attempts, entry.attempts);
}
#[test]
fn test_payload_base64_encoding() {
let entry = DlqEntry::new("test", "reason", b"hello world".to_vec());
let json = serde_json::to_string(&entry).expect("serialise");
assert!(json.contains("aGVsbG8gd29ybGQ="));
}
}