Skip to main content

actr_cli/commands/
dlq.rs

1//! `actr dlq` — Dead Letter Queue inspection and remediation.
2//!
3//! Subcommands:
4//!   - `list`   — list DLQ records (default: newest 20)
5//!   - `show`   — show full detail for one record
6//!   - `stats`  — print DLQ statistics
7//!   - `replay` — re-enqueue a record's raw bytes into a mailbox
8//!   - `purge`  — permanently delete records (by ID, or by filter with `--all`)
9
10use actr_runtime_mailbox::{
11    DeadLetterQueue,
12    dlq::{DlqQuery, DlqRecord},
13    mailbox::{Mailbox, MessagePriority},
14    sqlite::SqliteMailbox,
15    sqlite_dlq::SqliteDeadLetterQueue,
16};
17use anyhow::{Context, Result, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::{Args, Subcommand};
21use std::path::{Path, PathBuf};
22use uuid::Uuid;
23
24use crate::core::{Command, CommandContext, CommandResult, ComponentType};
25
26const DEFAULT_DB_PATH: &str = "actr-data/dlq.db";
27const DEFAULT_MAILBOX_PATH: &str = "actr-data/mailbox.db";
28const DEFAULT_LIST_LIMIT: u32 = 20;
29
30#[derive(Args, Debug)]
31pub struct DlqArgs {
32    #[command(subcommand)]
33    pub command: DlqCommand,
34}
35
36#[derive(Subcommand, Debug)]
37pub enum DlqCommand {
38    /// List DLQ records (newest first).
39    List(DlqListArgs),
40    /// Show full detail for a single record.
41    Show(DlqShowArgs),
42    /// Print aggregate statistics.
43    Stats(DlqStatsArgs),
44    /// Re-enqueue a record's raw bytes into a live mailbox.
45    Replay(DlqReplayArgs),
46    /// Permanently remove records.
47    Purge(DlqPurgeArgs),
48}
49
50#[derive(Args, Debug)]
51pub struct DlqListArgs {
52    /// Path to DLQ SQLite file
53    #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
54    pub db: PathBuf,
55    /// Max records to return
56    #[arg(long, default_value_t = DEFAULT_LIST_LIMIT)]
57    pub limit: u32,
58    /// Filter by error_category
59    #[arg(long, value_name = "CATEGORY")]
60    pub category: Option<String>,
61    /// Filter records created after timestamp (RFC 3339)
62    #[arg(long, value_name = "RFC3339")]
63    pub after: Option<String>,
64}
65
66#[derive(Args, Debug)]
67pub struct DlqShowArgs {
68    /// DLQ record UUID
69    #[arg(value_name = "ID")]
70    pub id: String,
71    /// Path to DLQ SQLite file
72    #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
73    pub db: PathBuf,
74}
75
76#[derive(Args, Debug)]
77pub struct DlqStatsArgs {
78    /// Path to DLQ SQLite file
79    #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
80    pub db: PathBuf,
81}
82
83#[derive(Args, Debug)]
84pub struct DlqReplayArgs {
85    /// DLQ record UUID
86    #[arg(value_name = "ID")]
87    pub id: String,
88    /// Path to DLQ SQLite file
89    #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
90    pub db: PathBuf,
91    /// Path to target mailbox SQLite file
92    #[arg(long, default_value = DEFAULT_MAILBOX_PATH, value_name = "FILE")]
93    pub mailbox: PathBuf,
94    /// Keep the DLQ record after a successful replay (default: delete)
95    #[arg(long)]
96    pub keep: bool,
97}
98
99#[derive(Args, Debug)]
100pub struct DlqPurgeArgs {
101    /// Single record UUID to purge. If omitted, `--all` is required.
102    #[arg(value_name = "ID")]
103    pub id: Option<String>,
104    /// Path to DLQ SQLite file
105    #[arg(long, default_value = DEFAULT_DB_PATH, value_name = "FILE")]
106    pub db: PathBuf,
107    /// Purge every record. Combine with `--category` or `--before` to narrow scope.
108    #[arg(long, conflicts_with = "id")]
109    pub all: bool,
110    /// When purging with `--all`, restrict to records in this error_category
111    #[arg(long, value_name = "CATEGORY", requires = "all")]
112    pub category: Option<String>,
113    /// When purging with `--all`, restrict to records created before the timestamp (RFC 3339)
114    #[arg(long, value_name = "RFC3339", requires = "all")]
115    pub before: Option<String>,
116}
117
118#[async_trait]
119impl Command for DlqArgs {
120    async fn execute(&self, _ctx: &CommandContext) -> Result<CommandResult> {
121        match &self.command {
122            DlqCommand::List(a) => cmd_list(a).await?,
123            DlqCommand::Show(a) => cmd_show(a).await?,
124            DlqCommand::Stats(a) => cmd_stats(a).await?,
125            DlqCommand::Replay(a) => cmd_replay(a).await?,
126            DlqCommand::Purge(a) => cmd_purge(a).await?,
127        }
128        Ok(CommandResult::Success(String::new()))
129    }
130
131    fn required_components(&self) -> Vec<ComponentType> {
132        vec![]
133    }
134
135    fn name(&self) -> &str {
136        "dlq"
137    }
138
139    fn description(&self) -> &str {
140        "Dead Letter Queue inspection and remediation"
141    }
142}
143
144// ── helpers ──────────────────────────────────────────────────────────────────
145
146async fn open_dlq(db: &Path) -> Result<SqliteDeadLetterQueue> {
147    SqliteDeadLetterQueue::new_standalone(db)
148        .await
149        .with_context(|| format!("Failed to open DLQ database at {}", db.display()))
150}
151
152fn parse_id(raw: &str) -> Result<Uuid> {
153    Uuid::parse_str(raw).with_context(|| format!("Invalid UUID: '{raw}'"))
154}
155
156fn truncate(s: &str, max: usize) -> String {
157    if s.len() <= max {
158        s.to_string()
159    } else {
160        format!("{}…", &s[..max])
161    }
162}
163
164fn print_record_summary(r: &DlqRecord) {
165    println!(
166        "{id}  {ts}  {cat:<24}  {msg}",
167        id = r.id,
168        ts = r.created_at.format("%Y-%m-%d %H:%M:%SZ"),
169        cat = r.error_category,
170        msg = truncate(&r.error_message, 60),
171    );
172}
173
174fn print_record_detail(r: &DlqRecord) {
175    println!("ID:              {}", r.id);
176    println!("Created at:      {}", r.created_at.to_rfc3339());
177    println!("Error category:  {}", r.error_category);
178    println!("Error message:   {}", r.error_message);
179    println!("Trace ID:        {}", r.trace_id);
180    if let Some(ref rid) = r.request_id {
181        println!("Request ID:      {rid}");
182    }
183    if let Some(ref mid) = r.original_message_id {
184        println!("Message ID:      {mid}");
185    }
186    println!("Raw bytes:       {} bytes", r.raw_bytes.len());
187    if !r.raw_bytes.is_empty() {
188        let preview: String = r
189            .raw_bytes
190            .iter()
191            .take(32)
192            .map(|b| format!("{b:02x}"))
193            .collect::<Vec<_>>()
194            .join(" ");
195        let suffix = if r.raw_bytes.len() > 32 { " …" } else { "" };
196        println!("                 {preview}{suffix}");
197    }
198    println!("Redrive attempts:{}", r.redrive_attempts);
199    if let Some(ref ts) = r.last_redrive_at {
200        println!("Last redrive:    {}", ts.to_rfc3339());
201    }
202    if let Some(ref ctx) = r.context {
203        println!("Context:         {ctx}");
204    }
205}
206
207fn parse_rfc3339(s: &str, flag: &str) -> Result<chrono::DateTime<chrono::Utc>> {
208    DateTime::parse_from_rfc3339(s)
209        .map(|dt| dt.to_utc())
210        .with_context(|| {
211            format!("{flag} must be a valid RFC 3339 timestamp (e.g. 2026-01-01T00:00:00Z)")
212        })
213}
214
215// ── subcommands ──────────────────────────────────────────────────────────────
216
217async fn cmd_list(args: &DlqListArgs) -> Result<()> {
218    let dlq = open_dlq(&args.db).await?;
219
220    let after = args
221        .after
222        .as_deref()
223        .map(|s| parse_rfc3339(s, "--after"))
224        .transpose()?;
225
226    let query = DlqQuery {
227        error_category: args.category.clone(),
228        limit: Some(args.limit),
229        created_after: after,
230        ..Default::default()
231    };
232
233    let records = dlq.query(query).await.context("DLQ query failed")?;
234
235    if records.is_empty() {
236        println!("DLQ is empty (no matching records).");
237        return Ok(());
238    }
239
240    println!(
241        "{:<36}  {:<20}  {:<24}  Error",
242        "ID", "Created at", "Category"
243    );
244    println!("{}", "-".repeat(110));
245    for r in &records {
246        print_record_summary(r);
247    }
248    println!("\n{} record(s) shown (limit={})", records.len(), args.limit);
249    Ok(())
250}
251
252async fn cmd_show(args: &DlqShowArgs) -> Result<()> {
253    let id = parse_id(&args.id)?;
254    let dlq = open_dlq(&args.db).await?;
255    match dlq.get(id).await.context("DLQ get failed")? {
256        Some(r) => {
257            print_record_detail(&r);
258            Ok(())
259        }
260        None => bail!("No DLQ record found with ID: {id}"),
261    }
262}
263
264async fn cmd_stats(args: &DlqStatsArgs) -> Result<()> {
265    let dlq = open_dlq(&args.db).await?;
266    let stats = dlq.stats().await.context("DLQ stats failed")?;
267
268    println!("DLQ Statistics");
269    println!("  Total messages:           {}", stats.total_messages);
270    println!(
271        "  With redrive attempts:    {}",
272        stats.messages_with_redrive_attempts
273    );
274    if let Some(ts) = stats.oldest_message_at {
275        println!("  Oldest message:           {}", ts.to_rfc3339());
276    }
277    if !stats.messages_by_category.is_empty() {
278        println!("  By category:");
279        let mut cats: Vec<_> = stats.messages_by_category.iter().collect();
280        cats.sort_by(|a, b| b.1.cmp(a.1));
281        for (cat, count) in cats {
282            println!("    {cat:<30} {count}");
283        }
284    }
285    Ok(())
286}
287
288async fn cmd_replay(args: &DlqReplayArgs) -> Result<()> {
289    let id = parse_id(&args.id)?;
290    let dlq = open_dlq(&args.db).await?;
291    let record = dlq
292        .get(id)
293        .await
294        .context("DLQ get failed")?
295        .ok_or_else(|| anyhow::anyhow!("No DLQ record found with ID: {id}"))?;
296
297    if !args.mailbox.exists() {
298        bail!(
299            "Target mailbox file does not exist: {}\n\
300             Specify a different path with --mailbox.",
301            args.mailbox.display()
302        );
303    }
304
305    let from = record.from.clone().ok_or_else(|| {
306        anyhow::anyhow!("DLQ record {id} has no 'from' ActrId; cannot re-enqueue without a sender.")
307    })?;
308
309    let mailbox = SqliteMailbox::new(&args.mailbox)
310        .await
311        .with_context(|| format!("Failed to open mailbox: {}", args.mailbox.display()))?;
312
313    let msg_id = mailbox
314        .enqueue(from, record.raw_bytes.clone(), MessagePriority::Normal)
315        .await
316        .context("Failed to re-enqueue into mailbox")?;
317
318    dlq.record_redrive_attempt(id)
319        .await
320        .context("Failed to record redrive attempt")?;
321
322    if args.keep {
323        println!(
324            "Replayed DLQ record {id} into {} as message {msg_id} (kept in DLQ).",
325            args.mailbox.display()
326        );
327    } else {
328        dlq.delete(id)
329            .await
330            .context("Failed to delete DLQ record")?;
331        println!(
332            "Replayed DLQ record {id} into {} as message {msg_id} and removed from DLQ.",
333            args.mailbox.display()
334        );
335    }
336    Ok(())
337}
338
339async fn cmd_purge(args: &DlqPurgeArgs) -> Result<()> {
340    let dlq = open_dlq(&args.db).await?;
341
342    if let Some(id) = &args.id {
343        let uuid = parse_id(id)?;
344        if dlq.get(uuid).await.context("DLQ get failed")?.is_none() {
345            bail!("No DLQ record found with ID: {uuid}");
346        }
347        dlq.delete(uuid).await.context("DLQ delete failed")?;
348        println!("Purged DLQ record: {uuid}");
349        return Ok(());
350    }
351
352    if !args.all {
353        bail!("Specify a record ID, or pass --all (optionally with --category / --before).");
354    }
355
356    let before = args
357        .before
358        .as_deref()
359        .map(|s| parse_rfc3339(s, "--before"))
360        .transpose()?;
361
362    // Query matching records, then delete each. We restrict to DlqQuery's filters
363    // (category, created_after); `created_before` is applied after the query.
364    let query = DlqQuery {
365        error_category: args.category.clone(),
366        limit: None,
367        created_after: None,
368        ..Default::default()
369    };
370    let records = dlq.query(query).await.context("DLQ query failed")?;
371
372    let mut purged = 0usize;
373    for r in records {
374        if let Some(cutoff) = before
375            && r.created_at >= cutoff
376        {
377            continue;
378        }
379        dlq.delete(r.id).await.context("DLQ delete failed")?;
380        purged += 1;
381    }
382
383    println!("Purged {purged} DLQ record(s).");
384    Ok(())
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use actr_runtime_mailbox::{dlq::DlqRecord, sqlite_dlq::SqliteDeadLetterQueue};
391    use chrono::Utc;
392    use tempfile::tempdir;
393
394    async fn make_dlq() -> (SqliteDeadLetterQueue, tempfile::TempDir) {
395        let dir = tempdir().unwrap();
396        let path = dir.path().join("dlq.db");
397        let dlq = SqliteDeadLetterQueue::new_standalone(&path).await.unwrap();
398        (dlq, dir)
399    }
400
401    fn sample_record(category: &str, msg: &str) -> DlqRecord {
402        DlqRecord {
403            id: uuid::Uuid::new_v4(),
404            original_message_id: None,
405            from: Some(b"sender-actr-id".to_vec()),
406            to: None,
407            raw_bytes: b"bad bytes".to_vec(),
408            error_message: msg.to_string(),
409            error_category: category.to_string(),
410            trace_id: uuid::Uuid::new_v4().to_string(),
411            request_id: None,
412            created_at: Utc::now(),
413            redrive_attempts: 0,
414            last_redrive_at: None,
415            context: None,
416        }
417    }
418
419    #[tokio::test]
420    async fn list_returns_records() {
421        let (dlq, dir) = make_dlq().await;
422        dlq.enqueue(sample_record("decode", "bad proto"))
423            .await
424            .unwrap();
425        let db = dir.path().join("dlq.db");
426        cmd_list(&DlqListArgs {
427            db,
428            limit: 10,
429            category: None,
430            after: None,
431        })
432        .await
433        .unwrap();
434    }
435
436    #[tokio::test]
437    async fn show_missing_record_returns_error() {
438        let (_dlq, dir) = make_dlq().await;
439        let db = dir.path().join("dlq.db");
440        assert!(
441            cmd_show(&DlqShowArgs {
442                id: Uuid::new_v4().to_string(),
443                db,
444            })
445            .await
446            .is_err()
447        );
448    }
449
450    #[tokio::test]
451    async fn purge_by_id_deletes() {
452        let (dlq, dir) = make_dlq().await;
453        let id = dlq.enqueue(sample_record("decode", "x")).await.unwrap();
454        let db = dir.path().join("dlq.db");
455        cmd_purge(&DlqPurgeArgs {
456            id: Some(id.to_string()),
457            db: db.clone(),
458            all: false,
459            category: None,
460            before: None,
461        })
462        .await
463        .unwrap();
464        let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
465        assert!(dlq2.get(id).await.unwrap().is_none());
466    }
467
468    #[tokio::test]
469    async fn purge_all_with_category_filter() {
470        let (dlq, dir) = make_dlq().await;
471        dlq.enqueue(sample_record("decode", "a")).await.unwrap();
472        dlq.enqueue(sample_record("envelope", "b")).await.unwrap();
473        let db = dir.path().join("dlq.db");
474        cmd_purge(&DlqPurgeArgs {
475            id: None,
476            db: db.clone(),
477            all: true,
478            category: Some("decode".into()),
479            before: None,
480        })
481        .await
482        .unwrap();
483        let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
484        let remaining = dlq2.query(DlqQuery::default()).await.unwrap();
485        assert_eq!(remaining.len(), 1);
486        assert_eq!(remaining[0].error_category, "envelope");
487    }
488
489    #[tokio::test]
490    async fn replay_moves_record_to_mailbox() {
491        let (dlq, dir) = make_dlq().await;
492        let id = dlq.enqueue(sample_record("decode", "x")).await.unwrap();
493        let db = dir.path().join("dlq.db");
494        let mailbox = dir.path().join("mailbox.db");
495        // touch mailbox file first via SqliteMailbox::new
496        let _ = SqliteMailbox::new(&mailbox).await.unwrap();
497
498        cmd_replay(&DlqReplayArgs {
499            id: id.to_string(),
500            db: db.clone(),
501            mailbox: mailbox.clone(),
502            keep: false,
503        })
504        .await
505        .unwrap();
506
507        let dlq2 = SqliteDeadLetterQueue::new_standalone(&db).await.unwrap();
508        assert!(dlq2.get(id).await.unwrap().is_none());
509    }
510}