celers_cli/
commands.rs

1//! CLI command implementations for `CeleRS` distributed task queue management.
2//!
3//! This module provides all the command implementations for the `CeleRS` CLI tool.
4//! Commands are organized into the following categories:
5//!
6//! - **Worker Management**: Starting, stopping, pausing, scaling workers
7//! - **Queue Operations**: Listing, purging, moving, importing/exporting queues
8//! - **Task Management**: Inspecting, canceling, retrying, and monitoring tasks
9//! - **DLQ Operations**: Managing failed tasks in the Dead Letter Queue
10//! - **Scheduling**: Managing scheduled/periodic tasks with cron expressions
11//! - **Monitoring**: Metrics, dashboard, health checks, and diagnostics
12//! - **Database**: Connection testing, health checks, migrations
13//! - **Configuration**: Validation, initialization, and profile management
14//!
15//! # Error Handling
16//!
17//! All functions return `anyhow::Result<()>` for consistent error handling.
18//! Errors are user-friendly and provide actionable feedback.
19//!
20//! # Examples
21//!
22//! ```no_run
23//! use celers_cli::commands;
24//!
25//! #[tokio::main]
26//! async fn main() -> anyhow::Result<()> {
27//!     // Start a worker
28//!     commands::start_worker(
29//!         "redis://localhost:6379",
30//!         "my_queue",
31//!         "fifo",
32//!         4,
33//!         3,
34//!         300
35//!     ).await?;
36//!     Ok(())
37//! }
38//! ```
39
40use celers_broker_redis::{QueueMode, RedisBroker};
41use celers_core::Broker;
42use celers_worker::{wait_for_signal, Worker, WorkerConfig};
43use chrono::Utc;
44use colored::Colorize;
45use tabled::{settings::Style, Table, Tabled};
46
47/// Start a worker with the given configuration.
48///
49/// Creates and runs a worker that processes tasks from the specified queue.
50/// The worker will run until it receives a shutdown signal (Ctrl+C).
51///
52/// # Arguments
53///
54/// * `broker_url` - Redis connection URL (e.g., `redis://localhost:6379`)
55/// * `queue` - Queue name to process tasks from
56/// * `mode` - Queue mode: "fifo" for FIFO, "priority" for priority-based
57/// * `concurrency` - Maximum number of concurrent tasks to process
58/// * `max_retries` - Maximum retry attempts for failed tasks
59/// * `timeout` - Task execution timeout in seconds
60///
61/// # Returns
62///
63/// Returns `Ok(())` on successful shutdown, or an error if worker fails to start.
64///
65/// # Examples
66///
67/// ```no_run
68/// # use celers_cli::commands::start_worker;
69/// # #[tokio::main]
70/// # async fn main() -> anyhow::Result<()> {
71/// // Start a FIFO worker with 4 concurrent tasks
72/// start_worker(
73///     "redis://localhost:6379",
74///     "my_queue",
75///     "fifo",
76///     4,
77///     3,
78///     300
79/// ).await?;
80/// # Ok(())
81/// # }
82/// ```
83pub async fn start_worker(
84    broker_url: &str,
85    queue: &str,
86    mode: &str,
87    concurrency: usize,
88    max_retries: u32,
89    timeout: u64,
90) -> anyhow::Result<()> {
91    println!("{}", "=== CeleRS Worker ===".bold().green());
92    println!();
93
94    // Parse queue mode
95    let queue_mode = match mode.to_lowercase().as_str() {
96        "priority" => QueueMode::Priority,
97        _ => QueueMode::Fifo,
98    };
99
100    // Create broker
101    let broker = RedisBroker::with_mode(broker_url, queue, queue_mode)?;
102    println!("✓ Connected to Redis: {}", broker_url.cyan());
103    println!("✓ Queue: {} (mode: {})", queue.cyan(), mode.cyan());
104
105    // Create empty task registry (users would register their tasks)
106    let registry = celers_core::TaskRegistry::new();
107    println!("⚠️  No tasks registered. Register tasks in your application code.");
108
109    // Configure worker
110    let config = WorkerConfig {
111        concurrency,
112        poll_interval_ms: 1000,
113        max_retries,
114        default_timeout_secs: timeout,
115        ..Default::default()
116    };
117
118    println!();
119    println!("Worker configuration:");
120    println!("  Concurrency: {}", concurrency.to_string().yellow());
121    println!("  Max retries: {}", max_retries.to_string().yellow());
122    println!("  Timeout: {}s", timeout.to_string().yellow());
123    println!();
124
125    // Create worker
126    let worker = Worker::new(broker, registry, config);
127    println!("{}", "✓ Worker started successfully".green().bold());
128    println!("{}", "  Press Ctrl+C to stop gracefully".dimmed());
129    println!();
130
131    // Set up signal handler
132    let worker_task = tokio::spawn(async move {
133        if let Err(e) = worker.run().await {
134            eprintln!("Worker error: {e}");
135        }
136    });
137
138    // Wait for shutdown signal
139    wait_for_signal().await;
140
141    println!();
142    println!("{}", "Shutting down gracefully...".yellow());
143    worker_task.abort();
144
145    println!("{}", "✓ Worker stopped".green());
146
147    Ok(())
148}
149
150/// Display queue status and statistics.
151///
152/// Shows current queue metrics including pending tasks, DLQ size, and health warnings.
153/// Uses a formatted table for clear visualization of queue state.
154///
155/// # Arguments
156///
157/// * `broker_url` - Redis connection URL
158/// * `queue` - Queue name to check status for
159///
160/// # Returns
161///
162/// Returns `Ok(())` on success, or an error if connection fails.
163///
164/// # Examples
165///
166/// ```no_run
167/// # use celers_cli::commands::show_status;
168/// # #[tokio::main]
169/// # async fn main() -> anyhow::Result<()> {
170/// show_status("redis://localhost:6379", "my_queue").await?;
171/// # Ok(())
172/// # }
173/// ```
174pub async fn show_status(broker_url: &str, queue: &str) -> anyhow::Result<()> {
175    let broker = RedisBroker::new(broker_url, queue)?;
176
177    println!("{}", "=== Queue Status ===".bold().cyan());
178    println!();
179
180    let queue_size = broker.queue_size().await?;
181    let dlq_size = broker.dlq_size().await?;
182
183    #[derive(Tabled)]
184    struct QueueStats {
185        #[tabled(rename = "Metric")]
186        metric: String,
187        #[tabled(rename = "Value")]
188        value: String,
189    }
190
191    let stats = vec![
192        QueueStats {
193            metric: "Queue".to_string(),
194            value: queue.to_string(),
195        },
196        QueueStats {
197            metric: "Pending Tasks".to_string(),
198            value: queue_size.to_string(),
199        },
200        QueueStats {
201            metric: "Failed Tasks (DLQ)".to_string(),
202            value: dlq_size.to_string(),
203        },
204        QueueStats {
205            metric: "Total".to_string(),
206            value: (queue_size + dlq_size).to_string(),
207        },
208    ];
209
210    let table = Table::new(stats).with(Style::rounded()).to_string();
211    println!("{table}");
212
213    if dlq_size > 0 {
214        println!();
215        println!(
216            "{}",
217            format!("⚠️  {dlq_size} tasks in Dead Letter Queue")
218                .yellow()
219                .bold()
220        );
221        println!("   Run: celers dlq inspect --broker {broker_url} --queue {queue}");
222    }
223
224    Ok(())
225}
226
227/// Inspect failed tasks in the Dead Letter Queue (DLQ).
228///
229/// Displays a list of failed tasks with their IDs, task names, and failure metadata.
230/// Useful for debugging and understanding why tasks are failing.
231///
232/// # Arguments
233///
234/// * `broker_url` - Redis connection URL
235/// * `queue` - Queue name
236/// * `limit` - Maximum number of tasks to display (default: 10)
237///
238/// # Returns
239///
240/// Returns `Ok(())` on success, or an error if connection fails.
241///
242/// # Examples
243///
244/// ```no_run
245/// # use celers_cli::commands::inspect_dlq;
246/// # #[tokio::main]
247/// # async fn main() -> anyhow::Result<()> {
248/// // Show last 20 failed tasks
249/// inspect_dlq("redis://localhost:6379", "my_queue", 20).await?;
250/// # Ok(())
251/// # }
252/// ```
253pub async fn inspect_dlq(broker_url: &str, queue: &str, limit: usize) -> anyhow::Result<()> {
254    let broker = RedisBroker::new(broker_url, queue)?;
255
256    println!("{}", "=== Dead Letter Queue ===".bold().red());
257    println!();
258
259    let dlq_size = broker.dlq_size().await?;
260    println!("Total failed tasks: {}", dlq_size.to_string().yellow());
261
262    if dlq_size == 0 {
263        println!("{}", "✓ DLQ is empty".green());
264        return Ok(());
265    }
266
267    println!();
268    let tasks = broker.inspect_dlq(limit as isize).await?;
269
270    for (idx, task) in tasks.iter().enumerate() {
271        println!("{}", format!("Task #{}", idx + 1).bold());
272        println!("  ID: {}", task.metadata.id.to_string().cyan());
273        println!("  Name: {}", task.metadata.name.yellow());
274        println!("  State: {:?}", task.metadata.state);
275        println!("  Max Retries: {}", task.metadata.max_retries);
276        println!("  Created: {}", task.metadata.created_at);
277        println!();
278    }
279
280    println!(
281        "Showing {} of {} tasks",
282        tasks.len().to_string().yellow(),
283        dlq_size
284    );
285
286    Ok(())
287}
288
289/// Clear Dead Letter Queue
290pub async fn clear_dlq(broker_url: &str, queue: &str, confirm: bool) -> anyhow::Result<()> {
291    let broker = RedisBroker::new(broker_url, queue)?;
292
293    let dlq_size = broker.dlq_size().await?;
294
295    if dlq_size == 0 {
296        println!("{}", "✓ DLQ is already empty".green());
297        return Ok(());
298    }
299
300    if !confirm {
301        println!(
302            "{}",
303            format!("⚠️  This will delete {dlq_size} tasks from DLQ")
304                .yellow()
305                .bold()
306        );
307        println!("   Add --confirm to proceed");
308        return Ok(());
309    }
310
311    let count = broker.clear_dlq().await?;
312    println!("{}", format!("✓ Cleared {count} tasks from DLQ").green());
313
314    Ok(())
315}
316
317/// Replay a task from DLQ
318pub async fn replay_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
319    let broker = RedisBroker::new(broker_url, queue)?;
320
321    let task_id = task_id_str
322        .parse::<uuid::Uuid>()
323        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
324
325    println!("{}", "=== Replaying Task from DLQ ===".bold().cyan());
326    println!("Task ID: {}", task_id.to_string().yellow());
327
328    let replayed = broker.replay_from_dlq(&task_id).await?;
329
330    if replayed {
331        println!("{}", "✓ Task replayed successfully".green().bold());
332        println!("  The task will be processed again by workers");
333    } else {
334        println!("{}", "✗ Task not found in DLQ".red());
335    }
336
337    Ok(())
338}
339
340/// Generate a default configuration file with template settings.
341///
342/// Creates a TOML configuration file with commented examples for all settings.
343/// The generated file can be customized for different environments.
344///
345/// # Arguments
346///
347/// * `path` - Output file path (default: "celers.toml")
348///
349/// # Returns
350///
351/// Returns `Ok(())` on success, or an error if file creation fails.
352///
353/// # Examples
354///
355/// ```no_run
356/// # use celers_cli::commands::init_config;
357/// # #[tokio::main]
358/// # async fn main() -> anyhow::Result<()> {
359/// // Create default config
360/// init_config("celers.toml").await?;
361///
362/// // Create production config
363/// init_config("celers-prod.toml").await?;
364/// # Ok(())
365/// # }
366/// ```
367#[allow(clippy::unused_async)]
368pub async fn init_config(path: &str) -> anyhow::Result<()> {
369    let config = crate::config::Config::default_config();
370
371    config.to_file(path)?;
372
373    println!("{}", "✓ Configuration file created".green().bold());
374    println!("  Location: {}", path.cyan());
375    println!();
376    println!("Edit the file and run:");
377    println!("  celers worker --config {path}");
378
379    Ok(())
380}
381
382/// List all queues (Redis only)
383pub async fn list_queues(broker_url: &str) -> anyhow::Result<()> {
384    // Connect to Redis
385    let client = redis::Client::open(broker_url)?;
386    let mut conn = client.get_multiplexed_async_connection().await?;
387
388    println!("{}", "=== Redis Queues ===".bold().cyan());
389    println!();
390
391    // Scan for keys matching queue patterns
392    let mut cursor = 0;
393    let mut queue_keys: Vec<String> = Vec::new();
394
395    loop {
396        let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
397            .arg(cursor)
398            .arg("MATCH")
399            .arg("celers:*")
400            .arg("COUNT")
401            .arg(100)
402            .query_async(&mut conn)
403            .await?;
404
405        queue_keys.extend(keys);
406        cursor = new_cursor;
407
408        if cursor == 0 {
409            break;
410        }
411    }
412
413    if queue_keys.is_empty() {
414        println!("{}", "No queues found".yellow());
415        return Ok(());
416    }
417
418    #[derive(Tabled)]
419    struct QueueInfo {
420        #[tabled(rename = "Queue")]
421        name: String,
422        #[tabled(rename = "Type")]
423        queue_type: String,
424        #[tabled(rename = "Size")]
425        size: String,
426    }
427
428    let mut queue_infos = Vec::new();
429
430    for key in queue_keys {
431        let key_type: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
432
433        let size: isize = match key_type.as_str() {
434            "list" => redis::cmd("LLEN").arg(&key).query_async(&mut conn).await?,
435            "zset" => redis::cmd("ZCARD").arg(&key).query_async(&mut conn).await?,
436            _ => 0,
437        };
438
439        let queue_type = if key.contains(":dlq") {
440            "DLQ".to_string()
441        } else if key.contains(":delayed") {
442            "Delayed".to_string()
443        } else if key_type == "zset" {
444            "Priority".to_string()
445        } else {
446            "FIFO".to_string()
447        };
448
449        queue_infos.push(QueueInfo {
450            name: key,
451            queue_type,
452            size: size.to_string(),
453        });
454    }
455
456    let table = Table::new(queue_infos).with(Style::rounded()).to_string();
457    println!("{table}");
458
459    Ok(())
460}
461
462/// Purge all tasks from a queue
463pub async fn purge_queue(broker_url: &str, queue: &str, confirm: bool) -> anyhow::Result<()> {
464    let broker = RedisBroker::new(broker_url, queue)?;
465
466    let queue_size = broker.queue_size().await?;
467
468    if queue_size == 0 {
469        println!("{}", "✓ Queue is already empty".green());
470        return Ok(());
471    }
472
473    if !confirm {
474        println!(
475            "{}",
476            format!("⚠️  This will delete {queue_size} tasks from queue '{queue}'")
477                .yellow()
478                .bold()
479        );
480        println!("   Add --confirm to proceed");
481        return Ok(());
482    }
483
484    // Connect to Redis directly to delete the queue
485    let client = redis::Client::open(broker_url)?;
486    let mut conn = client.get_multiplexed_async_connection().await?;
487
488    let queue_key = format!("celers:{queue}");
489    redis::cmd("DEL")
490        .arg(&queue_key)
491        .query_async::<()>(&mut conn)
492        .await?;
493
494    println!(
495        "{}",
496        format!("✓ Purged {queue_size} tasks from queue '{queue}'").green()
497    );
498
499    Ok(())
500}
501
502/// Inspect a specific task by ID
503pub async fn inspect_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
504    let task_id = task_id_str
505        .parse::<uuid::Uuid>()
506        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
507
508    println!("{}", "=== Task Details ===".bold().cyan());
509    println!("Task ID: {}", task_id.to_string().yellow());
510    println!();
511
512    // Connect to Redis
513    let client = redis::Client::open(broker_url)?;
514    let mut conn = client.get_multiplexed_async_connection().await?;
515
516    // Search in main queue
517    let queue_key = format!("celers:{queue}");
518    let queue_type: String = redis::cmd("TYPE")
519        .arg(&queue_key)
520        .query_async(&mut conn)
521        .await?;
522
523    let mut found = false;
524
525    // Try to find task in main queue
526    if queue_type == "list" {
527        let tasks: Vec<String> = redis::cmd("LRANGE")
528            .arg(&queue_key)
529            .arg(0)
530            .arg(-1)
531            .query_async(&mut conn)
532            .await?;
533
534        for task_str in tasks {
535            if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
536                if task.metadata.id == task_id {
537                    print_task_details(&task, "Main Queue");
538                    found = true;
539                    break;
540                }
541            }
542        }
543    } else if queue_type == "zset" {
544        let tasks: Vec<String> = redis::cmd("ZRANGE")
545            .arg(&queue_key)
546            .arg(0)
547            .arg(-1)
548            .query_async(&mut conn)
549            .await?;
550
551        for task_str in tasks {
552            if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
553                if task.metadata.id == task_id {
554                    print_task_details(&task, "Main Queue (Priority)");
555                    found = true;
556                    break;
557                }
558            }
559        }
560    }
561
562    // Try DLQ if not found
563    if !found {
564        let dlq_key = format!("celers:{queue}:dlq");
565        let dlq_tasks: Vec<String> = redis::cmd("LRANGE")
566            .arg(&dlq_key)
567            .arg(0)
568            .arg(-1)
569            .query_async(&mut conn)
570            .await?;
571
572        for task_str in dlq_tasks {
573            if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
574                if task.metadata.id == task_id {
575                    print_task_details(&task, "Dead Letter Queue");
576                    found = true;
577                    break;
578                }
579            }
580        }
581    }
582
583    // Try delayed queue if not found
584    if !found {
585        let delayed_key = format!("celers:{queue}:delayed");
586        let delayed_tasks: Vec<String> = redis::cmd("ZRANGE")
587            .arg(&delayed_key)
588            .arg(0)
589            .arg(-1)
590            .query_async(&mut conn)
591            .await?;
592
593        for task_str in delayed_tasks {
594            if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
595                if task.metadata.id == task_id {
596                    print_task_details(&task, "Delayed Queue");
597                    found = true;
598                    break;
599                }
600            }
601        }
602    }
603
604    if !found {
605        println!("{}", "✗ Task not found in any queue".red());
606        println!();
607        println!("The task may have been:");
608        println!("  • Already processed");
609        println!("  • Deleted");
610        println!("  • In a different queue");
611    }
612
613    Ok(())
614}
615
616/// Helper to print task details
617fn print_task_details(task: &celers_core::SerializedTask, location: &str) {
618    println!("{}", format!("Location: {location}").green().bold());
619    println!();
620
621    #[derive(Tabled)]
622    struct TaskDetail {
623        #[tabled(rename = "Field")]
624        field: String,
625        #[tabled(rename = "Value")]
626        value: String,
627    }
628
629    let details = vec![
630        TaskDetail {
631            field: "ID".to_string(),
632            value: task.metadata.id.to_string(),
633        },
634        TaskDetail {
635            field: "Name".to_string(),
636            value: task.metadata.name.clone(),
637        },
638        TaskDetail {
639            field: "State".to_string(),
640            value: format!("{:?}", task.metadata.state),
641        },
642        TaskDetail {
643            field: "Priority".to_string(),
644            value: task.metadata.priority.to_string(),
645        },
646        TaskDetail {
647            field: "Max Retries".to_string(),
648            value: task.metadata.max_retries.to_string(),
649        },
650        TaskDetail {
651            field: "Timeout".to_string(),
652            value: task
653                .metadata
654                .timeout_secs
655                .map_or_else(|| "default".to_string(), |s| format!("{s}s")),
656        },
657        TaskDetail {
658            field: "Created At".to_string(),
659            value: task.metadata.created_at.to_string(),
660        },
661        TaskDetail {
662            field: "Payload Size".to_string(),
663            value: format!("{} bytes", task.payload.len()),
664        },
665    ];
666
667    let table = Table::new(details).with(Style::rounded()).to_string();
668    println!("{table}");
669}
670
671/// Cancel a running or pending task
672pub async fn cancel_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
673    let task_id = task_id_str
674        .parse::<uuid::Uuid>()
675        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
676
677    println!("{}", "=== Cancel Task ===".bold().yellow());
678    println!("Task ID: {}", task_id.to_string().yellow());
679    println!();
680
681    // Create broker
682    let broker = RedisBroker::new(broker_url, queue)?;
683    println!("✓ Connected to Redis: {}", broker_url.cyan());
684    println!();
685
686    // Send cancellation signal
687    println!("Sending cancellation signal...");
688    let cancelled = broker.cancel(&task_id).await?;
689
690    if cancelled {
691        println!(
692            "{}",
693            "✓ Cancellation signal sent successfully".green().bold()
694        );
695        println!();
696        println!("The task will be cancelled if:");
697        println!("  • It's currently running and has cancellation checkpoints");
698        println!("  • It's pending in the queue (will be removed)");
699        println!("  • Workers are subscribed to the cancellation channel");
700        println!();
701        println!(
702            "{}",
703            "Note: Task cancellation depends on worker implementation".yellow()
704        );
705    } else {
706        println!(
707            "{}",
708            "⚠️  No workers subscribed to cancellation channel"
709                .yellow()
710                .bold()
711        );
712        println!();
713        println!("Possible reasons:");
714        println!("  • No workers are currently running");
715        println!("  • Workers don't support cancellation");
716        println!("  • The task has already completed");
717        println!();
718        println!("Make sure workers are running and support task cancellation.");
719    }
720
721    Ok(())
722}
723
724/// Retry a failed task (from any queue)
725pub async fn retry_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
726    let task_id = task_id_str
727        .parse::<uuid::Uuid>()
728        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
729
730    println!("{}", "=== Retry Task ===".bold().cyan());
731    println!("Task ID: {}", task_id.to_string().yellow());
732    println!();
733
734    // Connect to Redis
735    let client = redis::Client::open(broker_url)?;
736    let mut conn = client.get_multiplexed_async_connection().await?;
737
738    let mut task: Option<celers_core::SerializedTask> = None;
739    let mut source_queue: Option<String> = None;
740
741    // Search for task in main queue
742    let queue_key = format!("celers:{queue}");
743    let queue_type: String = redis::cmd("TYPE")
744        .arg(&queue_key)
745        .query_async(&mut conn)
746        .await?;
747
748    // Try to find task in main queue (FIFO/Priority)
749    if queue_type == "list" {
750        let tasks: Vec<String> = redis::cmd("LRANGE")
751            .arg(&queue_key)
752            .arg(0)
753            .arg(-1)
754            .query_async(&mut conn)
755            .await?;
756
757        for task_str in &tasks {
758            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
759                if t.metadata.id == task_id {
760                    task = Some(t);
761                    source_queue = Some("main_list".to_string());
762                    // Remove from list
763                    let _: usize = redis::cmd("LREM")
764                        .arg(&queue_key)
765                        .arg(1)
766                        .arg(task_str)
767                        .query_async(&mut conn)
768                        .await?;
769                    break;
770                }
771            }
772        }
773    } else if queue_type == "zset" {
774        let tasks: Vec<String> = redis::cmd("ZRANGE")
775            .arg(&queue_key)
776            .arg(0)
777            .arg(-1)
778            .query_async(&mut conn)
779            .await?;
780
781        for task_str in tasks {
782            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
783                if t.metadata.id == task_id {
784                    task = Some(t);
785                    source_queue = Some("main_zset".to_string());
786                    // Remove from sorted set
787                    let _: usize = redis::cmd("ZREM")
788                        .arg(&queue_key)
789                        .arg(&task_str)
790                        .query_async(&mut conn)
791                        .await?;
792                    break;
793                }
794            }
795        }
796    }
797
798    // Try DLQ if not found
799    if task.is_none() {
800        let dlq_key = format!("celers:{queue}:dlq");
801        let dlq_tasks: Vec<String> = redis::cmd("LRANGE")
802            .arg(&dlq_key)
803            .arg(0)
804            .arg(-1)
805            .query_async(&mut conn)
806            .await?;
807
808        for task_str in dlq_tasks {
809            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
810                if t.metadata.id == task_id {
811                    task = Some(t);
812                    source_queue = Some("dlq".to_string());
813                    // Remove from DLQ
814                    let _: usize = redis::cmd("LREM")
815                        .arg(&dlq_key)
816                        .arg(1)
817                        .arg(&task_str)
818                        .query_async(&mut conn)
819                        .await?;
820                    break;
821                }
822            }
823        }
824    }
825
826    // Try delayed queue if not found
827    if task.is_none() {
828        let delayed_key = format!("celers:{queue}:delayed");
829        let delayed_tasks: Vec<String> = redis::cmd("ZRANGE")
830            .arg(&delayed_key)
831            .arg(0)
832            .arg(-1)
833            .query_async(&mut conn)
834            .await?;
835
836        for task_str in delayed_tasks {
837            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
838                if t.metadata.id == task_id {
839                    task = Some(t);
840                    source_queue = Some("delayed".to_string());
841                    // Remove from delayed queue
842                    let _: usize = redis::cmd("ZREM")
843                        .arg(&delayed_key)
844                        .arg(&task_str)
845                        .query_async(&mut conn)
846                        .await?;
847                    break;
848                }
849            }
850        }
851    }
852
853    // If task found, reset and re-enqueue
854    if let Some(mut t) = task {
855        println!(
856            "✓ Task found in: {}",
857            source_queue
858                .expect("source_queue set when task is found")
859                .cyan()
860        );
861        println!();
862
863        // Reset task state
864        t.metadata.state = celers_core::TaskState::Pending;
865        t.metadata.updated_at = Utc::now();
866
867        // Re-enqueue to main queue
868        let task_json = serde_json::to_string(&t)?;
869        let _: usize = redis::cmd("LPUSH")
870            .arg(&queue_key)
871            .arg(&task_json)
872            .query_async(&mut conn)
873            .await?;
874
875        println!("{}", "✓ Task retried successfully".green().bold());
876        println!();
877        println!("The task has been:");
878        println!("  • Removed from its current queue");
879        println!("  • Reset to Pending state");
880        println!("  • Re-enqueued to main queue");
881        println!();
882        println!("Workers will process it again.");
883    } else {
884        println!("{}", "✗ Task not found in any queue".red());
885        println!();
886        println!("The task may have been:");
887        println!("  • Already processed and completed");
888        println!("  • Deleted manually");
889        println!("  • In a different queue");
890    }
891
892    Ok(())
893}
894
895/// Show task result from backend
896pub async fn show_task_result(backend_url: &str, task_id_str: &str) -> anyhow::Result<()> {
897    let task_id = task_id_str
898        .parse::<uuid::Uuid>()
899        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
900
901    println!("{}", "=== Task Result ===".bold().cyan());
902    println!("Task ID: {}", task_id.to_string().yellow());
903    println!();
904
905    // Connect to Redis
906    let client = redis::Client::open(backend_url)?;
907    let mut conn = client.get_multiplexed_async_connection().await?;
908
909    // Check for task result in Redis backend
910    let result_key = format!("celery-task-meta-{task_id}");
911    let result_data: Option<String> = redis::cmd("GET")
912        .arg(&result_key)
913        .query_async(&mut conn)
914        .await?;
915
916    if let Some(data) = result_data {
917        // Parse the result JSON
918        let result: serde_json::Value = serde_json::from_str(&data)?;
919
920        println!("{}", "✓ Task result found".green().bold());
921        println!();
922
923        #[derive(Tabled)]
924        struct ResultField {
925            #[tabled(rename = "Field")]
926            field: String,
927            #[tabled(rename = "Value")]
928            value: String,
929        }
930
931        let mut fields = vec![
932            ResultField {
933                field: "Task ID".to_string(),
934                value: result
935                    .get("task_id")
936                    .and_then(|v| v.as_str())
937                    .unwrap_or("N/A")
938                    .to_string(),
939            },
940            ResultField {
941                field: "Status".to_string(),
942                value: result
943                    .get("status")
944                    .and_then(|v| v.as_str())
945                    .unwrap_or("UNKNOWN")
946                    .to_string(),
947            },
948        ];
949
950        // Add result if present
951        if let Some(task_result) = result.get("result") {
952            fields.push(ResultField {
953                field: "Result".to_string(),
954                value: serde_json::to_string_pretty(task_result)?,
955            });
956        }
957
958        // Add error info if present
959        if let Some(traceback) = result.get("traceback") {
960            if !traceback.is_null() {
961                fields.push(ResultField {
962                    field: "Error".to_string(),
963                    value: traceback
964                        .as_str()
965                        .unwrap_or("Error information unavailable")
966                        .to_string(),
967                });
968            }
969        }
970
971        // Add metadata
972        if let Some(date_done) = result.get("date_done") {
973            if !date_done.is_null() {
974                fields.push(ResultField {
975                    field: "Completed At".to_string(),
976                    value: date_done.as_str().unwrap_or("N/A").to_string(),
977                });
978            }
979        }
980
981        let table = Table::new(fields).with(Style::rounded()).to_string();
982        println!("{table}");
983    } else {
984        println!("{}", "✗ Task result not found".red());
985        println!();
986        println!("Possible reasons:");
987        println!("  • Task hasn't completed yet");
988        println!("  • Task result has expired (TTL)");
989        println!("  • Wrong backend URL");
990        println!("  • Task was never executed");
991    }
992
993    Ok(())
994}
995
996/// Move task from one queue to another
997pub async fn requeue_task(
998    broker_url: &str,
999    from_queue: &str,
1000    to_queue: &str,
1001    task_id_str: &str,
1002) -> anyhow::Result<()> {
1003    let task_id = task_id_str
1004        .parse::<uuid::Uuid>()
1005        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
1006
1007    let client = redis::Client::open(broker_url)?;
1008    let mut conn = client.get_multiplexed_async_connection().await?;
1009
1010    // Construct queue keys
1011    let from_key = format!("celers:{from_queue}");
1012    let to_key = format!("celers:{to_queue}");
1013
1014    // Determine the source queue type
1015    let from_type: String = redis::cmd("TYPE")
1016        .arg(&from_key)
1017        .query_async(&mut conn)
1018        .await?;
1019
1020    let mut task: Option<celers_core::SerializedTask> = None;
1021    let mut source_type = String::new();
1022
1023    // Search in source queue
1024    if from_type == "list" {
1025        // FIFO queue
1026        let tasks: Vec<String> = redis::cmd("LRANGE")
1027            .arg(&from_key)
1028            .arg(0)
1029            .arg(-1)
1030            .query_async(&mut conn)
1031            .await?;
1032
1033        for task_str in &tasks {
1034            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1035                if t.metadata.id == task_id {
1036                    task = Some(t);
1037                    source_type = "list".to_string();
1038                    // Remove from source
1039                    let _: usize = redis::cmd("LREM")
1040                        .arg(&from_key)
1041                        .arg(1)
1042                        .arg(task_str)
1043                        .query_async(&mut conn)
1044                        .await?;
1045                    break;
1046                }
1047            }
1048        }
1049    } else if from_type == "zset" {
1050        // Priority queue
1051        let tasks: Vec<(String, f64)> = redis::cmd("ZRANGE")
1052            .arg(&from_key)
1053            .arg(0)
1054            .arg(-1)
1055            .arg("WITHSCORES")
1056            .query_async(&mut conn)
1057            .await?;
1058
1059        for (task_str, _score) in &tasks {
1060            if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1061                if t.metadata.id == task_id {
1062                    task = Some(t);
1063                    source_type = "zset".to_string();
1064                    // Remove from source
1065                    let _: usize = redis::cmd("ZREM")
1066                        .arg(&from_key)
1067                        .arg(task_str)
1068                        .query_async(&mut conn)
1069                        .await?;
1070                    break;
1071                }
1072            }
1073        }
1074    } else {
1075        return Err(anyhow::anyhow!(
1076            "Source queue '{from_queue}' not found or invalid queue type"
1077        ));
1078    }
1079
1080    if let Some(t) = task {
1081        // Determine destination queue type
1082        let to_type: String = redis::cmd("TYPE")
1083            .arg(&to_key)
1084            .query_async(&mut conn)
1085            .await?;
1086
1087        let task_json = serde_json::to_string(&t)?;
1088
1089        if to_type == "list" || to_type == "none" {
1090            // FIFO queue (or create new)
1091            let _: usize = redis::cmd("LPUSH")
1092                .arg(&to_key)
1093                .arg(&task_json)
1094                .query_async(&mut conn)
1095                .await?;
1096            println!(
1097                "{}",
1098                format!("✓ Task moved from '{from_queue}' ({source_type}) to '{to_queue}' (FIFO)")
1099                    .green()
1100                    .bold()
1101            );
1102        } else if to_type == "zset" {
1103            // Priority queue
1104            let priority = f64::from(t.metadata.priority);
1105            let _: usize = redis::cmd("ZADD")
1106                .arg(&to_key)
1107                .arg(priority)
1108                .arg(&task_json)
1109                .query_async(&mut conn)
1110                .await?;
1111            println!(
1112                "{}",
1113                format!(
1114                    "✓ Task moved from '{from_queue}' ({source_type}) to '{to_queue}' (Priority)"
1115                )
1116                .green()
1117                .bold()
1118            );
1119        } else {
1120            return Err(anyhow::anyhow!(
1121                "Destination queue '{to_queue}' has invalid type: {to_type}"
1122            ));
1123        }
1124
1125        // Show task details
1126        println!();
1127        println!("  {} {}", "Task ID:".cyan(), t.metadata.id);
1128        println!("  {} {}", "Task Name:".cyan(), t.metadata.name);
1129        println!("  {} {}", "Priority:".cyan(), t.metadata.priority);
1130    } else {
1131        println!(
1132            "{}",
1133            format!("✗ Task not found in queue '{from_queue}'").red()
1134        );
1135        println!();
1136        println!("Possible reasons:");
1137        println!("  • Task ID is incorrect");
1138        println!("  • Task is in a different queue");
1139        println!("  • Task has already been processed");
1140        return Err(anyhow::anyhow!("Task not found"));
1141    }
1142
1143    Ok(())
1144}
1145
1146/// Show detailed queue statistics
1147pub async fn queue_stats(broker_url: &str, queue: &str) -> anyhow::Result<()> {
1148    let client = redis::Client::open(broker_url)?;
1149    let mut conn = client.get_multiplexed_async_connection().await?;
1150
1151    // Construct queue keys
1152    let queue_key = format!("celers:{queue}");
1153    let processing_key = format!("{queue_key}:processing");
1154    let dlq_key = format!("{queue_key}:dlq");
1155    let delayed_key = format!("{queue_key}:delayed");
1156
1157    // Get queue type
1158    let queue_type: String = redis::cmd("TYPE")
1159        .arg(&queue_key)
1160        .query_async(&mut conn)
1161        .await?;
1162
1163    // Get queue sizes
1164    let queue_size: usize = if queue_type == "list" {
1165        redis::cmd("LLEN")
1166            .arg(&queue_key)
1167            .query_async(&mut conn)
1168            .await?
1169    } else if queue_type == "zset" {
1170        redis::cmd("ZCARD")
1171            .arg(&queue_key)
1172            .query_async(&mut conn)
1173            .await?
1174    } else {
1175        0
1176    };
1177
1178    let processing_size: usize = redis::cmd("LLEN")
1179        .arg(&processing_key)
1180        .query_async(&mut conn)
1181        .await
1182        .unwrap_or(0);
1183
1184    let dlq_size: usize = redis::cmd("LLEN")
1185        .arg(&dlq_key)
1186        .query_async(&mut conn)
1187        .await
1188        .unwrap_or(0);
1189
1190    let delayed_size: usize = redis::cmd("ZCARD")
1191        .arg(&delayed_key)
1192        .query_async(&mut conn)
1193        .await
1194        .unwrap_or(0);
1195
1196    // Sample tasks to get task type distribution
1197    let mut task_names = std::collections::HashMap::new();
1198
1199    if queue_size > 0 {
1200        let sample_size = std::cmp::min(queue_size, 100);
1201        let tasks: Vec<String> = if queue_type == "list" {
1202            redis::cmd("LRANGE")
1203                .arg(&queue_key)
1204                .arg(0)
1205                .arg(sample_size as isize - 1)
1206                .query_async(&mut conn)
1207                .await?
1208        } else if queue_type == "zset" {
1209            redis::cmd("ZRANGE")
1210                .arg(&queue_key)
1211                .arg(0)
1212                .arg(sample_size as isize - 1)
1213                .query_async(&mut conn)
1214                .await?
1215        } else {
1216            vec![]
1217        };
1218
1219        for task_str in tasks {
1220            if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
1221                *task_names.entry(task.metadata.name.clone()).or_insert(0) += 1;
1222            }
1223        }
1224    }
1225
1226    // Display statistics
1227    println!("{}", format!("Queue Statistics: {queue}").cyan().bold());
1228    println!();
1229
1230    #[derive(Tabled)]
1231    struct StatRow {
1232        #[tabled(rename = "Metric")]
1233        metric: String,
1234        #[tabled(rename = "Value")]
1235        value: String,
1236    }
1237
1238    let stats = vec![
1239        StatRow {
1240            metric: "Queue Type".to_string(),
1241            value: if queue_type == "list" {
1242                "FIFO (List)".to_string()
1243            } else if queue_type == "zset" {
1244                "Priority (Sorted Set)".to_string()
1245            } else {
1246                format!("Unknown ({queue_type})")
1247            },
1248        },
1249        StatRow {
1250            metric: "Pending Tasks".to_string(),
1251            value: queue_size.to_string(),
1252        },
1253        StatRow {
1254            metric: "Processing Tasks".to_string(),
1255            value: processing_size.to_string(),
1256        },
1257        StatRow {
1258            metric: "Dead Letter Queue".to_string(),
1259            value: dlq_size.to_string(),
1260        },
1261        StatRow {
1262            metric: "Delayed Tasks".to_string(),
1263            value: delayed_size.to_string(),
1264        },
1265        StatRow {
1266            metric: "Total Tasks".to_string(),
1267            value: (queue_size + processing_size + dlq_size + delayed_size).to_string(),
1268        },
1269    ];
1270
1271    let table = Table::new(stats).with(Style::rounded()).to_string();
1272    println!("{table}");
1273
1274    // Show task type distribution if we have data
1275    if !task_names.is_empty() {
1276        println!();
1277        println!("{}", "Task Type Distribution (sample):".cyan().bold());
1278        println!();
1279
1280        #[derive(Tabled)]
1281        struct TaskTypeRow {
1282            #[tabled(rename = "Task Name")]
1283            task_name: String,
1284            #[tabled(rename = "Count")]
1285            count: usize,
1286        }
1287
1288        let mut task_types: Vec<TaskTypeRow> = task_names
1289            .into_iter()
1290            .map(|(name, count)| TaskTypeRow {
1291                task_name: name,
1292                count,
1293            })
1294            .collect();
1295
1296        task_types.sort_by(|a, b| b.count.cmp(&a.count));
1297
1298        let table = Table::new(task_types.into_iter().take(10))
1299            .with(Style::rounded())
1300            .to_string();
1301        println!("{table}");
1302    }
1303
1304    // Health indicators
1305    println!();
1306    if dlq_size > 0 {
1307        println!("{}", format!("⚠ Warning: {dlq_size} tasks in DLQ").yellow());
1308    }
1309    if processing_size > queue_size * 2 {
1310        println!(
1311            "{}",
1312            "⚠ Warning: High number of processing tasks (possible stuck workers)".yellow()
1313        );
1314    }
1315    if queue_size == 0 && processing_size == 0 && dlq_size == 0 {
1316        println!("{}", "✓ Queue is empty and healthy".green());
1317    }
1318
1319    Ok(())
1320}
1321
1322/// Move all tasks from one queue to another
1323pub async fn move_queue(
1324    broker_url: &str,
1325    from_queue: &str,
1326    to_queue: &str,
1327    confirm: bool,
1328) -> anyhow::Result<()> {
1329    let client = redis::Client::open(broker_url)?;
1330    let mut conn = client.get_multiplexed_async_connection().await?;
1331
1332    // Construct queue keys
1333    let from_key = format!("celers:{from_queue}");
1334    let to_key = format!("celers:{to_queue}");
1335
1336    // Determine the source queue type
1337    let from_type: String = redis::cmd("TYPE")
1338        .arg(&from_key)
1339        .query_async(&mut conn)
1340        .await?;
1341
1342    if from_type == "none" {
1343        println!(
1344            "{}",
1345            format!("✗ Source queue '{from_queue}' does not exist").red()
1346        );
1347        return Ok(());
1348    }
1349
1350    // Get source queue size
1351    let queue_size: usize = if from_type == "list" {
1352        redis::cmd("LLEN")
1353            .arg(&from_key)
1354            .query_async(&mut conn)
1355            .await?
1356    } else if from_type == "zset" {
1357        redis::cmd("ZCARD")
1358            .arg(&from_key)
1359            .query_async(&mut conn)
1360            .await?
1361    } else {
1362        println!("{}", format!("✗ Unknown queue type: {from_type}").red());
1363        return Ok(());
1364    };
1365
1366    if queue_size == 0 {
1367        println!(
1368            "{}",
1369            format!("✗ Source queue '{from_queue}' is empty").yellow()
1370        );
1371        return Ok(());
1372    }
1373
1374    // Confirm operation
1375    if !confirm {
1376        println!(
1377            "{}",
1378            format!(
1379                "⚠ Warning: This will move {queue_size} tasks from '{from_queue}' to '{to_queue}'"
1380            )
1381            .yellow()
1382        );
1383        println!("{}", "Use --confirm to proceed".yellow());
1384        return Ok(());
1385    }
1386
1387    println!(
1388        "{}",
1389        format!("Moving {queue_size} tasks from '{from_queue}' to '{to_queue}'...").cyan()
1390    );
1391
1392    // Determine destination queue type (or create as list if doesn't exist)
1393    let to_type: String = redis::cmd("TYPE")
1394        .arg(&to_key)
1395        .query_async(&mut conn)
1396        .await?;
1397
1398    let mut moved_count = 0;
1399
1400    // Move tasks
1401    if from_type == "list" {
1402        // Source is FIFO queue
1403        loop {
1404            let task: Option<String> = redis::cmd("RPOP")
1405                .arg(&from_key)
1406                .query_async(&mut conn)
1407                .await?;
1408
1409            match task {
1410                Some(task_str) => {
1411                    if to_type == "list" || to_type == "none" {
1412                        // Destination is FIFO queue (or create new)
1413                        let _: usize = redis::cmd("LPUSH")
1414                            .arg(&to_key)
1415                            .arg(&task_str)
1416                            .query_async(&mut conn)
1417                            .await?;
1418                    } else if to_type == "zset" {
1419                        // Destination is priority queue
1420                        if let Ok(task) =
1421                            serde_json::from_str::<celers_core::SerializedTask>(&task_str)
1422                        {
1423                            let priority = f64::from(task.metadata.priority);
1424                            let _: usize = redis::cmd("ZADD")
1425                                .arg(&to_key)
1426                                .arg(priority)
1427                                .arg(&task_str)
1428                                .query_async(&mut conn)
1429                                .await?;
1430                        }
1431                    }
1432                    moved_count += 1;
1433
1434                    if moved_count % 100 == 0 {
1435                        print!(
1436                            "\r{}",
1437                            format!("Moved {moved_count} / {queue_size} tasks...").cyan()
1438                        );
1439                        use std::io::Write;
1440                        std::io::stdout().flush()?;
1441                    }
1442                }
1443                None => break,
1444            }
1445        }
1446    } else if from_type == "zset" {
1447        // Source is priority queue
1448        loop {
1449            let result: Vec<(String, f64)> = redis::cmd("ZPOPMIN")
1450                .arg(&from_key)
1451                .arg(1)
1452                .query_async(&mut conn)
1453                .await?;
1454
1455            if result.is_empty() {
1456                break;
1457            }
1458
1459            let (task_str, _score) = &result[0];
1460
1461            if to_type == "list" || to_type == "none" {
1462                // Destination is FIFO queue
1463                let _: usize = redis::cmd("LPUSH")
1464                    .arg(&to_key)
1465                    .arg(task_str)
1466                    .query_async(&mut conn)
1467                    .await?;
1468            } else if to_type == "zset" {
1469                // Destination is priority queue
1470                if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1471                    let priority = f64::from(task.metadata.priority);
1472                    let _: usize = redis::cmd("ZADD")
1473                        .arg(&to_key)
1474                        .arg(priority)
1475                        .arg(task_str)
1476                        .query_async(&mut conn)
1477                        .await?;
1478                }
1479            }
1480            moved_count += 1;
1481
1482            if moved_count % 100 == 0 {
1483                print!(
1484                    "\r{}",
1485                    format!("Moved {moved_count} / {queue_size} tasks...").cyan()
1486                );
1487                use std::io::Write;
1488                std::io::stdout().flush()?;
1489            }
1490        }
1491    }
1492
1493    println!();
1494    println!(
1495        "{}",
1496        format!("✓ Successfully moved {moved_count} tasks from '{from_queue}' to '{to_queue}'")
1497            .green()
1498            .bold()
1499    );
1500
1501    // Show queue type info
1502    let dest_queue_type = if to_type == "list" || to_type == "none" {
1503        "FIFO"
1504    } else if to_type == "zset" {
1505        "Priority"
1506    } else {
1507        "Unknown"
1508    };
1509
1510    println!(
1511        "  {} {} → {}",
1512        "Queue Type:".cyan(),
1513        from_type,
1514        dest_queue_type
1515    );
1516
1517    Ok(())
1518}
1519
1520/// Export queue tasks to a JSON file
1521pub async fn export_queue(broker_url: &str, queue: &str, output_file: &str) -> anyhow::Result<()> {
1522    let client = redis::Client::open(broker_url)?;
1523    let mut conn = client.get_multiplexed_async_connection().await?;
1524
1525    let queue_key = format!("celers:{queue}");
1526
1527    // Get queue type
1528    let queue_type: String = redis::cmd("TYPE")
1529        .arg(&queue_key)
1530        .query_async(&mut conn)
1531        .await?;
1532
1533    if queue_type == "none" {
1534        println!("{}", format!("✗ Queue '{queue}' does not exist").red());
1535        return Ok(());
1536    }
1537
1538    println!("{}", format!("Exporting queue '{queue}'...").cyan());
1539
1540    // Fetch all tasks
1541    let tasks: Vec<String> = if queue_type == "list" {
1542        redis::cmd("LRANGE")
1543            .arg(&queue_key)
1544            .arg(0)
1545            .arg(-1)
1546            .query_async(&mut conn)
1547            .await?
1548    } else if queue_type == "zset" {
1549        redis::cmd("ZRANGE")
1550            .arg(&queue_key)
1551            .arg(0)
1552            .arg(-1)
1553            .query_async(&mut conn)
1554            .await?
1555    } else {
1556        println!("{}", format!("✗ Unknown queue type: {queue_type}").red());
1557        return Ok(());
1558    };
1559
1560    // Parse tasks and create export data
1561    let mut export_tasks = Vec::new();
1562    for task_str in tasks {
1563        if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
1564            export_tasks.push(task);
1565        }
1566    }
1567
1568    #[derive(serde::Serialize)]
1569    struct QueueExport {
1570        queue_name: String,
1571        queue_type: String,
1572        exported_at: String,
1573        task_count: usize,
1574        tasks: Vec<celers_core::SerializedTask>,
1575    }
1576
1577    let export_data = QueueExport {
1578        queue_name: queue.to_string(),
1579        queue_type: queue_type.clone(),
1580        exported_at: chrono::Utc::now().to_rfc3339(),
1581        task_count: export_tasks.len(),
1582        tasks: export_tasks,
1583    };
1584
1585    // Write to file
1586    let json = serde_json::to_string_pretty(&export_data)?;
1587    std::fs::write(output_file, json)?;
1588
1589    println!(
1590        "{}",
1591        format!(
1592            "✓ Exported {} tasks from queue '{}' to '{}'",
1593            export_data.task_count, queue, output_file
1594        )
1595        .green()
1596        .bold()
1597    );
1598    println!("  {} {}", "Queue Type:".cyan(), queue_type);
1599    let file_size = std::fs::metadata(output_file)?.len();
1600    println!("  {} {} bytes", "File Size:".cyan(), file_size);
1601
1602    Ok(())
1603}
1604
1605/// Import queue tasks from a JSON file
1606pub async fn import_queue(
1607    broker_url: &str,
1608    queue: &str,
1609    input_file: &str,
1610    confirm: bool,
1611) -> anyhow::Result<()> {
1612    // Read and parse file
1613    let json = std::fs::read_to_string(input_file)?;
1614
1615    #[derive(serde::Deserialize)]
1616    struct QueueExport {
1617        queue_name: String,
1618        queue_type: String,
1619        exported_at: String,
1620        task_count: usize,
1621        tasks: Vec<celers_core::SerializedTask>,
1622    }
1623
1624    let export_data: QueueExport = serde_json::from_str(&json)?;
1625
1626    // Show import info
1627    println!("{}", "Import Information:".cyan().bold());
1628    println!("  {} {}", "Source Queue:".cyan(), export_data.queue_name);
1629    println!("  {} {}", "Source Type:".cyan(), export_data.queue_type);
1630    println!("  {} {}", "Exported At:".cyan(), export_data.exported_at);
1631    println!("  {} {}", "Task Count:".cyan(), export_data.task_count);
1632    println!("  {} {}", "Destination Queue:".cyan(), queue);
1633    println!();
1634
1635    if !confirm {
1636        println!(
1637            "{}",
1638            format!(
1639                "⚠ Warning: This will import {} tasks into queue '{}'",
1640                export_data.task_count, queue
1641            )
1642            .yellow()
1643        );
1644        println!("{}", "Use --confirm to proceed".yellow());
1645        return Ok(());
1646    }
1647
1648    let client = redis::Client::open(broker_url)?;
1649    let mut conn = client.get_multiplexed_async_connection().await?;
1650
1651    let queue_key = format!("celers:{queue}");
1652
1653    // Determine destination queue type
1654    let to_type: String = redis::cmd("TYPE")
1655        .arg(&queue_key)
1656        .query_async(&mut conn)
1657        .await?;
1658
1659    println!(
1660        "{}",
1661        format!("Importing {} tasks...", export_data.task_count).cyan()
1662    );
1663
1664    let mut imported = 0;
1665    for task in export_data.tasks {
1666        let task_json = serde_json::to_string(&task)?;
1667
1668        if to_type == "list" || to_type == "none" {
1669            // Destination is FIFO queue
1670            let _: usize = redis::cmd("LPUSH")
1671                .arg(&queue_key)
1672                .arg(&task_json)
1673                .query_async(&mut conn)
1674                .await?;
1675        } else if to_type == "zset" {
1676            // Destination is priority queue
1677            let priority = f64::from(task.metadata.priority);
1678            let _: usize = redis::cmd("ZADD")
1679                .arg(&queue_key)
1680                .arg(priority)
1681                .arg(&task_json)
1682                .query_async(&mut conn)
1683                .await?;
1684        }
1685
1686        imported += 1;
1687        if imported % 100 == 0 {
1688            print!(
1689                "\r{}",
1690                format!(
1691                    "Imported {} / {} tasks...",
1692                    imported, export_data.task_count
1693                )
1694                .cyan()
1695            );
1696            use std::io::Write;
1697            std::io::stdout().flush()?;
1698        }
1699    }
1700
1701    println!();
1702    println!(
1703        "{}",
1704        format!("✓ Successfully imported {imported} tasks into queue '{queue}'")
1705            .green()
1706            .bold()
1707    );
1708
1709    Ok(())
1710}
1711
1712/// Display Prometheus metrics
1713pub async fn show_metrics(
1714    format: &str,
1715    output_file: Option<&str>,
1716    pattern: Option<&str>,
1717    watch_interval: Option<u64>,
1718) -> anyhow::Result<()> {
1719    // If watch mode is enabled and output_file is set, it doesn't make sense
1720    if watch_interval.is_some() && output_file.is_some() {
1721        println!(
1722            "{}",
1723            "⚠ Watch mode cannot be used with file output".yellow()
1724        );
1725        return Ok(());
1726    }
1727
1728    if let Some(interval) = watch_interval {
1729        // Watch mode - refresh metrics periodically
1730        println!("{}", "=== Metrics Watch Mode ===".bold().green());
1731        println!(
1732            "{}",
1733            format!("Refreshing every {interval} seconds (Ctrl+C to stop)").dimmed()
1734        );
1735        println!();
1736
1737        loop {
1738            // Clear screen for better readability
1739            print!("\x1B[2J\x1B[1;1H"); // ANSI escape codes to clear screen
1740
1741            // Display current time
1742            println!(
1743                "{}",
1744                format!("Last updated: {}", Utc::now().format("%Y-%m-%d %H:%M:%S")).dimmed()
1745            );
1746            println!();
1747
1748            // Gather and format metrics
1749            format_and_display_metrics(format, pattern)?;
1750
1751            // Sleep for the specified interval
1752            tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
1753        }
1754    } else {
1755        // One-time display
1756        format_and_output_metrics(format, output_file, pattern)?;
1757        Ok(())
1758    }
1759}
1760
1761/// Format and output metrics (for one-time display)
1762fn format_and_output_metrics(
1763    format: &str,
1764    output_file: Option<&str>,
1765    pattern: Option<&str>,
1766) -> anyhow::Result<()> {
1767    // Gather metrics from Prometheus registry
1768    let metrics_text = celers_metrics::gather_metrics();
1769
1770    // Filter metrics if pattern is provided
1771    let filtered_metrics = if let Some(pat) = pattern {
1772        metrics_text
1773            .lines()
1774            .filter(|line| {
1775                // Keep HELP and TYPE lines for matching metrics
1776                if line.starts_with("# HELP") || line.starts_with("# TYPE") {
1777                    line.contains(pat)
1778                } else if line.starts_with('#') {
1779                    // Skip other comment lines
1780                    false
1781                } else {
1782                    // Keep metric data lines if they match the pattern
1783                    line.contains(pat)
1784                }
1785            })
1786            .collect::<Vec<_>>()
1787            .join("\n")
1788    } else {
1789        metrics_text.clone()
1790    };
1791
1792    // Format metrics based on format parameter
1793    let output = match format.to_lowercase().as_str() {
1794        "json" => {
1795            // Parse Prometheus text format and convert to JSON
1796            let mut metrics_map = serde_json::Map::new();
1797
1798            for line in filtered_metrics.lines() {
1799                if line.starts_with('#') || line.trim().is_empty() {
1800                    continue;
1801                }
1802
1803                // Parse metric line: metric_name{labels} value
1804                if let Some(space_idx) = line.rfind(' ') {
1805                    let (metric_part, value_str) = line.split_at(space_idx);
1806                    let value_str = value_str.trim();
1807
1808                    if let Ok(value) = value_str.parse::<f64>() {
1809                        let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1810                            &metric_part[..brace_idx]
1811                        } else {
1812                            metric_part
1813                        };
1814
1815                        metrics_map.insert(metric_name.to_string(), serde_json::json!(value));
1816                    }
1817                }
1818            }
1819
1820            serde_json::to_string_pretty(&metrics_map)?
1821        }
1822
1823        "prometheus" | "prom" => {
1824            // Raw Prometheus format
1825            filtered_metrics
1826        }
1827
1828        _ => {
1829            // Human-readable text format (default)
1830            let mut output = String::new();
1831            output.push_str(&format!("{}\n\n", "=== CeleRS Metrics ===".bold().green()));
1832
1833            let mut current_metric = String::new();
1834            let mut help_text = String::new();
1835
1836            for line in filtered_metrics.lines() {
1837                if line.starts_with("# HELP") {
1838                    // Extract help text
1839                    if let Some(help) = line.strip_prefix("# HELP ") {
1840                        let parts: Vec<&str> = help.splitn(2, ' ').collect();
1841                        if parts.len() == 2 {
1842                            current_metric = parts[0].to_string();
1843                            help_text = parts[1].to_string();
1844                        }
1845                    }
1846                } else if line.starts_with("# TYPE") {
1847                    // Skip TYPE lines
1848                } else if line.starts_with('#') || line.trim().is_empty() {
1849                    // Skip comments and empty lines
1850                } else {
1851                    // Parse metric value
1852                    if let Some(space_idx) = line.rfind(' ') {
1853                        let (metric_part, value_str) = line.split_at(space_idx);
1854                        let value_str = value_str.trim();
1855
1856                        let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1857                            &metric_part[..brace_idx]
1858                        } else {
1859                            metric_part
1860                        };
1861
1862                        // Only show if this is a new metric
1863                        if metric_name == current_metric && !help_text.is_empty() {
1864                            output.push_str(&format!("{}\n", metric_name.cyan().bold()));
1865                            output.push_str(&format!("  {}\n", help_text.dimmed()));
1866                            output.push_str(&format!(
1867                                "  {} {}\n\n",
1868                                "Value:".yellow(),
1869                                value_str.green()
1870                            ));
1871                            help_text.clear();
1872                        }
1873                    }
1874                }
1875            }
1876
1877            if output.trim().is_empty() {
1878                output = format!("{}\n", "No metrics found".yellow());
1879                if pattern.is_some() {
1880                    output.push_str(&format!(
1881                        "{}\n",
1882                        "Try adjusting your filter pattern".dimmed()
1883                    ));
1884                }
1885            }
1886
1887            output
1888        }
1889    };
1890
1891    // Write to file or stdout
1892    if let Some(file_path) = output_file {
1893        std::fs::write(file_path, &output)?;
1894        println!(
1895            "{}",
1896            format!("✓ Metrics exported to '{file_path}'")
1897                .green()
1898                .bold()
1899        );
1900        println!("  {} {}", "Format:".cyan(), format);
1901        if let Some(pat) = pattern {
1902            println!("  {} {}", "Filter:".cyan(), pat);
1903        }
1904    } else {
1905        println!("{output}");
1906    }
1907
1908    Ok(())
1909}
1910
1911/// Format and display metrics (for watch mode)
1912fn format_and_display_metrics(format: &str, pattern: Option<&str>) -> anyhow::Result<()> {
1913    // Gather metrics from Prometheus registry
1914    let metrics_text = celers_metrics::gather_metrics();
1915
1916    // Filter metrics if pattern is provided
1917    let filtered_metrics = if let Some(pat) = pattern {
1918        metrics_text
1919            .lines()
1920            .filter(|line| {
1921                // Keep HELP and TYPE lines for matching metrics
1922                if line.starts_with("# HELP") || line.starts_with("# TYPE") {
1923                    line.contains(pat)
1924                } else if line.starts_with('#') {
1925                    false
1926                } else {
1927                    line.contains(pat)
1928                }
1929            })
1930            .collect::<Vec<_>>()
1931            .join("\n")
1932    } else {
1933        metrics_text.clone()
1934    };
1935
1936    // Format metrics based on format parameter
1937    let output = match format.to_lowercase().as_str() {
1938        "json" => {
1939            // Parse Prometheus text format and convert to JSON
1940            let mut metrics_map = serde_json::Map::new();
1941
1942            for line in filtered_metrics.lines() {
1943                if line.starts_with('#') || line.trim().is_empty() {
1944                    continue;
1945                }
1946
1947                if let Some(space_idx) = line.rfind(' ') {
1948                    let (metric_part, value_str) = line.split_at(space_idx);
1949                    let value_str = value_str.trim();
1950
1951                    if let Ok(value) = value_str.parse::<f64>() {
1952                        let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1953                            &metric_part[..brace_idx]
1954                        } else {
1955                            metric_part
1956                        };
1957
1958                        metrics_map.insert(metric_name.to_string(), serde_json::json!(value));
1959                    }
1960                }
1961            }
1962
1963            serde_json::to_string_pretty(&metrics_map)?
1964        }
1965
1966        "prometheus" | "prom" => filtered_metrics,
1967
1968        _ => {
1969            // Human-readable text format (default)
1970            let mut output = String::new();
1971            output.push_str(&format!("{}\n\n", "=== CeleRS Metrics ===".bold().green()));
1972
1973            let mut current_metric = String::new();
1974            let mut help_text = String::new();
1975
1976            for line in filtered_metrics.lines() {
1977                if line.starts_with("# HELP") {
1978                    if let Some(help) = line.strip_prefix("# HELP ") {
1979                        let parts: Vec<&str> = help.splitn(2, ' ').collect();
1980                        if parts.len() == 2 {
1981                            current_metric = parts[0].to_string();
1982                            help_text = parts[1].to_string();
1983                        }
1984                    }
1985                } else if line.starts_with('#') || line.trim().is_empty() {
1986                    // Skip comments and empty lines
1987                } else if let Some(space_idx) = line.rfind(' ') {
1988                    let (metric_part, value_str) = line.split_at(space_idx);
1989                    let value_str = value_str.trim();
1990
1991                    let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1992                        &metric_part[..brace_idx]
1993                    } else {
1994                        metric_part
1995                    };
1996
1997                    if metric_name == current_metric && !help_text.is_empty() {
1998                        output.push_str(&format!("{}\n", metric_name.cyan().bold()));
1999                        output.push_str(&format!("  {}\n", help_text.dimmed()));
2000                        output.push_str(&format!(
2001                            "  {} {}\n\n",
2002                            "Value:".yellow(),
2003                            value_str.green()
2004                        ));
2005                        help_text.clear();
2006                    }
2007                }
2008            }
2009
2010            if output.trim().is_empty() {
2011                output = format!("{}\n", "No metrics found".yellow());
2012                if pattern.is_some() {
2013                    output.push_str(&format!(
2014                        "{}\n",
2015                        "Try adjusting your filter pattern".dimmed()
2016                    ));
2017                }
2018            }
2019
2020            output
2021        }
2022    };
2023
2024    println!("{output}");
2025    Ok(())
2026}
2027
2028/// Validate configuration file
2029pub async fn validate_config(config_path: &str, test_connection: bool) -> anyhow::Result<()> {
2030    use std::path::Path;
2031
2032    println!("{}", "=== Configuration Validation ===".bold().green());
2033    println!();
2034
2035    // Check if file exists
2036    if !Path::new(config_path).exists() {
2037        println!(
2038            "{}",
2039            format!("✗ Configuration file not found: {config_path}")
2040                .red()
2041                .bold()
2042        );
2043        return Ok(());
2044    }
2045
2046    println!(
2047        "{}",
2048        format!("📄 Loading configuration from '{config_path}'...").cyan()
2049    );
2050    println!();
2051
2052    // Try to load and parse the configuration
2053    let config = match crate::config::Config::from_file(config_path) {
2054        Ok(cfg) => {
2055            println!("{}", "✓ Configuration file is valid TOML".green());
2056            cfg
2057        }
2058        Err(e) => {
2059            println!("{}", "✗ Failed to parse configuration file:".red().bold());
2060            println!("  {}", format!("{e}").red());
2061            return Ok(());
2062        }
2063    };
2064
2065    println!();
2066    println!("{}", "Configuration Details:".bold());
2067    println!();
2068
2069    // Validate broker configuration
2070    println!("{}", "Broker Configuration:".cyan().bold());
2071    println!("  {} {}", "Type:".yellow(), config.broker.broker_type);
2072    println!("  {} {}", "URL:".yellow(), config.broker.url);
2073    println!("  {} {}", "Queue:".yellow(), config.broker.queue);
2074    println!("  {} {}", "Mode:".yellow(), config.broker.mode);
2075
2076    println!();
2077
2078    // Validate worker configuration
2079    println!("{}", "Worker Configuration:".cyan().bold());
2080    println!(
2081        "  {} {}",
2082        "Concurrency:".yellow(),
2083        config.worker.concurrency
2084    );
2085    println!(
2086        "  {} {} ms",
2087        "Poll Interval:".yellow(),
2088        config.worker.poll_interval_ms
2089    );
2090    println!(
2091        "  {} {}",
2092        "Max Retries:".yellow(),
2093        config.worker.max_retries
2094    );
2095    println!(
2096        "  {} {} seconds",
2097        "Default Timeout:".yellow(),
2098        config.worker.default_timeout_secs
2099    );
2100
2101    println!();
2102
2103    // Run validation and show warnings
2104    let warnings = config.validate()?;
2105    if warnings.is_empty() {
2106        println!(
2107            "{}",
2108            "✓ Configuration is valid with no warnings".green().bold()
2109        );
2110    } else {
2111        println!("{}", "Configuration Warnings:".yellow().bold());
2112        for warning in &warnings {
2113            println!("  {} {}", "⚠".yellow(), warning.yellow());
2114        }
2115    }
2116
2117    println!();
2118
2119    // Show configured queues
2120    if !config.queues.is_empty() {
2121        println!("{}", "Configured Queues:".cyan().bold());
2122        for queue in &config.queues {
2123            println!("  • {}", queue.dimmed());
2124        }
2125        println!();
2126    }
2127
2128    // Test broker connection if requested
2129    if test_connection {
2130        println!("{}", "Testing broker connection...".cyan().bold());
2131        println!();
2132
2133        match config.broker.broker_type.to_lowercase().as_str() {
2134            "redis" => {
2135                match redis::Client::open(config.broker.url.as_str()) {
2136                    Ok(client) => {
2137                        match client.get_multiplexed_async_connection().await {
2138                            Ok(mut conn) => {
2139                                // Test a simple PING command
2140                                match redis::cmd("PING").query_async::<String>(&mut conn).await {
2141                                    Ok(_) => {
2142                                        println!(
2143                                            "{}",
2144                                            "✓ Successfully connected to Redis broker"
2145                                                .green()
2146                                                .bold()
2147                                        );
2148                                    }
2149                                    Err(e) => {
2150                                        println!(
2151                                            "{}",
2152                                            "✗ Failed to PING Redis broker:".red().bold()
2153                                        );
2154                                        println!("  {}", format!("{e}").red());
2155                                    }
2156                                }
2157                            }
2158                            Err(e) => {
2159                                println!("{}", "✗ Failed to connect to Redis broker:".red().bold());
2160                                println!("  {}", format!("{e}").red());
2161                            }
2162                        }
2163                    }
2164                    Err(e) => {
2165                        println!("{}", "✗ Invalid Redis URL:".red().bold());
2166                        println!("  {}", format!("{e}").red());
2167                    }
2168                }
2169            }
2170            "postgres" | "postgresql" => {
2171                // Test PostgreSQL connection
2172                match sqlx::postgres::PgPool::connect(&config.broker.url).await {
2173                    Ok(pool) => {
2174                        // Test with a simple query
2175                        match sqlx::query("SELECT 1").fetch_one(&pool).await {
2176                            Ok(_) => {
2177                                println!(
2178                                    "{}",
2179                                    "✓ Successfully connected to PostgreSQL broker"
2180                                        .green()
2181                                        .bold()
2182                                );
2183                            }
2184                            Err(e) => {
2185                                println!("{}", "✗ Failed to query PostgreSQL broker:".red().bold());
2186                                println!("  {}", format!("{e}").red());
2187                            }
2188                        }
2189                        pool.close().await;
2190                    }
2191                    Err(e) => {
2192                        println!(
2193                            "{}",
2194                            "✗ Failed to connect to PostgreSQL broker:".red().bold()
2195                        );
2196                        println!("  {}", format!("{e}").red());
2197                    }
2198                }
2199            }
2200            "mysql" => {
2201                println!(
2202                    "{}",
2203                    "ℹ MySQL connection testing via CLI not yet available".cyan()
2204                );
2205                println!("  {}", "To test MySQL connection:".dimmed());
2206                println!(
2207                    "  {}",
2208                    "  1. Use the 'db test-connection' command with your MySQL URL".dimmed()
2209                );
2210                println!(
2211                    "  {}",
2212                    "  2. Or use mysql client: mysql -h <host> -u <user> -p".dimmed()
2213                );
2214            }
2215            "amqp" | "rabbitmq" => {
2216                println!(
2217                    "{}",
2218                    "ℹ AMQP connection testing via CLI not yet available".cyan()
2219                );
2220                println!("  {}", "To test RabbitMQ connection:".dimmed());
2221                println!(
2222                    "  {}",
2223                    "  1. Check RabbitMQ Management UI at http://<host>:15672".dimmed()
2224                );
2225                println!("  {}", "  2. Or use rabbitmq-diagnostics ping".dimmed());
2226                println!(
2227                    "  {}",
2228                    "  3. Verify credentials and virtual host configuration".dimmed()
2229                );
2230            }
2231            "sqs" => {
2232                println!(
2233                    "{}",
2234                    "ℹ SQS connection testing via CLI not yet available".cyan()
2235                );
2236                println!("  {}", "To test AWS SQS connection:".dimmed());
2237                println!(
2238                    "  {}",
2239                    "  1. Ensure AWS credentials are configured (aws configure)".dimmed()
2240                );
2241                println!("  {}", "  2. Test with: aws sqs list-queues".dimmed());
2242                println!(
2243                    "  {}",
2244                    "  3. Verify IAM permissions for SQS operations".dimmed()
2245                );
2246            }
2247            _ => {
2248                println!(
2249                    "{}",
2250                    format!(
2251                        "⚠ Cannot test connection for broker type '{}'",
2252                        config.broker.broker_type
2253                    )
2254                    .yellow()
2255                );
2256            }
2257        }
2258
2259        println!();
2260    }
2261
2262    println!("{}", "✓ Configuration validation complete".green().bold());
2263
2264    Ok(())
2265}
2266
2267/// Pause queue processing
2268pub async fn pause_queue(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2269    let client = redis::Client::open(broker_url)?;
2270    let mut conn = client.get_multiplexed_async_connection().await?;
2271
2272    let pause_key = format!("celers:{queue}:paused");
2273
2274    // Set pause flag with a timestamp
2275    let timestamp = chrono::Utc::now().to_rfc3339();
2276    let _: () = redis::cmd("SET")
2277        .arg(&pause_key)
2278        .arg(&timestamp)
2279        .query_async(&mut conn)
2280        .await?;
2281
2282    println!(
2283        "{}",
2284        format!("✓ Queue '{queue}' has been paused").green().bold()
2285    );
2286    println!();
2287    println!("{}", "Note:".yellow().bold());
2288    println!("  • Workers will stop processing tasks from this queue");
2289    println!("  • Existing tasks will remain in the queue");
2290    println!("  • Use 'celers queue resume' to resume processing");
2291    println!();
2292    println!("  Paused at: {}", timestamp.cyan());
2293
2294    Ok(())
2295}
2296
2297/// Resume queue processing
2298pub async fn resume_queue(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2299    let client = redis::Client::open(broker_url)?;
2300    let mut conn = client.get_multiplexed_async_connection().await?;
2301
2302    let pause_key = format!("celers:{queue}:paused");
2303
2304    // Check if queue is paused
2305    let paused: Option<String> = redis::cmd("GET")
2306        .arg(&pause_key)
2307        .query_async(&mut conn)
2308        .await?;
2309
2310    if paused.is_none() {
2311        println!("{}", format!("✓ Queue '{queue}' is not paused").yellow());
2312        return Ok(());
2313    }
2314
2315    // Remove pause flag
2316    let _: () = redis::cmd("DEL")
2317        .arg(&pause_key)
2318        .query_async(&mut conn)
2319        .await?;
2320
2321    println!(
2322        "{}",
2323        format!("✓ Queue '{queue}' has been resumed").green().bold()
2324    );
2325    println!();
2326    println!("{}", "Note:".yellow().bold());
2327    println!("  • Workers will now process tasks from this queue");
2328    if let Some(paused_at) = paused {
2329        println!("  • Was paused at: {}", paused_at.dimmed());
2330    }
2331
2332    Ok(())
2333}
2334
2335/// Run system health diagnostics
2336pub async fn health_check(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2337    println!("{}", "=== System Health Check ===".bold().cyan());
2338    println!();
2339
2340    let mut health_issues = Vec::new();
2341    let mut health_warnings = Vec::new();
2342
2343    // Test 1: Broker Connection
2344    println!("{}", "1. Broker Connection".bold());
2345    let client = match redis::Client::open(broker_url) {
2346        Ok(c) => {
2347            println!("  {} Redis client created", "✓".green());
2348            c
2349        }
2350        Err(e) => {
2351            println!("  {} Failed to create Redis client: {}", "✗".red(), e);
2352            health_issues.push("Cannot create Redis client".to_string());
2353            println!();
2354            println!("{}", "Health Check Failed".red().bold());
2355            return Ok(());
2356        }
2357    };
2358
2359    let mut conn = match client.get_multiplexed_async_connection().await {
2360        Ok(c) => {
2361            println!("  {} Successfully connected to broker", "✓".green());
2362            c
2363        }
2364        Err(e) => {
2365            println!("  {} Failed to connect: {}", "✗".red(), e);
2366            health_issues.push("Cannot connect to broker".to_string());
2367            println!();
2368            println!("{}", "Health Check Failed".red().bold());
2369            return Ok(());
2370        }
2371    };
2372
2373    // Test PING
2374    match redis::cmd("PING").query_async::<String>(&mut conn).await {
2375        Ok(_) => {
2376            println!("  {} PING successful", "✓".green());
2377        }
2378        Err(e) => {
2379            println!("  {} PING failed: {}", "⚠".yellow(), e);
2380            health_warnings.push("PING to broker failed".to_string());
2381        }
2382    }
2383
2384    println!();
2385
2386    // Test 2: Queue Status
2387    println!("{}", "2. Queue Status".bold());
2388    let broker = RedisBroker::new(broker_url, queue)?;
2389
2390    let _queue_size = match broker.queue_size().await {
2391        Ok(size) => {
2392            println!("  {} Queue size: {}", "✓".green(), size);
2393            size
2394        }
2395        Err(e) => {
2396            println!("  {} Failed to get queue size: {}", "✗".red(), e);
2397            health_issues.push("Cannot get queue size".to_string());
2398            0
2399        }
2400    };
2401
2402    let dlq_size = match broker.dlq_size().await {
2403        Ok(size) => {
2404            if size > 0 {
2405                println!("  {} DLQ size: {} (has failed tasks)", "⚠".yellow(), size);
2406                health_warnings.push(format!("{size} tasks in Dead Letter Queue"));
2407            } else {
2408                println!("  {} DLQ size: {} (empty)", "✓".green(), size);
2409            }
2410            size
2411        }
2412        Err(e) => {
2413            println!("  {} Failed to get DLQ size: {}", "✗".red(), e);
2414            health_issues.push("Cannot get DLQ size".to_string());
2415            0
2416        }
2417    };
2418
2419    println!();
2420
2421    // Test 3: Queue Accessibility
2422    println!("{}", "3. Queue Accessibility".bold());
2423    let queue_key = format!("celers:{queue}");
2424    let _queue_type: String = match redis::cmd("TYPE")
2425        .arg(&queue_key)
2426        .query_async(&mut conn)
2427        .await
2428    {
2429        Ok(t) => {
2430            if t == "none" {
2431                println!(
2432                    "  {} Queue does not exist (will be created on first task)",
2433                    "⚠".yellow()
2434                );
2435                health_warnings.push("Queue not yet created".to_string());
2436            } else if t == "list" {
2437                println!("  {} Queue type: FIFO (list)", "✓".green());
2438            } else if t == "zset" {
2439                println!("  {} Queue type: Priority (sorted set)", "✓".green());
2440            } else {
2441                println!("  {} Unknown queue type: {}", "⚠".yellow(), t);
2442                health_warnings.push(format!("Unknown queue type: {t}"));
2443            }
2444            t
2445        }
2446        Err(e) => {
2447            println!("  {} Failed to check queue type: {}", "✗".red(), e);
2448            health_issues.push("Cannot check queue type".to_string());
2449            "none".to_string()
2450        }
2451    };
2452
2453    // Check if queue is paused
2454    let pause_key = format!("celers:{queue}:paused");
2455    match redis::cmd("GET")
2456        .arg(&pause_key)
2457        .query_async::<Option<String>>(&mut conn)
2458        .await
2459    {
2460        Ok(Some(paused_at)) => {
2461            println!("  {} Queue is PAUSED (since: {})", "⚠".yellow(), paused_at);
2462            health_warnings.push(format!("Queue is paused since {paused_at}"));
2463        }
2464        Ok(None) => {
2465            println!("  {} Queue is not paused", "✓".green());
2466        }
2467        Err(e) => {
2468            println!("  {} Failed to check pause status: {}", "⚠".yellow(), e);
2469        }
2470    }
2471
2472    println!();
2473
2474    // Test 4: Memory Usage (if accessible)
2475    println!("{}", "4. Broker Memory".bold());
2476    match redis::cmd("INFO")
2477        .arg("memory")
2478        .query_async::<String>(&mut conn)
2479        .await
2480    {
2481        Ok(info) => {
2482            // Parse used_memory from INFO output
2483            for line in info.lines() {
2484                if line.starts_with("used_memory_human:") {
2485                    let memory = line.split(':').nth(1).unwrap_or("N/A");
2486                    println!("  {} Used memory: {}", "✓".green(), memory);
2487                    break;
2488                }
2489            }
2490        }
2491        Err(_) => {
2492            println!("  {} Memory info not available", "⚠".yellow());
2493        }
2494    }
2495
2496    println!();
2497
2498    // Test 5: Health Summary
2499    println!("{}", "Health Summary".bold().cyan());
2500    println!();
2501
2502    if health_issues.is_empty() && health_warnings.is_empty() {
2503        println!(
2504            "{}",
2505            "  ✓ All checks passed! System is healthy.".green().bold()
2506        );
2507    } else {
2508        if !health_issues.is_empty() {
2509            println!("{}", "  Critical Issues:".red().bold());
2510            for issue in &health_issues {
2511                println!("    {} {}", "✗".red(), issue);
2512            }
2513            println!();
2514        }
2515
2516        if !health_warnings.is_empty() {
2517            println!("{}", "  Warnings:".yellow().bold());
2518            for warning in &health_warnings {
2519                println!("    {} {}", "⚠".yellow(), warning);
2520            }
2521            println!();
2522        }
2523
2524        if health_issues.is_empty() {
2525            println!(
2526                "{}",
2527                "  Overall: System is operational with warnings"
2528                    .yellow()
2529                    .bold()
2530            );
2531        } else {
2532            println!("{}", "  Overall: System has critical issues".red().bold());
2533        }
2534    }
2535
2536    println!();
2537
2538    // Recommendations
2539    if dlq_size > 0 {
2540        println!("{}", "Recommendations:".cyan().bold());
2541        println!("  • Inspect DLQ: celers dlq inspect");
2542        println!("  • Clear DLQ: celers dlq clear --confirm");
2543        println!();
2544    }
2545
2546    Ok(())
2547}
2548
2549/// List all running workers
2550pub async fn list_workers(broker_url: &str) -> anyhow::Result<()> {
2551    let client = redis::Client::open(broker_url)?;
2552    let mut conn = client.get_multiplexed_async_connection().await?;
2553
2554    println!("{}", "=== Active Workers ===".bold().cyan());
2555    println!();
2556
2557    // Workers register themselves with a heartbeat key
2558    let worker_pattern = "celers:worker:*:heartbeat";
2559    let mut cursor = 0;
2560    let mut worker_keys: Vec<String> = Vec::new();
2561
2562    loop {
2563        let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
2564            .arg(cursor)
2565            .arg("MATCH")
2566            .arg(worker_pattern)
2567            .arg("COUNT")
2568            .arg(100)
2569            .query_async(&mut conn)
2570            .await?;
2571
2572        worker_keys.extend(keys);
2573        cursor = new_cursor;
2574
2575        if cursor == 0 {
2576            break;
2577        }
2578    }
2579
2580    if worker_keys.is_empty() {
2581        println!("{}", "No active workers found".yellow());
2582        println!();
2583        println!("Workers register themselves when they start processing tasks.");
2584        return Ok(());
2585    }
2586
2587    #[derive(Tabled)]
2588    struct WorkerInfo {
2589        #[tabled(rename = "Worker ID")]
2590        id: String,
2591        #[tabled(rename = "Status")]
2592        status: String,
2593        #[tabled(rename = "Last Heartbeat")]
2594        last_heartbeat: String,
2595    }
2596
2597    let mut workers = Vec::new();
2598    let worker_count = worker_keys.len();
2599
2600    for key in &worker_keys {
2601        // Extract worker ID from key: celers:worker:<id>:heartbeat
2602        let parts: Vec<&str> = key.split(':').collect();
2603        if parts.len() >= 3 {
2604            let worker_id = parts[2].to_string();
2605
2606            // Get heartbeat timestamp
2607            let heartbeat: Option<String> =
2608                redis::cmd("GET").arg(key).query_async(&mut conn).await?;
2609
2610            let status = if heartbeat.is_some() {
2611                "Active".to_string()
2612            } else {
2613                "Unknown".to_string()
2614            };
2615
2616            let last_heartbeat = heartbeat.unwrap_or_else(|| "N/A".to_string());
2617
2618            workers.push(WorkerInfo {
2619                id: worker_id,
2620                status,
2621                last_heartbeat,
2622            });
2623        }
2624    }
2625
2626    let table = Table::new(workers).with(Style::rounded()).to_string();
2627    println!("{table}");
2628    println!();
2629    println!(
2630        "{}",
2631        format!("Total active workers: {worker_count}")
2632            .cyan()
2633            .bold()
2634    );
2635
2636    Ok(())
2637}
2638
2639/// Show detailed statistics for a worker
2640pub async fn worker_stats(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2641    let client = redis::Client::open(broker_url)?;
2642    let mut conn = client.get_multiplexed_async_connection().await?;
2643
2644    println!(
2645        "{}",
2646        format!("=== Worker Statistics: {worker_id} ===")
2647            .bold()
2648            .cyan()
2649    );
2650    println!();
2651
2652    // Check if worker exists
2653    let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
2654    let heartbeat: Option<String> = redis::cmd("GET")
2655        .arg(&heartbeat_key)
2656        .query_async(&mut conn)
2657        .await?;
2658
2659    if heartbeat.is_none() {
2660        println!("{}", format!("✗ Worker '{worker_id}' not found").red());
2661        println!();
2662        println!("Possible reasons:");
2663        println!("  • Worker is not running");
2664        println!("  • Worker ID is incorrect");
2665        println!("  • Worker hasn't sent a heartbeat yet");
2666        return Ok(());
2667    }
2668
2669    #[derive(Tabled)]
2670    struct StatRow {
2671        #[tabled(rename = "Metric")]
2672        metric: String,
2673        #[tabled(rename = "Value")]
2674        value: String,
2675    }
2676
2677    // Gather worker statistics
2678    let stats_key = format!("celers:worker:{worker_id}:stats");
2679    let stats: Option<String> = redis::cmd("GET")
2680        .arg(&stats_key)
2681        .query_async(&mut conn)
2682        .await?;
2683
2684    let mut stat_rows = vec![
2685        StatRow {
2686            metric: "Worker ID".to_string(),
2687            value: worker_id.to_string(),
2688        },
2689        StatRow {
2690            metric: "Status".to_string(),
2691            value: "Active".to_string(),
2692        },
2693        StatRow {
2694            metric: "Last Heartbeat".to_string(),
2695            value: heartbeat.unwrap_or_else(|| "N/A".to_string()),
2696        },
2697    ];
2698
2699    if let Some(stats_json) = stats {
2700        if let Ok(stats_data) = serde_json::from_str::<serde_json::Value>(&stats_json) {
2701            if let Some(tasks_processed) = stats_data.get("tasks_processed") {
2702                stat_rows.push(StatRow {
2703                    metric: "Tasks Processed".to_string(),
2704                    value: tasks_processed.to_string(),
2705                });
2706            }
2707            if let Some(tasks_failed) = stats_data.get("tasks_failed") {
2708                stat_rows.push(StatRow {
2709                    metric: "Tasks Failed".to_string(),
2710                    value: tasks_failed.to_string(),
2711                });
2712            }
2713            if let Some(uptime) = stats_data.get("uptime_seconds") {
2714                stat_rows.push(StatRow {
2715                    metric: "Uptime".to_string(),
2716                    value: format!("{uptime} seconds"),
2717                });
2718            }
2719        }
2720    }
2721
2722    let table = Table::new(stat_rows).with(Style::rounded()).to_string();
2723    println!("{table}");
2724
2725    Ok(())
2726}
2727
2728/// Stop a specific worker
2729pub async fn stop_worker(broker_url: &str, worker_id: &str, graceful: bool) -> anyhow::Result<()> {
2730    let client = redis::Client::open(broker_url)?;
2731    let mut conn = client.get_multiplexed_async_connection().await?;
2732
2733    println!(
2734        "{}",
2735        format!("=== Stop Worker: {worker_id} ===").bold().yellow()
2736    );
2737    println!();
2738
2739    // Check if worker exists
2740    let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
2741    let heartbeat: Option<String> = redis::cmd("GET")
2742        .arg(&heartbeat_key)
2743        .query_async(&mut conn)
2744        .await?;
2745
2746    if heartbeat.is_none() {
2747        println!("{}", format!("✗ Worker '{worker_id}' not found").red());
2748        return Ok(());
2749    }
2750
2751    // Publish stop command via Redis Pub/Sub
2752    let channel = if graceful {
2753        format!("celers:worker:{worker_id}:shutdown_graceful")
2754    } else {
2755        format!("celers:worker:{worker_id}:shutdown")
2756    };
2757
2758    let subscribers: usize = redis::cmd("PUBLISH")
2759        .arg(&channel)
2760        .arg("STOP")
2761        .query_async(&mut conn)
2762        .await?;
2763
2764    if subscribers > 0 {
2765        println!(
2766            "{}",
2767            format!(
2768                "✓ Stop signal sent to worker '{}' (mode: {})",
2769                worker_id,
2770                if graceful { "graceful" } else { "immediate" }
2771            )
2772            .green()
2773            .bold()
2774        );
2775        println!();
2776        if graceful {
2777            println!("The worker will:");
2778            println!("  • Finish processing current tasks");
2779            println!("  • Stop accepting new tasks");
2780            println!("  • Shut down gracefully");
2781        } else {
2782            println!("The worker will:");
2783            println!("  • Stop immediately");
2784            println!("  • Cancel running tasks");
2785        }
2786    } else {
2787        println!(
2788            "{}",
2789            format!("⚠ No subscribers for worker '{worker_id}'")
2790                .yellow()
2791                .bold()
2792        );
2793        println!();
2794        println!("The worker may not be listening for stop commands.");
2795    }
2796
2797    Ok(())
2798}
2799
2800/// Pause task processing for a worker
2801pub async fn pause_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2802    let client = redis::Client::open(broker_url)?;
2803    let mut conn = client.get_multiplexed_async_connection().await?;
2804
2805    let pause_key = format!("celers:worker:{worker_id}:paused");
2806    let timestamp = chrono::Utc::now().to_rfc3339();
2807
2808    let _: () = redis::cmd("SET")
2809        .arg(&pause_key)
2810        .arg(&timestamp)
2811        .query_async(&mut conn)
2812        .await?;
2813
2814    println!(
2815        "{}",
2816        format!("✓ Worker '{worker_id}' has been paused")
2817            .green()
2818            .bold()
2819    );
2820    println!();
2821    println!("{}", "Note:".yellow().bold());
2822    println!("  • Worker will stop accepting new tasks");
2823    println!("  • Current tasks will continue to completion");
2824    println!("  • Use 'celers worker-mgmt resume' to resume");
2825    println!();
2826    println!("  Paused at: {}", timestamp.cyan());
2827
2828    Ok(())
2829}
2830
2831/// Resume task processing for a worker
2832pub async fn resume_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2833    let client = redis::Client::open(broker_url)?;
2834    let mut conn = client.get_multiplexed_async_connection().await?;
2835
2836    let pause_key = format!("celers:worker:{worker_id}:paused");
2837
2838    // Check if worker is paused
2839    let paused: Option<String> = redis::cmd("GET")
2840        .arg(&pause_key)
2841        .query_async(&mut conn)
2842        .await?;
2843
2844    if paused.is_none() {
2845        println!(
2846            "{}",
2847            format!("✓ Worker '{worker_id}' is not paused").yellow()
2848        );
2849        return Ok(());
2850    }
2851
2852    // Remove pause flag
2853    let _: () = redis::cmd("DEL")
2854        .arg(&pause_key)
2855        .query_async(&mut conn)
2856        .await?;
2857
2858    println!(
2859        "{}",
2860        format!("✓ Worker '{worker_id}' has been resumed")
2861            .green()
2862            .bold()
2863    );
2864    println!();
2865    println!("{}", "Note:".yellow().bold());
2866    println!("  • Worker will now accept new tasks");
2867    if let Some(paused_at) = paused {
2868        println!("  • Was paused at: {}", paused_at.dimmed());
2869    }
2870
2871    Ok(())
2872}
2873
2874/// Automatic problem detection and diagnostics
2875pub async fn doctor(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2876    println!("{}", "=== CeleRS Doctor ===".bold().cyan());
2877    println!("{}", "Running automatic diagnostics...".dimmed());
2878    println!();
2879
2880    let client = redis::Client::open(broker_url)?;
2881    let mut conn = client.get_multiplexed_async_connection().await?;
2882
2883    let mut issues = Vec::new();
2884    let mut warnings = Vec::new();
2885    let mut recommendations = Vec::new();
2886
2887    // Test 1: Broker connectivity
2888    println!("{}", "1. Checking broker connectivity...".bold());
2889    match redis::cmd("PING").query_async::<String>(&mut conn).await {
2890        Ok(_) => {
2891            println!("  {} Broker is reachable", "✓".green());
2892        }
2893        Err(e) => {
2894            println!("  {} Broker connection failed: {}", "✗".red(), e);
2895            issues.push("Cannot connect to broker".to_string());
2896            recommendations.push("Check broker URL and ensure Redis is running".to_string());
2897        }
2898    }
2899    println!();
2900
2901    // Test 2: Queue health
2902    println!("{}", "2. Analyzing queue health...".bold());
2903    let broker = RedisBroker::new(broker_url, queue)?;
2904
2905    let queue_size = broker.queue_size().await.unwrap_or(0);
2906    let dlq_size = broker.dlq_size().await.unwrap_or(0);
2907
2908    println!("  {} Pending tasks: {}", "•".cyan(), queue_size);
2909    println!("  {} DLQ tasks: {}", "•".cyan(), dlq_size);
2910
2911    if dlq_size > 10 {
2912        warnings.push(format!("High number of failed tasks in DLQ: {dlq_size}"));
2913        recommendations.push("Inspect DLQ with: celers dlq inspect".to_string());
2914    }
2915
2916    if queue_size > 1000 {
2917        warnings.push(format!("Large queue backlog: {queue_size} tasks"));
2918        recommendations.push("Consider scaling up workers".to_string());
2919    }
2920    println!();
2921
2922    // Test 3: Worker availability
2923    println!("{}", "3. Checking worker availability...".bold());
2924    let worker_pattern = "celers:worker:*:heartbeat";
2925    let mut cursor = 0;
2926    let mut worker_count = 0;
2927
2928    loop {
2929        let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
2930            .arg(cursor)
2931            .arg("MATCH")
2932            .arg(worker_pattern)
2933            .arg("COUNT")
2934            .arg(100)
2935            .query_async(&mut conn)
2936            .await?;
2937
2938        worker_count += keys.len();
2939        cursor = new_cursor;
2940
2941        if cursor == 0 {
2942            break;
2943        }
2944    }
2945
2946    println!("  {} Active workers: {}", "•".cyan(), worker_count);
2947
2948    if worker_count == 0 && queue_size > 0 {
2949        issues.push("No workers available to process pending tasks".to_string());
2950        recommendations.push("Start workers with: celers worker".to_string());
2951    } else if worker_count > 0 {
2952        println!("  {} Workers are available", "✓".green());
2953    }
2954    println!();
2955
2956    // Test 4: Queue pause status
2957    println!("{}", "4. Checking queue status...".bold());
2958    let pause_key = format!("celers:{queue}:paused");
2959    let paused: Option<String> = redis::cmd("GET")
2960        .arg(&pause_key)
2961        .query_async(&mut conn)
2962        .await?;
2963
2964    if let Some(paused_at) = paused {
2965        warnings.push(format!("Queue '{queue}' is paused since {paused_at}"));
2966        recommendations.push("Resume queue with: celers queue resume".to_string());
2967        println!("  {} Queue is PAUSED", "⚠".yellow());
2968    } else {
2969        println!("  {} Queue is active", "✓".green());
2970    }
2971    println!();
2972
2973    // Test 5: Memory usage
2974    println!("{}", "5. Checking broker memory...".bold());
2975    match redis::cmd("INFO")
2976        .arg("memory")
2977        .query_async::<String>(&mut conn)
2978        .await
2979    {
2980        Ok(info) => {
2981            for line in info.lines() {
2982                if line.starts_with("used_memory_human:") {
2983                    let memory = line.split(':').nth(1).unwrap_or("N/A");
2984                    println!("  {} Used memory: {}", "•".cyan(), memory);
2985                }
2986                if line.starts_with("maxmemory_human:") {
2987                    let max_memory = line.split(':').nth(1).unwrap_or("N/A");
2988                    if max_memory != "0B" {
2989                        println!("  {} Max memory: {}", "•".cyan(), max_memory);
2990                    }
2991                }
2992            }
2993            println!("  {} Memory usage is acceptable", "✓".green());
2994        }
2995        Err(_) => {
2996            println!("  {} Memory info unavailable", "⚠".yellow());
2997        }
2998    }
2999    println!();
3000
3001    // Summary
3002    println!("{}", "=== Diagnosis Summary ===".bold().cyan());
3003    println!();
3004
3005    if issues.is_empty() && warnings.is_empty() {
3006        println!(
3007            "{}",
3008            "  ✓ No issues detected! System is healthy.".green().bold()
3009        );
3010    } else {
3011        if !issues.is_empty() {
3012            println!("{}", "  Critical Issues:".red().bold());
3013            for issue in &issues {
3014                println!("    {} {}", "✗".red(), issue);
3015            }
3016            println!();
3017        }
3018
3019        if !warnings.is_empty() {
3020            println!("{}", "  Warnings:".yellow().bold());
3021            for warning in &warnings {
3022                println!("    {} {}", "⚠".yellow(), warning);
3023            }
3024            println!();
3025        }
3026
3027        if !recommendations.is_empty() {
3028            println!("{}", "  Recommendations:".cyan().bold());
3029            for (i, rec) in recommendations.iter().enumerate() {
3030                println!("    {}. {}", i + 1, rec);
3031            }
3032            println!();
3033        }
3034
3035        if issues.is_empty() {
3036            println!(
3037                "{}",
3038                "  Overall: System is operational with warnings"
3039                    .yellow()
3040                    .bold()
3041            );
3042        } else {
3043            println!(
3044                "{}",
3045                "  Overall: System has critical issues that need attention"
3046                    .red()
3047                    .bold()
3048            );
3049        }
3050    }
3051
3052    Ok(())
3053}
3054
3055/// Show task execution logs
3056pub async fn show_task_logs(
3057    broker_url: &str,
3058    task_id_str: &str,
3059    limit: usize,
3060) -> anyhow::Result<()> {
3061    let task_id = task_id_str
3062        .parse::<uuid::Uuid>()
3063        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
3064
3065    println!("{}", "=== Task Execution Logs ===".bold().cyan());
3066    println!("Task ID: {}", task_id.to_string().yellow());
3067    println!();
3068
3069    // Connect to Redis
3070    let client = redis::Client::open(broker_url)?;
3071    let mut conn = client.get_multiplexed_async_connection().await?;
3072
3073    // Logs are stored in a Redis list: celers:task:<id>:logs
3074    let logs_key = format!("celers:task:{task_id}:logs");
3075
3076    // Check if logs exist
3077    let exists: bool = redis::cmd("EXISTS")
3078        .arg(&logs_key)
3079        .query_async(&mut conn)
3080        .await?;
3081
3082    if !exists {
3083        println!("{}", "✗ No logs found for this task".red());
3084        println!();
3085        println!("Possible reasons:");
3086        println!("  • Task hasn't been executed yet");
3087        println!("  • Logs have expired (TTL)");
3088        println!("  • Task was executed before logging was enabled");
3089        println!("  • Wrong task ID");
3090        return Ok(());
3091    }
3092
3093    // Get log count
3094    let log_count: isize = redis::cmd("LLEN")
3095        .arg(&logs_key)
3096        .query_async(&mut conn)
3097        .await?;
3098
3099    println!(
3100        "{}",
3101        format!("Total log entries: {log_count}").cyan().bold()
3102    );
3103    println!();
3104
3105    // Fetch logs (most recent first)
3106    let logs: Vec<String> = redis::cmd("LRANGE")
3107        .arg(&logs_key)
3108        .arg(-(limit as isize))
3109        .arg(-1)
3110        .query_async(&mut conn)
3111        .await?;
3112
3113    if logs.is_empty() {
3114        println!("{}", "No log entries available".yellow());
3115        return Ok(());
3116    }
3117
3118    // Display logs with colors
3119    for (idx, log_entry) in logs.iter().enumerate() {
3120        // Try to parse as JSON for structured logs
3121        if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log_entry) {
3122            let timestamp = log_json
3123                .get("timestamp")
3124                .and_then(|v| v.as_str())
3125                .unwrap_or("N/A");
3126            let level = log_json
3127                .get("level")
3128                .and_then(|v| v.as_str())
3129                .unwrap_or("INFO");
3130            let message = log_json
3131                .get("message")
3132                .and_then(|v| v.as_str())
3133                .unwrap_or(log_entry);
3134
3135            let level_colored = match level {
3136                "ERROR" | "error" => level.red().bold(),
3137                "WARN" | "warn" => level.yellow().bold(),
3138                "DEBUG" | "debug" => level.dimmed(),
3139                _ => level.cyan().bold(),
3140            };
3141
3142            println!(
3143                "{} {} {} {}",
3144                format!("[{}]", idx + 1).dimmed(),
3145                timestamp.dimmed(),
3146                level_colored,
3147                message
3148            );
3149        } else {
3150            // Plain text log
3151            println!("{} {}", format!("[{}]", idx + 1).dimmed(), log_entry);
3152        }
3153    }
3154
3155    println!();
3156    if log_count as usize > limit {
3157        println!(
3158            "{}",
3159            format!("Showing last {} of {} log entries", logs.len(), log_count).yellow()
3160        );
3161        println!(
3162            "{}",
3163            format!("Use --limit to show more entries (max: {log_count})").dimmed()
3164        );
3165    } else {
3166        println!(
3167            "{}",
3168            format!("Showing all {} log entries", logs.len())
3169                .green()
3170                .bold()
3171        );
3172    }
3173
3174    Ok(())
3175}
3176
3177/// List all scheduled tasks
3178pub async fn list_schedules(broker_url: &str) -> anyhow::Result<()> {
3179    let client = redis::Client::open(broker_url)?;
3180    let mut conn = client.get_multiplexed_async_connection().await?;
3181
3182    println!("{}", "=== Scheduled Tasks ===".bold().cyan());
3183    println!();
3184
3185    // Schedules are stored with keys like: celers:schedule:<name>
3186    let schedule_pattern = "celers:schedule:*";
3187    let mut cursor = 0;
3188    let mut schedule_keys: Vec<String> = Vec::new();
3189
3190    loop {
3191        let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
3192            .arg(cursor)
3193            .arg("MATCH")
3194            .arg(schedule_pattern)
3195            .arg("COUNT")
3196            .arg(100)
3197            .query_async(&mut conn)
3198            .await?;
3199
3200        schedule_keys.extend(keys);
3201        cursor = new_cursor;
3202
3203        if cursor == 0 {
3204            break;
3205        }
3206    }
3207
3208    if schedule_keys.is_empty() {
3209        println!("{}", "No scheduled tasks found".yellow());
3210        println!();
3211        println!("Add a schedule with: celers schedule add <name> --task <task> --cron <expr>");
3212        return Ok(());
3213    }
3214
3215    #[derive(Tabled)]
3216    struct ScheduleInfo {
3217        #[tabled(rename = "Name")]
3218        name: String,
3219        #[tabled(rename = "Task")]
3220        task: String,
3221        #[tabled(rename = "Cron")]
3222        cron: String,
3223        #[tabled(rename = "Status")]
3224        status: String,
3225        #[tabled(rename = "Last Run")]
3226        last_run: String,
3227    }
3228
3229    let mut schedules = Vec::new();
3230
3231    for key in &schedule_keys {
3232        // Extract schedule name from key
3233        let name = key.strip_prefix("celers:schedule:").unwrap_or(key);
3234
3235        // Get schedule data
3236        let schedule_data: Option<String> =
3237            redis::cmd("GET").arg(key).query_async(&mut conn).await?;
3238
3239        if let Some(data) = schedule_data {
3240            if let Ok(schedule) = serde_json::from_str::<serde_json::Value>(&data) {
3241                let task = schedule
3242                    .get("task")
3243                    .and_then(|v| v.as_str())
3244                    .unwrap_or("N/A")
3245                    .to_string();
3246                let cron = schedule
3247                    .get("cron")
3248                    .and_then(|v| v.as_str())
3249                    .unwrap_or("N/A")
3250                    .to_string();
3251
3252                // Check if paused
3253                let pause_key = format!("celers:schedule:{name}:paused");
3254                let paused: Option<String> = redis::cmd("GET")
3255                    .arg(&pause_key)
3256                    .query_async(&mut conn)
3257                    .await?;
3258
3259                let status = if paused.is_some() {
3260                    "Paused".to_string()
3261                } else {
3262                    "Active".to_string()
3263                };
3264
3265                let last_run = schedule
3266                    .get("last_run")
3267                    .and_then(|v| v.as_str())
3268                    .unwrap_or("Never")
3269                    .to_string();
3270
3271                schedules.push(ScheduleInfo {
3272                    name: name.to_string(),
3273                    task,
3274                    cron,
3275                    status,
3276                    last_run,
3277                });
3278            }
3279        }
3280    }
3281
3282    let table = Table::new(schedules).with(Style::rounded()).to_string();
3283    println!("{table}");
3284    println!();
3285    println!(
3286        "{}",
3287        format!("Total scheduled tasks: {}", schedule_keys.len())
3288            .cyan()
3289            .bold()
3290    );
3291
3292    Ok(())
3293}
3294
3295/// Add a new scheduled task
3296#[allow(clippy::too_many_arguments)]
3297pub async fn add_schedule(
3298    broker_url: &str,
3299    name: &str,
3300    task: &str,
3301    cron: &str,
3302    queue: &str,
3303    args: Option<&str>,
3304) -> anyhow::Result<()> {
3305    let client = redis::Client::open(broker_url)?;
3306    let mut conn = client.get_multiplexed_async_connection().await?;
3307
3308    println!("{}", "=== Add Scheduled Task ===".bold().cyan());
3309    println!();
3310
3311    // Validate cron expression (basic validation)
3312    let cron_parts: Vec<&str> = cron.split_whitespace().collect();
3313    if cron_parts.len() != 5 {
3314        println!(
3315            "{}",
3316            "✗ Invalid cron expression. Expected format: 'min hour day month weekday'"
3317                .red()
3318                .bold()
3319        );
3320        println!();
3321        println!("Examples:");
3322        println!("  0 0 * * *       - Daily at midnight");
3323        println!("  0 */2 * * *     - Every 2 hours");
3324        println!("  */15 * * * *    - Every 15 minutes");
3325        println!("  0 9 * * 1-5     - Weekdays at 9 AM");
3326        return Ok(());
3327    }
3328
3329    // Check if schedule already exists
3330    let schedule_key = format!("celers:schedule:{name}");
3331    let exists: bool = redis::cmd("EXISTS")
3332        .arg(&schedule_key)
3333        .query_async(&mut conn)
3334        .await?;
3335
3336    if exists {
3337        println!(
3338            "{}",
3339            format!("✗ Schedule '{name}' already exists").red().bold()
3340        );
3341        println!();
3342        println!("Use a different name or remove the existing schedule first:");
3343        println!("  celers schedule remove {name} --confirm");
3344        return Ok(());
3345    }
3346
3347    // Parse args if provided
3348    let args_value = if let Some(args_str) = args {
3349        match serde_json::from_str::<serde_json::Value>(args_str) {
3350            Ok(v) => v,
3351            Err(e) => {
3352                println!("{}", "✗ Invalid JSON arguments".red().bold());
3353                println!("  Error: {e}");
3354                return Ok(());
3355            }
3356        }
3357    } else {
3358        serde_json::json!({})
3359    };
3360
3361    // Create schedule data
3362    let schedule_data = serde_json::json!({
3363        "name": name,
3364        "task": task,
3365        "cron": cron,
3366        "queue": queue,
3367        "args": args_value,
3368        "created_at": chrono::Utc::now().to_rfc3339(),
3369        "last_run": null,
3370    });
3371
3372    // Save schedule
3373    let schedule_json = serde_json::to_string(&schedule_data)?;
3374    let _: () = redis::cmd("SET")
3375        .arg(&schedule_key)
3376        .arg(&schedule_json)
3377        .query_async(&mut conn)
3378        .await?;
3379
3380    println!("{}", "✓ Schedule added successfully".green().bold());
3381    println!();
3382    println!("  {} {}", "Name:".cyan(), name);
3383    println!("  {} {}", "Task:".cyan(), task);
3384    println!("  {} {}", "Cron:".cyan(), cron);
3385    println!("  {} {}", "Queue:".cyan(), queue);
3386    if let Some(args) = args {
3387        println!("  {} {}", "Args:".cyan(), args);
3388    }
3389    println!();
3390    println!("{}", "Note:".yellow().bold());
3391    println!("  • Ensure a beat scheduler is running to execute this schedule");
3392    println!("  • The schedule is active and will run at the specified times");
3393
3394    Ok(())
3395}
3396
3397/// Remove a scheduled task
3398pub async fn remove_schedule(broker_url: &str, name: &str, confirm: bool) -> anyhow::Result<()> {
3399    let client = redis::Client::open(broker_url)?;
3400    let mut conn = client.get_multiplexed_async_connection().await?;
3401
3402    let schedule_key = format!("celers:schedule:{name}");
3403
3404    // Check if schedule exists
3405    let exists: bool = redis::cmd("EXISTS")
3406        .arg(&schedule_key)
3407        .query_async(&mut conn)
3408        .await?;
3409
3410    if !exists {
3411        println!("{}", format!("✗ Schedule '{name}' not found").red());
3412        return Ok(());
3413    }
3414
3415    if !confirm {
3416        println!(
3417            "{}",
3418            format!("⚠ Warning: This will delete schedule '{name}'")
3419                .yellow()
3420                .bold()
3421        );
3422        println!("   Add --confirm to proceed");
3423        return Ok(());
3424    }
3425
3426    // Remove schedule and its pause flag
3427    let pause_key = format!("celers:schedule:{name}:paused");
3428    let _: () = redis::cmd("DEL")
3429        .arg(&schedule_key)
3430        .arg(&pause_key)
3431        .query_async(&mut conn)
3432        .await?;
3433
3434    println!(
3435        "{}",
3436        format!("✓ Schedule '{name}' removed successfully")
3437            .green()
3438            .bold()
3439    );
3440
3441    Ok(())
3442}
3443
3444/// Pause a schedule
3445pub async fn pause_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3446    let client = redis::Client::open(broker_url)?;
3447    let mut conn = client.get_multiplexed_async_connection().await?;
3448
3449    let schedule_key = format!("celers:schedule:{name}");
3450
3451    // Check if schedule exists
3452    let exists: bool = redis::cmd("EXISTS")
3453        .arg(&schedule_key)
3454        .query_async(&mut conn)
3455        .await?;
3456
3457    if !exists {
3458        println!("{}", format!("✗ Schedule '{name}' not found").red());
3459        return Ok(());
3460    }
3461
3462    // Set pause flag
3463    let pause_key = format!("celers:schedule:{name}:paused");
3464    let timestamp = chrono::Utc::now().to_rfc3339();
3465    let _: () = redis::cmd("SET")
3466        .arg(&pause_key)
3467        .arg(&timestamp)
3468        .query_async(&mut conn)
3469        .await?;
3470
3471    println!(
3472        "{}",
3473        format!("✓ Schedule '{name}' has been paused")
3474            .green()
3475            .bold()
3476    );
3477    println!();
3478    println!("{}", "Note:".yellow().bold());
3479    println!("  • Schedule will not execute until resumed");
3480    println!("  • Use 'celers schedule resume {name}' to resume");
3481    println!();
3482    println!("  Paused at: {}", timestamp.cyan());
3483
3484    Ok(())
3485}
3486
3487/// Resume a paused schedule
3488pub async fn resume_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3489    let client = redis::Client::open(broker_url)?;
3490    let mut conn = client.get_multiplexed_async_connection().await?;
3491
3492    let schedule_key = format!("celers:schedule:{name}");
3493
3494    // Check if schedule exists
3495    let exists: bool = redis::cmd("EXISTS")
3496        .arg(&schedule_key)
3497        .query_async(&mut conn)
3498        .await?;
3499
3500    if !exists {
3501        println!("{}", format!("✗ Schedule '{name}' not found").red());
3502        return Ok(());
3503    }
3504
3505    // Check if paused
3506    let pause_key = format!("celers:schedule:{name}:paused");
3507    let paused: Option<String> = redis::cmd("GET")
3508        .arg(&pause_key)
3509        .query_async(&mut conn)
3510        .await?;
3511
3512    if paused.is_none() {
3513        println!("{}", format!("✓ Schedule '{name}' is not paused").yellow());
3514        return Ok(());
3515    }
3516
3517    // Remove pause flag
3518    let _: () = redis::cmd("DEL")
3519        .arg(&pause_key)
3520        .query_async(&mut conn)
3521        .await?;
3522
3523    println!(
3524        "{}",
3525        format!("✓ Schedule '{name}' has been resumed")
3526            .green()
3527            .bold()
3528    );
3529    println!();
3530    println!("{}", "Note:".yellow().bold());
3531    println!("  • Schedule will now execute at the specified times");
3532    if let Some(paused_at) = paused {
3533        println!("  • Was paused at: {}", paused_at.dimmed());
3534    }
3535
3536    Ok(())
3537}
3538
3539/// Manually trigger a scheduled task
3540pub async fn trigger_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3541    let client = redis::Client::open(broker_url)?;
3542    let mut conn = client.get_multiplexed_async_connection().await?;
3543
3544    let schedule_key = format!("celers:schedule:{name}");
3545
3546    println!(
3547        "{}",
3548        format!("=== Trigger Schedule: {name} ===").bold().cyan()
3549    );
3550    println!();
3551
3552    // Get schedule data
3553    let schedule_data: Option<String> = redis::cmd("GET")
3554        .arg(&schedule_key)
3555        .query_async(&mut conn)
3556        .await?;
3557
3558    if let Some(data) = schedule_data {
3559        if let Ok(schedule) = serde_json::from_str::<serde_json::Value>(&data) {
3560            let task = schedule
3561                .get("task")
3562                .and_then(|v| v.as_str())
3563                .unwrap_or("N/A");
3564            let queue = schedule
3565                .get("queue")
3566                .and_then(|v| v.as_str())
3567                .unwrap_or("celers");
3568
3569            // Publish trigger command via Redis Pub/Sub
3570            let trigger_channel = format!("celers:schedule:{name}:trigger");
3571            let subscribers: usize = redis::cmd("PUBLISH")
3572                .arg(&trigger_channel)
3573                .arg("TRIGGER")
3574                .query_async(&mut conn)
3575                .await?;
3576
3577            if subscribers > 0 {
3578                println!(
3579                    "{}",
3580                    format!("✓ Trigger signal sent for schedule '{name}'")
3581                        .green()
3582                        .bold()
3583                );
3584                println!();
3585                println!("  {} {}", "Task:".cyan(), task);
3586                println!("  {} {}", "Queue:".cyan(), queue);
3587                println!();
3588                println!("The task will be executed immediately by the beat scheduler.");
3589            } else {
3590                println!(
3591                    "{}",
3592                    "⚠ No beat scheduler subscribed to trigger channel"
3593                        .yellow()
3594                        .bold()
3595                );
3596                println!();
3597                println!("The trigger command was sent but no beat scheduler is listening.");
3598                println!("Ensure a beat scheduler is running to execute scheduled tasks.");
3599            }
3600        } else {
3601            println!("{}", "✗ Invalid schedule data".red());
3602        }
3603    } else {
3604        println!("{}", format!("✗ Schedule '{name}' not found").red());
3605    }
3606
3607    Ok(())
3608}
3609
3610/// Scale workers to N instances
3611pub async fn scale_workers(broker_url: &str, target_count: usize) -> anyhow::Result<()> {
3612    let client = redis::Client::open(broker_url)?;
3613    let mut conn = client.get_multiplexed_async_connection().await?;
3614
3615    println!(
3616        "{}",
3617        format!("=== Scale Workers to {target_count} ===")
3618            .bold()
3619            .cyan()
3620    );
3621    println!();
3622
3623    // Get current worker count
3624    let pattern = "celers:worker:*:heartbeat";
3625    let keys: Vec<String> = redis::cmd("KEYS")
3626        .arg(pattern)
3627        .query_async(&mut conn)
3628        .await?;
3629
3630    let current_count = keys.len();
3631
3632    println!("Current workers: {}", current_count.to_string().yellow());
3633    println!("Target workers: {}", target_count.to_string().green());
3634    println!();
3635
3636    if current_count == target_count {
3637        println!("{}", "✓ Already at target worker count".green().bold());
3638        return Ok(());
3639    }
3640
3641    if current_count < target_count {
3642        let needed = target_count - current_count;
3643        println!(
3644            "{}",
3645            format!("⚠ Need to start {needed} more workers")
3646                .yellow()
3647                .bold()
3648        );
3649        println!();
3650        println!("To scale up, start additional worker instances:");
3651        println!("  celers worker --broker {broker_url}");
3652        println!();
3653        println!("Or run them in parallel:");
3654        for i in 1..=needed {
3655            println!("  celers worker --broker {broker_url} & # Worker {i}");
3656        }
3657    } else {
3658        let excess = current_count - target_count;
3659        println!(
3660            "{}",
3661            format!("⚠ Need to stop {excess} workers").yellow().bold()
3662        );
3663        println!();
3664        println!("To scale down, stop workers gracefully:");
3665        println!("  celers worker-mgmt list");
3666        println!("  celers worker-mgmt stop <worker-id> --graceful");
3667    }
3668
3669    Ok(())
3670}
3671
3672/// Drain worker (stop accepting new tasks)
3673pub async fn drain_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
3674    let client = redis::Client::open(broker_url)?;
3675    let mut conn = client.get_multiplexed_async_connection().await?;
3676
3677    println!(
3678        "{}",
3679        format!("=== Drain Worker: {worker_id} ===").bold().cyan()
3680    );
3681    println!();
3682
3683    // Check if worker exists
3684    let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
3685    let exists: bool = redis::cmd("EXISTS")
3686        .arg(&heartbeat_key)
3687        .query_async(&mut conn)
3688        .await?;
3689
3690    if !exists {
3691        println!("{}", format!("✗ Worker '{worker_id}' not found").red());
3692        return Ok(());
3693    }
3694
3695    // Set drain flag
3696    let drain_key = format!("celers:worker:{worker_id}:draining");
3697    let timestamp = Utc::now().to_rfc3339();
3698    redis::cmd("SET")
3699        .arg(&drain_key)
3700        .arg(&timestamp)
3701        .query_async::<()>(&mut conn)
3702        .await?;
3703
3704    // Set TTL to 24 hours
3705    redis::cmd("EXPIRE")
3706        .arg(&drain_key)
3707        .arg(86400)
3708        .query_async::<()>(&mut conn)
3709        .await?;
3710
3711    println!(
3712        "{}",
3713        format!("✓ Worker '{worker_id}' is now draining")
3714            .green()
3715            .bold()
3716    );
3717    println!();
3718    println!("The worker will:");
3719    println!("  • Stop accepting new tasks");
3720    println!("  • Complete currently running tasks");
3721    println!("  • Shut down automatically when all tasks complete");
3722    println!();
3723    println!("To resume normal operation:");
3724    println!("  celers worker-mgmt resume {worker_id}");
3725
3726    Ok(())
3727}
3728
3729/// Show execution history for a schedule
3730pub async fn schedule_history(broker_url: &str, name: &str, limit: usize) -> anyhow::Result<()> {
3731    let client = redis::Client::open(broker_url)?;
3732    let mut conn = client.get_multiplexed_async_connection().await?;
3733
3734    println!(
3735        "{}",
3736        format!("=== Schedule History: {name} ===").bold().cyan()
3737    );
3738    println!();
3739
3740    // Get history from Redis sorted set
3741    let history_key = format!("celers:schedule:{name}:history");
3742    let entries: Vec<String> = redis::cmd("ZREVRANGE")
3743        .arg(&history_key)
3744        .arg(0)
3745        .arg(limit as isize - 1)
3746        .query_async(&mut conn)
3747        .await?;
3748
3749    if entries.is_empty() {
3750        println!("{}", "No execution history found".yellow());
3751        println!();
3752        println!("History is recorded when tasks are triggered by the beat scheduler.");
3753        return Ok(());
3754    }
3755
3756    #[derive(Tabled)]
3757    struct HistoryEntry {
3758        #[tabled(rename = "#")]
3759        index: String,
3760        #[tabled(rename = "Timestamp")]
3761        timestamp: String,
3762        #[tabled(rename = "Status")]
3763        status: String,
3764        #[tabled(rename = "Task ID")]
3765        task_id: String,
3766    }
3767
3768    let mut history_entries = Vec::new();
3769    for (idx, entry) in entries.iter().enumerate() {
3770        if let Ok(data) = serde_json::from_str::<serde_json::Value>(entry) {
3771            let timestamp = data
3772                .get("timestamp")
3773                .and_then(|v| v.as_str())
3774                .unwrap_or("N/A")
3775                .to_string();
3776            let status = data
3777                .get("status")
3778                .and_then(|v| v.as_str())
3779                .unwrap_or("unknown")
3780                .to_string();
3781            let task_id = data
3782                .get("task_id")
3783                .and_then(|v| v.as_str())
3784                .unwrap_or("N/A")
3785                .to_string();
3786
3787            history_entries.push(HistoryEntry {
3788                index: (idx + 1).to_string(),
3789                timestamp,
3790                status,
3791                task_id,
3792            });
3793        }
3794    }
3795
3796    let entry_count = history_entries.len();
3797    let table = Table::new(history_entries)
3798        .with(Style::rounded())
3799        .to_string();
3800    println!("{table}");
3801    println!();
3802    println!(
3803        "Showing {} of {} entries",
3804        entry_count.to_string().yellow(),
3805        entries.len()
3806    );
3807
3808    Ok(())
3809}
3810
3811/// Debug task execution details
3812pub async fn debug_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
3813    let task_id = task_id_str
3814        .parse::<uuid::Uuid>()
3815        .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
3816
3817    println!("{}", format!("=== Debug Task: {task_id} ===").bold().cyan());
3818    println!();
3819
3820    let client = redis::Client::open(broker_url)?;
3821    let mut conn = client.get_multiplexed_async_connection().await?;
3822
3823    // Get task logs
3824    let logs_key = format!("celers:task:{task_id}:logs");
3825    let logs: Vec<String> = redis::cmd("LRANGE")
3826        .arg(&logs_key)
3827        .arg(0)
3828        .arg(-1)
3829        .query_async(&mut conn)
3830        .await?;
3831
3832    if logs.is_empty() {
3833        println!("{}", "No debug logs found for this task".yellow());
3834    } else {
3835        println!("{}", "Task Logs:".green().bold());
3836        println!();
3837        for log in &logs {
3838            if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
3839                let level = log_json
3840                    .get("level")
3841                    .and_then(|v| v.as_str())
3842                    .unwrap_or("INFO");
3843                let message = log_json
3844                    .get("message")
3845                    .and_then(|v| v.as_str())
3846                    .unwrap_or(log);
3847                let timestamp = log_json
3848                    .get("timestamp")
3849                    .and_then(|v| v.as_str())
3850                    .unwrap_or("");
3851
3852                let level_colored = match level {
3853                    "ERROR" | "error" => level.red(),
3854                    "WARN" | "warn" => level.yellow(),
3855                    "DEBUG" | "debug" => level.cyan(),
3856                    _ => level.normal(),
3857                };
3858
3859                println!("[{}] {} {}", timestamp.dimmed(), level_colored, message);
3860            } else {
3861                println!("{log}");
3862            }
3863        }
3864        println!();
3865    }
3866
3867    // Get task metadata
3868    let metadata_key = format!("celers:task:{task_id}:metadata");
3869    let metadata: Option<String> = redis::cmd("GET")
3870        .arg(&metadata_key)
3871        .query_async(&mut conn)
3872        .await?;
3873
3874    if let Some(meta_str) = metadata {
3875        println!("{}", "Task Metadata:".green().bold());
3876        println!();
3877        if let Ok(meta_json) = serde_json::from_str::<serde_json::Value>(&meta_str) {
3878            println!("{}", serde_json::to_string_pretty(&meta_json)?);
3879        } else {
3880            println!("{meta_str}");
3881        }
3882        println!();
3883    }
3884
3885    // Get task state from queue
3886    inspect_task(broker_url, queue, task_id_str).await?;
3887
3888    Ok(())
3889}
3890
3891/// Debug worker issues
3892pub async fn debug_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
3893    let client = redis::Client::open(broker_url)?;
3894    let mut conn = client.get_multiplexed_async_connection().await?;
3895
3896    println!(
3897        "{}",
3898        format!("=== Debug Worker: {worker_id} ===").bold().cyan()
3899    );
3900    println!();
3901
3902    // Get worker heartbeat
3903    let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
3904    let heartbeat: Option<String> = redis::cmd("GET")
3905        .arg(&heartbeat_key)
3906        .query_async(&mut conn)
3907        .await?;
3908
3909    if heartbeat.is_none() {
3910        println!("{}", format!("✗ Worker '{worker_id}' not found").red());
3911        println!();
3912        println!("Possible causes:");
3913        println!("  • Worker is not running");
3914        println!("  • Worker ID is incorrect");
3915        println!("  • Heartbeat expired (worker crashed)");
3916        return Ok(());
3917    }
3918
3919    println!("{}", "Worker Status: Active".green().bold());
3920    if let Some(hb) = heartbeat {
3921        println!("Last heartbeat: {}", hb.yellow());
3922    }
3923    println!();
3924
3925    // Check worker stats
3926    let stats_key = format!("celers:worker:{worker_id}:stats");
3927    let stats: Option<String> = redis::cmd("GET")
3928        .arg(&stats_key)
3929        .query_async(&mut conn)
3930        .await?;
3931
3932    if let Some(stats_str) = stats {
3933        println!("{}", "Worker Statistics:".green().bold());
3934        if let Ok(stats_json) = serde_json::from_str::<serde_json::Value>(&stats_str) {
3935            println!("{}", serde_json::to_string_pretty(&stats_json)?);
3936        } else {
3937            println!("{stats_str}");
3938        }
3939        println!();
3940    }
3941
3942    // Check for pause status
3943    let pause_key = format!("celers:worker:{worker_id}:paused");
3944    let paused: bool = redis::cmd("EXISTS")
3945        .arg(&pause_key)
3946        .query_async(&mut conn)
3947        .await?;
3948
3949    if paused {
3950        println!("{}", "⚠ Worker is PAUSED".yellow().bold());
3951        println!("  Tasks are not being processed");
3952        println!();
3953    }
3954
3955    // Check for drain status
3956    let drain_key = format!("celers:worker:{worker_id}:draining");
3957    let draining: bool = redis::cmd("EXISTS")
3958        .arg(&drain_key)
3959        .query_async(&mut conn)
3960        .await?;
3961
3962    if draining {
3963        println!("{}", "⚠ Worker is DRAINING".yellow().bold());
3964        println!("  Not accepting new tasks");
3965        println!();
3966    }
3967
3968    // Get worker logs
3969    let logs_key = format!("celers:worker:{worker_id}:logs");
3970    let logs: Vec<String> = redis::cmd("LRANGE")
3971        .arg(&logs_key)
3972        .arg(-20)
3973        .arg(-1)
3974        .query_async(&mut conn)
3975        .await?;
3976
3977    if !logs.is_empty() {
3978        println!("{}", "Recent Worker Logs (last 20):".green().bold());
3979        println!();
3980        for log in logs {
3981            println!("{log}");
3982        }
3983    }
3984
3985    Ok(())
3986}
3987
3988/// Generate daily execution report
3989pub async fn report_daily(broker_url: &str, queue: &str) -> anyhow::Result<()> {
3990    let client = redis::Client::open(broker_url)?;
3991    let mut conn = client.get_multiplexed_async_connection().await?;
3992
3993    println!("{}", "=== Daily Execution Report ===".bold().cyan());
3994    println!();
3995
3996    let now = Utc::now();
3997    let today_key = format!("celers:metrics:{}:daily:{}", queue, now.format("%Y-%m-%d"));
3998
3999    // Get daily metrics
4000    let metrics: Option<String> = redis::cmd("GET")
4001        .arg(&today_key)
4002        .query_async(&mut conn)
4003        .await?;
4004
4005    if let Some(metrics_str) = metrics {
4006        if let Ok(metrics_json) = serde_json::from_str::<serde_json::Value>(&metrics_str) {
4007            println!("{}", format!("Date: {}", now.format("%Y-%m-%d")).yellow());
4008            println!();
4009
4010            #[derive(Tabled)]
4011            struct DailyMetric {
4012                #[tabled(rename = "Metric")]
4013                metric: String,
4014                #[tabled(rename = "Count")]
4015                count: String,
4016            }
4017
4018            let mut daily_metrics = vec![];
4019
4020            if let Some(total) = metrics_json
4021                .get("total_tasks")
4022                .and_then(serde_json::Value::as_u64)
4023            {
4024                daily_metrics.push(DailyMetric {
4025                    metric: "Total Tasks".to_string(),
4026                    count: total.to_string(),
4027                });
4028            }
4029
4030            if let Some(succeeded) = metrics_json
4031                .get("succeeded")
4032                .and_then(serde_json::Value::as_u64)
4033            {
4034                daily_metrics.push(DailyMetric {
4035                    metric: "Succeeded".to_string(),
4036                    count: succeeded.to_string(),
4037                });
4038            }
4039
4040            if let Some(failed) = metrics_json
4041                .get("failed")
4042                .and_then(serde_json::Value::as_u64)
4043            {
4044                daily_metrics.push(DailyMetric {
4045                    metric: "Failed".to_string(),
4046                    count: failed.to_string(),
4047                });
4048            }
4049
4050            if let Some(retried) = metrics_json
4051                .get("retried")
4052                .and_then(serde_json::Value::as_u64)
4053            {
4054                daily_metrics.push(DailyMetric {
4055                    metric: "Retried".to_string(),
4056                    count: retried.to_string(),
4057                });
4058            }
4059
4060            if let Some(avg_time) = metrics_json
4061                .get("avg_execution_time")
4062                .and_then(serde_json::Value::as_f64)
4063            {
4064                daily_metrics.push(DailyMetric {
4065                    metric: "Avg Execution Time".to_string(),
4066                    count: format!("{avg_time:.2}s"),
4067                });
4068            }
4069
4070            let table = Table::new(daily_metrics).with(Style::rounded()).to_string();
4071            println!("{table}");
4072        }
4073    } else {
4074        println!("{}", "No metrics available for today".yellow());
4075        println!();
4076        println!("Metrics are collected automatically when tasks are executed.");
4077        println!("Ensure workers are running and processing tasks.");
4078    }
4079
4080    Ok(())
4081}
4082
4083/// Generate weekly statistics report
4084pub async fn report_weekly(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4085    let client = redis::Client::open(broker_url)?;
4086    let mut conn = client.get_multiplexed_async_connection().await?;
4087
4088    println!("{}", "=== Weekly Statistics Report ===".bold().cyan());
4089    println!();
4090
4091    let now = Utc::now();
4092    let week_start = now - chrono::Duration::days(7);
4093
4094    println!(
4095        "{}",
4096        format!(
4097            "Period: {} to {}",
4098            week_start.format("%Y-%m-%d"),
4099            now.format("%Y-%m-%d")
4100        )
4101        .yellow()
4102    );
4103    println!();
4104
4105    let mut total_tasks = 0u64;
4106    let mut total_succeeded = 0u64;
4107    let mut total_failed = 0u64;
4108    let mut total_retried = 0u64;
4109
4110    // Aggregate daily metrics for the week
4111    for day_offset in 0..7 {
4112        let day = now - chrono::Duration::days(day_offset);
4113        let day_key = format!("celers:metrics:{}:daily:{}", queue, day.format("%Y-%m-%d"));
4114
4115        let metrics: Option<String> = redis::cmd("GET")
4116            .arg(&day_key)
4117            .query_async(&mut conn)
4118            .await?;
4119
4120        if let Some(metrics_str) = metrics {
4121            if let Ok(metrics_json) = serde_json::from_str::<serde_json::Value>(&metrics_str) {
4122                if let Some(tasks) = metrics_json
4123                    .get("total_tasks")
4124                    .and_then(serde_json::Value::as_u64)
4125                {
4126                    total_tasks += tasks;
4127                }
4128                if let Some(succeeded) = metrics_json
4129                    .get("succeeded")
4130                    .and_then(serde_json::Value::as_u64)
4131                {
4132                    total_succeeded += succeeded;
4133                }
4134                if let Some(failed) = metrics_json
4135                    .get("failed")
4136                    .and_then(serde_json::Value::as_u64)
4137                {
4138                    total_failed += failed;
4139                }
4140                if let Some(retried) = metrics_json
4141                    .get("retried")
4142                    .and_then(serde_json::Value::as_u64)
4143                {
4144                    total_retried += retried;
4145                }
4146            }
4147        }
4148    }
4149
4150    #[derive(Tabled)]
4151    struct WeeklyMetric {
4152        #[tabled(rename = "Metric")]
4153        metric: String,
4154        #[tabled(rename = "Count")]
4155        count: String,
4156        #[tabled(rename = "Percentage")]
4157        percentage: String,
4158    }
4159
4160    let success_rate = if total_tasks > 0 {
4161        (total_succeeded as f64 / total_tasks as f64) * 100.0
4162    } else {
4163        0.0
4164    };
4165
4166    let failure_rate = if total_tasks > 0 {
4167        (total_failed as f64 / total_tasks as f64) * 100.0
4168    } else {
4169        0.0
4170    };
4171
4172    let weekly_metrics = vec![
4173        WeeklyMetric {
4174            metric: "Total Tasks".to_string(),
4175            count: total_tasks.to_string(),
4176            percentage: "100%".to_string(),
4177        },
4178        WeeklyMetric {
4179            metric: "Succeeded".to_string(),
4180            count: total_succeeded.to_string(),
4181            percentage: format!("{success_rate:.1}%"),
4182        },
4183        WeeklyMetric {
4184            metric: "Failed".to_string(),
4185            count: total_failed.to_string(),
4186            percentage: format!("{failure_rate:.1}%"),
4187        },
4188        WeeklyMetric {
4189            metric: "Retried".to_string(),
4190            count: total_retried.to_string(),
4191            percentage: "-".to_string(),
4192        },
4193    ];
4194
4195    let table = Table::new(weekly_metrics)
4196        .with(Style::rounded())
4197        .to_string();
4198    println!("{table}");
4199
4200    if total_tasks == 0 {
4201        println!();
4202        println!("{}", "No tasks processed this week".yellow());
4203    }
4204
4205    Ok(())
4206}
4207
4208/// Analyze performance bottlenecks
4209pub async fn analyze_bottlenecks(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4210    let client = redis::Client::open(broker_url)?;
4211    let mut conn = client.get_multiplexed_async_connection().await?;
4212
4213    println!(
4214        "{}",
4215        "=== Performance Bottleneck Analysis ===".bold().cyan()
4216    );
4217    println!();
4218
4219    // Check queue depth
4220    let queue_key = format!("celers:{queue}");
4221    let queue_type: String = redis::cmd("TYPE")
4222        .arg(&queue_key)
4223        .query_async(&mut conn)
4224        .await?;
4225
4226    let queue_size: isize = match queue_type.as_str() {
4227        "list" => {
4228            redis::cmd("LLEN")
4229                .arg(&queue_key)
4230                .query_async(&mut conn)
4231                .await?
4232        }
4233        "zset" => {
4234            redis::cmd("ZCARD")
4235                .arg(&queue_key)
4236                .query_async(&mut conn)
4237                .await?
4238        }
4239        _ => 0,
4240    };
4241
4242    // Check worker count
4243    let worker_keys: Vec<String> = redis::cmd("KEYS")
4244        .arg("celers:worker:*:heartbeat")
4245        .query_async(&mut conn)
4246        .await?;
4247    let worker_count = worker_keys.len();
4248
4249    // Check DLQ size
4250    let dlq_key = format!("celers:{queue}:dlq");
4251    let dlq_size: isize = redis::cmd("LLEN")
4252        .arg(&dlq_key)
4253        .query_async(&mut conn)
4254        .await?;
4255
4256    println!("{}", "System Overview:".green().bold());
4257    println!("  Queue Depth: {}", queue_size.to_string().yellow());
4258    println!("  Active Workers: {}", worker_count.to_string().yellow());
4259    println!("  DLQ Size: {}", dlq_size.to_string().yellow());
4260    println!();
4261
4262    let mut bottlenecks = Vec::new();
4263
4264    // Analyze bottlenecks
4265    if queue_size > 1000 {
4266        bottlenecks.push("High queue depth - consider scaling up workers");
4267    }
4268
4269    if worker_count == 0 && queue_size > 0 {
4270        bottlenecks.push("No active workers - tasks are not being processed");
4271    }
4272
4273    if worker_count > 0 && queue_size > (worker_count * 100) as isize {
4274        bottlenecks.push("Queue depth is very high relative to worker count");
4275    }
4276
4277    if dlq_size > 100 {
4278        bottlenecks.push("High DLQ size - many tasks are failing");
4279    }
4280
4281    if bottlenecks.is_empty() {
4282        println!("{}", "✓ No significant bottlenecks detected".green());
4283    } else {
4284        println!("{}", "⚠ Bottlenecks Detected:".yellow().bold());
4285        println!();
4286        for (idx, bottleneck) in bottlenecks.iter().enumerate() {
4287            println!("  {}. {}", idx + 1, bottleneck);
4288        }
4289        println!();
4290
4291        println!("{}", "Recommendations:".cyan().bold());
4292        println!();
4293        if queue_size > 1000 {
4294            println!(
4295                "  • Scale up workers: celers worker-mgmt scale {}",
4296                worker_count * 2
4297            );
4298        }
4299        if worker_count == 0 {
4300            println!("  • Start workers: celers worker --broker {broker_url}");
4301        }
4302        if dlq_size > 100 {
4303            println!("  • Investigate failed tasks: celers dlq inspect");
4304            println!("  • Check task implementations for errors");
4305        }
4306    }
4307
4308    Ok(())
4309}
4310
4311/// Analyze failure patterns
4312pub async fn analyze_failures(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4313    let broker = RedisBroker::new(broker_url, queue)?;
4314
4315    println!("{}", "=== Failure Pattern Analysis ===".bold().cyan());
4316    println!();
4317
4318    let dlq_size = broker.dlq_size().await?;
4319    println!("Total failed tasks: {}", dlq_size.to_string().yellow());
4320
4321    if dlq_size == 0 {
4322        println!("{}", "✓ No failed tasks to analyze".green());
4323        return Ok(());
4324    }
4325
4326    println!();
4327
4328    // Get failed tasks
4329    let tasks = broker.inspect_dlq(100).await?;
4330
4331    let mut task_name_failures: std::collections::HashMap<String, usize> =
4332        std::collections::HashMap::new();
4333
4334    for task in &tasks {
4335        // Count by task name
4336        *task_name_failures
4337            .entry(task.metadata.name.clone())
4338            .or_insert(0) += 1;
4339    }
4340
4341    println!("{}", "Failures by Task Type:".green().bold());
4342    println!();
4343
4344    #[derive(Tabled)]
4345    struct FailureCount {
4346        #[tabled(rename = "Task Name")]
4347        task_name: String,
4348        #[tabled(rename = "Failures")]
4349        count: String,
4350    }
4351
4352    let mut task_failures: Vec<FailureCount> = task_name_failures
4353        .into_iter()
4354        .map(|(name, count)| FailureCount {
4355            task_name: name,
4356            count: count.to_string(),
4357        })
4358        .collect();
4359    task_failures.sort_by(|a, b| {
4360        b.count
4361            .parse::<usize>()
4362            .unwrap_or(0)
4363            .cmp(&a.count.parse::<usize>().unwrap_or(0))
4364    });
4365
4366    let table = Table::new(task_failures.iter().take(10))
4367        .with(Style::rounded())
4368        .to_string();
4369    println!("{table}");
4370    println!();
4371    println!("{}", "Recommendations:".cyan().bold());
4372    println!();
4373    println!("  • Review task implementations for the most failing tasks");
4374    println!("  • Check error logs: celers task logs <task-id>");
4375    println!("  • Consider increasing retry limits for transient failures");
4376    println!("  • Replay fixed tasks: celers dlq replay <task-id>");
4377
4378    Ok(())
4379}
4380
4381/// Stream worker logs with optional filtering and follow mode
4382pub async fn worker_logs(
4383    broker_url: &str,
4384    worker_id: &str,
4385    level_filter: Option<&str>,
4386    follow: bool,
4387    initial_lines: usize,
4388) -> anyhow::Result<()> {
4389    let client = redis::Client::open(broker_url)?;
4390    let mut conn = client.get_multiplexed_async_connection().await?;
4391
4392    println!(
4393        "{}",
4394        format!("=== Worker Logs: {worker_id} ===").bold().cyan()
4395    );
4396    println!();
4397
4398    // Check if worker exists
4399    let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
4400    let exists: bool = redis::cmd("EXISTS")
4401        .arg(&heartbeat_key)
4402        .query_async(&mut conn)
4403        .await?;
4404
4405    if !exists {
4406        println!("{}", format!("✗ Worker '{worker_id}' not found").red());
4407        return Ok(());
4408    }
4409
4410    let logs_key = format!("celers:worker:{worker_id}:logs");
4411
4412    // Get initial logs
4413    let logs: Vec<String> = redis::cmd("LRANGE")
4414        .arg(&logs_key)
4415        .arg(-(initial_lines as isize))
4416        .arg(-1)
4417        .query_async(&mut conn)
4418        .await?;
4419
4420    // Display initial logs with filtering
4421    for log in &logs {
4422        display_log_line(log, level_filter);
4423    }
4424
4425    if !follow {
4426        return Ok(());
4427    }
4428
4429    println!();
4430    println!("{}", "=== Following logs (Ctrl+C to stop) ===".dimmed());
4431    println!();
4432
4433    // Follow mode - use Redis polling
4434    let mut last_length: isize = redis::cmd("LLEN")
4435        .arg(&logs_key)
4436        .query_async(&mut conn)
4437        .await?;
4438
4439    loop {
4440        // Sleep briefly
4441        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4442
4443        // Check current length
4444        let current_length: isize = redis::cmd("LLEN")
4445            .arg(&logs_key)
4446            .query_async(&mut conn)
4447            .await?;
4448
4449        if current_length > last_length {
4450            // New logs available
4451            let new_logs: Vec<String> = redis::cmd("LRANGE")
4452                .arg(&logs_key)
4453                .arg(last_length)
4454                .arg(-1)
4455                .query_async(&mut conn)
4456                .await?;
4457
4458            for log in &new_logs {
4459                display_log_line(log, level_filter);
4460            }
4461
4462            last_length = current_length;
4463        }
4464
4465        // Check if worker is still alive
4466        let still_exists: bool = redis::cmd("EXISTS")
4467            .arg(&heartbeat_key)
4468            .query_async(&mut conn)
4469            .await?;
4470
4471        if !still_exists {
4472            println!();
4473            println!("{}", "Worker has stopped".yellow());
4474            break;
4475        }
4476    }
4477
4478    Ok(())
4479}
4480
4481/// Helper function to display a log line with optional filtering
4482#[allow(dead_code)]
4483fn display_log_line(log: &str, level_filter: Option<&str>) {
4484    if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
4485        let level = log_json
4486            .get("level")
4487            .and_then(|v| v.as_str())
4488            .unwrap_or("INFO");
4489        let message = log_json
4490            .get("message")
4491            .and_then(|v| v.as_str())
4492            .unwrap_or(log);
4493        let timestamp = log_json
4494            .get("timestamp")
4495            .and_then(|v| v.as_str())
4496            .unwrap_or("");
4497
4498        // Filter by level if specified
4499        if let Some(filter) = level_filter {
4500            if !level.eq_ignore_ascii_case(filter) {
4501                return;
4502            }
4503        }
4504
4505        let level_colored = match level.to_uppercase().as_str() {
4506            "ERROR" => level.red(),
4507            "WARN" => level.yellow(),
4508            "DEBUG" => level.cyan(),
4509            _ => level.normal(),
4510        };
4511
4512        println!("[{}] {} {}", timestamp.dimmed(), level_colored, message);
4513    } else {
4514        // Non-JSON log, just print it
4515        if level_filter.is_none() {
4516            println!("{log}");
4517        }
4518    }
4519}
4520
4521/// Start auto-scaling service
4522pub async fn autoscale_start(
4523    broker_url: &str,
4524    queue: &str,
4525    autoscale_config: Option<crate::config::AutoScaleConfig>,
4526) -> anyhow::Result<()> {
4527    println!("{}", "=== Auto-Scaling Service ===".bold().green());
4528    println!();
4529
4530    let config = match autoscale_config {
4531        Some(cfg) if cfg.enabled => cfg,
4532        Some(_) => {
4533            println!(
4534                "{}",
4535                "⚠️  Auto-scaling is disabled in configuration".yellow()
4536            );
4537            return Ok(());
4538        }
4539        None => {
4540            println!("{}", "⚠️  No auto-scaling configuration found".yellow());
4541            println!("Add [autoscale] section to your celers.toml");
4542            return Ok(());
4543        }
4544    };
4545
4546    println!("Configuration:");
4547    println!("  Min workers: {}", config.min_workers.to_string().cyan());
4548    println!("  Max workers: {}", config.max_workers.to_string().cyan());
4549    println!(
4550        "  Scale up threshold: {}",
4551        config.scale_up_threshold.to_string().cyan()
4552    );
4553    println!(
4554        "  Scale down threshold: {}",
4555        config.scale_down_threshold.to_string().cyan()
4556    );
4557    println!(
4558        "  Check interval: {}s",
4559        config.check_interval_secs.to_string().cyan()
4560    );
4561    println!();
4562
4563    let broker = RedisBroker::new(broker_url, queue)?;
4564    println!("{}", "✓ Connected to broker".green());
4565    println!();
4566    println!("{}", "Starting auto-scaling monitor...".green().bold());
4567    println!("{}", "  Press Ctrl+C to stop".dimmed());
4568    println!();
4569
4570    loop {
4571        tokio::time::sleep(tokio::time::Duration::from_secs(config.check_interval_secs)).await;
4572
4573        // Get current queue size
4574        let queue_size = broker.queue_size().await?;
4575
4576        // Get current worker count
4577        let client = redis::Client::open(broker_url)?;
4578        let mut conn = client.get_multiplexed_async_connection().await?;
4579        let worker_keys: Vec<String> = redis::cmd("KEYS")
4580            .arg("celers:worker:*:heartbeat")
4581            .query_async(&mut conn)
4582            .await?;
4583        let current_workers = worker_keys.len();
4584
4585        println!(
4586            "[{}] Queue: {}, Workers: {}",
4587            Utc::now().format("%H:%M:%S").to_string().dimmed(),
4588            queue_size.to_string().yellow(),
4589            current_workers.to_string().cyan()
4590        );
4591
4592        // Determine scaling action
4593        if queue_size > config.scale_up_threshold && current_workers < config.max_workers {
4594            let needed = config.max_workers.min(current_workers + 1);
4595            println!(
4596                "  {} Scale up recommended: {} -> {}",
4597                "↑".green().bold(),
4598                current_workers,
4599                needed
4600            );
4601        } else if queue_size < config.scale_down_threshold && current_workers > config.min_workers {
4602            let target = config.min_workers.max(current_workers.saturating_sub(1));
4603            println!(
4604                "  {} Scale down possible: {} -> {}",
4605                "↓".yellow().bold(),
4606                current_workers,
4607                target
4608            );
4609        }
4610    }
4611}
4612
4613/// Show auto-scaling status
4614pub async fn autoscale_status(
4615    broker_url: &str,
4616    autoscale_config: Option<crate::config::AutoScaleConfig>,
4617) -> anyhow::Result<()> {
4618    println!("{}", "=== Auto-Scaling Status ===".bold().cyan());
4619    println!();
4620
4621    if let Some(cfg) = autoscale_config {
4622        println!(
4623            "Status: {}",
4624            if cfg.enabled {
4625                "Enabled".green()
4626            } else {
4627                "Disabled".red()
4628            }
4629        );
4630        println!();
4631        println!("Configuration:");
4632        println!("  Min workers: {}", cfg.min_workers);
4633        println!("  Max workers: {}", cfg.max_workers);
4634        println!("  Scale up threshold: {}", cfg.scale_up_threshold);
4635        println!("  Scale down threshold: {}", cfg.scale_down_threshold);
4636        println!("  Check interval: {}s", cfg.check_interval_secs);
4637        println!();
4638
4639        // Get current metrics
4640        let client = redis::Client::open(broker_url)?;
4641        let mut conn = client.get_multiplexed_async_connection().await?;
4642        let worker_keys: Vec<String> = redis::cmd("KEYS")
4643            .arg("celers:worker:*:heartbeat")
4644            .query_async(&mut conn)
4645            .await?;
4646
4647        println!("Current State:");
4648        println!("  Active workers: {}", worker_keys.len().to_string().cyan());
4649    } else {
4650        println!("{}", "Auto-scaling is not configured".yellow());
4651        println!("Add [autoscale] section to your celers.toml");
4652    }
4653
4654    Ok(())
4655}
4656
4657/// Start alert monitoring service
4658pub async fn alert_start(
4659    broker_url: &str,
4660    queue: &str,
4661    alert_config: Option<crate::config::AlertConfig>,
4662) -> anyhow::Result<()> {
4663    println!("{}", "=== Alert Monitoring Service ===".bold().green());
4664    println!();
4665
4666    let config = match alert_config {
4667        Some(cfg) if cfg.enabled => cfg,
4668        Some(_) => {
4669            println!(
4670                "{}",
4671                "⚠️  Alert monitoring is disabled in configuration".yellow()
4672            );
4673            return Ok(());
4674        }
4675        None => {
4676            println!("{}", "⚠️  No alert configuration found".yellow());
4677            println!("Add [alerts] section to your celers.toml");
4678            return Ok(());
4679        }
4680    };
4681
4682    if config.webhook_url.is_none() {
4683        println!("{}", "⚠️  No webhook URL configured".yellow());
4684        return Ok(());
4685    }
4686
4687    println!("Configuration:");
4688    println!(
4689        "  Webhook URL: {}",
4690        config
4691            .webhook_url
4692            .as_ref()
4693            .expect("webhook_url validated to be Some")
4694            .cyan()
4695    );
4696    println!(
4697        "  DLQ threshold: {}",
4698        config.dlq_threshold.to_string().cyan()
4699    );
4700    println!(
4701        "  Failed threshold: {}",
4702        config.failed_threshold.to_string().cyan()
4703    );
4704    println!(
4705        "  Check interval: {}s",
4706        config.check_interval_secs.to_string().cyan()
4707    );
4708    println!();
4709
4710    let broker = RedisBroker::new(broker_url, queue)?;
4711    println!("{}", "✓ Connected to broker".green());
4712    println!();
4713    println!("{}", "Starting alert monitor...".green().bold());
4714    println!("{}", "  Press Ctrl+C to stop".dimmed());
4715    println!();
4716
4717    let webhook_url = config
4718        .webhook_url
4719        .expect("webhook_url validated to be Some");
4720
4721    loop {
4722        tokio::time::sleep(tokio::time::Duration::from_secs(config.check_interval_secs)).await;
4723
4724        // Check DLQ size
4725        let dlq_size = broker.dlq_size().await?;
4726
4727        println!(
4728            "[{}] DLQ size: {}",
4729            Utc::now().format("%H:%M:%S").to_string().dimmed(),
4730            dlq_size.to_string().yellow()
4731        );
4732
4733        // Send alert if threshold exceeded
4734        if dlq_size > config.dlq_threshold {
4735            let message = format!(
4736                "⚠️ DLQ size ({}) exceeded threshold ({})",
4737                dlq_size, config.dlq_threshold
4738            );
4739            println!("  {} Sending alert...", "!".red().bold());
4740
4741            if let Err(e) = send_webhook_alert(&webhook_url, &message).await {
4742                println!("  {} Failed to send alert: {}", "✗".red(), e);
4743            } else {
4744                println!("  {} Alert sent", "✓".green());
4745            }
4746        }
4747    }
4748}
4749
4750/// Test webhook notification
4751pub async fn alert_test(webhook_url: &str, message: &str) -> anyhow::Result<()> {
4752    println!("{}", "=== Testing Webhook ===".bold().cyan());
4753    println!();
4754    println!("Webhook URL: {}", webhook_url.cyan());
4755    println!("Message: {}", message.yellow());
4756    println!();
4757
4758    println!("Sending test notification...");
4759    send_webhook_alert(webhook_url, message).await?;
4760
4761    println!("{}", "✓ Test notification sent successfully".green());
4762
4763    Ok(())
4764}
4765
4766/// Helper function to send webhook alert
4767async fn send_webhook_alert(webhook_url: &str, message: &str) -> anyhow::Result<()> {
4768    let client = reqwest::Client::new();
4769    let payload = serde_json::json!({
4770        "text": message,
4771        "timestamp": Utc::now().to_rfc3339(),
4772    });
4773
4774    let response = client.post(webhook_url).json(&payload).send().await?;
4775
4776    if !response.status().is_success() {
4777        anyhow::bail!("Webhook request failed with status: {}", response.status());
4778    }
4779
4780    Ok(())
4781}
4782
4783/// Test database connection
4784pub async fn db_test_connection(url: &str, benchmark: bool) -> anyhow::Result<()> {
4785    println!("{}", "=== Database Connection Test ===".bold().cyan());
4786    println!();
4787    println!("Database URL: {}", mask_password(url).cyan());
4788    println!();
4789
4790    // Determine database type from URL
4791    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
4792        "PostgreSQL"
4793    } else if url.starts_with("mysql://") {
4794        "MySQL"
4795    } else {
4796        "Unknown"
4797    };
4798
4799    println!("Database type: {}", db_type.yellow());
4800    println!();
4801
4802    // Test connection
4803    println!("Testing connection...");
4804    let start = std::time::Instant::now();
4805
4806    // For PostgreSQL
4807    if db_type == "PostgreSQL" {
4808        match test_postgres_connection(url).await {
4809            Ok(version) => {
4810                let duration = start.elapsed();
4811                println!("{}", "✓ Connection successful".green());
4812                println!("  Version: {}", version.cyan());
4813                println!("  Latency: {}ms", duration.as_millis().to_string().yellow());
4814            }
4815            Err(e) => {
4816                println!("{}", "✗ Connection failed".red());
4817                println!("  Error: {e}");
4818                return Err(e);
4819            }
4820        }
4821    } else if db_type == "MySQL" {
4822        println!("{}", "⚠️  MySQL support not yet implemented".yellow());
4823        return Ok(());
4824    } else {
4825        println!("{}", "⚠️  Unknown database type".yellow());
4826        return Ok(());
4827    }
4828
4829    // Run benchmark if requested
4830    if benchmark {
4831        println!();
4832        println!("{}", "Running latency benchmark...".bold());
4833        println!();
4834
4835        let mut latencies = Vec::new();
4836        for i in 1..=10 {
4837            let start = std::time::Instant::now();
4838            if let Err(e) = test_postgres_connection(url).await {
4839                println!("  {} Query {} failed: {}", "✗".red(), i, e);
4840                continue;
4841            }
4842            let duration = start.elapsed();
4843            latencies.push(duration.as_millis());
4844            println!("  {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
4845        }
4846
4847        if !latencies.is_empty() {
4848            let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
4849            let min = latencies
4850                .iter()
4851                .min()
4852                .expect("collection validated to be non-empty");
4853            let max = latencies
4854                .iter()
4855                .max()
4856                .expect("collection validated to be non-empty");
4857
4858            println!();
4859            println!("Benchmark results:");
4860            println!("  Average: {}ms", avg.to_string().cyan());
4861            println!("  Min: {}ms", min.to_string().green());
4862            println!("  Max: {}ms", max.to_string().yellow());
4863        }
4864    }
4865
4866    Ok(())
4867}
4868
4869/// Test `PostgreSQL` connection
4870async fn test_postgres_connection(url: &str) -> anyhow::Result<String> {
4871    use celers_broker_postgres::PostgresBroker;
4872
4873    // Create a temporary broker to test connection
4874    let broker = PostgresBroker::new(url).await?;
4875
4876    // Test connection and return a version string
4877    let connected = broker.test_connection().await?;
4878    if connected {
4879        Ok("Connected".to_string())
4880    } else {
4881        anyhow::bail!("Connection test failed")
4882    }
4883}
4884
4885/// Check database health
4886pub async fn db_health(url: &str) -> anyhow::Result<()> {
4887    println!("{}", "=== Database Health Check ===".bold().cyan());
4888    println!();
4889
4890    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
4891        "PostgreSQL"
4892    } else if url.starts_with("mysql://") {
4893        "MySQL"
4894    } else {
4895        "Unknown"
4896    };
4897
4898    println!("Database: {}", db_type.yellow());
4899    println!();
4900
4901    if db_type == "PostgreSQL" {
4902        check_postgres_health(url).await?;
4903    } else {
4904        println!(
4905            "{}",
4906            "⚠️  Health check not supported for this database type".yellow()
4907        );
4908    }
4909
4910    Ok(())
4911}
4912
4913/// Check `PostgreSQL` health
4914async fn check_postgres_health(url: &str) -> anyhow::Result<()> {
4915    use celers_broker_postgres::PostgresBroker;
4916
4917    println!("Checking connection...");
4918    let broker = PostgresBroker::new(url).await?;
4919    println!("{}", "  ✓ Connection OK".green());
4920
4921    println!();
4922    println!("Testing connection with latency measurement...");
4923
4924    // Measure query performance
4925    let mut latencies = Vec::new();
4926    for i in 1..=5 {
4927        let start = std::time::Instant::now();
4928        let connected = broker.test_connection().await?;
4929        let duration = start.elapsed();
4930
4931        if connected {
4932            latencies.push(duration.as_millis());
4933            println!("  {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
4934        } else {
4935            println!("  {} Query {} failed", "✗".red(), i);
4936        }
4937    }
4938
4939    if !latencies.is_empty() {
4940        let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
4941        let min = latencies
4942            .iter()
4943            .min()
4944            .expect("collection validated to be non-empty");
4945        let max = latencies
4946            .iter()
4947            .max()
4948            .expect("collection validated to be non-empty");
4949
4950        println!();
4951        println!("Query Performance:");
4952        println!("  Average: {}ms", avg.to_string().cyan());
4953        println!("  Min: {}ms", min.to_string().green());
4954        println!("  Max: {}ms", max.to_string().yellow());
4955
4956        if avg > 100 {
4957            println!("  {}", "⚠️  High query latency detected".yellow());
4958        } else {
4959            println!("  {}", "✓ Query latency is healthy".green());
4960        }
4961    }
4962
4963    // Check pool metrics
4964    println!();
4965    println!("Connection Pool Status:");
4966    let pool_metrics = broker.get_pool_metrics();
4967    println!(
4968        "  Max Connections: {}",
4969        pool_metrics.max_size.to_string().cyan()
4970    );
4971    println!("  Active: {}", pool_metrics.size.to_string().cyan());
4972    println!("  Idle: {}", pool_metrics.idle.to_string().cyan());
4973    println!("  In-Use: {}", pool_metrics.in_use.to_string().cyan());
4974
4975    let utilization = if pool_metrics.max_size > 0 {
4976        (f64::from(pool_metrics.in_use) / f64::from(pool_metrics.max_size)) * 100.0
4977    } else {
4978        0.0
4979    };
4980
4981    if utilization > 80.0 {
4982        println!(
4983            "  {}",
4984            "⚠️  High pool utilization - consider scaling".yellow()
4985        );
4986    }
4987
4988    println!();
4989    println!("{}", "✓ Database health check completed".green().bold());
4990
4991    Ok(())
4992}
4993
4994/// Show connection pool statistics
4995pub async fn db_pool_stats(url: &str) -> anyhow::Result<()> {
4996    println!("{}", "=== Connection Pool Statistics ===".bold().cyan());
4997    println!();
4998    println!("Database URL: {}", mask_password(url).cyan());
4999    println!();
5000
5001    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
5002        "PostgreSQL"
5003    } else if url.starts_with("mysql://") {
5004        "MySQL"
5005    } else {
5006        "Unknown"
5007    };
5008
5009    if db_type == "PostgreSQL" {
5010        use celers_broker_postgres::PostgresBroker;
5011
5012        println!("Connecting to database...");
5013        let broker = PostgresBroker::new(url).await?;
5014        println!("{}", "  ✓ Connected".green());
5015        println!();
5016
5017        let metrics = broker.get_pool_metrics();
5018
5019        #[derive(Tabled)]
5020        struct PoolStat {
5021            #[tabled(rename = "Metric")]
5022            metric: String,
5023            #[tabled(rename = "Value")]
5024            value: String,
5025        }
5026
5027        let stats = vec![
5028            PoolStat {
5029                metric: "Max Connections".to_string(),
5030                value: metrics.max_size.to_string(),
5031            },
5032            PoolStat {
5033                metric: "Active Connections".to_string(),
5034                value: metrics.size.to_string(),
5035            },
5036            PoolStat {
5037                metric: "Idle Connections".to_string(),
5038                value: metrics.idle.to_string(),
5039            },
5040            PoolStat {
5041                metric: "In-Use Connections".to_string(),
5042                value: metrics.in_use.to_string(),
5043            },
5044            PoolStat {
5045                metric: "Waiting Tasks".to_string(),
5046                value: if metrics.waiting > 0 {
5047                    metrics.waiting.to_string()
5048                } else {
5049                    "0 (estimated)".to_string()
5050                },
5051            },
5052        ];
5053
5054        let table = Table::new(stats).with(Style::rounded()).to_string();
5055        println!("{table}");
5056        println!();
5057
5058        // Pool utilization
5059        let utilization = if metrics.max_size > 0 {
5060            (f64::from(metrics.in_use) / f64::from(metrics.max_size)) * 100.0
5061        } else {
5062            0.0
5063        };
5064
5065        println!("Pool Utilization: {utilization:.1}%");
5066        if utilization > 80.0 {
5067            println!(
5068                "{}",
5069                "⚠️  High pool utilization - consider increasing max_connections".yellow()
5070            );
5071        } else if utilization < 20.0 && metrics.max_size > 10 {
5072            println!(
5073                "{}",
5074                "ℹ️  Low pool utilization - consider reducing max_connections".cyan()
5075            );
5076        } else {
5077            println!("{}", "✓ Pool utilization is healthy".green());
5078        }
5079    } else {
5080        println!(
5081            "{}",
5082            "⚠️  Pool statistics only supported for PostgreSQL".yellow()
5083        );
5084    }
5085
5086    Ok(())
5087}
5088
5089/// Run database migrations
5090pub async fn db_migrate(url: &str, action: &str, steps: usize) -> anyhow::Result<()> {
5091    println!("{}", "=== Database Migrations ===".bold().cyan());
5092    println!();
5093    println!("Database URL: {}", mask_password(url).cyan());
5094    println!("Action: {}", action.yellow());
5095    println!();
5096
5097    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
5098        "PostgreSQL"
5099    } else if url.starts_with("mysql://") {
5100        "MySQL"
5101    } else {
5102        "Unknown"
5103    };
5104
5105    if db_type == "PostgreSQL" {
5106        use celers_broker_postgres::PostgresBroker;
5107
5108        match action.to_lowercase().as_str() {
5109            "apply" => {
5110                println!("Applying migrations...");
5111                let broker = PostgresBroker::new(url).await?;
5112                let _ = broker; // Use broker to ensure it connects
5113
5114                println!("{}", "  ✓ Schema initialized".green());
5115                println!();
5116                println!("{}", "✓ Migrations applied successfully".green().bold());
5117            }
5118            "rollback" => {
5119                println!("Rolling back {steps} migration(s)...");
5120                println!();
5121                println!(
5122                    "{}",
5123                    "⚠️  Manual rollback required - use SQL scripts".yellow()
5124                );
5125                println!("  CeleRS uses auto-migration with SQLx");
5126                println!("  To rollback, restore from database backup");
5127            }
5128            "status" => {
5129                println!("Checking migration status...");
5130                let broker = PostgresBroker::new(url).await?;
5131                let connected = broker.test_connection().await?;
5132
5133                println!();
5134                if connected {
5135                    println!("{}", "  ✓ Database schema is up-to-date".green());
5136                    println!("  Tables: celers_tasks, celers_results");
5137                } else {
5138                    println!("{}", "  ✗ Cannot connect to database".red());
5139                }
5140            }
5141            _ => {
5142                anyhow::bail!("Unknown action '{action}'. Valid actions: apply, rollback, status");
5143            }
5144        }
5145    } else {
5146        println!(
5147            "{}",
5148            "⚠️  Migrations only supported for PostgreSQL".yellow()
5149        );
5150    }
5151
5152    Ok(())
5153}
5154
5155/// Mask password in database URL for display
5156fn mask_password(url: &str) -> String {
5157    if let Some(at_pos) = url.find('@') {
5158        if let Some(colon_pos) = url[..at_pos].rfind(':') {
5159            let before = &url[..=colon_pos];
5160            let after = &url[at_pos..];
5161            return format!("{before}****{after}");
5162        }
5163    }
5164    url.to_string()
5165}
5166
5167/// Run interactive TUI dashboard for real-time monitoring
5168pub async fn run_dashboard(broker_url: &str, queue: &str, refresh_secs: u64) -> anyhow::Result<()> {
5169    use crossterm::{
5170        event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
5171        execute,
5172        terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
5173    };
5174    use ratatui::{
5175        backend::CrosstermBackend,
5176        layout::{Constraint, Direction, Layout},
5177        style::{Color, Modifier, Style},
5178        widgets::{Block, Borders, Gauge, List, ListItem, Paragraph},
5179        Terminal,
5180    };
5181    use std::io;
5182    use std::time::Duration;
5183
5184    let broker = RedisBroker::new(broker_url, queue)?;
5185
5186    // Setup terminal
5187    enable_raw_mode()?;
5188    let mut stdout = io::stdout();
5189    execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
5190    let backend = CrosstermBackend::new(stdout);
5191    let mut terminal = Terminal::new(backend)?;
5192
5193    let refresh_duration = Duration::from_secs(refresh_secs);
5194    let mut last_update = std::time::Instant::now();
5195
5196    let result = loop {
5197        // Fetch stats
5198        let queue_size = broker.queue_size().await.unwrap_or(0);
5199        let dlq_size = broker.dlq_size().await.unwrap_or(0);
5200        let now = Utc::now();
5201
5202        // Get worker list
5203        let mut con = redis::Client::open(broker_url)?.get_connection()?;
5204        let keys: Vec<String> = redis::cmd("KEYS")
5205            .arg("celers:worker:*:heartbeat")
5206            .query(&mut con)?;
5207        let worker_count = keys.len();
5208
5209        // Render UI
5210        terminal.draw(|f| {
5211            let size = f.area();
5212
5213            // Create layout
5214            let chunks = Layout::default()
5215                .direction(Direction::Vertical)
5216                .margin(1)
5217                .constraints([
5218                    Constraint::Length(3),
5219                    Constraint::Length(7),
5220                    Constraint::Length(7),
5221                    Constraint::Min(0),
5222                ])
5223                .split(size);
5224
5225            // Title
5226            let title = Paragraph::new(format!(
5227                "CeleRS Dashboard - Queue: {} | Last Update: {}",
5228                queue,
5229                now.format("%Y-%m-%d %H:%M:%S")
5230            ))
5231            .style(
5232                Style::default()
5233                    .fg(Color::Cyan)
5234                    .add_modifier(Modifier::BOLD),
5235            )
5236            .block(Block::default().borders(Borders::ALL));
5237            f.render_widget(title, chunks[0]);
5238
5239            // Queue stats
5240            let queue_stats = [
5241                format!("Pending Tasks:  {queue_size}"),
5242                format!("DLQ Size:       {dlq_size}"),
5243                format!("Active Workers: {worker_count}"),
5244            ];
5245            let queue_block = Paragraph::new(queue_stats.join("\n"))
5246                .style(Style::default().fg(Color::Green))
5247                .block(
5248                    Block::default()
5249                        .borders(Borders::ALL)
5250                        .title("Queue Statistics"),
5251                );
5252            f.render_widget(queue_block, chunks[1]);
5253
5254            // Queue depth gauge
5255            let max_display = 1000;
5256            let ratio = (queue_size.min(max_display) as f64 / max_display as f64).min(1.0);
5257            let gauge_color = if queue_size > 500 {
5258                Color::Red
5259            } else if queue_size > 100 {
5260                Color::Yellow
5261            } else {
5262                Color::Green
5263            };
5264
5265            let gauge = Gauge::default()
5266                .block(Block::default().borders(Borders::ALL).title("Queue Depth"))
5267                .gauge_style(Style::default().fg(gauge_color))
5268                .ratio(ratio)
5269                .label(format!("{queue_size} / {max_display}"));
5270            f.render_widget(gauge, chunks[2]);
5271
5272            // Status messages
5273            let mut messages = vec![];
5274            if worker_count == 0 && queue_size > 0 {
5275                messages.push(
5276                    ListItem::new("⚠️  No active workers - tasks are not being processed")
5277                        .style(Style::default().fg(Color::Red)),
5278                );
5279            }
5280            if dlq_size > 10 {
5281                messages.push(
5282                    ListItem::new(format!("⚠️  High DLQ size: {dlq_size} failed tasks"))
5283                        .style(Style::default().fg(Color::Yellow)),
5284                );
5285            }
5286            if queue_size > 1000 {
5287                messages.push(
5288                    ListItem::new("⚠️  Queue backlog is high - consider scaling workers")
5289                        .style(Style::default().fg(Color::Yellow)),
5290                );
5291            }
5292            if messages.is_empty() {
5293                messages.push(
5294                    ListItem::new("✓ All systems normal").style(Style::default().fg(Color::Green)),
5295                );
5296            }
5297
5298            messages.push(ListItem::new(""));
5299            messages.push(
5300                ListItem::new("Press 'q' to quit").style(Style::default().fg(Color::DarkGray)),
5301            );
5302
5303            let status_list = List::new(messages).block(
5304                Block::default()
5305                    .borders(Borders::ALL)
5306                    .title("Status & Alerts"),
5307            );
5308            f.render_widget(status_list, chunks[3]);
5309        })?;
5310
5311        // Handle input
5312        if event::poll(Duration::from_millis(100))? {
5313            if let Event::Key(key) = event::read()? {
5314                if key.code == KeyCode::Char('q') {
5315                    break Ok(());
5316                }
5317            }
5318        }
5319
5320        // Auto-refresh
5321        if last_update.elapsed() >= refresh_duration {
5322            last_update = std::time::Instant::now();
5323        }
5324    };
5325
5326    // Cleanup terminal
5327    disable_raw_mode()?;
5328    execute!(
5329        terminal.backend_mut(),
5330        LeaveAlternateScreen,
5331        DisableMouseCapture
5332    )?;
5333    terminal.show_cursor()?;
5334
5335    result
5336}
5337
5338// ============================================================================
5339// Utility Functions
5340// ============================================================================
5341
5342/// Retry a connection operation with exponential backoff
5343#[allow(dead_code)]
5344async fn retry_with_backoff<F, T, E>(
5345    operation: F,
5346    max_retries: u32,
5347    operation_name: &str,
5348) -> Result<T, E>
5349where
5350    F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
5351    E: std::fmt::Display,
5352{
5353    let mut retries = 0;
5354    loop {
5355        match operation().await {
5356            Ok(result) => return Ok(result),
5357            Err(err) => {
5358                retries += 1;
5359                if retries >= max_retries {
5360                    eprintln!(
5361                        "{}",
5362                        format!("✗ {operation_name} failed after {max_retries} attempts: {err}")
5363                            .red()
5364                    );
5365                    return Err(err);
5366                }
5367
5368                let backoff_ms = 100 * (2_u64.pow(retries - 1));
5369                eprintln!(
5370                    "{}",
5371                    format!(
5372                        "⚠ {operation_name} failed (attempt {retries}/{max_retries}), retrying in {backoff_ms}ms..."
5373                    )
5374                    .yellow()
5375                );
5376                tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
5377            }
5378        }
5379    }
5380}
5381
5382/// Format bytes into human-readable size
5383#[allow(dead_code)]
5384fn format_bytes(bytes: usize) -> String {
5385    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
5386    let mut size = bytes as f64;
5387    let mut unit_idx = 0;
5388
5389    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
5390        size /= 1024.0;
5391        unit_idx += 1;
5392    }
5393
5394    if unit_idx == 0 {
5395        format!("{} {}", size as usize, UNITS[unit_idx])
5396    } else {
5397        format!("{:.2} {}", size, UNITS[unit_idx])
5398    }
5399}
5400
5401/// Format duration into human-readable string
5402#[allow(dead_code)]
5403fn format_duration(seconds: u64) -> String {
5404    if seconds < 60 {
5405        format!("{seconds}s")
5406    } else if seconds < 3600 {
5407        let minutes = seconds / 60;
5408        let secs = seconds % 60;
5409        if secs == 0 {
5410            format!("{minutes}m")
5411        } else {
5412            format!("{minutes}m {secs}s")
5413        }
5414    } else if seconds < 86400 {
5415        let hours = seconds / 3600;
5416        let minutes = (seconds % 3600) / 60;
5417        if minutes == 0 {
5418            format!("{hours}h")
5419        } else {
5420            format!("{hours}h {minutes}m")
5421        }
5422    } else {
5423        let days = seconds / 86400;
5424        let hours = (seconds % 86400) / 3600;
5425        if hours == 0 {
5426            format!("{days}d")
5427        } else {
5428            format!("{days}d {hours}h")
5429        }
5430    }
5431}
5432
5433/// Validate task ID format
5434#[allow(dead_code)]
5435fn validate_task_id(task_id: &str) -> anyhow::Result<uuid::Uuid> {
5436    uuid::Uuid::parse_str(task_id)
5437        .map_err(|e| anyhow::anyhow!("Invalid task ID format: {e}. Expected UUID format."))
5438}
5439
5440/// Validate queue name
5441#[allow(dead_code)]
5442fn validate_queue_name(queue: &str) -> anyhow::Result<()> {
5443    if queue.is_empty() {
5444        anyhow::bail!("Queue name cannot be empty");
5445    }
5446    if queue.contains(char::is_whitespace) {
5447        anyhow::bail!("Queue name cannot contain whitespace");
5448    }
5449    if queue.len() > 255 {
5450        anyhow::bail!("Queue name too long (max 255 characters)");
5451    }
5452    Ok(())
5453}
5454
5455/// Calculate percentage safely
5456#[allow(dead_code)]
5457fn calculate_percentage(part: usize, total: usize) -> f64 {
5458    if total == 0 {
5459        0.0
5460    } else {
5461        (part as f64 / total as f64) * 100.0
5462    }
5463}
5464
5465#[cfg(test)]
5466mod tests {
5467    #[test]
5468    fn test_task_id_parsing() {
5469        // Valid UUID
5470        let valid_id = "550e8400-e29b-41d4-a716-446655440000";
5471        assert!(valid_id.parse::<uuid::Uuid>().is_ok());
5472
5473        // Invalid UUID
5474        let invalid_id = "not-a-valid-uuid";
5475        assert!(invalid_id.parse::<uuid::Uuid>().is_err());
5476    }
5477
5478    #[test]
5479    fn test_worker_id_extraction() {
5480        let key = "celers:worker:worker-123:heartbeat";
5481        let parts: Vec<&str> = key.split(':').collect();
5482        assert_eq!(parts.len(), 4);
5483        assert_eq!(parts[2], "worker-123");
5484    }
5485
5486    #[test]
5487    fn test_log_level_matching() {
5488        let levels = vec![
5489            "ERROR", "error", "WARN", "warn", "INFO", "info", "DEBUG", "debug",
5490        ];
5491
5492        for level in levels {
5493            let _colored = match level {
5494                "ERROR" | "error" => level,
5495                "WARN" | "warn" => level,
5496                "DEBUG" | "debug" => level,
5497                _ => level,
5498            };
5499            // Just testing the pattern matching logic
5500            assert!(!level.is_empty());
5501        }
5502    }
5503
5504    #[test]
5505    fn test_redis_key_formatting() {
5506        let task_id = uuid::Uuid::new_v4();
5507        let logs_key = format!("celers:task:{}:logs", task_id);
5508        assert!(logs_key.starts_with("celers:task:"));
5509        assert!(logs_key.ends_with(":logs"));
5510
5511        let worker_id = "worker-123";
5512        let heartbeat_key = format!("celers:worker:{}:heartbeat", worker_id);
5513        assert_eq!(heartbeat_key, "celers:worker:worker-123:heartbeat");
5514
5515        let pause_key = format!("celers:worker:{}:paused", worker_id);
5516        assert_eq!(pause_key, "celers:worker:worker-123:paused");
5517    }
5518
5519    #[test]
5520    fn test_queue_key_formatting() {
5521        let queue = "test-queue";
5522        let queue_key = format!("celers:{}", queue);
5523        assert_eq!(queue_key, "celers:test-queue");
5524
5525        let dlq_key = format!("{}:dlq", queue_key);
5526        assert_eq!(dlq_key, "celers:test-queue:dlq");
5527
5528        let delayed_key = format!("{}:delayed", queue_key);
5529        assert_eq!(delayed_key, "celers:test-queue:delayed");
5530    }
5531
5532    #[test]
5533    fn test_json_log_parsing() {
5534        let valid_log =
5535            r#"{"timestamp":"2026-01-04T10:00:00Z","level":"INFO","message":"Test message"}"#;
5536        let parsed: Result<serde_json::Value, _> = serde_json::from_str(valid_log);
5537        assert!(parsed.is_ok());
5538
5539        let log_json = parsed.unwrap();
5540        assert_eq!(log_json.get("level").and_then(|v| v.as_str()), Some("INFO"));
5541        assert_eq!(
5542            log_json.get("message").and_then(|v| v.as_str()),
5543            Some("Test message")
5544        );
5545
5546        let invalid_log = "not json";
5547        let parsed: Result<serde_json::Value, _> = serde_json::from_str(invalid_log);
5548        assert!(parsed.is_err());
5549    }
5550
5551    #[test]
5552    fn test_limit_range_calculation() {
5553        let limit = 50;
5554        let log_count = 100;
5555
5556        // Redis LRANGE with negative indices
5557        let start_idx = -(limit as isize);
5558        let end_idx = -1;
5559
5560        assert_eq!(start_idx, -50);
5561        assert_eq!(end_idx, -1);
5562
5563        // Should get last 50 items
5564        assert!(log_count as usize > limit);
5565    }
5566
5567    #[test]
5568    fn test_diagnostic_thresholds() {
5569        // DLQ threshold
5570        let dlq_size = 15;
5571        assert!(dlq_size > 10, "Should trigger warning when DLQ > 10");
5572
5573        // Queue backlog threshold
5574        let queue_size = 1500;
5575        assert!(
5576            queue_size > 1000,
5577            "Should trigger warning when queue > 1000"
5578        );
5579
5580        // No workers scenario
5581        let worker_count = 0;
5582        let pending_tasks = 50;
5583        assert!(
5584            worker_count == 0 && pending_tasks > 0,
5585            "Should trigger error when no workers but tasks pending"
5586        );
5587    }
5588
5589    #[test]
5590    fn test_shutdown_channel_naming() {
5591        let worker_id = "worker-123";
5592
5593        let graceful_channel = format!("celers:worker:{}:shutdown_graceful", worker_id);
5594        assert_eq!(
5595            graceful_channel,
5596            "celers:worker:worker-123:shutdown_graceful"
5597        );
5598
5599        let immediate_channel = format!("celers:worker:{}:shutdown", worker_id);
5600        assert_eq!(immediate_channel, "celers:worker:worker-123:shutdown");
5601    }
5602
5603    #[test]
5604    fn test_timestamp_formatting() {
5605        let timestamp = chrono::Utc::now().to_rfc3339();
5606        assert!(timestamp.contains('T'));
5607        assert!(timestamp.contains('Z') || timestamp.contains('+'));
5608    }
5609
5610    #[test]
5611    fn test_mask_password() {
5612        // Test PostgreSQL URL
5613        let pg_url = "postgres://user:password123@localhost:5432/dbname";
5614        let masked = super::mask_password(pg_url);
5615        assert!(masked.contains("postgres://user:****@localhost"));
5616        assert!(!masked.contains("password123"));
5617
5618        // Test MySQL URL
5619        let mysql_url = "mysql://admin:secret@127.0.0.1:3306/db";
5620        let masked = super::mask_password(mysql_url);
5621        assert!(masked.contains("mysql://admin:****@127.0.0.1"));
5622        assert!(!masked.contains("secret"));
5623
5624        // Test URL without password
5625        let no_pass_url = "redis://localhost:6379";
5626        let masked = super::mask_password(no_pass_url);
5627        assert_eq!(masked, no_pass_url);
5628    }
5629
5630    #[test]
5631    fn test_format_bytes() {
5632        assert_eq!(super::format_bytes(0), "0 B");
5633        assert_eq!(super::format_bytes(500), "500 B");
5634        assert_eq!(super::format_bytes(1024), "1.00 KB");
5635        assert_eq!(super::format_bytes(1536), "1.50 KB");
5636        assert_eq!(super::format_bytes(1048576), "1.00 MB");
5637        assert_eq!(super::format_bytes(1073741824), "1.00 GB");
5638    }
5639
5640    #[test]
5641    fn test_format_duration() {
5642        assert_eq!(super::format_duration(0), "0s");
5643        assert_eq!(super::format_duration(30), "30s");
5644        assert_eq!(super::format_duration(60), "1m");
5645        assert_eq!(super::format_duration(90), "1m 30s");
5646        assert_eq!(super::format_duration(3600), "1h");
5647        assert_eq!(super::format_duration(3660), "1h 1m");
5648        assert_eq!(super::format_duration(86400), "1d");
5649        assert_eq!(super::format_duration(90000), "1d 1h");
5650    }
5651
5652    #[test]
5653    fn test_validate_task_id() {
5654        // Valid UUID
5655        let valid = "550e8400-e29b-41d4-a716-446655440000";
5656        assert!(super::validate_task_id(valid).is_ok());
5657
5658        // Invalid UUIDs
5659        assert!(super::validate_task_id("not-a-uuid").is_err());
5660        assert!(super::validate_task_id("").is_err());
5661        assert!(super::validate_task_id("12345").is_err());
5662    }
5663
5664    #[test]
5665    fn test_validate_queue_name() {
5666        // Valid queue names
5667        assert!(super::validate_queue_name("default").is_ok());
5668        assert!(super::validate_queue_name("high-priority").is_ok());
5669        assert!(super::validate_queue_name("queue_1").is_ok());
5670
5671        // Invalid queue names
5672        assert!(super::validate_queue_name("").is_err()); // Empty
5673        assert!(super::validate_queue_name("queue name").is_err()); // Whitespace
5674        assert!(super::validate_queue_name(&"x".repeat(256)).is_err()); // Too long
5675    }
5676
5677    #[test]
5678    fn test_calculate_percentage() {
5679        assert_eq!(super::calculate_percentage(0, 100), 0.0);
5680        assert_eq!(super::calculate_percentage(50, 100), 50.0);
5681        assert_eq!(super::calculate_percentage(100, 100), 100.0);
5682        assert_eq!(super::calculate_percentage(25, 100), 25.0);
5683
5684        // Edge case: division by zero
5685        assert_eq!(super::calculate_percentage(10, 0), 0.0);
5686    }
5687}