1use chrono::{DateTime, NaiveDateTime, Utc};
2use rusqlite::params;
3
4use roboticus_core::Result;
5
6use crate::{Database, DbResultExt};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct DeliveryQueueRecord {
10 pub id: String,
11 pub channel: String,
12 pub recipient_id: String,
13 pub content: String,
14 pub status: String,
15 pub attempts: u32,
16 pub max_attempts: u32,
17 pub next_retry_at: DateTime<Utc>,
18 pub last_error: Option<String>,
19 pub idempotency_key: String,
20 pub created_at: DateTime<Utc>,
21}
22
23fn parse_db_ts(input: &str) -> Option<DateTime<Utc>> {
24 DateTime::parse_from_rfc3339(input)
25 .map(|dt| dt.with_timezone(&Utc))
26 .ok()
27 .or_else(|| {
28 NaiveDateTime::parse_from_str(input, "%Y-%m-%d %H:%M:%S")
29 .ok()
30 .map(|ndt| DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc))
31 })
32}
33
34pub fn upsert_delivery_item(db: &Database, item: &DeliveryQueueRecord) -> Result<()> {
35 let conn = db.conn();
36 conn.execute(
37 r#"
38 INSERT INTO delivery_queue (
39 id, channel, recipient_id, content, status, attempts, max_attempts,
40 next_retry_at, last_error, idempotency_key, created_at
41 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
42 ON CONFLICT(id) DO UPDATE SET
43 channel = excluded.channel,
44 recipient_id = excluded.recipient_id,
45 content = excluded.content,
46 status = excluded.status,
47 attempts = excluded.attempts,
48 max_attempts = excluded.max_attempts,
49 next_retry_at = excluded.next_retry_at,
50 last_error = excluded.last_error,
51 idempotency_key = excluded.idempotency_key
52 "#,
53 params![
54 item.id,
55 item.channel,
56 item.recipient_id,
57 item.content,
58 item.status,
59 item.attempts,
60 item.max_attempts,
61 item.next_retry_at.to_rfc3339(),
62 item.last_error,
63 item.idempotency_key,
64 item.created_at.to_rfc3339(),
65 ],
66 )
67 .db_err()?;
68 Ok(())
69}
70
71pub fn list_recoverable(db: &Database, max_items: usize) -> Result<Vec<DeliveryQueueRecord>> {
72 let conn = db.conn();
73 let mut stmt = conn
74 .prepare(
75 r#"
76 SELECT id, channel, recipient_id, content, status, attempts, max_attempts,
77 next_retry_at, last_error, idempotency_key, created_at
78 FROM delivery_queue
79 WHERE status IN ('pending', 'in_flight')
80 ORDER BY next_retry_at ASC
81 LIMIT ?1
82 "#,
83 )
84 .db_err()?;
85
86 let rows = stmt
87 .query_map(params![max_items as i64], |row| {
88 let next_retry_raw: String = row.get(7)?;
89 let created_raw: String = row.get(10)?;
90 Ok(DeliveryQueueRecord {
91 id: row.get(0)?,
92 channel: row.get(1)?,
93 recipient_id: row.get(2)?,
94 content: row.get(3)?,
95 status: row.get(4)?,
96 attempts: row.get::<_, i64>(5)? as u32,
97 max_attempts: row.get::<_, i64>(6)? as u32,
98 next_retry_at: parse_db_ts(&next_retry_raw).unwrap_or_else(|| {
99 tracing::warn!(raw = %next_retry_raw, "corrupt next_retry_at timestamp, using epoch");
100 DateTime::<Utc>::UNIX_EPOCH
101 }),
102 last_error: row.get(8)?,
103 idempotency_key: row.get(9)?,
104 created_at: parse_db_ts(&created_raw).unwrap_or_else(Utc::now),
105 })
106 })
107 .db_err()?;
108
109 rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
110}
111
112pub fn mark_delivered(db: &Database, id: &str) -> Result<()> {
113 let conn = db.conn();
114 conn.execute(
115 "UPDATE delivery_queue SET status = 'delivered', last_error = NULL WHERE id = ?1",
116 params![id],
117 )
118 .db_err()?;
119 Ok(())
120}
121
122pub fn mark_in_flight(db: &Database, id: &str) -> Result<()> {
123 let conn = db.conn();
124 conn.execute(
125 "UPDATE delivery_queue SET status = 'in_flight' WHERE id = ?1",
126 params![id],
127 )
128 .db_err()?;
129 Ok(())
130}
131
132pub fn list_dead_letters(db: &Database, max_items: usize) -> Result<Vec<DeliveryQueueRecord>> {
133 let conn = db.conn();
134 let mut stmt = conn
135 .prepare(
136 r#"
137 SELECT id, channel, recipient_id, content, status, attempts, max_attempts,
138 next_retry_at, last_error, idempotency_key, created_at
139 FROM delivery_queue
140 WHERE status = 'dead_letter'
141 ORDER BY created_at DESC
142 LIMIT ?1
143 "#,
144 )
145 .db_err()?;
146
147 let rows = stmt
148 .query_map(params![max_items as i64], |row| {
149 let next_retry_raw: String = row.get(7)?;
150 let created_raw: String = row.get(10)?;
151 Ok(DeliveryQueueRecord {
152 id: row.get(0)?,
153 channel: row.get(1)?,
154 recipient_id: row.get(2)?,
155 content: row.get(3)?,
156 status: row.get(4)?,
157 attempts: row.get::<_, i64>(5)? as u32,
158 max_attempts: row.get::<_, i64>(6)? as u32,
159 next_retry_at: parse_db_ts(&next_retry_raw).unwrap_or_else(|| {
160 tracing::warn!(raw = %next_retry_raw, "corrupt next_retry_at timestamp, using epoch");
161 DateTime::<Utc>::UNIX_EPOCH
162 }),
163 last_error: row.get(8)?,
164 idempotency_key: row.get(9)?,
165 created_at: parse_db_ts(&created_raw).unwrap_or_else(Utc::now),
166 })
167 })
168 .db_err()?;
169
170 rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
171}
172
173pub fn replay_dead_letter(db: &Database, id: &str) -> Result<bool> {
174 let conn = db.conn();
175 let rows = conn
176 .execute(
177 "UPDATE delivery_queue SET status = 'pending', next_retry_at = ?1 WHERE id = ?2 AND status = 'dead_letter'",
178 params![Utc::now().to_rfc3339(), id],
179 )
180 .db_err()?;
181 Ok(rows > 0)
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use chrono::Datelike;
188
189 #[test]
190 fn upsert_and_list_recoverable() {
191 let db = Database::new(":memory:").expect("db");
192 let item = DeliveryQueueRecord {
193 id: "d1".into(),
194 channel: "telegram".into(),
195 recipient_id: "u1".into(),
196 content: "hello".into(),
197 status: "pending".into(),
198 attempts: 0,
199 max_attempts: 5,
200 next_retry_at: Utc::now(),
201 last_error: None,
202 idempotency_key: "idem-1".into(),
203 created_at: Utc::now(),
204 };
205 upsert_delivery_item(&db, &item).expect("upsert");
206 let rows = list_recoverable(&db, 20).expect("load");
207 assert_eq!(rows.len(), 1);
208 assert_eq!(rows[0].id, "d1");
209 }
210
211 #[test]
212 fn mark_delivered_updates_status() {
213 let db = Database::new(":memory:").expect("db");
214 let item = DeliveryQueueRecord {
215 id: "d2".into(),
216 channel: "discord".into(),
217 recipient_id: "u2".into(),
218 content: "msg".into(),
219 status: "pending".into(),
220 attempts: 0,
221 max_attempts: 5,
222 next_retry_at: Utc::now(),
223 last_error: None,
224 idempotency_key: "idem-2".into(),
225 created_at: Utc::now(),
226 };
227 upsert_delivery_item(&db, &item).expect("upsert");
228 mark_delivered(&db, "d2").expect("mark delivered");
229 let rows = list_recoverable(&db, 20).expect("load");
230 assert!(rows.is_empty(), "delivered rows should not be recoverable");
231 }
232
233 #[test]
234 fn replay_dead_letter_moves_back_to_pending() {
235 let db = Database::new(":memory:").expect("db");
236 let item = DeliveryQueueRecord {
237 id: "d3".into(),
238 channel: "discord".into(),
239 recipient_id: "u2".into(),
240 content: "msg".into(),
241 status: "dead_letter".into(),
242 attempts: 5,
243 max_attempts: 5,
244 next_retry_at: Utc::now(),
245 last_error: Some("failed".into()),
246 idempotency_key: "idem-3".into(),
247 created_at: Utc::now(),
248 };
249 upsert_delivery_item(&db, &item).expect("upsert");
250 assert_eq!(list_dead_letters(&db, 10).expect("dead").len(), 1);
251 assert!(replay_dead_letter(&db, "d3").expect("replay"));
252 let recovered = list_recoverable(&db, 10).expect("recoverable");
253 assert_eq!(recovered.len(), 1);
254 assert_eq!(recovered[0].status, "pending");
255 }
256
257 #[test]
258 fn mark_in_flight_updates_status() {
259 let db = Database::new(":memory:").expect("db");
260 let item = DeliveryQueueRecord {
261 id: "d4".into(),
262 channel: "telegram".into(),
263 recipient_id: "u1".into(),
264 content: "hi".into(),
265 status: "pending".into(),
266 attempts: 0,
267 max_attempts: 5,
268 next_retry_at: Utc::now(),
269 last_error: None,
270 idempotency_key: "idem-4".into(),
271 created_at: Utc::now(),
272 };
273 upsert_delivery_item(&db, &item).unwrap();
274 mark_in_flight(&db, "d4").unwrap();
275
276 let rows = list_recoverable(&db, 10).unwrap();
278 assert_eq!(rows.len(), 1);
279 assert_eq!(rows[0].status, "in_flight");
280 }
281
282 #[test]
283 fn mark_in_flight_nonexistent_is_noop() {
284 let db = Database::new(":memory:").expect("db");
285 mark_in_flight(&db, "nonexistent").unwrap();
286 }
287
288 #[test]
289 fn parse_db_ts_rfc3339() {
290 let ts = parse_db_ts("2025-06-01T12:00:00+00:00").unwrap();
291 assert_eq!(ts.year(), 2025);
292 assert_eq!(ts.month(), 6);
293 }
294
295 #[test]
296 fn parse_db_ts_sqlite_format() {
297 let ts = parse_db_ts("2025-06-01 12:00:00").unwrap();
299 assert_eq!(ts.year(), 2025);
300 assert_eq!(ts.month(), 6);
301 }
302
303 #[test]
304 fn parse_db_ts_invalid_returns_none() {
305 assert!(parse_db_ts("not-a-date").is_none());
306 assert!(parse_db_ts("").is_none());
307 }
308
309 #[test]
310 fn list_dead_letters_empty() {
311 let db = Database::new(":memory:").expect("db");
312 let dead = list_dead_letters(&db, 10).unwrap();
313 assert!(dead.is_empty());
314 }
315
316 #[test]
317 fn list_recoverable_empty() {
318 let db = Database::new(":memory:").expect("db");
319 let rows = list_recoverable(&db, 10).unwrap();
320 assert!(rows.is_empty());
321 }
322
323 #[test]
324 fn replay_dead_letter_nonexistent_returns_false() {
325 let db = Database::new(":memory:").expect("db");
326 assert!(!replay_dead_letter(&db, "missing").unwrap());
327 }
328
329 #[test]
330 fn replay_non_dead_letter_returns_false() {
331 let db = Database::new(":memory:").expect("db");
332 let item = DeliveryQueueRecord {
333 id: "d5".into(),
334 channel: "email".into(),
335 recipient_id: "u1".into(),
336 content: "hello".into(),
337 status: "pending".into(),
338 attempts: 0,
339 max_attempts: 3,
340 next_retry_at: Utc::now(),
341 last_error: None,
342 idempotency_key: "idem-5".into(),
343 created_at: Utc::now(),
344 };
345 upsert_delivery_item(&db, &item).unwrap();
346 assert!(!replay_dead_letter(&db, "d5").unwrap());
348 }
349
350 #[test]
351 fn upsert_updates_existing() {
352 let db = Database::new(":memory:").expect("db");
353 let mut item = DeliveryQueueRecord {
354 id: "d6".into(),
355 channel: "telegram".into(),
356 recipient_id: "u1".into(),
357 content: "first".into(),
358 status: "pending".into(),
359 attempts: 0,
360 max_attempts: 5,
361 next_retry_at: Utc::now(),
362 last_error: None,
363 idempotency_key: "idem-6".into(),
364 created_at: Utc::now(),
365 };
366 upsert_delivery_item(&db, &item).unwrap();
367
368 item.content = "updated".into();
369 item.attempts = 1;
370 item.last_error = Some("timeout".into());
371 upsert_delivery_item(&db, &item).unwrap();
372
373 let rows = list_recoverable(&db, 10).unwrap();
374 assert_eq!(rows.len(), 1);
375 assert_eq!(rows[0].content, "updated");
376 assert_eq!(rows[0].attempts, 1);
377 assert_eq!(rows[0].last_error.as_deref(), Some("timeout"));
378 }
379
380 #[test]
381 fn list_dead_letters_only_dead() {
382 let db = Database::new(":memory:").expect("db");
383 let pending = DeliveryQueueRecord {
384 id: "d7".into(),
385 channel: "email".into(),
386 recipient_id: "u1".into(),
387 content: "hi".into(),
388 status: "pending".into(),
389 attempts: 0,
390 max_attempts: 3,
391 next_retry_at: Utc::now(),
392 last_error: None,
393 idempotency_key: "idem-7".into(),
394 created_at: Utc::now(),
395 };
396 let dead = DeliveryQueueRecord {
397 id: "d8".into(),
398 channel: "email".into(),
399 recipient_id: "u2".into(),
400 content: "failed msg".into(),
401 status: "dead_letter".into(),
402 attempts: 5,
403 max_attempts: 5,
404 next_retry_at: Utc::now(),
405 last_error: Some("permanent failure".into()),
406 idempotency_key: "idem-8".into(),
407 created_at: Utc::now(),
408 };
409 upsert_delivery_item(&db, &pending).unwrap();
410 upsert_delivery_item(&db, &dead).unwrap();
411
412 let dead_items = list_dead_letters(&db, 10).unwrap();
413 assert_eq!(dead_items.len(), 1);
414 assert_eq!(dead_items[0].id, "d8");
415 }
416}