Skip to main content

actr_runtime_mailbox/
sqlite_dlq.rs

1//! SQLite implementation of Dead Letter Queue
2
3use crate::{
4    dlq::{DeadLetterQueue, DlqQuery, DlqRecord, DlqStats},
5    error::StorageResult,
6};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use rusqlite::{Connection, params};
10use std::{
11    collections::HashMap,
12    path::Path,
13    sync::{Arc, Mutex},
14};
15use uuid::Uuid;
16
17fn parse_uuid_col(row: &rusqlite::Row<'_>, col: usize) -> rusqlite::Result<Uuid> {
18    let s: String = row.get(col)?;
19    Uuid::parse_str(&s).map_err(|e| {
20        rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
21    })
22}
23
24fn parse_datetime_col(row: &rusqlite::Row<'_>, col: usize) -> rusqlite::Result<DateTime<Utc>> {
25    let s: String = row.get(col)?;
26    parse_rfc3339(col, &s)
27}
28
29fn parse_datetime_opt_col(
30    row: &rusqlite::Row<'_>,
31    col: usize,
32) -> rusqlite::Result<Option<DateTime<Utc>>> {
33    match row.get::<_, Option<String>>(col)? {
34        Some(s) => Ok(Some(parse_rfc3339(col, &s)?)),
35        None => Ok(None),
36    }
37}
38
39fn parse_rfc3339(col: usize, s: &str) -> rusqlite::Result<DateTime<Utc>> {
40    DateTime::parse_from_rfc3339(s)
41        .map(|dt| dt.with_timezone(&Utc))
42        .map_err(|e| {
43            rusqlite::Error::FromSqlConversionFailure(col, rusqlite::types::Type::Text, Box::new(e))
44        })
45}
46
47fn row_to_dlq_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<DlqRecord> {
48    Ok(DlqRecord {
49        id: parse_uuid_col(row, 0)?,
50        original_message_id: row.get(1)?,
51        from: row.get(2)?,
52        to: row.get(3)?,
53        raw_bytes: row.get(4)?,
54        error_message: row.get(5)?,
55        error_category: row.get(6)?,
56        trace_id: row.get(7)?,
57        request_id: row.get(8)?,
58        created_at: parse_datetime_col(row, 9)?,
59        redrive_attempts: row.get(10)?,
60        last_redrive_at: parse_datetime_opt_col(row, 11)?,
61        context: row.get(12)?,
62    })
63}
64
65/// SQLite connection wrapper for DLQ
66struct SqliteDlqConnection {
67    conn: Mutex<Connection>,
68}
69
70impl SqliteDlqConnection {
71    fn new(conn: Connection) -> StorageResult<Self> {
72        Self::create_tables(&conn)?;
73        Ok(Self {
74            conn: Mutex::new(conn),
75        })
76    }
77
78    fn create_tables(conn: &Connection) -> StorageResult<()> {
79        conn.execute_batch(
80            r#"
81            CREATE TABLE IF NOT EXISTS dead_letter_queue (
82                id TEXT PRIMARY KEY,
83                original_message_id TEXT,
84                from_actr_id BLOB,           -- Sender ActrId (Protobuf bytes, nullable)
85                to_actr_id BLOB,             -- Target ActrId (Protobuf bytes, nullable)
86                raw_bytes BLOB NOT NULL,     -- Complete original message for forensics
87                error_message TEXT NOT NULL, -- Human-readable error description
88                error_category TEXT NOT NULL,-- Error classification (e.g., "protobuf_decode")
89                trace_id TEXT NOT NULL,      -- Distributed trace ID
90                request_id TEXT,             -- Request ID (if available)
91                created_at TEXT NOT NULL,    -- Timestamp of DLQ entry
92                redrive_attempts INTEGER NOT NULL DEFAULT 0,
93                last_redrive_at TEXT,        -- Last redrive attempt timestamp
94                context TEXT                 -- JSON-encoded additional metadata
95            );
96
97            -- Index for common query patterns
98            CREATE INDEX IF NOT EXISTS idx_dlq_created_at ON dead_letter_queue(created_at DESC);
99            CREATE INDEX IF NOT EXISTS idx_dlq_error_category ON dead_letter_queue(error_category, created_at DESC);
100            CREATE INDEX IF NOT EXISTS idx_dlq_trace_id ON dead_letter_queue(trace_id);
101            "#,
102        )?;
103        Ok(())
104    }
105}
106
107/// SQLite-backed Dead Letter Queue
108pub struct SqliteDeadLetterQueue {
109    connection: Arc<SqliteDlqConnection>,
110}
111
112impl SqliteDeadLetterQueue {
113    /// Create a new DLQ using an existing database connection
114    ///
115    /// DLQ shares the same database file as mailbox but uses separate tables.
116    pub fn new(conn: Connection) -> StorageResult<Self> {
117        let connection = Arc::new(SqliteDlqConnection::new(conn)?);
118        Ok(Self { connection })
119    }
120
121    /// Create a new DLQ with a separate database file
122    pub async fn new_standalone<P: AsRef<Path>>(database_path: P) -> StorageResult<Self> {
123        let conn = Connection::open(database_path.as_ref())?;
124        conn.execute_batch("PRAGMA journal_mode = WAL;")?;
125        Self::new(conn)
126    }
127}
128
129#[async_trait]
130impl DeadLetterQueue for SqliteDeadLetterQueue {
131    async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid> {
132        let id = record.id;
133        let conn = self.connection.conn.lock().unwrap();
134
135        conn.execute(
136            r#"
137            INSERT INTO dead_letter_queue (
138                id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
139                error_message, error_category, trace_id, request_id,
140                created_at, redrive_attempts, last_redrive_at, context
141            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
142            "#,
143            params![
144                id.to_string(),
145                record.original_message_id,
146                record.from,
147                record.to,
148                record.raw_bytes,
149                record.error_message,
150                record.error_category,
151                record.trace_id,
152                record.request_id,
153                record.created_at.to_rfc3339(),
154                record.redrive_attempts,
155                record.last_redrive_at.map(|dt| dt.to_rfc3339()),
156                record.context,
157            ],
158        )?;
159
160        Ok(id)
161    }
162
163    async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>> {
164        let conn = self.connection.conn.lock().unwrap();
165
166        // Build dynamic query based on filters
167        let mut sql = String::from(
168            r#"
169            SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
170                   error_message, error_category, trace_id, request_id,
171                   created_at, redrive_attempts, last_redrive_at, context
172            FROM dead_letter_queue
173            WHERE 1=1
174            "#,
175        );
176
177        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
178
179        if let Some(ref category) = query.error_category {
180            sql.push_str(" AND error_category = ?");
181            params_vec.push(Box::new(category.clone()));
182        }
183
184        if let Some(ref trace_id) = query.trace_id {
185            sql.push_str(" AND trace_id = ?");
186            params_vec.push(Box::new(trace_id.clone()));
187        }
188
189        if let Some(ref from_bytes) = query.from {
190            sql.push_str(" AND from_actr_id = ?");
191            params_vec.push(Box::new(from_bytes.clone()));
192        }
193
194        if let Some(ref created_after) = query.created_after {
195            sql.push_str(" AND created_at > ?");
196            params_vec.push(Box::new(created_after.to_rfc3339()));
197        }
198
199        sql.push_str(" ORDER BY created_at DESC");
200
201        if let Some(limit) = query.limit {
202            sql.push_str(&format!(" LIMIT {limit}"));
203        }
204
205        let mut stmt = conn.prepare(&sql)?;
206
207        // Convert Vec<Box<dyn ToSql>> to params
208        let params_refs: Vec<&dyn rusqlite::ToSql> =
209            params_vec.iter().map(|b| b.as_ref()).collect();
210
211        let records = stmt
212            .query_map(params_refs.as_slice(), row_to_dlq_record)?
213            .collect::<Result<Vec<_>, _>>()?;
214
215        Ok(records)
216    }
217
218    async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>> {
219        let conn = self.connection.conn.lock().unwrap();
220
221        let result = conn.query_row(
222            r#"
223            SELECT id, original_message_id, from_actr_id, to_actr_id, raw_bytes,
224                   error_message, error_category, trace_id, request_id,
225                   created_at, redrive_attempts, last_redrive_at, context
226            FROM dead_letter_queue
227            WHERE id = ?1
228            "#,
229            params![id.to_string()],
230            row_to_dlq_record,
231        );
232
233        match result {
234            Ok(record) => Ok(Some(record)),
235            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
236            Err(e) => Err(e.into()),
237        }
238    }
239
240    async fn delete(&self, id: Uuid) -> StorageResult<()> {
241        let conn = self.connection.conn.lock().unwrap();
242        conn.execute(
243            "DELETE FROM dead_letter_queue WHERE id = ?1",
244            params![id.to_string()],
245        )?;
246        Ok(())
247    }
248
249    async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()> {
250        let conn = self.connection.conn.lock().unwrap();
251        conn.execute(
252            r#"
253            UPDATE dead_letter_queue
254            SET redrive_attempts = redrive_attempts + 1,
255                last_redrive_at = ?1
256            WHERE id = ?2
257            "#,
258            params![Utc::now().to_rfc3339(), id.to_string()],
259        )?;
260        Ok(())
261    }
262
263    async fn stats(&self) -> StorageResult<DlqStats> {
264        let conn = self.connection.conn.lock().unwrap();
265
266        let total_messages: u64 =
267            conn.query_row("SELECT COUNT(*) FROM dead_letter_queue", [], |row| {
268                row.get(0)
269            })?;
270
271        let messages_with_redrive_attempts: u64 = conn.query_row(
272            "SELECT COUNT(*) FROM dead_letter_queue WHERE redrive_attempts > 0",
273            [],
274            |row| row.get(0),
275        )?;
276
277        let oldest_message_at: Option<DateTime<Utc>> = conn
278            .query_row("SELECT MIN(created_at) FROM dead_letter_queue", [], |row| {
279                row.get::<_, Option<String>>(0)
280            })?
281            .map(|s| {
282                DateTime::parse_from_rfc3339(&s)
283                    .unwrap()
284                    .with_timezone(&Utc)
285            });
286
287        // Group by error category
288        let mut messages_by_category = HashMap::new();
289        let mut stmt = conn.prepare(
290            "SELECT error_category, COUNT(*) FROM dead_letter_queue GROUP BY error_category",
291        )?;
292        let rows = stmt.query_map([], |row| {
293            let category: String = row.get(0)?;
294            let count: u64 = row.get(1)?;
295            Ok((category, count))
296        })?;
297
298        for row in rows {
299            let (category, count) = row?;
300            messages_by_category.insert(category, count);
301        }
302
303        Ok(DlqStats {
304            total_messages,
305            messages_by_category,
306            messages_with_redrive_attempts,
307            oldest_message_at,
308        })
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use tempfile::tempdir;
316
317    async fn setup_dlq() -> SqliteDeadLetterQueue {
318        let dir = tempdir().unwrap();
319        let path = dir.path().join("test_dlq.db");
320        SqliteDeadLetterQueue::new_standalone(&path).await.unwrap()
321    }
322
323    fn dummy_dlq_record() -> DlqRecord {
324        DlqRecord {
325            id: Uuid::new_v4(),
326            original_message_id: Some("msg-123".to_string()),
327            from: Some(vec![1, 2, 3]),
328            to: Some(vec![4, 5, 6]),
329            raw_bytes: vec![0xDE, 0xAD, 0xBE, 0xEF],
330            error_message: "Protobuf decode failed: unexpected EOF".to_string(),
331            error_category: "protobuf_decode".to_string(),
332            trace_id: "trace-abc-123".to_string(),
333            request_id: Some("req-xyz".to_string()),
334            created_at: Utc::now(),
335            redrive_attempts: 0,
336            last_redrive_at: None,
337            context: Some(r#"{"transport": "webrtc"}"#.to_string()),
338        }
339    }
340
341    #[tokio::test]
342    async fn test_enqueue_and_get() {
343        let dlq = setup_dlq().await;
344        let record = dummy_dlq_record();
345        let id = record.id;
346
347        // Enqueue
348        dlq.enqueue(record.clone()).await.unwrap();
349
350        // Get
351        let retrieved = dlq.get(id).await.unwrap();
352        assert!(retrieved.is_some());
353        let retrieved = retrieved.unwrap();
354        assert_eq!(retrieved.id, id);
355        assert_eq!(retrieved.error_message, record.error_message);
356        assert_eq!(retrieved.error_category, record.error_category);
357        assert_eq!(retrieved.trace_id, record.trace_id);
358    }
359
360    #[tokio::test]
361    async fn test_query_by_category() {
362        let dlq = setup_dlq().await;
363
364        let mut record1 = dummy_dlq_record();
365        record1.error_category = "protobuf_decode".to_string();
366        dlq.enqueue(record1).await.unwrap();
367
368        let mut record2 = dummy_dlq_record();
369        record2.error_category = "corrupted_envelope".to_string();
370        dlq.enqueue(record2).await.unwrap();
371
372        let mut record3 = dummy_dlq_record();
373        record3.error_category = "protobuf_decode".to_string();
374        dlq.enqueue(record3).await.unwrap();
375
376        // Query by category
377        let query = DlqQuery {
378            error_category: Some("protobuf_decode".to_string()),
379            ..Default::default()
380        };
381        let results = dlq.query(query).await.unwrap();
382        assert_eq!(results.len(), 2);
383        assert!(
384            results
385                .iter()
386                .all(|r| r.error_category == "protobuf_decode")
387        );
388    }
389
390    #[tokio::test]
391    async fn test_redrive_attempt_tracking() {
392        let dlq = setup_dlq().await;
393        let record = dummy_dlq_record();
394        let id = record.id;
395
396        dlq.enqueue(record).await.unwrap();
397
398        // Initial state
399        let retrieved = dlq.get(id).await.unwrap().unwrap();
400        assert_eq!(retrieved.redrive_attempts, 0);
401        assert!(retrieved.last_redrive_at.is_none());
402
403        // Record redrive attempt
404        dlq.record_redrive_attempt(id).await.unwrap();
405
406        // Check updated state
407        let updated = dlq.get(id).await.unwrap().unwrap();
408        assert_eq!(updated.redrive_attempts, 1);
409        assert!(updated.last_redrive_at.is_some());
410    }
411
412    #[tokio::test]
413    async fn test_delete() {
414        let dlq = setup_dlq().await;
415        let record = dummy_dlq_record();
416        let id = record.id;
417
418        dlq.enqueue(record).await.unwrap();
419        assert!(dlq.get(id).await.unwrap().is_some());
420
421        dlq.delete(id).await.unwrap();
422        assert!(dlq.get(id).await.unwrap().is_none());
423    }
424
425    #[tokio::test]
426    async fn test_stats() {
427        let dlq = setup_dlq().await;
428
429        let mut record1 = dummy_dlq_record();
430        record1.error_category = "protobuf_decode".to_string();
431        dlq.enqueue(record1.clone()).await.unwrap();
432
433        let mut record2 = dummy_dlq_record();
434        record2.error_category = "corrupted_envelope".to_string();
435        dlq.enqueue(record2).await.unwrap();
436
437        // Record a redrive attempt
438        dlq.record_redrive_attempt(record1.id).await.unwrap();
439
440        let stats = dlq.stats().await.unwrap();
441        assert_eq!(stats.total_messages, 2);
442        assert_eq!(stats.messages_with_redrive_attempts, 1);
443        assert_eq!(
444            stats
445                .messages_by_category
446                .get("protobuf_decode")
447                .copied()
448                .unwrap_or(0),
449            1
450        );
451        assert_eq!(
452            stats
453                .messages_by_category
454                .get("corrupted_envelope")
455                .copied()
456                .unwrap_or(0),
457            1
458        );
459        assert!(stats.oldest_message_at.is_some());
460    }
461}