hyperi_rustlib/dlq/
entry.rs1use base64::Engine;
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DlqEntry {
20 pub timestamp: i64,
22
23 pub service: String,
25
26 pub reason: String,
28
29 pub attempts: u32,
31
32 #[serde(
34 serialize_with = "serialize_payload",
35 deserialize_with = "deserialize_payload"
36 )]
37 pub payload: Vec<u8>,
38
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub destination: Option<String>,
42
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub source: Option<DlqSource>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct DlqSource {
51 #[serde(skip_serializing_if = "Option::is_none")]
53 pub topic: Option<String>,
54
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub partition: Option<i32>,
58
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub offset: Option<i64>,
62}
63
64impl DlqEntry {
65 pub fn new(service: impl Into<String>, reason: impl Into<String>, payload: Vec<u8>) -> Self {
67 Self {
68 timestamp: chrono::Utc::now().timestamp_millis(),
69 service: service.into(),
70 reason: reason.into(),
71 attempts: 0,
72 payload,
73 destination: None,
74 source: None,
75 }
76 }
77
78 #[must_use]
80 pub fn with_destination(mut self, dest: impl Into<String>) -> Self {
81 self.destination = Some(dest.into());
82 self
83 }
84
85 #[must_use]
87 pub fn with_source(mut self, source: DlqSource) -> Self {
88 self.source = Some(source);
89 self
90 }
91
92 #[must_use]
94 pub fn with_attempts(mut self, attempts: u32) -> Self {
95 self.attempts = attempts;
96 self
97 }
98}
99
100impl DlqSource {
101 pub fn kafka(topic: impl Into<String>, partition: i32, offset: i64) -> Self {
103 Self {
104 topic: Some(topic.into()),
105 partition: Some(partition),
106 offset: Some(offset),
107 }
108 }
109}
110
111fn serialize_payload<S>(payload: &[u8], serializer: S) -> Result<S::Ok, S::Error>
112where
113 S: serde::Serializer,
114{
115 let encoded = base64::engine::general_purpose::STANDARD.encode(payload);
116 serializer.serialize_str(&encoded)
117}
118
119fn deserialize_payload<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
120where
121 D: serde::Deserializer<'de>,
122{
123 let s = String::deserialize(deserializer)?;
124 base64::engine::general_purpose::STANDARD
125 .decode(&s)
126 .map_err(serde::de::Error::custom)
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn test_entry_new() {
135 let entry = DlqEntry::new("loader", "schema_mismatch", b"test payload".to_vec());
136 assert_eq!(entry.service, "loader");
137 assert_eq!(entry.reason, "schema_mismatch");
138 assert_eq!(entry.payload, b"test payload");
139 assert_eq!(entry.attempts, 0);
140 assert!(entry.destination.is_none());
141 assert!(entry.source.is_none());
142 assert!(entry.timestamp > 0);
143 }
144
145 #[test]
146 fn test_entry_builders() {
147 let entry = DlqEntry::new("receiver", "parse_error", b"bad json".to_vec())
148 .with_destination("acme.auth")
149 .with_source(DlqSource::kafka("events", 3, 12345))
150 .with_attempts(2);
151
152 assert_eq!(entry.destination.as_deref(), Some("acme.auth"));
153 assert_eq!(entry.attempts, 2);
154 let src = entry.source.as_ref().expect("source should be set");
155 assert_eq!(src.topic.as_deref(), Some("events"));
156 assert_eq!(src.partition, Some(3));
157 assert_eq!(src.offset, Some(12345));
158 }
159
160 #[test]
161 fn test_serde_roundtrip() {
162 let entry = DlqEntry::new("loader", "type_error", b"\x00\x01\x02\xff".to_vec())
163 .with_destination("db.table")
164 .with_source(DlqSource::kafka("topic", 0, 999));
165
166 let json = serde_json::to_string(&entry).expect("serialise");
167 let parsed: DlqEntry = serde_json::from_str(&json).expect("deserialise");
168
169 assert_eq!(parsed.service, entry.service);
170 assert_eq!(parsed.reason, entry.reason);
171 assert_eq!(parsed.payload, entry.payload);
172 assert_eq!(parsed.destination, entry.destination);
173 assert_eq!(parsed.attempts, entry.attempts);
174 }
175
176 #[test]
177 fn test_payload_base64_encoding() {
178 let entry = DlqEntry::new("test", "reason", b"hello world".to_vec());
179 let json = serde_json::to_string(&entry).expect("serialise");
180 assert!(json.contains("aGVsbG8gd29ybGQ="));
182 }
183}