Skip to main content

rusmes_cli/commands/
queue.rs

1//! Queue management commands
2
3use anyhow::Result;
4use colored::*;
5use serde::{Deserialize, Serialize};
6use tabled::{Table, Tabled};
7
8use crate::client::Client;
9
10#[derive(Debug, Serialize, Deserialize, Tabled)]
11pub struct QueuedMessage {
12    pub id: String,
13    pub from: String,
14    pub to: String,
15    pub size_kb: u64,
16    pub attempts: u32,
17    pub next_retry: String,
18    pub status: String,
19}
20
21/// List messages in the queue
22pub async fn list(client: &Client, json: bool, filter: Option<&str>) -> Result<()> {
23    let mut url = "/api/queue".to_string();
24    if let Some(f) = filter {
25        url.push_str(&format!("?status={}", f));
26    }
27
28    let messages: Vec<QueuedMessage> = client.get(&url).await?;
29
30    if json {
31        println!("{}", serde_json::to_string_pretty(&messages)?);
32    } else {
33        if messages.is_empty() {
34            println!("{}", "Queue is empty".green());
35            return Ok(());
36        }
37
38        let table = Table::new(&messages).to_string();
39        println!("{}", table);
40
41        let total_size: u64 = messages.iter().map(|m| m.size_kb).sum();
42        let failed = messages.iter().filter(|m| m.status == "failed").count();
43        let pending = messages.iter().filter(|m| m.status == "pending").count();
44        let retrying = messages.iter().filter(|m| m.status == "retrying").count();
45
46        println!(
47            "\n{} messages in queue ({} pending, {} retrying, {} failed)",
48            messages.len().to_string().bold(),
49            pending,
50            retrying,
51            failed
52        );
53        println!("Total size: {} KB", total_size);
54    }
55
56    Ok(())
57}
58
59/// Flush the queue (attempt immediate delivery)
60pub async fn flush(client: &Client, json: bool) -> Result<()> {
61    #[derive(Deserialize, Serialize)]
62    struct FlushResponse {
63        messages_processed: u32,
64        messages_sent: u32,
65        messages_failed: u32,
66    }
67
68    let response: FlushResponse = client
69        .post("/api/queue/flush", &serde_json::json!({}))
70        .await?;
71
72    if json {
73        println!("{}", serde_json::to_string_pretty(&response)?);
74    } else {
75        println!("{}", "✓ Queue flushed".green().bold());
76        println!("  Processed: {}", response.messages_processed);
77        println!("  Sent: {}", response.messages_sent.to_string().green());
78        if response.messages_failed > 0 {
79            println!("  Failed: {}", response.messages_failed.to_string().red());
80        }
81    }
82
83    Ok(())
84}
85
86/// Inspect a specific message in the queue
87pub async fn inspect(client: &Client, message_id: &str, json: bool) -> Result<()> {
88    #[derive(Deserialize, Serialize)]
89    struct MessageDetails {
90        id: String,
91        from: String,
92        to: Vec<String>,
93        subject: String,
94        size_bytes: u64,
95        attempts: u32,
96        max_attempts: u32,
97        status: String,
98        created_at: String,
99        last_attempt: Option<String>,
100        next_retry: Option<String>,
101        error: Option<String>,
102        headers: Vec<(String, String)>,
103    }
104
105    let details: MessageDetails = client.get(&format!("/api/queue/{}", message_id)).await?;
106
107    if json {
108        println!("{}", serde_json::to_string_pretty(&details)?);
109    } else {
110        println!("{}", format!("Message: {}", message_id).bold());
111        println!("  From: {}", details.from);
112        println!("  To: {}", details.to.join(", "));
113        println!("  Subject: {}", details.subject);
114        println!("  Size: {} KB", details.size_bytes / 1024);
115        println!(
116            "  Status: {}",
117            match details.status.as_str() {
118                "pending" => details.status.yellow(),
119                "retrying" => details.status.blue(),
120                "failed" => details.status.red(),
121                "sent" => details.status.green(),
122                _ => details.status.normal(),
123            }
124        );
125        println!(
126            "  Attempts: {} / {}",
127            details.attempts, details.max_attempts
128        );
129        println!("  Created: {}", details.created_at);
130
131        if let Some(last) = &details.last_attempt {
132            println!("  Last attempt: {}", last);
133        }
134
135        if let Some(next) = &details.next_retry {
136            println!("  Next retry: {}", next);
137        }
138
139        if let Some(error) = &details.error {
140            println!("  Error: {}", error.red());
141        }
142
143        if !details.headers.is_empty() {
144            println!("\n  Headers:");
145            for (key, value) in &details.headers {
146                println!("    {}: {}", key.bold(), value);
147            }
148        }
149    }
150
151    Ok(())
152}
153
154/// Delete a message from the queue
155pub async fn delete(client: &Client, message_id: &str, json: bool) -> Result<()> {
156    #[derive(Deserialize, Serialize)]
157    struct DeleteResponse {
158        success: bool,
159    }
160
161    let response: DeleteResponse = client.delete(&format!("/api/queue/{}", message_id)).await?;
162
163    if json {
164        println!("{}", serde_json::to_string_pretty(&response)?);
165    } else {
166        println!(
167            "{}",
168            format!("✓ Message {} deleted from queue", message_id)
169                .green()
170                .bold()
171        );
172    }
173
174    Ok(())
175}
176
177/// Retry a failed message immediately
178pub async fn retry(client: &Client, message_id: &str, json: bool) -> Result<()> {
179    #[derive(Deserialize, Serialize)]
180    struct RetryResponse {
181        success: bool,
182        error: Option<String>,
183    }
184
185    let response: RetryResponse = client
186        .post(
187            &format!("/api/queue/{}/retry", message_id),
188            &serde_json::json!({}),
189        )
190        .await?;
191
192    if json {
193        println!("{}", serde_json::to_string_pretty(&response)?);
194    } else if response.success {
195        println!(
196            "{}",
197            format!("✓ Message {} sent successfully", message_id)
198                .green()
199                .bold()
200        );
201    } else {
202        println!(
203            "{}",
204            format!("✗ Failed to send message {}", message_id)
205                .red()
206                .bold()
207        );
208        if let Some(error) = response.error {
209            println!("  Error: {}", error.red());
210        }
211    }
212
213    Ok(())
214}
215
216/// Purge all failed messages from the queue
217pub async fn purge(client: &Client, json: bool) -> Result<()> {
218    #[derive(Deserialize, Serialize)]
219    struct PurgeResponse {
220        messages_deleted: u32,
221    }
222
223    let response: PurgeResponse = client
224        .post("/api/queue/purge", &serde_json::json!({}))
225        .await?;
226
227    if json {
228        println!("{}", serde_json::to_string_pretty(&response)?);
229    } else {
230        println!(
231            "{}",
232            format!("✓ Purged {} failed messages", response.messages_deleted)
233                .green()
234                .bold()
235        );
236    }
237
238    Ok(())
239}
240
241/// Show queue statistics
242pub async fn stats(client: &Client, json: bool) -> Result<()> {
243    #[derive(Deserialize, Serialize)]
244    struct QueueStats {
245        total: u32,
246        pending: u32,
247        retrying: u32,
248        failed: u32,
249        total_size_mb: u64,
250        oldest_message: Option<String>,
251        average_attempts: f64,
252    }
253
254    let stats: QueueStats = client.get("/api/queue/stats").await?;
255
256    if json {
257        println!("{}", serde_json::to_string_pretty(&stats)?);
258    } else {
259        println!("{}", "Queue Statistics".bold());
260        println!("  Total messages: {}", stats.total);
261        println!("  Pending: {}", stats.pending.to_string().yellow());
262        println!("  Retrying: {}", stats.retrying.to_string().blue());
263        println!("  Failed: {}", stats.failed.to_string().red());
264        println!("  Total size: {} MB", stats.total_size_mb);
265        if let Some(oldest) = stats.oldest_message {
266            println!("  Oldest message: {}", oldest);
267        }
268        println!("  Average attempts: {:.1}", stats.average_attempts);
269    }
270
271    Ok(())
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_queued_message_serialization() {
280        let msg = QueuedMessage {
281            id: "msg123".to_string(),
282            from: "sender@example.com".to_string(),
283            to: "recipient@example.com".to_string(),
284            size_kb: 100,
285            attempts: 2,
286            next_retry: "2024-01-01T00:00:00Z".to_string(),
287            status: "retrying".to_string(),
288        };
289
290        let json = serde_json::to_string(&msg).unwrap();
291        assert!(json.contains("msg123"));
292    }
293
294    #[test]
295    fn test_queue_stats_calculation() {
296        let messages = [
297            QueuedMessage {
298                id: "1".to_string(),
299                from: "a@test.com".to_string(),
300                to: "b@test.com".to_string(),
301                size_kb: 10,
302                attempts: 1,
303                next_retry: "".to_string(),
304                status: "pending".to_string(),
305            },
306            QueuedMessage {
307                id: "2".to_string(),
308                from: "a@test.com".to_string(),
309                to: "c@test.com".to_string(),
310                size_kb: 20,
311                attempts: 3,
312                next_retry: "".to_string(),
313                status: "failed".to_string(),
314            },
315        ];
316
317        let total_size: u64 = messages.iter().map(|m| m.size_kb).sum();
318        let failed = messages.iter().filter(|m| m.status == "failed").count();
319
320        assert_eq!(total_size, 30);
321        assert_eq!(failed, 1);
322    }
323
324    #[test]
325    fn test_queued_message_pending() {
326        let msg = QueuedMessage {
327            id: "pending123".to_string(),
328            from: "sender@example.com".to_string(),
329            to: "recipient@example.com".to_string(),
330            size_kb: 50,
331            attempts: 0,
332            next_retry: "".to_string(),
333            status: "pending".to_string(),
334        };
335
336        assert_eq!(msg.status, "pending");
337        assert_eq!(msg.attempts, 0);
338    }
339
340    #[test]
341    fn test_queued_message_deserialization() {
342        let json = r#"{
343            "id": "msg456",
344            "from": "sender@test.com",
345            "to": "recipient@test.com",
346            "size_kb": 150,
347            "attempts": 1,
348            "next_retry": "2024-01-01T12:00:00Z",
349            "status": "retrying"
350        }"#;
351
352        let msg: QueuedMessage = serde_json::from_str(json).unwrap();
353        assert_eq!(msg.id, "msg456");
354        assert_eq!(msg.attempts, 1);
355        assert_eq!(msg.status, "retrying");
356    }
357
358    #[test]
359    fn test_queue_status_filtering() {
360        let messages = [
361            QueuedMessage {
362                id: "1".to_string(),
363                from: "a@test.com".to_string(),
364                to: "b@test.com".to_string(),
365                size_kb: 10,
366                attempts: 0,
367                next_retry: "".to_string(),
368                status: "pending".to_string(),
369            },
370            QueuedMessage {
371                id: "2".to_string(),
372                from: "a@test.com".to_string(),
373                to: "c@test.com".to_string(),
374                size_kb: 20,
375                attempts: 2,
376                next_retry: "".to_string(),
377                status: "retrying".to_string(),
378            },
379            QueuedMessage {
380                id: "3".to_string(),
381                from: "a@test.com".to_string(),
382                to: "d@test.com".to_string(),
383                size_kb: 30,
384                attempts: 5,
385                next_retry: "".to_string(),
386                status: "failed".to_string(),
387            },
388        ];
389
390        let pending = messages.iter().filter(|m| m.status == "pending").count();
391        let retrying = messages.iter().filter(|m| m.status == "retrying").count();
392        let failed = messages.iter().filter(|m| m.status == "failed").count();
393
394        assert_eq!(pending, 1);
395        assert_eq!(retrying, 1);
396        assert_eq!(failed, 1);
397    }
398
399    #[test]
400    fn test_queue_empty() {
401        let messages: Vec<QueuedMessage> = vec![];
402        assert!(messages.is_empty());
403    }
404
405    #[test]
406    fn test_queue_message_size_calculation() {
407        let messages = [
408            QueuedMessage {
409                id: "1".to_string(),
410                from: "a@test.com".to_string(),
411                to: "b@test.com".to_string(),
412                size_kb: 100,
413                attempts: 1,
414                next_retry: "".to_string(),
415                status: "pending".to_string(),
416            },
417            QueuedMessage {
418                id: "2".to_string(),
419                from: "a@test.com".to_string(),
420                to: "c@test.com".to_string(),
421                size_kb: 200,
422                attempts: 1,
423                next_retry: "".to_string(),
424                status: "pending".to_string(),
425            },
426        ];
427
428        let total_kb: u64 = messages.iter().map(|m| m.size_kb).sum();
429        let total_mb = total_kb / 1024;
430
431        assert_eq!(total_kb, 300);
432        assert_eq!(total_mb, 0); // Less than 1 MB
433    }
434}