1use crate::{
4 dlq::{DeadLetterQueue, DlqQuery, DlqRecord, DlqStats},
5 error::StorageResult,
6};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use rusqlite::{Connection, params};
10use std::{
11 collections::HashMap,
12 path::Path,
13 sync::{Arc, Mutex},
14};
15use uuid::Uuid;
16
17fn parse_uuid_col(row: &rusqlite::Row<'_>, col: usize) -> rusqlite::Result<Uuid> {
18 let s: String = row.get(col)?;
19 Uuid::parse_str(&s).map_err(|e| {
20 rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
21 })
22}
23
24fn parse_datetime_col(row: &rusqlite::Row<'_>, col: usize) -> rusqlite::Result<DateTime<Utc>> {
25 let s: String = row.get(col)?;
26 parse_rfc3339(col, &s)
27}
28
29fn parse_datetime_opt_col(
30 row: &rusqlite::Row<'_>,
31 col: usize,
32) -> rusqlite::Result<Option<DateTime<Utc>>> {
33 match row.get::<_, Option<String>>(col)? {
34 Some(s) => Ok(Some(parse_rfc3339(col, &s)?)),
35 None => Ok(None),
36 }
37}
38
39fn parse_rfc3339(col: usize, s: &str) -> rusqlite::Result<DateTime<Utc>> {
40 DateTime::parse_from_rfc3339(s)
41 .map(|dt| dt.with_timezone(&Utc))
42 .map_err(|e| {
43 rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
44 })
45}
46
47fn row_to_dlq_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<DlqRecord> {
48 Ok(DlqRecord {
49 id: parse_uuid_col(row, 0)?,
50 original_message_id: row.get(1)?,
51 from: row.get(2)?,
52 to: row.get(3)?,
53 raw_bytes: row.get(4)?,
54 error_message: row.get(5)?,
55 error_category: row.get(6)?,
56 trace_id: row.get(7)?,
57 request_id: row.get(8)?,
58 created_at: parse_datetime_col(row, 9)?,
59 redrive_attempts: row.get(10)?,
60 last_redrive_at: parse_datetime_opt_col(row, 11)?,
61 context: row.get(12)?,
62 })
63}
64
65struct SqliteDlqConnection {
67 conn: Mutex<Connection>,
68}
69
70impl SqliteDlqConnection {
71 fn new(conn: Connection) -> StorageResult<Self> {
72 Self::create_tables(&conn)?;
73 Ok(Self {
74 conn: Mutex::new(conn),
75 })
76 }
77
78 fn create_tables(conn: &Connection) -> StorageResult<()> {
79 conn.execute_batch(
80 r#"
81 CREATE TABLE IF NOT EXISTS dead_letter_queue (
82 id TEXT PRIMARY KEY,
83 original_message_id TEXT,
84 from_actr_id BLOB, -- Sender ActrId (Protobuf bytes, nullable)
85 to_actr_id BLOB, -- Target ActrId (Protobuf bytes, nullable)
86 raw_bytes BLOB NOT NULL, -- Complete original message for forensics
87 error_message TEXT NOT NULL, -- Human-readable error description
88 error_category TEXT NOT NULL,-- Error classification (e.g., "protobuf_decode")
89 trace_id TEXT NOT NULL, -- Distributed trace ID
90 request_id TEXT, -- Request ID (if available)
91 created_at TEXT NOT NULL, -- Timestamp of DLQ entry
92 redrive_attempts INTEGER NOT NULL DEFAULT 0,
93 last_redrive_at TEXT, -- Last redrive attempt timestamp
94 context TEXT -- JSON-encoded additional metadata
95 );
96
97 -- Index for common query patterns
98 CREATE INDEX IF NOT EXISTS idx_dlq_created_at ON dead_letter_queue(created_at DESC);
99 CREATE INDEX IF NOT EXISTS idx_dlq_error_category ON dead_letter_queue(error_category, created_at DESC);
100 CREATE INDEX IF NOT EXISTS idx_dlq_trace_id ON dead_letter_queue(trace_id);
101 "#,
102 )?;
103 Ok(())
104 }
105}
106
107pub struct SqliteDeadLetterQueue {
109 connection: Arc<SqliteDlqConnection>,
110}
111
112impl SqliteDeadLetterQueue {
113 pub fn new(conn: Connection) -> StorageResult<Self> {
117 let connection = Arc::new(SqliteDlqConnection::new(conn)?);
118 Ok(Self { connection })
119 }
120
121 pub async fn new_standalone<P: AsRef<Path>>(database_path: P) -> StorageResult<Self> {
123 let conn = Connection::open(database_path.as_ref())?;
124 conn.execute_batch("PRAGMA journal_mode = WAL;")?;
125 Self::new(conn)
126 }
127}
128
129#[async_trait]
130impl DeadLetterQueue for SqliteDeadLetterQueue {
131 async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid> {
132 let id = record.id;
133 let conn = self.connection.conn.lock().unwrap();
134
135 conn.execute(
136 r#"
137 INSERT INTO dead_letter_queue (
138 id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
139 error_message, error_category, trace_id, request_id,
140 created_at, redrive_attempts, last_redrive_at, context
141 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
142 "#,
143 params![
144 id.to_string(),
145 record.original_message_id,
146 record.from,
147 record.to,
148 record.raw_bytes,
149 record.error_message,
150 record.error_category,
151 record.trace_id,
152 record.request_id,
153 record.created_at.to_rfc3339(),
154 record.redrive_attempts,
155 record.last_redrive_at.map(|dt| dt.to_rfc3339()),
156 record.context,
157 ],
158 )?;
159
160 Ok(id)
161 }
162
163 async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>> {
164 let conn = self.connection.conn.lock().unwrap();
165
166 let mut sql = String::from(
168 r#"
169 SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
170 error_message, error_category, trace_id, request_id,
171 created_at, redrive_attempts, last_redrive_at, context
172 FROM dead_letter_queue
173 WHERE 1=1
174 "#,
175 );
176
177 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
178
179 if let Some(ref category) = query.error_category {
180 sql.push_str(" AND error_category = ?");
181 params_vec.push(Box::new(category.clone()));
182 }
183
184 if let Some(ref trace_id) = query.trace_id {
185 sql.push_str(" AND trace_id = ?");
186 params_vec.push(Box::new(trace_id.clone()));
187 }
188
189 if let Some(ref from_bytes) = query.from {
190 sql.push_str(" AND from_actr_id = ?");
191 params_vec.push(Box::new(from_bytes.clone()));
192 }
193
194 if let Some(ref created_after) = query.created_after {
195 sql.push_str(" AND created_at > ?");
196 params_vec.push(Box::new(created_after.to_rfc3339()));
197 }
198
199 sql.push_str(" ORDER BY created_at DESC");
200
201 if let Some(limit) = query.limit {
202 sql.push_str(&format!(" LIMIT {limit}"));
203 }
204
205 let mut stmt = conn.prepare(&sql)?;
206
207 let params_refs: Vec<&dyn rusqlite::ToSql> =
209 params_vec.iter().map(|b| b.as_ref()).collect();
210
211 let records = stmt
212 .query_map(params_refs.as_slice(), row_to_dlq_record)?
213 .collect::<Result<Vec<_>, _>>()?;
214
215 Ok(records)
216 }
217
218 async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>> {
219 let conn = self.connection.conn.lock().unwrap();
220
221 let result = conn.query_row(
222 r#"
223 SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
224 error_message, error_category, trace_id, request_id,
225 created_at, redrive_attempts, last_redrive_at, context
226 FROM dead_letter_queue
227 WHERE id = ?1
228 "#,
229 params![id.to_string()],
230 row_to_dlq_record,
231 );
232
233 match result {
234 Ok(record) => Ok(Some(record)),
235 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
236 Err(e) => Err(e.into()),
237 }
238 }
239
240 async fn delete(&self, id: Uuid) -> StorageResult<()> {
241 let conn = self.connection.conn.lock().unwrap();
242 conn.execute(
243 "DELETE FROM dead_letter_queue WHERE id = ?1",
244 params![id.to_string()],
245 )?;
246 Ok(())
247 }
248
249 async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()> {
250 let conn = self.connection.conn.lock().unwrap();
251 conn.execute(
252 r#"
253 UPDATE dead_letter_queue
254 SET redrive_attempts = redrive_attempts + 1,
255 last_redrive_at = ?1
256 WHERE id = ?2
257 "#,
258 params![Utc::now().to_rfc3339(), id.to_string()],
259 )?;
260 Ok(())
261 }
262
263 async fn stats(&self) -> StorageResult<DlqStats> {
264 let conn = self.connection.conn.lock().unwrap();
265
266 let total_messages: u64 =
267 conn.query_row("SELECT COUNT(*) FROM dead_letter_queue", [], |row| {
268 row.get(0)
269 })?;
270
271 let messages_with_redrive_attempts: u64 = conn.query_row(
272 "SELECT COUNT(*) FROM dead_letter_queue WHERE redrive_attempts > 0",
273 [],
274 |row| row.get(0),
275 )?;
276
277 let oldest_message_at: Option<DateTime<Utc>> = conn
278 .query_row("SELECT MIN(created_at) FROM dead_letter_queue", [], |row| {
279 row.get::<_, Option<String>>(0)
280 })?
281 .map(|s| {
282 DateTime::parse_from_rfc3339(&s)
283 .unwrap()
284 .with_timezone(&Utc)
285 });
286
287 let mut messages_by_category = HashMap::new();
289 let mut stmt = conn.prepare(
290 "SELECT error_category, COUNT(*) FROM dead_letter_queue GROUP BY error_category",
291 )?;
292 let rows = stmt.query_map([], |row| {
293 let category: String = row.get(0)?;
294 let count: u64 = row.get(1)?;
295 Ok((category, count))
296 })?;
297
298 for row in rows {
299 let (category, count) = row?;
300 messages_by_category.insert(category, count);
301 }
302
303 Ok(DlqStats {
304 total_messages,
305 messages_by_category,
306 messages_with_redrive_attempts,
307 oldest_message_at,
308 })
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use tempfile::tempdir;
316
317 async fn setup_dlq() -> SqliteDeadLetterQueue {
318 let dir = tempdir().unwrap();
319 let path = dir.path().join("test_dlq.db");
320 SqliteDeadLetterQueue::new_standalone(&path).await.unwrap()
321 }
322
323 fn dummy_dlq_record() -> DlqRecord {
324 DlqRecord {
325 id: Uuid::new_v4(),
326 original_message_id: Some("msg-123".to_string()),
327 from: Some(vec![1, 2, 3]),
328 to: Some(vec![4, 5, 6]),
329 raw_bytes: vec![0xDE, 0xAD, 0xBE, 0xEF],
330 error_message: "Protobuf decode failed: unexpected EOF".to_string(),
331 error_category: "protobuf_decode".to_string(),
332 trace_id: "trace-abc-123".to_string(),
333 request_id: Some("req-xyz".to_string()),
334 created_at: Utc::now(),
335 redrive_attempts: 0,
336 last_redrive_at: None,
337 context: Some(r#"{"transport": "webrtc"}"#.to_string()),
338 }
339 }
340
341 #[tokio::test]
342 async fn test_enqueue_and_get() {
343 let dlq = setup_dlq().await;
344 let record = dummy_dlq_record();
345 let id = record.id;
346
347 dlq.enqueue(record.clone()).await.unwrap();
349
350 let retrieved = dlq.get(id).await.unwrap();
352 assert!(retrieved.is_some());
353 let retrieved = retrieved.unwrap();
354 assert_eq!(retrieved.id, id);
355 assert_eq!(retrieved.error_message, record.error_message);
356 assert_eq!(retrieved.error_category, record.error_category);
357 assert_eq!(retrieved.trace_id, record.trace_id);
358 }
359
360 #[tokio::test]
361 async fn test_query_by_category() {
362 let dlq = setup_dlq().await;
363
364 let mut record1 = dummy_dlq_record();
365 record1.error_category = "protobuf_decode".to_string();
366 dlq.enqueue(record1).await.unwrap();
367
368 let mut record2 = dummy_dlq_record();
369 record2.error_category = "corrupted_envelope".to_string();
370 dlq.enqueue(record2).await.unwrap();
371
372 let mut record3 = dummy_dlq_record();
373 record3.error_category = "protobuf_decode".to_string();
374 dlq.enqueue(record3).await.unwrap();
375
376 let query = DlqQuery {
378 error_category: Some("protobuf_decode".to_string()),
379 ..Default::default()
380 };
381 let results = dlq.query(query).await.unwrap();
382 assert_eq!(results.len(), 2);
383 assert!(
384 results
385 .iter()
386 .all(|r| r.error_category == "protobuf_decode")
387 );
388 }
389
390 #[tokio::test]
391 async fn test_redrive_attempt_tracking() {
392 let dlq = setup_dlq().await;
393 let record = dummy_dlq_record();
394 let id = record.id;
395
396 dlq.enqueue(record).await.unwrap();
397
398 let retrieved = dlq.get(id).await.unwrap().unwrap();
400 assert_eq!(retrieved.redrive_attempts, 0);
401 assert!(retrieved.last_redrive_at.is_none());
402
403 dlq.record_redrive_attempt(id).await.unwrap();
405
406 let updated = dlq.get(id).await.unwrap().unwrap();
408 assert_eq!(updated.redrive_attempts, 1);
409 assert!(updated.last_redrive_at.is_some());
410 }
411
412 #[tokio::test]
413 async fn test_delete() {
414 let dlq = setup_dlq().await;
415 let record = dummy_dlq_record();
416 let id = record.id;
417
418 dlq.enqueue(record).await.unwrap();
419 assert!(dlq.get(id).await.unwrap().is_some());
420
421 dlq.delete(id).await.unwrap();
422 assert!(dlq.get(id).await.unwrap().is_none());
423 }
424
425 #[tokio::test]
426 async fn test_stats() {
427 let dlq = setup_dlq().await;
428
429 let mut record1 = dummy_dlq_record();
430 record1.error_category = "protobuf_decode".to_string();
431 dlq.enqueue(record1.clone()).await.unwrap();
432
433 let mut record2 = dummy_dlq_record();
434 record2.error_category = "corrupted_envelope".to_string();
435 dlq.enqueue(record2).await.unwrap();
436
437 dlq.record_redrive_attempt(record1.id).await.unwrap();
439
440 let stats = dlq.stats().await.unwrap();
441 assert_eq!(stats.total_messages, 2);
442 assert_eq!(stats.messages_with_redrive_attempts, 1);
443 assert_eq!(
444 stats
445 .messages_by_category
446 .get("protobuf_decode")
447 .copied()
448 .unwrap_or(0),
449 1
450 );
451 assert_eq!(
452 stats
453 .messages_by_category
454 .get("corrupted_envelope")
455 .copied()
456 .unwrap_or(0),
457 1
458 );
459 assert!(stats.oldest_message_at.is_some());
460 }
461}