celers_cli/
command_utils.rs

1//! Utility functions shared across CLI commands.
2//!
3//! This module provides common helper functions used by various CLI commands,
4//! including formatting, validation, and retry logic.
5
6use colored::Colorize;
7use tabled::{settings::Style, Table, Tabled};
8
9/// Mask password in connection URLs for safe display.
10///
11/// Replaces the password portion of database/broker URLs with asterisks
12/// to prevent sensitive information from being displayed.
13///
14/// # Examples
15///
16/// ```
17/// # use celers_cli::command_utils::mask_password;
18/// let url = "postgres://user:secret@localhost/db";
19/// let masked = mask_password(url);
20/// assert!(masked.contains("****"));
21/// assert!(!masked.contains("secret"));
22/// ```
23pub fn mask_password(url: &str) -> String {
24    if let Some(at_pos) = url.find('@') {
25        if let Some(colon_pos) = url[..at_pos].rfind(':') {
26            let before = &url[..=colon_pos];
27            let after = &url[at_pos..];
28            return format!("{before}****{after}");
29        }
30    }
31    url.to_string()
32}
33
34/// Format byte size into human-readable string.
35///
36/// Converts raw byte counts into human-readable format using
37/// appropriate units (B, KB, MB, GB, TB).
38///
39/// # Examples
40///
41/// ```
42/// # use celers_cli::command_utils::format_bytes;
43/// assert_eq!(format_bytes(1024), "1.00 KB");
44/// assert_eq!(format_bytes(1048576), "1.00 MB");
45/// ```
46pub fn format_bytes(bytes: usize) -> String {
47    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
48    let mut size = bytes as f64;
49    let mut unit_idx = 0;
50
51    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
52        size /= 1024.0;
53        unit_idx += 1;
54    }
55
56    if unit_idx == 0 {
57        format!("{} {}", size as usize, UNITS[unit_idx])
58    } else {
59        format!("{:.2} {}", size, UNITS[unit_idx])
60    }
61}
62
63/// Format duration into human-readable string.
64///
65/// Converts seconds into a human-readable format (e.g., "1h 30m", "2d 5h").
66///
67/// # Examples
68///
69/// ```
70/// # use celers_cli::command_utils::format_duration;
71/// assert_eq!(format_duration(90), "1m 30s");
72/// assert_eq!(format_duration(3660), "1h 1m");
73/// ```
74pub fn format_duration(seconds: u64) -> String {
75    if seconds < 60 {
76        format!("{seconds}s")
77    } else if seconds < 3600 {
78        let minutes = seconds / 60;
79        let secs = seconds % 60;
80        if secs == 0 {
81            format!("{minutes}m")
82        } else {
83            format!("{minutes}m {secs}s")
84        }
85    } else if seconds < 86400 {
86        let hours = seconds / 3600;
87        let minutes = (seconds % 3600) / 60;
88        if minutes == 0 {
89            format!("{hours}h")
90        } else {
91            format!("{hours}h {minutes}m")
92        }
93    } else {
94        let days = seconds / 86400;
95        let hours = (seconds % 86400) / 3600;
96        if hours == 0 {
97            format!("{days}d")
98        } else {
99            format!("{days}d {hours}h")
100        }
101    }
102}
103
104/// Validate task ID format (UUID).
105///
106/// Ensures the provided string is a valid UUID format.
107///
108/// # Errors
109///
110/// Returns an error if the task ID is not a valid UUID.
111///
112/// # Examples
113///
114/// ```
115/// # use celers_cli::command_utils::validate_task_id;
116/// let valid_id = "550e8400-e29b-41d4-a716-446655440000";
117/// assert!(validate_task_id(valid_id).is_ok());
118///
119/// let invalid_id = "not-a-uuid";
120/// assert!(validate_task_id(invalid_id).is_err());
121/// ```
122pub fn validate_task_id(task_id: &str) -> anyhow::Result<uuid::Uuid> {
123    uuid::Uuid::parse_str(task_id)
124        .map_err(|e| anyhow::anyhow!("Invalid task ID format: {e}. Expected UUID format."))
125}
126
127/// Validate queue name.
128///
129/// Ensures the queue name is not empty, doesn't contain whitespace,
130/// and is not too long.
131///
132/// # Errors
133///
134/// Returns an error if the queue name is invalid.
135///
136/// # Examples
137///
138/// ```
139/// # use celers_cli::command_utils::validate_queue_name;
140/// assert!(validate_queue_name("my_queue").is_ok());
141/// assert!(validate_queue_name("").is_err());
142/// assert!(validate_queue_name("has spaces").is_err());
143/// ```
144pub fn validate_queue_name(queue: &str) -> anyhow::Result<()> {
145    if queue.is_empty() {
146        anyhow::bail!("Queue name cannot be empty");
147    }
148    if queue.contains(char::is_whitespace) {
149        anyhow::bail!("Queue name cannot contain whitespace");
150    }
151    if queue.len() > 255 {
152        anyhow::bail!("Queue name too long (max 255 characters)");
153    }
154    Ok(())
155}
156
157/// Calculate percentage safely (handles division by zero).
158///
159/// # Examples
160///
161/// ```
162/// # use celers_cli::command_utils::calculate_percentage;
163/// assert_eq!(calculate_percentage(50, 100), 50.0);
164/// assert_eq!(calculate_percentage(10, 0), 0.0); // Division by zero handled
165/// ```
166pub fn calculate_percentage(part: usize, total: usize) -> f64 {
167    if total == 0 {
168        0.0
169    } else {
170        (part as f64 / total as f64) * 100.0
171    }
172}
173
174/// Print task details in a formatted table.
175///
176/// Displays task metadata including ID, name, queue, priority, and arguments.
177pub fn print_task_details(task: &celers_core::SerializedTask, location: &str) {
178    println!("{}", format!("Location: {location}").green().bold());
179    println!();
180
181    #[derive(Tabled)]
182    struct TaskDetail {
183        #[tabled(rename = "Field")]
184        field: String,
185        #[tabled(rename = "Value")]
186        value: String,
187    }
188
189    let details = vec![
190        TaskDetail {
191            field: "ID".to_string(),
192            value: task.metadata.id.to_string(),
193        },
194        TaskDetail {
195            field: "Name".to_string(),
196            value: task.metadata.name.clone(),
197        },
198        TaskDetail {
199            field: "State".to_string(),
200            value: format!("{:?}", task.metadata.state),
201        },
202        TaskDetail {
203            field: "Priority".to_string(),
204            value: task.metadata.priority.to_string(),
205        },
206        TaskDetail {
207            field: "Max Retries".to_string(),
208            value: task.metadata.max_retries.to_string(),
209        },
210        TaskDetail {
211            field: "Timeout".to_string(),
212            value: task
213                .metadata
214                .timeout_secs
215                .map_or_else(|| "default".to_string(), |s| format!("{s}s")),
216        },
217        TaskDetail {
218            field: "Created At".to_string(),
219            value: task.metadata.created_at.to_string(),
220        },
221        TaskDetail {
222            field: "Payload Size".to_string(),
223            value: format!("{} bytes", task.payload.len()),
224        },
225    ];
226
227    let table = Table::new(details).with(Style::rounded()).to_string();
228    println!("{table}");
229}
230
231/// Display a log line with optional level filtering.
232///
233/// Parses JSON log format and displays with color coding based on level.
234/// Filters out logs that don't match the specified level (if provided).
235pub fn display_log_line(log: &str, level_filter: Option<&str>) {
236    if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
237        let level = log_json
238            .get("level")
239            .and_then(|v| v.as_str())
240            .unwrap_or("INFO");
241        let message = log_json
242            .get("message")
243            .and_then(|v| v.as_str())
244            .unwrap_or(log);
245        let timestamp = log_json
246            .get("timestamp")
247            .and_then(|v| v.as_str())
248            .unwrap_or("");
249
250        // Filter by level if specified
251        if let Some(filter) = level_filter {
252            if !level.eq_ignore_ascii_case(filter) {
253                return;
254            }
255        }
256
257        // Color code by level
258        let colored_level = match level {
259            "ERROR" | "error" => level.red().bold(),
260            "WARN" | "warn" => level.yellow().bold(),
261            "DEBUG" | "debug" => level.cyan(),
262            _ => level.green(),
263        };
264
265        let colored_msg = match level {
266            "ERROR" | "error" => message.red(),
267            "WARN" | "warn" => message.yellow(),
268            _ => message.normal(),
269        };
270
271        println!("[{timestamp}] {colored_level}: {colored_msg}");
272    } else {
273        // Plain text log
274        println!("{log}");
275    }
276}
277
278/// Send alert via webhook.
279///
280/// Posts a JSON payload to the specified webhook URL with the alert message.
281///
282/// # Errors
283///
284/// Returns an error if the HTTP request fails or returns a non-success status.
285pub async fn send_webhook_alert(webhook_url: &str, message: &str) -> anyhow::Result<()> {
286    let client = reqwest::Client::new();
287    let payload = serde_json::json!({
288        "text": message,
289        "timestamp": chrono::Utc::now().to_rfc3339(),
290    });
291
292    let response = client.post(webhook_url).json(&payload).send().await?;
293
294    if !response.status().is_success() {
295        anyhow::bail!("Webhook request failed with status: {}", response.status());
296    }
297
298    Ok(())
299}
300
301/// Retry an async operation with exponential backoff.
302///
303/// Attempts the operation up to `max_retries` times with exponential backoff
304/// between attempts.
305///
306/// # Type Parameters
307///
308/// * `F` - Future-producing function
309/// * `T` - Success result type
310/// * `E` - Error type (must implement Display)
311///
312/// # Errors
313///
314/// Returns the last error if all retry attempts fail.
315pub async fn retry_with_backoff<F, T, E>(
316    operation: F,
317    max_retries: u32,
318    operation_name: &str,
319) -> Result<T, E>
320where
321    F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
322    E: std::fmt::Display,
323{
324    let mut retries = 0;
325    loop {
326        match operation().await {
327            Ok(result) => return Ok(result),
328            Err(err) => {
329                retries += 1;
330                if retries >= max_retries {
331                    eprintln!(
332                        "{}",
333                        format!("✗ {operation_name} failed after {max_retries} attempts: {err}")
334                            .red()
335                    );
336                    return Err(err);
337                }
338
339                let backoff_secs = 2_u64.pow(retries);
340                eprintln!(
341                    "{}",
342                    format!(
343                        "⚠ {operation_name} attempt {retries} failed: {err}. Retrying in {backoff_secs}s..."
344                    )
345                    .yellow()
346                );
347                tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
348            }
349        }
350    }
351}
352
353/// Truncate a string to a maximum length with ellipsis.
354///
355/// If the string exceeds `max_len`, it will be truncated and "..." will be appended.
356///
357/// # Examples
358///
359/// ```
360/// # use celers_cli::command_utils::truncate_string;
361/// assert_eq!(truncate_string("Hello, World!", 10), "Hello, ...");
362/// assert_eq!(truncate_string("Short", 10), "Short");
363/// ```
364pub fn truncate_string(s: &str, max_len: usize) -> String {
365    if s.len() <= max_len {
366        s.to_string()
367    } else {
368        format!("{}...", &s[..max_len.saturating_sub(3)])
369    }
370}
371
372/// Format a timestamp into a human-readable relative time.
373///
374/// Converts a timestamp into a relative time string like "2 hours ago" or "just now".
375///
376/// # Examples
377///
378/// ```
379/// # use celers_cli::command_utils::format_relative_time;
380/// # use chrono::{Utc, Duration};
381/// let now = Utc::now();
382/// let past = now - Duration::minutes(5);
383/// assert!(format_relative_time(&past).contains("ago"));
384/// ```
385pub fn format_relative_time(timestamp: &chrono::DateTime<chrono::Utc>) -> String {
386    let now = chrono::Utc::now();
387    let duration = now.signed_duration_since(*timestamp);
388
389    if duration.num_seconds() < 60 {
390        "just now".to_string()
391    } else if duration.num_minutes() < 60 {
392        let mins = duration.num_minutes();
393        if mins == 1 {
394            "1 minute ago".to_string()
395        } else {
396            format!("{mins} minutes ago")
397        }
398    } else if duration.num_hours() < 24 {
399        let hours = duration.num_hours();
400        if hours == 1 {
401            "1 hour ago".to_string()
402        } else {
403            format!("{hours} hours ago")
404        }
405    } else {
406        let days = duration.num_days();
407        if days == 1 {
408            "1 day ago".to_string()
409        } else {
410            format!("{days} days ago")
411        }
412    }
413}
414
415/// Validate and normalize broker URL.
416///
417/// Ensures the broker URL has a valid scheme and normalizes the format.
418///
419/// # Errors
420///
421/// Returns an error if the URL is invalid or has an unsupported scheme.
422///
423/// # Examples
424///
425/// ```
426/// # use celers_cli::command_utils::validate_broker_url;
427/// assert!(validate_broker_url("redis://localhost:6379").is_ok());
428/// assert!(validate_broker_url("postgresql://localhost/db").is_ok());
429/// assert!(validate_broker_url("invalid").is_err());
430/// ```
431pub fn validate_broker_url(url: &str) -> anyhow::Result<String> {
432    if url.is_empty() {
433        anyhow::bail!("Broker URL cannot be empty");
434    }
435
436    // Check for valid scheme
437    let valid_schemes = [
438        "redis",
439        "rediss",
440        "postgres",
441        "postgresql",
442        "mysql",
443        "amqp",
444        "amqps",
445    ];
446    let has_valid_scheme = valid_schemes.iter().any(|&scheme| {
447        url.starts_with(&format!("{scheme}://")) || url.starts_with(&format!("{scheme}:"))
448    });
449
450    if !has_valid_scheme {
451        anyhow::bail!(
452            "Invalid broker URL scheme. Supported: {}",
453            valid_schemes.join(", ")
454        );
455    }
456
457    Ok(url.to_string())
458}
459
460/// Format a count with thousands separators.
461///
462/// Adds commas as thousands separators for better readability.
463///
464/// # Examples
465///
466/// ```
467/// # use celers_cli::command_utils::format_count;
468/// assert_eq!(format_count(1000), "1,000");
469/// assert_eq!(format_count(1234567), "1,234,567");
470/// ```
471pub fn format_count(count: usize) -> String {
472    let count_str = count.to_string();
473    let mut result = String::new();
474
475    for (i, ch) in count_str.chars().rev().enumerate() {
476        if i > 0 && i % 3 == 0 {
477            result.insert(0, ',');
478        }
479        result.insert(0, ch);
480    }
481
482    result
483}
484
485/// Display a simple progress indicator.
486///
487/// Prints a progress bar with percentage and optional message.
488pub fn print_progress(current: usize, total: usize, message: &str) {
489    if total == 0 {
490        return;
491    }
492
493    let percentage = (current as f64 / total as f64 * 100.0) as usize;
494    let bar_width = 40;
495    let filled = (bar_width * current) / total;
496    let empty = bar_width - filled;
497
498    let bar = format!(
499        "[{}{}] {}/{}",
500        "=".repeat(filled),
501        " ".repeat(empty),
502        current,
503        total
504    );
505
506    eprint!("\r{} {}% {}", bar, percentage, message);
507    if current >= total {
508        eprintln!(); // New line when complete
509    }
510}
511
512/// Write data to CSV file
513///
514/// # Arguments
515///
516/// * `file_path` - Output file path
517/// * `headers` - CSV column headers
518/// * `rows` - Data rows
519///
520/// # Examples
521///
522/// ```no_run
523/// use celers_cli::command_utils::write_csv;
524///
525/// # fn example() -> anyhow::Result<()> {
526/// let headers = vec!["Name", "Count"];
527/// let rows = vec![
528///     vec!["Tasks".to_string(), "100".to_string()],
529///     vec!["Workers".to_string(), "5".to_string()],
530/// ];
531/// write_csv("report.csv", &headers, &rows)?;
532/// # Ok(())
533/// # }
534/// ```
535pub fn write_csv(file_path: &str, headers: &[&str], rows: &[Vec<String>]) -> anyhow::Result<()> {
536    use std::fs::File;
537    let file = File::create(file_path)?;
538    let mut writer = csv::Writer::from_writer(file);
539
540    // Write headers
541    writer.write_record(headers)?;
542
543    // Write rows
544    for row in rows {
545        writer.write_record(row)?;
546    }
547
548    writer.flush()?;
549    Ok(())
550}
551
552/// Format task statistics as CSV rows
553///
554/// # Arguments
555///
556/// * `stats` - Map of task statistics (task_type -> count)
557///
558/// # Examples
559///
560/// ```
561/// use std::collections::HashMap;
562/// use celers_cli::command_utils::format_task_stats_csv;
563///
564/// let mut stats = HashMap::new();
565/// stats.insert("process_data".to_string(), 100);
566/// stats.insert("send_email".to_string(), 50);
567///
568/// let rows = format_task_stats_csv(&stats);
569/// assert_eq!(rows.len(), 2);
570/// ```
571pub fn format_task_stats_csv(stats: &std::collections::HashMap<String, usize>) -> Vec<Vec<String>> {
572    let mut rows: Vec<Vec<String>> = stats
573        .iter()
574        .map(|(task, count)| vec![task.clone(), count.to_string()])
575        .collect();
576
577    // Sort by count descending
578    rows.sort_by(|a, b| {
579        let count_a: usize = a[1].parse().unwrap_or(0);
580        let count_b: usize = b[1].parse().unwrap_or(0);
581        count_b.cmp(&count_a)
582    });
583
584    rows
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_mask_password() {
593        // Test PostgreSQL URL
594        let pg_url = "postgres://user:password123@localhost:5432/dbname";
595        let masked = mask_password(pg_url);
596        assert!(masked.contains("postgres://user:****@localhost"));
597        assert!(!masked.contains("password123"));
598
599        // Test MySQL URL
600        let mysql_url = "mysql://admin:secret@127.0.0.1:3306/db";
601        let masked = mask_password(mysql_url);
602        assert!(masked.contains("mysql://admin:****@127.0.0.1"));
603        assert!(!masked.contains("secret"));
604
605        // Test URL without password
606        let no_pass_url = "redis://localhost:6379";
607        let masked = mask_password(no_pass_url);
608        assert_eq!(masked, no_pass_url);
609    }
610
611    #[test]
612    fn test_format_bytes() {
613        assert_eq!(format_bytes(0), "0 B");
614        assert_eq!(format_bytes(500), "500 B");
615        assert_eq!(format_bytes(1024), "1.00 KB");
616        assert_eq!(format_bytes(1536), "1.50 KB");
617        assert_eq!(format_bytes(1048576), "1.00 MB");
618        assert_eq!(format_bytes(1073741824), "1.00 GB");
619    }
620
621    #[test]
622    fn test_format_duration() {
623        assert_eq!(format_duration(0), "0s");
624        assert_eq!(format_duration(30), "30s");
625        assert_eq!(format_duration(60), "1m");
626        assert_eq!(format_duration(90), "1m 30s");
627        assert_eq!(format_duration(3600), "1h");
628        assert_eq!(format_duration(3660), "1h 1m");
629        assert_eq!(format_duration(86400), "1d");
630        assert_eq!(format_duration(90000), "1d 1h");
631    }
632
633    #[test]
634    fn test_validate_task_id() {
635        // Valid UUID
636        let valid = "550e8400-e29b-41d4-a716-446655440000";
637        assert!(validate_task_id(valid).is_ok());
638
639        // Invalid UUIDs
640        assert!(validate_task_id("not-a-uuid").is_err());
641        assert!(validate_task_id("").is_err());
642        assert!(validate_task_id("12345").is_err());
643    }
644
645    #[test]
646    fn test_validate_queue_name() {
647        // Valid queue names
648        assert!(validate_queue_name("default").is_ok());
649        assert!(validate_queue_name("high-priority").is_ok());
650        assert!(validate_queue_name("queue_1").is_ok());
651
652        // Invalid queue names
653        assert!(validate_queue_name("").is_err()); // Empty
654        assert!(validate_queue_name("queue name").is_err()); // Whitespace
655        assert!(validate_queue_name(&"x".repeat(256)).is_err()); // Too long
656    }
657
658    #[test]
659    fn test_calculate_percentage() {
660        assert_eq!(calculate_percentage(0, 100), 0.0);
661        assert_eq!(calculate_percentage(50, 100), 50.0);
662        assert_eq!(calculate_percentage(100, 100), 100.0);
663        assert_eq!(calculate_percentage(25, 100), 25.0);
664
665        // Edge case: division by zero
666        assert_eq!(calculate_percentage(10, 0), 0.0);
667    }
668
669    #[test]
670    fn test_truncate_string() {
671        // Short strings should not be truncated
672        assert_eq!(truncate_string("Hello", 10), "Hello");
673        assert_eq!(truncate_string("", 10), "");
674
675        // Long strings should be truncated with ellipsis
676        assert_eq!(truncate_string("Hello, World!", 10), "Hello, ...");
677        assert_eq!(
678            truncate_string("This is a very long string", 15),
679            "This is a ve..."
680        );
681
682        // Edge case: max_len <= 3
683        assert_eq!(truncate_string("Hello", 3), "...");
684        assert_eq!(truncate_string("Hi", 2), "Hi");
685    }
686
687    #[test]
688    fn test_format_relative_time() {
689        use chrono::{Duration, Utc};
690
691        let now = Utc::now();
692
693        // Just now
694        let recent = now - Duration::seconds(30);
695        assert_eq!(format_relative_time(&recent), "just now");
696
697        // Minutes ago
698        let mins_ago = now - Duration::minutes(5);
699        assert_eq!(format_relative_time(&mins_ago), "5 minutes ago");
700
701        let one_min_ago = now - Duration::minutes(1);
702        assert_eq!(format_relative_time(&one_min_ago), "1 minute ago");
703
704        // Hours ago
705        let hours_ago = now - Duration::hours(3);
706        assert_eq!(format_relative_time(&hours_ago), "3 hours ago");
707
708        let one_hour_ago = now - Duration::hours(1);
709        assert_eq!(format_relative_time(&one_hour_ago), "1 hour ago");
710
711        // Days ago
712        let days_ago = now - Duration::days(2);
713        assert_eq!(format_relative_time(&days_ago), "2 days ago");
714
715        let one_day_ago = now - Duration::days(1);
716        assert_eq!(format_relative_time(&one_day_ago), "1 day ago");
717    }
718
719    #[test]
720    fn test_validate_broker_url() {
721        // Valid URLs
722        assert!(validate_broker_url("redis://localhost:6379").is_ok());
723        assert!(validate_broker_url("rediss://localhost:6379").is_ok());
724        assert!(validate_broker_url("postgres://localhost/db").is_ok());
725        assert!(validate_broker_url("postgresql://user:pass@localhost/db").is_ok());
726        assert!(validate_broker_url("mysql://localhost:3306/db").is_ok());
727        assert!(validate_broker_url("amqp://localhost:5672").is_ok());
728        assert!(validate_broker_url("amqps://localhost:5671").is_ok());
729
730        // Invalid URLs
731        assert!(validate_broker_url("").is_err()); // Empty
732        assert!(validate_broker_url("http://localhost").is_err()); // Wrong scheme
733        assert!(validate_broker_url("ftp://localhost").is_err()); // Wrong scheme
734        assert!(validate_broker_url("invalid").is_err()); // No scheme
735        assert!(validate_broker_url("localhost:6379").is_err()); // Missing scheme
736    }
737
738    #[test]
739    fn test_format_count() {
740        assert_eq!(format_count(0), "0");
741        assert_eq!(format_count(100), "100");
742        assert_eq!(format_count(999), "999");
743        assert_eq!(format_count(1000), "1,000");
744        assert_eq!(format_count(1234), "1,234");
745        assert_eq!(format_count(12345), "12,345");
746        assert_eq!(format_count(123456), "123,456");
747        assert_eq!(format_count(1234567), "1,234,567");
748        assert_eq!(format_count(12345678), "12,345,678");
749    }
750}