1use anyhow::Result;
4use colored::*;
5use serde::{Deserialize, Serialize};
6use tabled::{Table, Tabled};
7
8use crate::client::Client;
9
10#[derive(Debug, Serialize, Deserialize, Tabled)]
11pub struct QueuedMessage {
12 pub id: String,
13 pub from: String,
14 pub to: String,
15 pub size_kb: u64,
16 pub attempts: u32,
17 pub next_retry: String,
18 pub status: String,
19}
20
21pub async fn list(client: &Client, json: bool, filter: Option<&str>) -> Result<()> {
23 let mut url = "/api/queue".to_string();
24 if let Some(f) = filter {
25 url.push_str(&format!("?status={}", f));
26 }
27
28 let messages: Vec<QueuedMessage> = client.get(&url).await?;
29
30 if json {
31 println!("{}", serde_json::to_string_pretty(&messages)?);
32 } else {
33 if messages.is_empty() {
34 println!("{}", "Queue is empty".green());
35 return Ok(());
36 }
37
38 let table = Table::new(&messages).to_string();
39 println!("{}", table);
40
41 let total_size: u64 = messages.iter().map(|m| m.size_kb).sum();
42 let failed = messages.iter().filter(|m| m.status == "failed").count();
43 let pending = messages.iter().filter(|m| m.status == "pending").count();
44 let retrying = messages.iter().filter(|m| m.status == "retrying").count();
45
46 println!(
47 "\n{} messages in queue ({} pending, {} retrying, {} failed)",
48 messages.len().to_string().bold(),
49 pending,
50 retrying,
51 failed
52 );
53 println!("Total size: {} KB", total_size);
54 }
55
56 Ok(())
57}
58
59pub async fn flush(client: &Client, json: bool) -> Result<()> {
61 #[derive(Deserialize, Serialize)]
62 struct FlushResponse {
63 messages_processed: u32,
64 messages_sent: u32,
65 messages_failed: u32,
66 }
67
68 let response: FlushResponse = client
69 .post("/api/queue/flush", &serde_json::json!({}))
70 .await?;
71
72 if json {
73 println!("{}", serde_json::to_string_pretty(&response)?);
74 } else {
75 println!("{}", "✓ Queue flushed".green().bold());
76 println!(" Processed: {}", response.messages_processed);
77 println!(" Sent: {}", response.messages_sent.to_string().green());
78 if response.messages_failed > 0 {
79 println!(" Failed: {}", response.messages_failed.to_string().red());
80 }
81 }
82
83 Ok(())
84}
85
86pub async fn inspect(client: &Client, message_id: &str, json: bool) -> Result<()> {
88 #[derive(Deserialize, Serialize)]
89 struct MessageDetails {
90 id: String,
91 from: String,
92 to: Vec<String>,
93 subject: String,
94 size_bytes: u64,
95 attempts: u32,
96 max_attempts: u32,
97 status: String,
98 created_at: String,
99 last_attempt: Option<String>,
100 next_retry: Option<String>,
101 error: Option<String>,
102 headers: Vec<(String, String)>,
103 }
104
105 let details: MessageDetails = client.get(&format!("/api/queue/{}", message_id)).await?;
106
107 if json {
108 println!("{}", serde_json::to_string_pretty(&details)?);
109 } else {
110 println!("{}", format!("Message: {}", message_id).bold());
111 println!(" From: {}", details.from);
112 println!(" To: {}", details.to.join(", "));
113 println!(" Subject: {}", details.subject);
114 println!(" Size: {} KB", details.size_bytes / 1024);
115 println!(
116 " Status: {}",
117 match details.status.as_str() {
118 "pending" => details.status.yellow(),
119 "retrying" => details.status.blue(),
120 "failed" => details.status.red(),
121 "sent" => details.status.green(),
122 _ => details.status.normal(),
123 }
124 );
125 println!(
126 " Attempts: {} / {}",
127 details.attempts, details.max_attempts
128 );
129 println!(" Created: {}", details.created_at);
130
131 if let Some(last) = &details.last_attempt {
132 println!(" Last attempt: {}", last);
133 }
134
135 if let Some(next) = &details.next_retry {
136 println!(" Next retry: {}", next);
137 }
138
139 if let Some(error) = &details.error {
140 println!(" Error: {}", error.red());
141 }
142
143 if !details.headers.is_empty() {
144 println!("\n Headers:");
145 for (key, value) in &details.headers {
146 println!(" {}: {}", key.bold(), value);
147 }
148 }
149 }
150
151 Ok(())
152}
153
154pub async fn delete(client: &Client, message_id: &str, json: bool) -> Result<()> {
156 #[derive(Deserialize, Serialize)]
157 struct DeleteResponse {
158 success: bool,
159 }
160
161 let response: DeleteResponse = client.delete(&format!("/api/queue/{}", message_id)).await?;
162
163 if json {
164 println!("{}", serde_json::to_string_pretty(&response)?);
165 } else {
166 println!(
167 "{}",
168 format!("✓ Message {} deleted from queue", message_id)
169 .green()
170 .bold()
171 );
172 }
173
174 Ok(())
175}
176
177pub async fn retry(client: &Client, message_id: &str, json: bool) -> Result<()> {
179 #[derive(Deserialize, Serialize)]
180 struct RetryResponse {
181 success: bool,
182 error: Option<String>,
183 }
184
185 let response: RetryResponse = client
186 .post(
187 &format!("/api/queue/{}/retry", message_id),
188 &serde_json::json!({}),
189 )
190 .await?;
191
192 if json {
193 println!("{}", serde_json::to_string_pretty(&response)?);
194 } else if response.success {
195 println!(
196 "{}",
197 format!("✓ Message {} sent successfully", message_id)
198 .green()
199 .bold()
200 );
201 } else {
202 println!(
203 "{}",
204 format!("✗ Failed to send message {}", message_id)
205 .red()
206 .bold()
207 );
208 if let Some(error) = response.error {
209 println!(" Error: {}", error.red());
210 }
211 }
212
213 Ok(())
214}
215
216pub async fn purge(client: &Client, json: bool) -> Result<()> {
218 #[derive(Deserialize, Serialize)]
219 struct PurgeResponse {
220 messages_deleted: u32,
221 }
222
223 let response: PurgeResponse = client
224 .post("/api/queue/purge", &serde_json::json!({}))
225 .await?;
226
227 if json {
228 println!("{}", serde_json::to_string_pretty(&response)?);
229 } else {
230 println!(
231 "{}",
232 format!("✓ Purged {} failed messages", response.messages_deleted)
233 .green()
234 .bold()
235 );
236 }
237
238 Ok(())
239}
240
241pub async fn stats(client: &Client, json: bool) -> Result<()> {
243 #[derive(Deserialize, Serialize)]
244 struct QueueStats {
245 total: u32,
246 pending: u32,
247 retrying: u32,
248 failed: u32,
249 total_size_mb: u64,
250 oldest_message: Option<String>,
251 average_attempts: f64,
252 }
253
254 let stats: QueueStats = client.get("/api/queue/stats").await?;
255
256 if json {
257 println!("{}", serde_json::to_string_pretty(&stats)?);
258 } else {
259 println!("{}", "Queue Statistics".bold());
260 println!(" Total messages: {}", stats.total);
261 println!(" Pending: {}", stats.pending.to_string().yellow());
262 println!(" Retrying: {}", stats.retrying.to_string().blue());
263 println!(" Failed: {}", stats.failed.to_string().red());
264 println!(" Total size: {} MB", stats.total_size_mb);
265 if let Some(oldest) = stats.oldest_message {
266 println!(" Oldest message: {}", oldest);
267 }
268 println!(" Average attempts: {:.1}", stats.average_attempts);
269 }
270
271 Ok(())
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_queued_message_serialization() {
280 let msg = QueuedMessage {
281 id: "msg123".to_string(),
282 from: "sender@example.com".to_string(),
283 to: "recipient@example.com".to_string(),
284 size_kb: 100,
285 attempts: 2,
286 next_retry: "2024-01-01T00:00:00Z".to_string(),
287 status: "retrying".to_string(),
288 };
289
290 let json = serde_json::to_string(&msg).unwrap();
291 assert!(json.contains("msg123"));
292 }
293
294 #[test]
295 fn test_queue_stats_calculation() {
296 let messages = [
297 QueuedMessage {
298 id: "1".to_string(),
299 from: "a@test.com".to_string(),
300 to: "b@test.com".to_string(),
301 size_kb: 10,
302 attempts: 1,
303 next_retry: "".to_string(),
304 status: "pending".to_string(),
305 },
306 QueuedMessage {
307 id: "2".to_string(),
308 from: "a@test.com".to_string(),
309 to: "c@test.com".to_string(),
310 size_kb: 20,
311 attempts: 3,
312 next_retry: "".to_string(),
313 status: "failed".to_string(),
314 },
315 ];
316
317 let total_size: u64 = messages.iter().map(|m| m.size_kb).sum();
318 let failed = messages.iter().filter(|m| m.status == "failed").count();
319
320 assert_eq!(total_size, 30);
321 assert_eq!(failed, 1);
322 }
323
324 #[test]
325 fn test_queued_message_pending() {
326 let msg = QueuedMessage {
327 id: "pending123".to_string(),
328 from: "sender@example.com".to_string(),
329 to: "recipient@example.com".to_string(),
330 size_kb: 50,
331 attempts: 0,
332 next_retry: "".to_string(),
333 status: "pending".to_string(),
334 };
335
336 assert_eq!(msg.status, "pending");
337 assert_eq!(msg.attempts, 0);
338 }
339
340 #[test]
341 fn test_queued_message_deserialization() {
342 let json = r#"{
343 "id": "msg456",
344 "from": "sender@test.com",
345 "to": "recipient@test.com",
346 "size_kb": 150,
347 "attempts": 1,
348 "next_retry": "2024-01-01T12:00:00Z",
349 "status": "retrying"
350 }"#;
351
352 let msg: QueuedMessage = serde_json::from_str(json).unwrap();
353 assert_eq!(msg.id, "msg456");
354 assert_eq!(msg.attempts, 1);
355 assert_eq!(msg.status, "retrying");
356 }
357
358 #[test]
359 fn test_queue_status_filtering() {
360 let messages = [
361 QueuedMessage {
362 id: "1".to_string(),
363 from: "a@test.com".to_string(),
364 to: "b@test.com".to_string(),
365 size_kb: 10,
366 attempts: 0,
367 next_retry: "".to_string(),
368 status: "pending".to_string(),
369 },
370 QueuedMessage {
371 id: "2".to_string(),
372 from: "a@test.com".to_string(),
373 to: "c@test.com".to_string(),
374 size_kb: 20,
375 attempts: 2,
376 next_retry: "".to_string(),
377 status: "retrying".to_string(),
378 },
379 QueuedMessage {
380 id: "3".to_string(),
381 from: "a@test.com".to_string(),
382 to: "d@test.com".to_string(),
383 size_kb: 30,
384 attempts: 5,
385 next_retry: "".to_string(),
386 status: "failed".to_string(),
387 },
388 ];
389
390 let pending = messages.iter().filter(|m| m.status == "pending").count();
391 let retrying = messages.iter().filter(|m| m.status == "retrying").count();
392 let failed = messages.iter().filter(|m| m.status == "failed").count();
393
394 assert_eq!(pending, 1);
395 assert_eq!(retrying, 1);
396 assert_eq!(failed, 1);
397 }
398
399 #[test]
400 fn test_queue_empty() {
401 let messages: Vec<QueuedMessage> = vec![];
402 assert!(messages.is_empty());
403 }
404
405 #[test]
406 fn test_queue_message_size_calculation() {
407 let messages = [
408 QueuedMessage {
409 id: "1".to_string(),
410 from: "a@test.com".to_string(),
411 to: "b@test.com".to_string(),
412 size_kb: 100,
413 attempts: 1,
414 next_retry: "".to_string(),
415 status: "pending".to_string(),
416 },
417 QueuedMessage {
418 id: "2".to_string(),
419 from: "a@test.com".to_string(),
420 to: "c@test.com".to_string(),
421 size_kb: 200,
422 attempts: 1,
423 next_retry: "".to_string(),
424 status: "pending".to_string(),
425 },
426 ];
427
428 let total_kb: u64 = messages.iter().map(|m| m.size_kb).sum();
429 let total_mb = total_kb / 1024;
430
431 assert_eq!(total_kb, 300);
432 assert_eq!(total_mb, 0); }
434}