1use crate::{
4 dlq::{DeadLetterQueue, DlqQuery, DlqRecord, DlqStats},
5 error::StorageResult,
6};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use rusqlite::{Connection, params};
10use std::{
11 collections::HashMap,
12 path::Path,
13 sync::{Arc, Mutex},
14};
15use uuid::Uuid;
16
17struct SqliteDlqConnection {
19 conn: Mutex<Connection>,
20}
21
22impl SqliteDlqConnection {
23 fn new(conn: Connection) -> StorageResult<Self> {
24 Self::create_tables(&conn)?;
25 Ok(Self {
26 conn: Mutex::new(conn),
27 })
28 }
29
30 fn create_tables(conn: &Connection) -> StorageResult<()> {
31 conn.execute_batch(
32 r#"
33 CREATE TABLE IF NOT EXISTS dead_letter_queue (
34 id TEXT PRIMARY KEY,
35 original_message_id TEXT,
36 from_actr_id BLOB, -- Sender ActrId (Protobuf bytes, nullable)
37 to_actr_id BLOB, -- Target ActrId (Protobuf bytes, nullable)
38 raw_bytes BLOB NOT NULL, -- Complete original message for forensics
39 error_message TEXT NOT NULL, -- Human-readable error description
40 error_category TEXT NOT NULL,-- Error classification (e.g., "protobuf_decode")
41 trace_id TEXT NOT NULL, -- Distributed trace ID
42 request_id TEXT, -- Request ID (if available)
43 created_at TEXT NOT NULL, -- Timestamp of DLQ entry
44 redrive_attempts INTEGER NOT NULL DEFAULT 0,
45 last_redrive_at TEXT, -- Last redrive attempt timestamp
46 context TEXT -- JSON-encoded additional metadata
47 );
48
49 -- Index for common query patterns
50 CREATE INDEX IF NOT EXISTS idx_dlq_created_at ON dead_letter_queue(created_at DESC);
51 CREATE INDEX IF NOT EXISTS idx_dlq_error_category ON dead_letter_queue(error_category, created_at DESC);
52 CREATE INDEX IF NOT EXISTS idx_dlq_trace_id ON dead_letter_queue(trace_id);
53 "#,
54 )?;
55 Ok(())
56 }
57}
58
59pub struct SqliteDeadLetterQueue {
61 connection: Arc<SqliteDlqConnection>,
62}
63
64impl SqliteDeadLetterQueue {
65 pub fn new(conn: Connection) -> StorageResult<Self> {
69 let connection = Arc::new(SqliteDlqConnection::new(conn)?);
70 Ok(Self { connection })
71 }
72
73 pub async fn new_standalone<P: AsRef<Path>>(database_path: P) -> StorageResult<Self> {
75 let conn = Connection::open(database_path.as_ref())?;
76 conn.execute_batch("PRAGMA journal_mode = WAL;")?;
77 Self::new(conn)
78 }
79}
80
81#[async_trait]
82impl DeadLetterQueue for SqliteDeadLetterQueue {
83 async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid> {
84 let id = record.id;
85 let conn = self.connection.conn.lock().unwrap();
86
87 conn.execute(
88 r#"
89 INSERT INTO dead_letter_queue (
90 id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
91 error_message, error_category, trace_id, request_id,
92 created_at, redrive_attempts, last_redrive_at, context
93 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
94 "#,
95 params![
96 id.to_string(),
97 record.original_message_id,
98 record.from,
99 record.to,
100 record.raw_bytes,
101 record.error_message,
102 record.error_category,
103 record.trace_id,
104 record.request_id,
105 record.created_at.to_rfc3339(),
106 record.redrive_attempts,
107 record.last_redrive_at.map(|dt| dt.to_rfc3339()),
108 record.context,
109 ],
110 )?;
111
112 Ok(id)
113 }
114
115 async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>> {
116 let conn = self.connection.conn.lock().unwrap();
117
118 let mut sql = String::from(
120 r#"
121 SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
122 error_message, error_category, trace_id, request_id,
123 created_at, redrive_attempts, last_redrive_at, context
124 FROM dead_letter_queue
125 WHERE 1=1
126 "#,
127 );
128
129 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
130
131 if let Some(ref category) = query.error_category {
132 sql.push_str(" AND error_category = ?");
133 params_vec.push(Box::new(category.clone()));
134 }
135
136 if let Some(ref trace_id) = query.trace_id {
137 sql.push_str(" AND trace_id = ?");
138 params_vec.push(Box::new(trace_id.clone()));
139 }
140
141 if let Some(ref from_bytes) = query.from {
142 sql.push_str(" AND from_actr_id = ?");
143 params_vec.push(Box::new(from_bytes.clone()));
144 }
145
146 if let Some(ref created_after) = query.created_after {
147 sql.push_str(" AND created_at > ?");
148 params_vec.push(Box::new(created_after.to_rfc3339()));
149 }
150
151 sql.push_str(" ORDER BY created_at DESC");
152
153 if let Some(limit) = query.limit {
154 sql.push_str(&format!(" LIMIT {limit}"));
155 }
156
157 let mut stmt = conn.prepare(&sql)?;
158
159 let params_refs: Vec<&dyn rusqlite::ToSql> =
161 params_vec.iter().map(|b| b.as_ref()).collect();
162
163 let records = stmt
164 .query_map(params_refs.as_slice(), |row| {
165 Ok(DlqRecord {
166 id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap(),
167 original_message_id: row.get(1)?,
168 from: row.get(2)?,
169 to: row.get(3)?,
170 raw_bytes: row.get(4)?,
171 error_message: row.get(5)?,
172 error_category: row.get(6)?,
173 trace_id: row.get(7)?,
174 request_id: row.get(8)?,
175 created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(9)?)
176 .unwrap()
177 .with_timezone(&Utc),
178 redrive_attempts: row.get(10)?,
179 last_redrive_at: row.get::<_, Option<String>>(11)?.map(|s| {
180 DateTime::parse_from_rfc3339(&s)
181 .unwrap()
182 .with_timezone(&Utc)
183 }),
184 context: row.get(12)?,
185 })
186 })?
187 .collect::<Result<Vec<_>, _>>()?;
188
189 Ok(records)
190 }
191
192 async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>> {
193 let conn = self.connection.conn.lock().unwrap();
194
195 let result = conn.query_row(
196 r#"
197 SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
198 error_message, error_category, trace_id, request_id,
199 created_at, redrive_attempts, last_redrive_at, context
200 FROM dead_letter_queue
201 WHERE id = ?1
202 "#,
203 params![id.to_string()],
204 |row| {
205 Ok(DlqRecord {
206 id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap(),
207 original_message_id: row.get(1)?,
208 from: row.get(2)?,
209 to: row.get(3)?,
210 raw_bytes: row.get(4)?,
211 error_message: row.get(5)?,
212 error_category: row.get(6)?,
213 trace_id: row.get(7)?,
214 request_id: row.get(8)?,
215 created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(9)?)
216 .unwrap()
217 .with_timezone(&Utc),
218 redrive_attempts: row.get(10)?,
219 last_redrive_at: row.get::<_, Option<String>>(11)?.map(|s| {
220 DateTime::parse_from_rfc3339(&s)
221 .unwrap()
222 .with_timezone(&Utc)
223 }),
224 context: row.get(12)?,
225 })
226 },
227 );
228
229 match result {
230 Ok(record) => Ok(Some(record)),
231 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
232 Err(e) => Err(e.into()),
233 }
234 }
235
236 async fn delete(&self, id: Uuid) -> StorageResult<()> {
237 let conn = self.connection.conn.lock().unwrap();
238 conn.execute(
239 "DELETE FROM dead_letter_queue WHERE id = ?1",
240 params![id.to_string()],
241 )?;
242 Ok(())
243 }
244
245 async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()> {
246 let conn = self.connection.conn.lock().unwrap();
247 conn.execute(
248 r#"
249 UPDATE dead_letter_queue
250 SET redrive_attempts = redrive_attempts + 1,
251 last_redrive_at = ?1
252 WHERE id = ?2
253 "#,
254 params![Utc::now().to_rfc3339(), id.to_string()],
255 )?;
256 Ok(())
257 }
258
259 async fn stats(&self) -> StorageResult<DlqStats> {
260 let conn = self.connection.conn.lock().unwrap();
261
262 let total_messages: u64 =
263 conn.query_row("SELECT COUNT(*) FROM dead_letter_queue", [], |row| {
264 row.get(0)
265 })?;
266
267 let messages_with_redrive_attempts: u64 = conn.query_row(
268 "SELECT COUNT(*) FROM dead_letter_queue WHERE redrive_attempts > 0",
269 [],
270 |row| row.get(0),
271 )?;
272
273 let oldest_message_at: Option<DateTime<Utc>> = conn
274 .query_row("SELECT MIN(created_at) FROM dead_letter_queue", [], |row| {
275 row.get::<_, Option<String>>(0)
276 })?
277 .map(|s| {
278 DateTime::parse_from_rfc3339(&s)
279 .unwrap()
280 .with_timezone(&Utc)
281 });
282
283 let mut messages_by_category = HashMap::new();
285 let mut stmt = conn.prepare(
286 "SELECT error_category, COUNT(*) FROM dead_letter_queue GROUP BY error_category",
287 )?;
288 let rows = stmt.query_map([], |row| {
289 let category: String = row.get(0)?;
290 let count: u64 = row.get(1)?;
291 Ok((category, count))
292 })?;
293
294 for row in rows {
295 let (category, count) = row?;
296 messages_by_category.insert(category, count);
297 }
298
299 Ok(DlqStats {
300 total_messages,
301 messages_by_category,
302 messages_with_redrive_attempts,
303 oldest_message_at,
304 })
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use tempfile::tempdir;
312
313 async fn setup_dlq() -> SqliteDeadLetterQueue {
314 let dir = tempdir().unwrap();
315 let path = dir.path().join("test_dlq.db");
316 SqliteDeadLetterQueue::new_standalone(&path).await.unwrap()
317 }
318
319 fn dummy_dlq_record() -> DlqRecord {
320 DlqRecord {
321 id: Uuid::new_v4(),
322 original_message_id: Some("msg-123".to_string()),
323 from: Some(vec![1, 2, 3]),
324 to: Some(vec![4, 5, 6]),
325 raw_bytes: vec![0xDE, 0xAD, 0xBE, 0xEF],
326 error_message: "Protobuf decode failed: unexpected EOF".to_string(),
327 error_category: "protobuf_decode".to_string(),
328 trace_id: "trace-abc-123".to_string(),
329 request_id: Some("req-xyz".to_string()),
330 created_at: Utc::now(),
331 redrive_attempts: 0,
332 last_redrive_at: None,
333 context: Some(r#"{"transport": "webrtc"}"#.to_string()),
334 }
335 }
336
337 #[tokio::test]
338 async fn test_enqueue_and_get() {
339 let dlq = setup_dlq().await;
340 let record = dummy_dlq_record();
341 let id = record.id;
342
343 dlq.enqueue(record.clone()).await.unwrap();
345
346 let retrieved = dlq.get(id).await.unwrap();
348 assert!(retrieved.is_some());
349 let retrieved = retrieved.unwrap();
350 assert_eq!(retrieved.id, id);
351 assert_eq!(retrieved.error_message, record.error_message);
352 assert_eq!(retrieved.error_category, record.error_category);
353 assert_eq!(retrieved.trace_id, record.trace_id);
354 }
355
356 #[tokio::test]
357 async fn test_query_by_category() {
358 let dlq = setup_dlq().await;
359
360 let mut record1 = dummy_dlq_record();
361 record1.error_category = "protobuf_decode".to_string();
362 dlq.enqueue(record1).await.unwrap();
363
364 let mut record2 = dummy_dlq_record();
365 record2.error_category = "corrupted_envelope".to_string();
366 dlq.enqueue(record2).await.unwrap();
367
368 let mut record3 = dummy_dlq_record();
369 record3.error_category = "protobuf_decode".to_string();
370 dlq.enqueue(record3).await.unwrap();
371
372 let query = DlqQuery {
374 error_category: Some("protobuf_decode".to_string()),
375 ..Default::default()
376 };
377 let results = dlq.query(query).await.unwrap();
378 assert_eq!(results.len(), 2);
379 assert!(
380 results
381 .iter()
382 .all(|r| r.error_category == "protobuf_decode")
383 );
384 }
385
386 #[tokio::test]
387 async fn test_redrive_attempt_tracking() {
388 let dlq = setup_dlq().await;
389 let record = dummy_dlq_record();
390 let id = record.id;
391
392 dlq.enqueue(record).await.unwrap();
393
394 let retrieved = dlq.get(id).await.unwrap().unwrap();
396 assert_eq!(retrieved.redrive_attempts, 0);
397 assert!(retrieved.last_redrive_at.is_none());
398
399 dlq.record_redrive_attempt(id).await.unwrap();
401
402 let updated = dlq.get(id).await.unwrap().unwrap();
404 assert_eq!(updated.redrive_attempts, 1);
405 assert!(updated.last_redrive_at.is_some());
406 }
407
408 #[tokio::test]
409 async fn test_delete() {
410 let dlq = setup_dlq().await;
411 let record = dummy_dlq_record();
412 let id = record.id;
413
414 dlq.enqueue(record).await.unwrap();
415 assert!(dlq.get(id).await.unwrap().is_some());
416
417 dlq.delete(id).await.unwrap();
418 assert!(dlq.get(id).await.unwrap().is_none());
419 }
420
421 #[tokio::test]
422 async fn test_stats() {
423 let dlq = setup_dlq().await;
424
425 let mut record1 = dummy_dlq_record();
426 record1.error_category = "protobuf_decode".to_string();
427 dlq.enqueue(record1.clone()).await.unwrap();
428
429 let mut record2 = dummy_dlq_record();
430 record2.error_category = "corrupted_envelope".to_string();
431 dlq.enqueue(record2).await.unwrap();
432
433 dlq.record_redrive_attempt(record1.id).await.unwrap();
435
436 let stats = dlq.stats().await.unwrap();
437 assert_eq!(stats.total_messages, 2);
438 assert_eq!(stats.messages_with_redrive_attempts, 1);
439 assert_eq!(
440 stats
441 .messages_by_category
442 .get("protobuf_decode")
443 .copied()
444 .unwrap_or(0),
445 1
446 );
447 assert_eq!(
448 stats
449 .messages_by_category
450 .get("corrupted_envelope")
451 .copied()
452 .unwrap_or(0),
453 1
454 );
455 assert!(stats.oldest_message_at.is_some());
456 }
457}