Skip to main content

roboticus_db/
delivery_queue.rs

1use chrono::{DateTime, NaiveDateTime, Utc};
2use rusqlite::params;
3
4use roboticus_core::Result;
5
6use crate::{Database, DbResultExt};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct DeliveryQueueRecord {
10    pub id: String,
11    pub channel: String,
12    pub recipient_id: String,
13    pub content: String,
14    pub status: String,
15    pub attempts: u32,
16    pub max_attempts: u32,
17    pub next_retry_at: DateTime<Utc>,
18    pub last_error: Option<String>,
19    pub idempotency_key: String,
20    pub created_at: DateTime<Utc>,
21}
22
23fn parse_db_ts(input: &str) -> Option<DateTime<Utc>> {
24    DateTime::parse_from_rfc3339(input)
25        .map(|dt| dt.with_timezone(&Utc))
26        .ok()
27        .or_else(|| {
28            NaiveDateTime::parse_from_str(input, "%Y-%m-%d %H:%M:%S")
29                .ok()
30                .map(|ndt| DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc))
31        })
32}
33
34pub fn upsert_delivery_item(db: &Database, item: &DeliveryQueueRecord) -> Result<()> {
35    let conn = db.conn();
36    conn.execute(
37        r#"
38        INSERT INTO delivery_queue (
39            id, channel, recipient_id, content, status, attempts, max_attempts,
40            next_retry_at, last_error, idempotency_key, created_at
41        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
42        ON CONFLICT(id) DO UPDATE SET
43            channel = excluded.channel,
44            recipient_id = excluded.recipient_id,
45            content = excluded.content,
46            status = excluded.status,
47            attempts = excluded.attempts,
48            max_attempts = excluded.max_attempts,
49            next_retry_at = excluded.next_retry_at,
50            last_error = excluded.last_error,
51            idempotency_key = excluded.idempotency_key
52        "#,
53        params![
54            item.id,
55            item.channel,
56            item.recipient_id,
57            item.content,
58            item.status,
59            item.attempts,
60            item.max_attempts,
61            item.next_retry_at.to_rfc3339(),
62            item.last_error,
63            item.idempotency_key,
64            item.created_at.to_rfc3339(),
65        ],
66    )
67    .db_err()?;
68    Ok(())
69}
70
71pub fn list_recoverable(db: &Database, max_items: usize) -> Result<Vec<DeliveryQueueRecord>> {
72    let conn = db.conn();
73    let mut stmt = conn
74        .prepare(
75            r#"
76            SELECT id, channel, recipient_id, content, status, attempts, max_attempts,
77                   next_retry_at, last_error, idempotency_key, created_at
78            FROM delivery_queue
79            WHERE status IN ('pending', 'in_flight')
80            ORDER BY next_retry_at ASC
81            LIMIT ?1
82            "#,
83        )
84        .db_err()?;
85
86    let rows = stmt
87        .query_map(params![max_items as i64], |row| {
88            let next_retry_raw: String = row.get(7)?;
89            let created_raw: String = row.get(10)?;
90            Ok(DeliveryQueueRecord {
91                id: row.get(0)?,
92                channel: row.get(1)?,
93                recipient_id: row.get(2)?,
94                content: row.get(3)?,
95                status: row.get(4)?,
96                attempts: row.get::<_, i64>(5)? as u32,
97                max_attempts: row.get::<_, i64>(6)? as u32,
98                next_retry_at: parse_db_ts(&next_retry_raw).unwrap_or_else(|| {
99                    tracing::warn!(raw = %next_retry_raw, "corrupt next_retry_at timestamp, using epoch");
100                    DateTime::<Utc>::UNIX_EPOCH
101                }),
102                last_error: row.get(8)?,
103                idempotency_key: row.get(9)?,
104                created_at: parse_db_ts(&created_raw).unwrap_or_else(Utc::now),
105            })
106        })
107        .db_err()?;
108
109    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
110}
111
112pub fn mark_delivered(db: &Database, id: &str) -> Result<()> {
113    let conn = db.conn();
114    conn.execute(
115        "UPDATE delivery_queue SET status = 'delivered', last_error = NULL WHERE id = ?1",
116        params![id],
117    )
118    .db_err()?;
119    Ok(())
120}
121
122pub fn mark_in_flight(db: &Database, id: &str) -> Result<()> {
123    let conn = db.conn();
124    conn.execute(
125        "UPDATE delivery_queue SET status = 'in_flight' WHERE id = ?1",
126        params![id],
127    )
128    .db_err()?;
129    Ok(())
130}
131
132pub fn list_dead_letters(db: &Database, max_items: usize) -> Result<Vec<DeliveryQueueRecord>> {
133    let conn = db.conn();
134    let mut stmt = conn
135        .prepare(
136            r#"
137            SELECT id, channel, recipient_id, content, status, attempts, max_attempts,
138                   next_retry_at, last_error, idempotency_key, created_at
139            FROM delivery_queue
140            WHERE status = 'dead_letter'
141            ORDER BY created_at DESC
142            LIMIT ?1
143            "#,
144        )
145        .db_err()?;
146
147    let rows = stmt
148        .query_map(params![max_items as i64], |row| {
149            let next_retry_raw: String = row.get(7)?;
150            let created_raw: String = row.get(10)?;
151            Ok(DeliveryQueueRecord {
152                id: row.get(0)?,
153                channel: row.get(1)?,
154                recipient_id: row.get(2)?,
155                content: row.get(3)?,
156                status: row.get(4)?,
157                attempts: row.get::<_, i64>(5)? as u32,
158                max_attempts: row.get::<_, i64>(6)? as u32,
159                next_retry_at: parse_db_ts(&next_retry_raw).unwrap_or_else(|| {
160                    tracing::warn!(raw = %next_retry_raw, "corrupt next_retry_at timestamp, using epoch");
161                    DateTime::<Utc>::UNIX_EPOCH
162                }),
163                last_error: row.get(8)?,
164                idempotency_key: row.get(9)?,
165                created_at: parse_db_ts(&created_raw).unwrap_or_else(Utc::now),
166            })
167        })
168        .db_err()?;
169
170    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
171}
172
173pub fn replay_dead_letter(db: &Database, id: &str) -> Result<bool> {
174    let conn = db.conn();
175    let rows = conn
176        .execute(
177            "UPDATE delivery_queue SET status = 'pending', next_retry_at = ?1 WHERE id = ?2 AND status = 'dead_letter'",
178            params![Utc::now().to_rfc3339(), id],
179        )
180        .db_err()?;
181    Ok(rows > 0)
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use chrono::Datelike;
188
189    #[test]
190    fn upsert_and_list_recoverable() {
191        let db = Database::new(":memory:").expect("db");
192        let item = DeliveryQueueRecord {
193            id: "d1".into(),
194            channel: "telegram".into(),
195            recipient_id: "u1".into(),
196            content: "hello".into(),
197            status: "pending".into(),
198            attempts: 0,
199            max_attempts: 5,
200            next_retry_at: Utc::now(),
201            last_error: None,
202            idempotency_key: "idem-1".into(),
203            created_at: Utc::now(),
204        };
205        upsert_delivery_item(&db, &item).expect("upsert");
206        let rows = list_recoverable(&db, 20).expect("load");
207        assert_eq!(rows.len(), 1);
208        assert_eq!(rows[0].id, "d1");
209    }
210
211    #[test]
212    fn mark_delivered_updates_status() {
213        let db = Database::new(":memory:").expect("db");
214        let item = DeliveryQueueRecord {
215            id: "d2".into(),
216            channel: "discord".into(),
217            recipient_id: "u2".into(),
218            content: "msg".into(),
219            status: "pending".into(),
220            attempts: 0,
221            max_attempts: 5,
222            next_retry_at: Utc::now(),
223            last_error: None,
224            idempotency_key: "idem-2".into(),
225            created_at: Utc::now(),
226        };
227        upsert_delivery_item(&db, &item).expect("upsert");
228        mark_delivered(&db, "d2").expect("mark delivered");
229        let rows = list_recoverable(&db, 20).expect("load");
230        assert!(rows.is_empty(), "delivered rows should not be recoverable");
231    }
232
233    #[test]
234    fn replay_dead_letter_moves_back_to_pending() {
235        let db = Database::new(":memory:").expect("db");
236        let item = DeliveryQueueRecord {
237            id: "d3".into(),
238            channel: "discord".into(),
239            recipient_id: "u2".into(),
240            content: "msg".into(),
241            status: "dead_letter".into(),
242            attempts: 5,
243            max_attempts: 5,
244            next_retry_at: Utc::now(),
245            last_error: Some("failed".into()),
246            idempotency_key: "idem-3".into(),
247            created_at: Utc::now(),
248        };
249        upsert_delivery_item(&db, &item).expect("upsert");
250        assert_eq!(list_dead_letters(&db, 10).expect("dead").len(), 1);
251        assert!(replay_dead_letter(&db, "d3").expect("replay"));
252        let recovered = list_recoverable(&db, 10).expect("recoverable");
253        assert_eq!(recovered.len(), 1);
254        assert_eq!(recovered[0].status, "pending");
255    }
256
257    #[test]
258    fn mark_in_flight_updates_status() {
259        let db = Database::new(":memory:").expect("db");
260        let item = DeliveryQueueRecord {
261            id: "d4".into(),
262            channel: "telegram".into(),
263            recipient_id: "u1".into(),
264            content: "hi".into(),
265            status: "pending".into(),
266            attempts: 0,
267            max_attempts: 5,
268            next_retry_at: Utc::now(),
269            last_error: None,
270            idempotency_key: "idem-4".into(),
271            created_at: Utc::now(),
272        };
273        upsert_delivery_item(&db, &item).unwrap();
274        mark_in_flight(&db, "d4").unwrap();
275
276        // in_flight items are still recoverable
277        let rows = list_recoverable(&db, 10).unwrap();
278        assert_eq!(rows.len(), 1);
279        assert_eq!(rows[0].status, "in_flight");
280    }
281
282    #[test]
283    fn mark_in_flight_nonexistent_is_noop() {
284        let db = Database::new(":memory:").expect("db");
285        mark_in_flight(&db, "nonexistent").unwrap();
286    }
287
288    #[test]
289    fn parse_db_ts_rfc3339() {
290        let ts = parse_db_ts("2025-06-01T12:00:00+00:00").unwrap();
291        assert_eq!(ts.year(), 2025);
292        assert_eq!(ts.month(), 6);
293    }
294
295    #[test]
296    fn parse_db_ts_sqlite_format() {
297        // SQLite default datetime('now') format: "YYYY-MM-DD HH:MM:SS"
298        let ts = parse_db_ts("2025-06-01 12:00:00").unwrap();
299        assert_eq!(ts.year(), 2025);
300        assert_eq!(ts.month(), 6);
301    }
302
303    #[test]
304    fn parse_db_ts_invalid_returns_none() {
305        assert!(parse_db_ts("not-a-date").is_none());
306        assert!(parse_db_ts("").is_none());
307    }
308
309    #[test]
310    fn list_dead_letters_empty() {
311        let db = Database::new(":memory:").expect("db");
312        let dead = list_dead_letters(&db, 10).unwrap();
313        assert!(dead.is_empty());
314    }
315
316    #[test]
317    fn list_recoverable_empty() {
318        let db = Database::new(":memory:").expect("db");
319        let rows = list_recoverable(&db, 10).unwrap();
320        assert!(rows.is_empty());
321    }
322
323    #[test]
324    fn replay_dead_letter_nonexistent_returns_false() {
325        let db = Database::new(":memory:").expect("db");
326        assert!(!replay_dead_letter(&db, "missing").unwrap());
327    }
328
329    #[test]
330    fn replay_non_dead_letter_returns_false() {
331        let db = Database::new(":memory:").expect("db");
332        let item = DeliveryQueueRecord {
333            id: "d5".into(),
334            channel: "email".into(),
335            recipient_id: "u1".into(),
336            content: "hello".into(),
337            status: "pending".into(),
338            attempts: 0,
339            max_attempts: 3,
340            next_retry_at: Utc::now(),
341            last_error: None,
342            idempotency_key: "idem-5".into(),
343            created_at: Utc::now(),
344        };
345        upsert_delivery_item(&db, &item).unwrap();
346        // Should not replay a pending item
347        assert!(!replay_dead_letter(&db, "d5").unwrap());
348    }
349
350    #[test]
351    fn upsert_updates_existing() {
352        let db = Database::new(":memory:").expect("db");
353        let mut item = DeliveryQueueRecord {
354            id: "d6".into(),
355            channel: "telegram".into(),
356            recipient_id: "u1".into(),
357            content: "first".into(),
358            status: "pending".into(),
359            attempts: 0,
360            max_attempts: 5,
361            next_retry_at: Utc::now(),
362            last_error: None,
363            idempotency_key: "idem-6".into(),
364            created_at: Utc::now(),
365        };
366        upsert_delivery_item(&db, &item).unwrap();
367
368        item.content = "updated".into();
369        item.attempts = 1;
370        item.last_error = Some("timeout".into());
371        upsert_delivery_item(&db, &item).unwrap();
372
373        let rows = list_recoverable(&db, 10).unwrap();
374        assert_eq!(rows.len(), 1);
375        assert_eq!(rows[0].content, "updated");
376        assert_eq!(rows[0].attempts, 1);
377        assert_eq!(rows[0].last_error.as_deref(), Some("timeout"));
378    }
379
380    #[test]
381    fn list_dead_letters_only_dead() {
382        let db = Database::new(":memory:").expect("db");
383        let pending = DeliveryQueueRecord {
384            id: "d7".into(),
385            channel: "email".into(),
386            recipient_id: "u1".into(),
387            content: "hi".into(),
388            status: "pending".into(),
389            attempts: 0,
390            max_attempts: 3,
391            next_retry_at: Utc::now(),
392            last_error: None,
393            idempotency_key: "idem-7".into(),
394            created_at: Utc::now(),
395        };
396        let dead = DeliveryQueueRecord {
397            id: "d8".into(),
398            channel: "email".into(),
399            recipient_id: "u2".into(),
400            content: "failed msg".into(),
401            status: "dead_letter".into(),
402            attempts: 5,
403            max_attempts: 5,
404            next_retry_at: Utc::now(),
405            last_error: Some("permanent failure".into()),
406            idempotency_key: "idem-8".into(),
407            created_at: Utc::now(),
408        };
409        upsert_delivery_item(&db, &pending).unwrap();
410        upsert_delivery_item(&db, &dead).unwrap();
411
412        let dead_items = list_dead_letters(&db, 10).unwrap();
413        assert_eq!(dead_items.len(), 1);
414        assert_eq!(dead_items[0].id, "d8");
415    }
416}