actr_mailbox/
sqlite_dlq.rs

1//! SQLite implementation of Dead Letter Queue
2
3use crate::{
4    dlq::{DeadLetterQueue, DlqQuery, DlqRecord, DlqStats},
5    error::StorageResult,
6};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use rusqlite::{Connection, params};
10use std::{
11    collections::HashMap,
12    path::Path,
13    sync::{Arc, Mutex},
14};
15use uuid::Uuid;
16
17/// SQLite connection wrapper for DLQ
18struct SqliteDlqConnection {
19    conn: Mutex<Connection>,
20}
21
22impl SqliteDlqConnection {
23    fn new(conn: Connection) -> StorageResult<Self> {
24        Self::create_tables(&conn)?;
25        Ok(Self {
26            conn: Mutex::new(conn),
27        })
28    }
29
30    fn create_tables(conn: &Connection) -> StorageResult<()> {
31        conn.execute_batch(
32            r#"
33            CREATE TABLE IF NOT EXISTS dead_letter_queue (
34                id TEXT PRIMARY KEY,
35                original_message_id TEXT,
36                from_actr_id BLOB,           -- Sender ActrId (Protobuf bytes, nullable)
37                to_actr_id BLOB,             -- Target ActrId (Protobuf bytes, nullable)
38                raw_bytes BLOB NOT NULL,     -- Complete original message for forensics
39                error_message TEXT NOT NULL, -- Human-readable error description
40                error_category TEXT NOT NULL,-- Error classification (e.g., "protobuf_decode")
41                trace_id TEXT NOT NULL,      -- Distributed trace ID
42                request_id TEXT,             -- Request ID (if available)
43                created_at TEXT NOT NULL,    -- Timestamp of DLQ entry
44                redrive_attempts INTEGER NOT NULL DEFAULT 0,
45                last_redrive_at TEXT,        -- Last redrive attempt timestamp
46                context TEXT                 -- JSON-encoded additional metadata
47            );
48
49            -- Index for common query patterns
50            CREATE INDEX IF NOT EXISTS idx_dlq_created_at ON dead_letter_queue(created_at DESC);
51            CREATE INDEX IF NOT EXISTS idx_dlq_error_category ON dead_letter_queue(error_category, created_at DESC);
52            CREATE INDEX IF NOT EXISTS idx_dlq_trace_id ON dead_letter_queue(trace_id);
53            "#,
54        )?;
55        Ok(())
56    }
57}
58
59/// SQLite-backed Dead Letter Queue
60pub struct SqliteDeadLetterQueue {
61    connection: Arc<SqliteDlqConnection>,
62}
63
64impl SqliteDeadLetterQueue {
65    /// Create a new DLQ using an existing database connection
66    ///
67    /// DLQ shares the same database file as mailbox but uses separate tables.
68    pub fn new(conn: Connection) -> StorageResult<Self> {
69        let connection = Arc::new(SqliteDlqConnection::new(conn)?);
70        Ok(Self { connection })
71    }
72
73    /// Create a new DLQ with a separate database file
74    pub async fn new_standalone<P: AsRef<Path>>(database_path: P) -> StorageResult<Self> {
75        let conn = Connection::open(database_path.as_ref())?;
76        conn.execute_batch("PRAGMA journal_mode = WAL;")?;
77        Self::new(conn)
78    }
79}
80
81#[async_trait]
82impl DeadLetterQueue for SqliteDeadLetterQueue {
83    async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid> {
84        let id = record.id;
85        let conn = self.connection.conn.lock().unwrap();
86
87        conn.execute(
88            r#"
89            INSERT INTO dead_letter_queue (
90                id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
91                error_message, error_category, trace_id, request_id,
92                created_at, redrive_attempts, last_redrive_at, context
93            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
94            "#,
95            params![
96                id.to_string(),
97                record.original_message_id,
98                record.from,
99                record.to,
100                record.raw_bytes,
101                record.error_message,
102                record.error_category,
103                record.trace_id,
104                record.request_id,
105                record.created_at.to_rfc3339(),
106                record.redrive_attempts,
107                record.last_redrive_at.map(|dt| dt.to_rfc3339()),
108                record.context,
109            ],
110        )?;
111
112        Ok(id)
113    }
114
115    async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>> {
116        let conn = self.connection.conn.lock().unwrap();
117
118        // Build dynamic query based on filters
119        let mut sql = String::from(
120            r#"
121            SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
122                   error_message, error_category, trace_id, request_id,
123                   created_at, redrive_attempts, last_redrive_at, context
124            FROM dead_letter_queue
125            WHERE 1=1
126            "#,
127        );
128
129        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
130
131        if let Some(ref category) = query.error_category {
132            sql.push_str(" AND error_category = ?");
133            params_vec.push(Box::new(category.clone()));
134        }
135
136        if let Some(ref trace_id) = query.trace_id {
137            sql.push_str(" AND trace_id = ?");
138            params_vec.push(Box::new(trace_id.clone()));
139        }
140
141        if let Some(ref from_bytes) = query.from {
142            sql.push_str(" AND from_actr_id = ?");
143            params_vec.push(Box::new(from_bytes.clone()));
144        }
145
146        if let Some(ref created_after) = query.created_after {
147            sql.push_str(" AND created_at > ?");
148            params_vec.push(Box::new(created_after.to_rfc3339()));
149        }
150
151        sql.push_str(" ORDER BY created_at DESC");
152
153        if let Some(limit) = query.limit {
154            sql.push_str(&format!(" LIMIT {limit}"));
155        }
156
157        let mut stmt = conn.prepare(&sql)?;
158
159        // Convert Vec<Box<dyn ToSql>> to params
160        let params_refs: Vec<&dyn rusqlite::ToSql> =
161            params_vec.iter().map(|b| b.as_ref()).collect();
162
163        let records = stmt
164            .query_map(params_refs.as_slice(), |row| {
165                Ok(DlqRecord {
166                    id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap(),
167                    original_message_id: row.get(1)?,
168                    from: row.get(2)?,
169                    to: row.get(3)?,
170                    raw_bytes: row.get(4)?,
171                    error_message: row.get(5)?,
172                    error_category: row.get(6)?,
173                    trace_id: row.get(7)?,
174                    request_id: row.get(8)?,
175                    created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(9)?)
176                        .unwrap()
177                        .with_timezone(&Utc),
178                    redrive_attempts: row.get(10)?,
179                    last_redrive_at: row.get::<_, Option<String>>(11)?.map(|s| {
180                        DateTime::parse_from_rfc3339(&s)
181                            .unwrap()
182                            .with_timezone(&Utc)
183                    }),
184                    context: row.get(12)?,
185                })
186            })?
187            .collect::<Result<Vec<_>, _>>()?;
188
189        Ok(records)
190    }
191
192    async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>> {
193        let conn = self.connection.conn.lock().unwrap();
194
195        let result = conn.query_row(
196            r#"
197            SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
198                   error_message, error_category, trace_id, request_id,
199                   created_at, redrive_attempts, last_redrive_at, context
200            FROM dead_letter_queue
201            WHERE id = ?1
202            "#,
203            params![id.to_string()],
204            |row| {
205                Ok(DlqRecord {
206                    id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap(),
207                    original_message_id: row.get(1)?,
208                    from: row.get(2)?,
209                    to: row.get(3)?,
210                    raw_bytes: row.get(4)?,
211                    error_message: row.get(5)?,
212                    error_category: row.get(6)?,
213                    trace_id: row.get(7)?,
214                    request_id: row.get(8)?,
215                    created_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(9)?)
216                        .unwrap()
217                        .with_timezone(&Utc),
218                    redrive_attempts: row.get(10)?,
219                    last_redrive_at: row.get::<_, Option<String>>(11)?.map(|s| {
220                        DateTime::parse_from_rfc3339(&s)
221                            .unwrap()
222                            .with_timezone(&Utc)
223                    }),
224                    context: row.get(12)?,
225                })
226            },
227        );
228
229        match result {
230            Ok(record) => Ok(Some(record)),
231            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
232            Err(e) => Err(e.into()),
233        }
234    }
235
236    async fn delete(&self, id: Uuid) -> StorageResult<()> {
237        let conn = self.connection.conn.lock().unwrap();
238        conn.execute(
239            "DELETE FROM dead_letter_queue WHERE id = ?1",
240            params![id.to_string()],
241        )?;
242        Ok(())
243    }
244
245    async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()> {
246        let conn = self.connection.conn.lock().unwrap();
247        conn.execute(
248            r#"
249            UPDATE dead_letter_queue
250            SET redrive_attempts = redrive_attempts + 1,
251                last_redrive_at = ?1
252            WHERE id = ?2
253            "#,
254            params![Utc::now().to_rfc3339(), id.to_string()],
255        )?;
256        Ok(())
257    }
258
259    async fn stats(&self) -> StorageResult<DlqStats> {
260        let conn = self.connection.conn.lock().unwrap();
261
262        let total_messages: u64 =
263            conn.query_row("SELECT COUNT(*) FROM dead_letter_queue", [], |row| {
264                row.get(0)
265            })?;
266
267        let messages_with_redrive_attempts: u64 = conn.query_row(
268            "SELECT COUNT(*) FROM dead_letter_queue WHERE redrive_attempts > 0",
269            [],
270            |row| row.get(0),
271        )?;
272
273        let oldest_message_at: Option<DateTime<Utc>> = conn
274            .query_row("SELECT MIN(created_at) FROM dead_letter_queue", [], |row| {
275                row.get::<_, Option<String>>(0)
276            })?
277            .map(|s| {
278                DateTime::parse_from_rfc3339(&s)
279                    .unwrap()
280                    .with_timezone(&Utc)
281            });
282
283        // Group by error category
284        let mut messages_by_category = HashMap::new();
285        let mut stmt = conn.prepare(
286            "SELECT error_category, COUNT(*) FROM dead_letter_queue GROUP BY error_category",
287        )?;
288        let rows = stmt.query_map([], |row| {
289            let category: String = row.get(0)?;
290            let count: u64 = row.get(1)?;
291            Ok((category, count))
292        })?;
293
294        for row in rows {
295            let (category, count) = row?;
296            messages_by_category.insert(category, count);
297        }
298
299        Ok(DlqStats {
300            total_messages,
301            messages_by_category,
302            messages_with_redrive_attempts,
303            oldest_message_at,
304        })
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use tempfile::tempdir;
312
313    async fn setup_dlq() -> SqliteDeadLetterQueue {
314        let dir = tempdir().unwrap();
315        let path = dir.path().join("test_dlq.db");
316        SqliteDeadLetterQueue::new_standalone(&path).await.unwrap()
317    }
318
319    fn dummy_dlq_record() -> DlqRecord {
320        DlqRecord {
321            id: Uuid::new_v4(),
322            original_message_id: Some("msg-123".to_string()),
323            from: Some(vec![1, 2, 3]),
324            to: Some(vec![4, 5, 6]),
325            raw_bytes: vec![0xDE, 0xAD, 0xBE, 0xEF],
326            error_message: "Protobuf decode failed: unexpected EOF".to_string(),
327            error_category: "protobuf_decode".to_string(),
328            trace_id: "trace-abc-123".to_string(),
329            request_id: Some("req-xyz".to_string()),
330            created_at: Utc::now(),
331            redrive_attempts: 0,
332            last_redrive_at: None,
333            context: Some(r#"{"transport": "webrtc"}"#.to_string()),
334        }
335    }
336
337    #[tokio::test]
338    async fn test_enqueue_and_get() {
339        let dlq = setup_dlq().await;
340        let record = dummy_dlq_record();
341        let id = record.id;
342
343        // Enqueue
344        dlq.enqueue(record.clone()).await.unwrap();
345
346        // Get
347        let retrieved = dlq.get(id).await.unwrap();
348        assert!(retrieved.is_some());
349        let retrieved = retrieved.unwrap();
350        assert_eq!(retrieved.id, id);
351        assert_eq!(retrieved.error_message, record.error_message);
352        assert_eq!(retrieved.error_category, record.error_category);
353        assert_eq!(retrieved.trace_id, record.trace_id);
354    }
355
356    #[tokio::test]
357    async fn test_query_by_category() {
358        let dlq = setup_dlq().await;
359
360        let mut record1 = dummy_dlq_record();
361        record1.error_category = "protobuf_decode".to_string();
362        dlq.enqueue(record1).await.unwrap();
363
364        let mut record2 = dummy_dlq_record();
365        record2.error_category = "corrupted_envelope".to_string();
366        dlq.enqueue(record2).await.unwrap();
367
368        let mut record3 = dummy_dlq_record();
369        record3.error_category = "protobuf_decode".to_string();
370        dlq.enqueue(record3).await.unwrap();
371
372        // Query by category
373        let query = DlqQuery {
374            error_category: Some("protobuf_decode".to_string()),
375            ..Default::default()
376        };
377        let results = dlq.query(query).await.unwrap();
378        assert_eq!(results.len(), 2);
379        assert!(
380            results
381                .iter()
382                .all(|r| r.error_category == "protobuf_decode")
383        );
384    }
385
386    #[tokio::test]
387    async fn test_redrive_attempt_tracking() {
388        let dlq = setup_dlq().await;
389        let record = dummy_dlq_record();
390        let id = record.id;
391
392        dlq.enqueue(record).await.unwrap();
393
394        // Initial state
395        let retrieved = dlq.get(id).await.unwrap().unwrap();
396        assert_eq!(retrieved.redrive_attempts, 0);
397        assert!(retrieved.last_redrive_at.is_none());
398
399        // Record redrive attempt
400        dlq.record_redrive_attempt(id).await.unwrap();
401
402        // Check updated state
403        let updated = dlq.get(id).await.unwrap().unwrap();
404        assert_eq!(updated.redrive_attempts, 1);
405        assert!(updated.last_redrive_at.is_some());
406    }
407
408    #[tokio::test]
409    async fn test_delete() {
410        let dlq = setup_dlq().await;
411        let record = dummy_dlq_record();
412        let id = record.id;
413
414        dlq.enqueue(record).await.unwrap();
415        assert!(dlq.get(id).await.unwrap().is_some());
416
417        dlq.delete(id).await.unwrap();
418        assert!(dlq.get(id).await.unwrap().is_none());
419    }
420
421    #[tokio::test]
422    async fn test_stats() {
423        let dlq = setup_dlq().await;
424
425        let mut record1 = dummy_dlq_record();
426        record1.error_category = "protobuf_decode".to_string();
427        dlq.enqueue(record1.clone()).await.unwrap();
428
429        let mut record2 = dummy_dlq_record();
430        record2.error_category = "corrupted_envelope".to_string();
431        dlq.enqueue(record2).await.unwrap();
432
433        // Record a redrive attempt
434        dlq.record_redrive_attempt(record1.id).await.unwrap();
435
436        let stats = dlq.stats().await.unwrap();
437        assert_eq!(stats.total_messages, 2);
438        assert_eq!(stats.messages_with_redrive_attempts, 1);
439        assert_eq!(
440            stats
441                .messages_by_category
442                .get("protobuf_decode")
443                .copied()
444                .unwrap_or(0),
445            1
446        );
447        assert_eq!(
448            stats
449                .messages_by_category
450                .get("corrupted_envelope")
451                .copied()
452                .unwrap_or(0),
453            1
454        );
455        assert!(stats.oldest_message_at.is_some());
456    }
457}