Skip to main content

hyperi_rustlib/dlq/
entry.rs

1// Project:   hyperi-rustlib
2// File:      src/dlq/entry.rs
3// Purpose:   Shared DLQ entry envelope format
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Shared DLQ entry types used by all backends.
10
11use base64::Engine;
12use serde::{Deserialize, Serialize};
13
14/// A failed message routed to the dead letter queue.
15///
16/// This envelope is backend-agnostic -- it carries the original payload
17/// plus metadata about why and where the failure occurred.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DlqEntry {
20    /// When the failure occurred (epoch milliseconds).
21    pub timestamp: i64,
22
23    /// Service that generated this entry (e.g. "loader", "receiver").
24    pub service: String,
25
26    /// Why the message failed processing.
27    pub reason: String,
28
29    /// How many times the message was retried before being sent to DLQ.
30    pub attempts: u32,
31
32    /// Original message payload (base64-encoded in JSON).
33    #[serde(
34        serialize_with = "serialize_payload",
35        deserialize_with = "deserialize_payload"
36    )]
37    pub payload: Vec<u8>,
38
39    /// Intended destination (e.g. "db.table" or topic name).
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub destination: Option<String>,
42
43    /// Where the message originated.
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub source: Option<DlqSource>,
46}
47
48/// Source metadata for a DLQ entry.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct DlqSource {
51    /// Original Kafka topic.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub topic: Option<String>,
54
55    /// Original Kafka partition.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub partition: Option<i32>,
58
59    /// Original Kafka offset.
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub offset: Option<i64>,
62}
63
64impl DlqEntry {
65    /// Create a new DLQ entry with the current timestamp.
66    pub fn new(service: impl Into<String>, reason: impl Into<String>, payload: Vec<u8>) -> Self {
67        Self {
68            timestamp: chrono::Utc::now().timestamp_millis(),
69            service: service.into(),
70            reason: reason.into(),
71            attempts: 0,
72            payload,
73            destination: None,
74            source: None,
75        }
76    }
77
78    /// Set the intended destination.
79    #[must_use]
80    pub fn with_destination(mut self, dest: impl Into<String>) -> Self {
81        self.destination = Some(dest.into());
82        self
83    }
84
85    /// Set the source metadata.
86    #[must_use]
87    pub fn with_source(mut self, source: DlqSource) -> Self {
88        self.source = Some(source);
89        self
90    }
91
92    /// Set the retry attempt count.
93    #[must_use]
94    pub fn with_attempts(mut self, attempts: u32) -> Self {
95        self.attempts = attempts;
96        self
97    }
98}
99
100impl DlqSource {
101    /// Create source metadata from Kafka coordinates.
102    pub fn kafka(topic: impl Into<String>, partition: i32, offset: i64) -> Self {
103        Self {
104            topic: Some(topic.into()),
105            partition: Some(partition),
106            offset: Some(offset),
107        }
108    }
109}
110
111fn serialize_payload<S>(payload: &[u8], serializer: S) -> Result<S::Ok, S::Error>
112where
113    S: serde::Serializer,
114{
115    let encoded = base64::engine::general_purpose::STANDARD.encode(payload);
116    serializer.serialize_str(&encoded)
117}
118
119fn deserialize_payload<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
120where
121    D: serde::Deserializer<'de>,
122{
123    let s = String::deserialize(deserializer)?;
124    base64::engine::general_purpose::STANDARD
125        .decode(&s)
126        .map_err(serde::de::Error::custom)
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_entry_new() {
135        let entry = DlqEntry::new("loader", "schema_mismatch", b"test payload".to_vec());
136        assert_eq!(entry.service, "loader");
137        assert_eq!(entry.reason, "schema_mismatch");
138        assert_eq!(entry.payload, b"test payload");
139        assert_eq!(entry.attempts, 0);
140        assert!(entry.destination.is_none());
141        assert!(entry.source.is_none());
142        assert!(entry.timestamp > 0);
143    }
144
145    #[test]
146    fn test_entry_builders() {
147        let entry = DlqEntry::new("receiver", "parse_error", b"bad json".to_vec())
148            .with_destination("acme.auth")
149            .with_source(DlqSource::kafka("events", 3, 12345))
150            .with_attempts(2);
151
152        assert_eq!(entry.destination.as_deref(), Some("acme.auth"));
153        assert_eq!(entry.attempts, 2);
154        let src = entry.source.as_ref().expect("source should be set");
155        assert_eq!(src.topic.as_deref(), Some("events"));
156        assert_eq!(src.partition, Some(3));
157        assert_eq!(src.offset, Some(12345));
158    }
159
160    #[test]
161    fn test_serde_roundtrip() {
162        let entry = DlqEntry::new("loader", "type_error", b"\x00\x01\x02\xff".to_vec())
163            .with_destination("db.table")
164            .with_source(DlqSource::kafka("topic", 0, 999));
165
166        let json = serde_json::to_string(&entry).expect("serialise");
167        let parsed: DlqEntry = serde_json::from_str(&json).expect("deserialise");
168
169        assert_eq!(parsed.service, entry.service);
170        assert_eq!(parsed.reason, entry.reason);
171        assert_eq!(parsed.payload, entry.payload);
172        assert_eq!(parsed.destination, entry.destination);
173        assert_eq!(parsed.attempts, entry.attempts);
174    }
175
176    #[test]
177    fn test_payload_base64_encoding() {
178        let entry = DlqEntry::new("test", "reason", b"hello world".to_vec());
179        let json = serde_json::to_string(&entry).expect("serialise");
180        // base64 of "hello world" is "aGVsbG8gd29ybGQ="
181        assert!(json.contains("aGVsbG8gd29ybGQ="));
182    }
183}