1use colored::Colorize;
7use tabled::{settings::Style, Table, Tabled};
8
9pub fn mask_password(url: &str) -> String {
24 if let Some(at_pos) = url.find('@') {
25 if let Some(colon_pos) = url[..at_pos].rfind(':') {
26 let before = &url[..=colon_pos];
27 let after = &url[at_pos..];
28 return format!("{before}****{after}");
29 }
30 }
31 url.to_string()
32}
33
34pub fn format_bytes(bytes: usize) -> String {
47 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
48 let mut size = bytes as f64;
49 let mut unit_idx = 0;
50
51 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
52 size /= 1024.0;
53 unit_idx += 1;
54 }
55
56 if unit_idx == 0 {
57 format!("{} {}", size as usize, UNITS[unit_idx])
58 } else {
59 format!("{:.2} {}", size, UNITS[unit_idx])
60 }
61}
62
63pub fn format_duration(seconds: u64) -> String {
75 if seconds < 60 {
76 format!("{seconds}s")
77 } else if seconds < 3600 {
78 let minutes = seconds / 60;
79 let secs = seconds % 60;
80 if secs == 0 {
81 format!("{minutes}m")
82 } else {
83 format!("{minutes}m {secs}s")
84 }
85 } else if seconds < 86400 {
86 let hours = seconds / 3600;
87 let minutes = (seconds % 3600) / 60;
88 if minutes == 0 {
89 format!("{hours}h")
90 } else {
91 format!("{hours}h {minutes}m")
92 }
93 } else {
94 let days = seconds / 86400;
95 let hours = (seconds % 86400) / 3600;
96 if hours == 0 {
97 format!("{days}d")
98 } else {
99 format!("{days}d {hours}h")
100 }
101 }
102}
103
104pub fn validate_task_id(task_id: &str) -> anyhow::Result<uuid::Uuid> {
123 uuid::Uuid::parse_str(task_id)
124 .map_err(|e| anyhow::anyhow!("Invalid task ID format: {e}. Expected UUID format."))
125}
126
127pub fn validate_queue_name(queue: &str) -> anyhow::Result<()> {
145 if queue.is_empty() {
146 anyhow::bail!("Queue name cannot be empty");
147 }
148 if queue.contains(char::is_whitespace) {
149 anyhow::bail!("Queue name cannot contain whitespace");
150 }
151 if queue.len() > 255 {
152 anyhow::bail!("Queue name too long (max 255 characters)");
153 }
154 Ok(())
155}
156
157pub fn calculate_percentage(part: usize, total: usize) -> f64 {
167 if total == 0 {
168 0.0
169 } else {
170 (part as f64 / total as f64) * 100.0
171 }
172}
173
174pub fn print_task_details(task: &celers_core::SerializedTask, location: &str) {
178 println!("{}", format!("Location: {location}").green().bold());
179 println!();
180
181 #[derive(Tabled)]
182 struct TaskDetail {
183 #[tabled(rename = "Field")]
184 field: String,
185 #[tabled(rename = "Value")]
186 value: String,
187 }
188
189 let details = vec![
190 TaskDetail {
191 field: "ID".to_string(),
192 value: task.metadata.id.to_string(),
193 },
194 TaskDetail {
195 field: "Name".to_string(),
196 value: task.metadata.name.clone(),
197 },
198 TaskDetail {
199 field: "State".to_string(),
200 value: format!("{:?}", task.metadata.state),
201 },
202 TaskDetail {
203 field: "Priority".to_string(),
204 value: task.metadata.priority.to_string(),
205 },
206 TaskDetail {
207 field: "Max Retries".to_string(),
208 value: task.metadata.max_retries.to_string(),
209 },
210 TaskDetail {
211 field: "Timeout".to_string(),
212 value: task
213 .metadata
214 .timeout_secs
215 .map_or_else(|| "default".to_string(), |s| format!("{s}s")),
216 },
217 TaskDetail {
218 field: "Created At".to_string(),
219 value: task.metadata.created_at.to_string(),
220 },
221 TaskDetail {
222 field: "Payload Size".to_string(),
223 value: format!("{} bytes", task.payload.len()),
224 },
225 ];
226
227 let table = Table::new(details).with(Style::rounded()).to_string();
228 println!("{table}");
229}
230
231pub fn display_log_line(log: &str, level_filter: Option<&str>) {
236 if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
237 let level = log_json
238 .get("level")
239 .and_then(|v| v.as_str())
240 .unwrap_or("INFO");
241 let message = log_json
242 .get("message")
243 .and_then(|v| v.as_str())
244 .unwrap_or(log);
245 let timestamp = log_json
246 .get("timestamp")
247 .and_then(|v| v.as_str())
248 .unwrap_or("");
249
250 if let Some(filter) = level_filter {
252 if !level.eq_ignore_ascii_case(filter) {
253 return;
254 }
255 }
256
257 let colored_level = match level {
259 "ERROR" | "error" => level.red().bold(),
260 "WARN" | "warn" => level.yellow().bold(),
261 "DEBUG" | "debug" => level.cyan(),
262 _ => level.green(),
263 };
264
265 let colored_msg = match level {
266 "ERROR" | "error" => message.red(),
267 "WARN" | "warn" => message.yellow(),
268 _ => message.normal(),
269 };
270
271 println!("[{timestamp}] {colored_level}: {colored_msg}");
272 } else {
273 println!("{log}");
275 }
276}
277
278pub async fn send_webhook_alert(webhook_url: &str, message: &str) -> anyhow::Result<()> {
286 let client = reqwest::Client::new();
287 let payload = serde_json::json!({
288 "text": message,
289 "timestamp": chrono::Utc::now().to_rfc3339(),
290 });
291
292 let response = client.post(webhook_url).json(&payload).send().await?;
293
294 if !response.status().is_success() {
295 anyhow::bail!("Webhook request failed with status: {}", response.status());
296 }
297
298 Ok(())
299}
300
301pub async fn retry_with_backoff<F, T, E>(
316 operation: F,
317 max_retries: u32,
318 operation_name: &str,
319) -> Result<T, E>
320where
321 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
322 E: std::fmt::Display,
323{
324 let mut retries = 0;
325 loop {
326 match operation().await {
327 Ok(result) => return Ok(result),
328 Err(err) => {
329 retries += 1;
330 if retries >= max_retries {
331 eprintln!(
332 "{}",
333 format!("✗ {operation_name} failed after {max_retries} attempts: {err}")
334 .red()
335 );
336 return Err(err);
337 }
338
339 let backoff_secs = 2_u64.pow(retries);
340 eprintln!(
341 "{}",
342 format!(
343 "⚠ {operation_name} attempt {retries} failed: {err}. Retrying in {backoff_secs}s..."
344 )
345 .yellow()
346 );
347 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
348 }
349 }
350 }
351}
352
353pub fn truncate_string(s: &str, max_len: usize) -> String {
365 if s.len() <= max_len {
366 s.to_string()
367 } else {
368 format!("{}...", &s[..max_len.saturating_sub(3)])
369 }
370}
371
372pub fn format_relative_time(timestamp: &chrono::DateTime<chrono::Utc>) -> String {
386 let now = chrono::Utc::now();
387 let duration = now.signed_duration_since(*timestamp);
388
389 if duration.num_seconds() < 60 {
390 "just now".to_string()
391 } else if duration.num_minutes() < 60 {
392 let mins = duration.num_minutes();
393 if mins == 1 {
394 "1 minute ago".to_string()
395 } else {
396 format!("{mins} minutes ago")
397 }
398 } else if duration.num_hours() < 24 {
399 let hours = duration.num_hours();
400 if hours == 1 {
401 "1 hour ago".to_string()
402 } else {
403 format!("{hours} hours ago")
404 }
405 } else {
406 let days = duration.num_days();
407 if days == 1 {
408 "1 day ago".to_string()
409 } else {
410 format!("{days} days ago")
411 }
412 }
413}
414
415pub fn validate_broker_url(url: &str) -> anyhow::Result<String> {
432 if url.is_empty() {
433 anyhow::bail!("Broker URL cannot be empty");
434 }
435
436 let valid_schemes = [
438 "redis",
439 "rediss",
440 "postgres",
441 "postgresql",
442 "mysql",
443 "amqp",
444 "amqps",
445 ];
446 let has_valid_scheme = valid_schemes.iter().any(|&scheme| {
447 url.starts_with(&format!("{scheme}://")) || url.starts_with(&format!("{scheme}:"))
448 });
449
450 if !has_valid_scheme {
451 anyhow::bail!(
452 "Invalid broker URL scheme. Supported: {}",
453 valid_schemes.join(", ")
454 );
455 }
456
457 Ok(url.to_string())
458}
459
460pub fn format_count(count: usize) -> String {
472 let count_str = count.to_string();
473 let mut result = String::new();
474
475 for (i, ch) in count_str.chars().rev().enumerate() {
476 if i > 0 && i % 3 == 0 {
477 result.insert(0, ',');
478 }
479 result.insert(0, ch);
480 }
481
482 result
483}
484
485pub fn print_progress(current: usize, total: usize, message: &str) {
489 if total == 0 {
490 return;
491 }
492
493 let percentage = (current as f64 / total as f64 * 100.0) as usize;
494 let bar_width = 40;
495 let filled = (bar_width * current) / total;
496 let empty = bar_width - filled;
497
498 let bar = format!(
499 "[{}{}] {}/{}",
500 "=".repeat(filled),
501 " ".repeat(empty),
502 current,
503 total
504 );
505
506 eprint!("\r{} {}% {}", bar, percentage, message);
507 if current >= total {
508 eprintln!(); }
510}
511
512pub fn write_csv(file_path: &str, headers: &[&str], rows: &[Vec<String>]) -> anyhow::Result<()> {
536 use std::fs::File;
537 let file = File::create(file_path)?;
538 let mut writer = csv::Writer::from_writer(file);
539
540 writer.write_record(headers)?;
542
543 for row in rows {
545 writer.write_record(row)?;
546 }
547
548 writer.flush()?;
549 Ok(())
550}
551
552pub fn format_task_stats_csv(stats: &std::collections::HashMap<String, usize>) -> Vec<Vec<String>> {
572 let mut rows: Vec<Vec<String>> = stats
573 .iter()
574 .map(|(task, count)| vec![task.clone(), count.to_string()])
575 .collect();
576
577 rows.sort_by(|a, b| {
579 let count_a: usize = a[1].parse().unwrap_or(0);
580 let count_b: usize = b[1].parse().unwrap_or(0);
581 count_b.cmp(&count_a)
582 });
583
584 rows
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_mask_password() {
593 let pg_url = "postgres://user:password123@localhost:5432/dbname";
595 let masked = mask_password(pg_url);
596 assert!(masked.contains("postgres://user:****@localhost"));
597 assert!(!masked.contains("password123"));
598
599 let mysql_url = "mysql://admin:secret@127.0.0.1:3306/db";
601 let masked = mask_password(mysql_url);
602 assert!(masked.contains("mysql://admin:****@127.0.0.1"));
603 assert!(!masked.contains("secret"));
604
605 let no_pass_url = "redis://localhost:6379";
607 let masked = mask_password(no_pass_url);
608 assert_eq!(masked, no_pass_url);
609 }
610
611 #[test]
612 fn test_format_bytes() {
613 assert_eq!(format_bytes(0), "0 B");
614 assert_eq!(format_bytes(500), "500 B");
615 assert_eq!(format_bytes(1024), "1.00 KB");
616 assert_eq!(format_bytes(1536), "1.50 KB");
617 assert_eq!(format_bytes(1048576), "1.00 MB");
618 assert_eq!(format_bytes(1073741824), "1.00 GB");
619 }
620
621 #[test]
622 fn test_format_duration() {
623 assert_eq!(format_duration(0), "0s");
624 assert_eq!(format_duration(30), "30s");
625 assert_eq!(format_duration(60), "1m");
626 assert_eq!(format_duration(90), "1m 30s");
627 assert_eq!(format_duration(3600), "1h");
628 assert_eq!(format_duration(3660), "1h 1m");
629 assert_eq!(format_duration(86400), "1d");
630 assert_eq!(format_duration(90000), "1d 1h");
631 }
632
633 #[test]
634 fn test_validate_task_id() {
635 let valid = "550e8400-e29b-41d4-a716-446655440000";
637 assert!(validate_task_id(valid).is_ok());
638
639 assert!(validate_task_id("not-a-uuid").is_err());
641 assert!(validate_task_id("").is_err());
642 assert!(validate_task_id("12345").is_err());
643 }
644
645 #[test]
646 fn test_validate_queue_name() {
647 assert!(validate_queue_name("default").is_ok());
649 assert!(validate_queue_name("high-priority").is_ok());
650 assert!(validate_queue_name("queue_1").is_ok());
651
652 assert!(validate_queue_name("").is_err()); assert!(validate_queue_name("queue name").is_err()); assert!(validate_queue_name(&"x".repeat(256)).is_err()); }
657
658 #[test]
659 fn test_calculate_percentage() {
660 assert_eq!(calculate_percentage(0, 100), 0.0);
661 assert_eq!(calculate_percentage(50, 100), 50.0);
662 assert_eq!(calculate_percentage(100, 100), 100.0);
663 assert_eq!(calculate_percentage(25, 100), 25.0);
664
665 assert_eq!(calculate_percentage(10, 0), 0.0);
667 }
668
669 #[test]
670 fn test_truncate_string() {
671 assert_eq!(truncate_string("Hello", 10), "Hello");
673 assert_eq!(truncate_string("", 10), "");
674
675 assert_eq!(truncate_string("Hello, World!", 10), "Hello, ...");
677 assert_eq!(
678 truncate_string("This is a very long string", 15),
679 "This is a ve..."
680 );
681
682 assert_eq!(truncate_string("Hello", 3), "...");
684 assert_eq!(truncate_string("Hi", 2), "Hi");
685 }
686
687 #[test]
688 fn test_format_relative_time() {
689 use chrono::{Duration, Utc};
690
691 let now = Utc::now();
692
693 let recent = now - Duration::seconds(30);
695 assert_eq!(format_relative_time(&recent), "just now");
696
697 let mins_ago = now - Duration::minutes(5);
699 assert_eq!(format_relative_time(&mins_ago), "5 minutes ago");
700
701 let one_min_ago = now - Duration::minutes(1);
702 assert_eq!(format_relative_time(&one_min_ago), "1 minute ago");
703
704 let hours_ago = now - Duration::hours(3);
706 assert_eq!(format_relative_time(&hours_ago), "3 hours ago");
707
708 let one_hour_ago = now - Duration::hours(1);
709 assert_eq!(format_relative_time(&one_hour_ago), "1 hour ago");
710
711 let days_ago = now - Duration::days(2);
713 assert_eq!(format_relative_time(&days_ago), "2 days ago");
714
715 let one_day_ago = now - Duration::days(1);
716 assert_eq!(format_relative_time(&one_day_ago), "1 day ago");
717 }
718
719 #[test]
720 fn test_validate_broker_url() {
721 assert!(validate_broker_url("redis://localhost:6379").is_ok());
723 assert!(validate_broker_url("rediss://localhost:6379").is_ok());
724 assert!(validate_broker_url("postgres://localhost/db").is_ok());
725 assert!(validate_broker_url("postgresql://user:pass@localhost/db").is_ok());
726 assert!(validate_broker_url("mysql://localhost:3306/db").is_ok());
727 assert!(validate_broker_url("amqp://localhost:5672").is_ok());
728 assert!(validate_broker_url("amqps://localhost:5671").is_ok());
729
730 assert!(validate_broker_url("").is_err()); assert!(validate_broker_url("http://localhost").is_err()); assert!(validate_broker_url("ftp://localhost").is_err()); assert!(validate_broker_url("invalid").is_err()); assert!(validate_broker_url("localhost:6379").is_err()); }
737
738 #[test]
739 fn test_format_count() {
740 assert_eq!(format_count(0), "0");
741 assert_eq!(format_count(100), "100");
742 assert_eq!(format_count(999), "999");
743 assert_eq!(format_count(1000), "1,000");
744 assert_eq!(format_count(1234), "1,234");
745 assert_eq!(format_count(12345), "12,345");
746 assert_eq!(format_count(123456), "123,456");
747 assert_eq!(format_count(1234567), "1,234,567");
748 assert_eq!(format_count(12345678), "12,345,678");
749 }
750}