1use actr_runtime_mailbox::{
11 DeadLetterQueue,
12 dlq::{DlqQuery, DlqRecord},
13 mailbox::{Mailbox, MessagePriority},
14 sqlite::SqliteMailbox,
15 sqlite_dlq::SqliteDeadLetterQueue,
16};
17use anyhow::{Context, Result, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::{Args, Subcommand};
21use std::path::{Path, PathBuf};
22use uuid::Uuid;
23
24use crate::core::{Command, CommandContext, CommandResult, ComponentType};
25
26const DEFAULT_DB_PATH: &str = "actr-data/dlq.db";
27const DEFAULT_MAILBOX_PATH: &str = "actr-data/mailbox.db";
28const DEFAULT_LIST_LIMIT: u32 = 20;
29
30#[derive(Args, Debug)]
31pub struct DlqArgs {
32 #[command(subcommand)]
33 pub command: DlqCommand,
34}
35
36#[derive(Subcommand, Debug)]
37pub enum DlqCommand {
38 List(DlqListArgs),
40 Show(DlqShowArgs),
42 Stats(DlqStatsArgs),
44 Replay(DlqReplayArgs),
46 Purge(DlqPurgeArgs),
48}
49
50#[derive(Args, Debug)]
51pub struct DlqListArgs {
52 #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
54 pub db: PathBuf,
55 #[arg(long, default_value_t = DEFAULT_LIST_LIMIT)]
57 pub limit: u32,
58 #[arg(long, value_name = "CATEGORY")]
60 pub category: Option<String>,
61 #[arg(long, value_name = "RFC3339")]
63 pub after: Option<String>,
64}
65
66#[derive(Args, Debug)]
67pub struct DlqShowArgs {
68 #[arg(value_name = "ID")]
70 pub id: String,
71 #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
73 pub db: PathBuf,
74}
75
76#[derive(Args, Debug)]
77pub struct DlqStatsArgs {
78 #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
80 pub db: PathBuf,
81}
82
83#[derive(Args, Debug)]
84pub struct DlqReplayArgs {
85 #[arg(value_name = "ID")]
87 pub id: String,
88 #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
90 pub db: PathBuf,
91 #[arg(long, default_value = DEFAULT_MAILBOX_PATH, value_name = "FILE")]
93 pub mailbox: PathBuf,
94 #[arg(long)]
96 pub keep: bool,
97}
98
99#[derive(Args, Debug)]
100pub struct DlqPurgeArgs {
101 #[arg(value_name = "ID")]
103 pub id: Option<String>,
104 #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
106 pub db: PathBuf,
107 #[arg(long, conflicts_with = "id")]
109 pub all: bool,
110 #[arg(long, value_name = "CATEGORY", requires = "all")]
112 pub category: Option<String>,
113 #[arg(long, value_name = "RFC3339", requires = "all")]
115 pub before: Option<String>,
116}
117
118#[async_trait]
119impl Command for DlqArgs {
120 async fn execute(&self, _ctx: &CommandContext) -> Result<CommandResult> {
121 match &self.command {
122 DlqCommand::List(a) => cmd_list(a).await?,
123 DlqCommand::Show(a) => cmd_show(a).await?,
124 DlqCommand::Stats(a) => cmd_stats(a).await?,
125 DlqCommand::Replay(a) => cmd_replay(a).await?,
126 DlqCommand::Purge(a) => cmd_purge(a).await?,
127 }
128 Ok(CommandResult::Success(String::new()))
129 }
130
131 fn required_components(&self) -> Vec<ComponentType> {
132 vec![]
133 }
134
135 fn name(&self) -> &str {
136 "dlq"
137 }
138
139 fn description(&self) -> &str {
140 "Dead Letter Queue inspection and remediation"
141 }
142}
143
144async fn open_dlq(db: &Path) -> Result<SqliteDeadLetterQueue> {
147 SqliteDeadLetterQueue::new_standalone(db)
148 .await
149 .with_context(|| format!("Failed to open DLQ database at {}", db.display()))
150}
151
152fn parse_id(raw: &str) -> Result<Uuid> {
153 Uuid::parse_str(raw).with_context(|| format!("Invalid UUID: '{raw}'"))
154}
155
156fn truncate(s: &str, max: usize) -> String {
157 if s.len() <= max {
158 s.to_string()
159 } else {
160 format!("{}…", &s[..max])
161 }
162}
163
164fn print_record_summary(r: &DlqRecord) {
165 println!(
166 "{id} {ts} {cat:<24} {msg}",
167 id = r.id,
168 ts = r.created_at.format("%Y-%m-%d %H:%M:%SZ"),
169 cat = r.error_category,
170 msg = truncate(&r.error_message, 60),
171 );
172}
173
174fn print_record_detail(r: &DlqRecord) {
175 println!("ID: {}", r.id);
176 println!("Created at: {}", r.created_at.to_rfc3339());
177 println!("Error category: {}", r.error_category);
178 println!("Error message: {}", r.error_message);
179 println!("Trace ID: {}", r.trace_id);
180 if let Some(ref rid) = r.request_id {
181 println!("Request ID: {rid}");
182 }
183 if let Some(ref mid) = r.original_message_id {
184 println!("Message ID: {mid}");
185 }
186 println!("Raw bytes: {} bytes", r.raw_bytes.len());
187 if !r.raw_bytes.is_empty() {
188 let preview: String = r
189 .raw_bytes
190 .iter()
191 .take(32)
192 .map(|b| format!("{b:02x}"))
193 .collect::<Vec<_>>()
194 .join(" ");
195 let suffix = if r.raw_bytes.len() > 32 { " …" } else { "" };
196 println!(" {preview}{suffix}");
197 }
198 println!("Redrive attempts:{}", r.redrive_attempts);
199 if let Some(ref ts) = r.last_redrive_at {
200 println!("Last redrive: {}", ts.to_rfc3339());
201 }
202 if let Some(ref ctx) = r.context {
203 println!("Context: {ctx}");
204 }
205}
206
207fn parse_rfc3339(s: &str, flag: &str) -> Result<chrono::DateTime<chrono::Utc>> {
208 DateTime::parse_from_rfc3339(s)
209 .map(|dt| dt.to_utc())
210 .with_context(|| {
211 format!("{flag} must be a valid RFC 3339 timestamp (e.g. 2026-01-01T00:00:00Z)")
212 })
213}
214
215async fn cmd_list(args: &DlqListArgs) -> Result<()> {
218 let dlq = open_dlq(&args.db).await?;
219
220 let after = args
221 .after
222 .as_deref()
223 .map(|s| parse_rfc3339(s, "--after"))
224 .transpose()?;
225
226 let query = DlqQuery {
227 error_category: args.category.clone(),
228 limit: Some(args.limit),
229 created_after: after,
230 ..Default::default()
231 };
232
233 let records = dlq.query(query).await.context("DLQ query failed")?;
234
235 if records.is_empty() {
236 println!("DLQ is empty (no matching records).");
237 return Ok(());
238 }
239
240 println!(
241 "{:<36} {:<20} {:<24} Error",
242 "ID", "Created at", "Category"
243 );
244 println!("{}", "-".repeat(110));
245 for r in &records {
246 print_record_summary(r);
247 }
248 println!("\n{} record(s) shown (limit={})", records.len(), args.limit);
249 Ok(())
250}
251
252async fn cmd_show(args: &DlqShowArgs) -> Result<()> {
253 let id = parse_id(&args.id)?;
254 let dlq = open_dlq(&args.db).await?;
255 match dlq.get(id).await.context("DLQ get failed")? {
256 Some(r) => {
257 print_record_detail(&r);
258 Ok(())
259 }
260 None => bail!("No DLQ record found with ID: {id}"),
261 }
262}
263
264async fn cmd_stats(args: &DlqStatsArgs) -> Result<()> {
265 let dlq = open_dlq(&args.db).await?;
266 let stats = dlq.stats().await.context("DLQ stats failed")?;
267
268 println!("DLQ Statistics");
269 println!(" Total messages: {}", stats.total_messages);
270 println!(
271 " With redrive attempts: {}",
272 stats.messages_with_redrive_attempts
273 );
274 if let Some(ts) = stats.oldest_message_at {
275 println!(" Oldest message: {}", ts.to_rfc3339());
276 }
277 if !stats.messages_by_category.is_empty() {
278 println!(" By category:");
279 let mut cats: Vec<_> = stats.messages_by_category.iter().collect();
280 cats.sort_by(|a, b| b.1.cmp(a.1));
281 for (cat, count) in cats {
282 println!(" {cat:<30} {count}");
283 }
284 }
285 Ok(())
286}
287
288async fn cmd_replay(args: &DlqReplayArgs) -> Result<()> {
289 let id = parse_id(&args.id)?;
290 let dlq = open_dlq(&args.db).await?;
291 let record = dlq
292 .get(id)
293 .await
294 .context("DLQ get failed")?
295 .ok_or_else(|| anyhow::anyhow!("No DLQ record found with ID: {id}"))?;
296
297 if !args.mailbox.exists() {
298 bail!(
299 "Target mailbox file does not exist: {}\n\
300 Specify a different path with --mailbox.",
301 args.mailbox.display()
302 );
303 }
304
305 let from = record.from.clone().ok_or_else(|| {
306 anyhow::anyhow!("DLQ record {id} has no 'from' ActrId; cannot re-enqueue without a sender.")
307 })?;
308
309 let mailbox = SqliteMailbox::new(&args.mailbox)
310 .await
311 .with_context(|| format!("Failed to open mailbox: {}", args.mailbox.display()))?;
312
313 let msg_id = mailbox
314 .enqueue(from, record.raw_bytes.clone(), MessagePriority::Normal)
315 .await
316 .context("Failed to re-enqueue into mailbox")?;
317
318 dlq.record_redrive_attempt(id)
319 .await
320 .context("Failed to record redrive attempt")?;
321
322 if args.keep {
323 println!(
324 "Replayed DLQ record {id} into {} as message {msg_id} (kept in DLQ).",
325 args.mailbox.display()
326 );
327 } else {
328 dlq.delete(id)
329 .await
330 .context("Failed to delete DLQ record")?;
331 println!(
332 "Replayed DLQ record {id} into {} as message {msg_id} and removed from DLQ.",
333 args.mailbox.display()
334 );
335 }
336 Ok(())
337}
338
339async fn cmd_purge(args: &DlqPurgeArgs) -> Result<()> {
340 let dlq = open_dlq(&args.db).await?;
341
342 if let Some(id) = &args.id {
343 let uuid = parse_id(id)?;
344 if dlq.get(uuid).await.context("DLQ get failed")?.is_none() {
345 bail!("No DLQ record found with ID: {uuid}");
346 }
347 dlq.delete(uuid).await.context("DLQ delete failed")?;
348 println!("Purged DLQ record: {uuid}");
349 return Ok(());
350 }
351
352 if !args.all {
353 bail!("Specify a record ID, or pass --all (optionally with --category / --before).");
354 }
355
356 let before = args
357 .before
358 .as_deref()
359 .map(|s| parse_rfc3339(s, "--before"))
360 .transpose()?;
361
362 let query = DlqQuery {
365 error_category: args.category.clone(),
366 limit: None,
367 created_after: None,
368 ..Default::default()
369 };
370 let records = dlq.query(query).await.context("DLQ query failed")?;
371
372 let mut purged = 0usize;
373 for r in records {
374 if let Some(cutoff) = before
375 && r.created_at >= cutoff
376 {
377 continue;
378 }
379 dlq.delete(r.id).await.context("DLQ delete failed")?;
380 purged += 1;
381 }
382
383 println!("Purged {purged} DLQ record(s).");
384 Ok(())
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use actr_runtime_mailbox::{dlq::DlqRecord, sqlite_dlq::SqliteDeadLetterQueue};
391 use chrono::Utc;
392 use tempfile::tempdir;
393
394 async fn make_dlq() -> (SqliteDeadLetterQueue, tempfile::TempDir) {
395 let dir = tempdir().unwrap();
396 let path = dir.path().join("dlq.db");
397 let dlq = SqliteDeadLetterQueue::new_standalone(&path).await.unwrap();
398 (dlq, dir)
399 }
400
401 fn sample_record(category: &str, msg: &str) -> DlqRecord {
402 DlqRecord {
403 id: uuid::Uuid::new_v4(),
404 original_message_id: None,
405 from: Some(b"sender-actr-id".to_vec()),
406 to: None,
407 raw_bytes: b"bad bytes".to_vec(),
408 error_message: msg.to_string(),
409 error_category: category.to_string(),
410 trace_id: uuid::Uuid::new_v4().to_string(),
411 request_id: None,
412 created_at: Utc::now(),
413 redrive_attempts: 0,
414 last_redrive_at: None,
415 context: None,
416 }
417 }
418
419 #[tokio::test]
420 async fn list_returns_records() {
421 let (dlq, dir) = make_dlq().await;
422 dlq.enqueue(sample_record("decode", "bad proto"))
423 .await
424 .unwrap();
425 let db = dir.path().join("dlq.db");
426 cmd_list(&DlqListArgs {
427 db,
428 limit: 10,
429 category: None,
430 after: None,
431 })
432 .await
433 .unwrap();
434 }
435
436 #[tokio::test]
437 async fn show_missing_record_returns_error() {
438 let (_dlq, dir) = make_dlq().await;
439 let db = dir.path().join("dlq.db");
440 assert!(
441 cmd_show(&DlqShowArgs {
442 id: Uuid::new_v4().to_string(),
443 db,
444 })
445 .await
446 .is_err()
447 );
448 }
449
450 #[tokio::test]
451 async fn purge_by_id_deletes() {
452 let (dlq, dir) = make_dlq().await;
453 let id = dlq.enqueue(sample_record("decode", "x")).await.unwrap();
454 let db = dir.path().join("dlq.db");
455 cmd_purge(&DlqPurgeArgs {
456 id: Some(id.to_string()),
457 db: db.clone(),
458 all: false,
459 category: None,
460 before: None,
461 })
462 .await
463 .unwrap();
464 let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
465 assert!(dlq2.get(id).await.unwrap().is_none());
466 }
467
468 #[tokio::test]
469 async fn purge_all_with_category_filter() {
470 let (dlq, dir) = make_dlq().await;
471 dlq.enqueue(sample_record("decode", "a")).await.unwrap();
472 dlq.enqueue(sample_record("envelope", "b")).await.unwrap();
473 let db = dir.path().join("dlq.db");
474 cmd_purge(&DlqPurgeArgs {
475 id: None,
476 db: db.clone(),
477 all: true,
478 category: Some("decode".into()),
479 before: None,
480 })
481 .await
482 .unwrap();
483 let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
484 let remaining = dlq2.query(DlqQuery::default()).await.unwrap();
485 assert_eq!(remaining.len(), 1);
486 assert_eq!(remaining[0].error_category, "envelope");
487 }
488
489 #[tokio::test]
490 async fn replay_moves_record_to_mailbox() {
491 let (dlq, dir) = make_dlq().await;
492 let id = dlq.enqueue(sample_record("decode", "x")).await.unwrap();
493 let db = dir.path().join("dlq.db");
494 let mailbox = dir.path().join("mailbox.db");
495 let _ = SqliteMailbox::new(&mailbox).await.unwrap();
497
498 cmd_replay(&DlqReplayArgs {
499 id: id.to_string(),
500 db: db.clone(),
501 mailbox: mailbox.clone(),
502 keep: false,
503 })
504 .await
505 .unwrap();
506
507 let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
508 assert!(dlq2.get(id).await.unwrap().is_none());
509 }
510}