1use celers_broker_redis::{QueueMode, RedisBroker};
41use celers_core::Broker;
42use celers_worker::{wait_for_signal, Worker, WorkerConfig};
43use chrono::Utc;
44use colored::Colorize;
45use tabled::{settings::Style, Table, Tabled};
46
47pub async fn start_worker(
84 broker_url: &str,
85 queue: &str,
86 mode: &str,
87 concurrency: usize,
88 max_retries: u32,
89 timeout: u64,
90) -> anyhow::Result<()> {
91 println!("{}", "=== CeleRS Worker ===".bold().green());
92 println!();
93
94 let queue_mode = match mode.to_lowercase().as_str() {
96 "priority" => QueueMode::Priority,
97 _ => QueueMode::Fifo,
98 };
99
100 let broker = RedisBroker::with_mode(broker_url, queue, queue_mode)?;
102 println!("✓ Connected to Redis: {}", broker_url.cyan());
103 println!("✓ Queue: {} (mode: {})", queue.cyan(), mode.cyan());
104
105 let registry = celers_core::TaskRegistry::new();
107 println!("⚠️ No tasks registered. Register tasks in your application code.");
108
109 let config = WorkerConfig {
111 concurrency,
112 poll_interval_ms: 1000,
113 max_retries,
114 default_timeout_secs: timeout,
115 ..Default::default()
116 };
117
118 println!();
119 println!("Worker configuration:");
120 println!(" Concurrency: {}", concurrency.to_string().yellow());
121 println!(" Max retries: {}", max_retries.to_string().yellow());
122 println!(" Timeout: {}s", timeout.to_string().yellow());
123 println!();
124
125 let worker = Worker::new(broker, registry, config);
127 println!("{}", "✓ Worker started successfully".green().bold());
128 println!("{}", " Press Ctrl+C to stop gracefully".dimmed());
129 println!();
130
131 let worker_task = tokio::spawn(async move {
133 if let Err(e) = worker.run().await {
134 eprintln!("Worker error: {e}");
135 }
136 });
137
138 wait_for_signal().await;
140
141 println!();
142 println!("{}", "Shutting down gracefully...".yellow());
143 worker_task.abort();
144
145 println!("{}", "✓ Worker stopped".green());
146
147 Ok(())
148}
149
150pub async fn show_status(broker_url: &str, queue: &str) -> anyhow::Result<()> {
175 let broker = RedisBroker::new(broker_url, queue)?;
176
177 println!("{}", "=== Queue Status ===".bold().cyan());
178 println!();
179
180 let queue_size = broker.queue_size().await?;
181 let dlq_size = broker.dlq_size().await?;
182
183 #[derive(Tabled)]
184 struct QueueStats {
185 #[tabled(rename = "Metric")]
186 metric: String,
187 #[tabled(rename = "Value")]
188 value: String,
189 }
190
191 let stats = vec![
192 QueueStats {
193 metric: "Queue".to_string(),
194 value: queue.to_string(),
195 },
196 QueueStats {
197 metric: "Pending Tasks".to_string(),
198 value: queue_size.to_string(),
199 },
200 QueueStats {
201 metric: "Failed Tasks (DLQ)".to_string(),
202 value: dlq_size.to_string(),
203 },
204 QueueStats {
205 metric: "Total".to_string(),
206 value: (queue_size + dlq_size).to_string(),
207 },
208 ];
209
210 let table = Table::new(stats).with(Style::rounded()).to_string();
211 println!("{table}");
212
213 if dlq_size > 0 {
214 println!();
215 println!(
216 "{}",
217 format!("⚠️ {dlq_size} tasks in Dead Letter Queue")
218 .yellow()
219 .bold()
220 );
221 println!(" Run: celers dlq inspect --broker {broker_url} --queue {queue}");
222 }
223
224 Ok(())
225}
226
227pub async fn inspect_dlq(broker_url: &str, queue: &str, limit: usize) -> anyhow::Result<()> {
254 let broker = RedisBroker::new(broker_url, queue)?;
255
256 println!("{}", "=== Dead Letter Queue ===".bold().red());
257 println!();
258
259 let dlq_size = broker.dlq_size().await?;
260 println!("Total failed tasks: {}", dlq_size.to_string().yellow());
261
262 if dlq_size == 0 {
263 println!("{}", "✓ DLQ is empty".green());
264 return Ok(());
265 }
266
267 println!();
268 let tasks = broker.inspect_dlq(limit as isize).await?;
269
270 for (idx, task) in tasks.iter().enumerate() {
271 println!("{}", format!("Task #{}", idx + 1).bold());
272 println!(" ID: {}", task.metadata.id.to_string().cyan());
273 println!(" Name: {}", task.metadata.name.yellow());
274 println!(" State: {:?}", task.metadata.state);
275 println!(" Max Retries: {}", task.metadata.max_retries);
276 println!(" Created: {}", task.metadata.created_at);
277 println!();
278 }
279
280 println!(
281 "Showing {} of {} tasks",
282 tasks.len().to_string().yellow(),
283 dlq_size
284 );
285
286 Ok(())
287}
288
289pub async fn clear_dlq(broker_url: &str, queue: &str, confirm: bool) -> anyhow::Result<()> {
291 let broker = RedisBroker::new(broker_url, queue)?;
292
293 let dlq_size = broker.dlq_size().await?;
294
295 if dlq_size == 0 {
296 println!("{}", "✓ DLQ is already empty".green());
297 return Ok(());
298 }
299
300 if !confirm {
301 println!(
302 "{}",
303 format!("⚠️ This will delete {dlq_size} tasks from DLQ")
304 .yellow()
305 .bold()
306 );
307 println!(" Add --confirm to proceed");
308 return Ok(());
309 }
310
311 let count = broker.clear_dlq().await?;
312 println!("{}", format!("✓ Cleared {count} tasks from DLQ").green());
313
314 Ok(())
315}
316
317pub async fn replay_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
319 let broker = RedisBroker::new(broker_url, queue)?;
320
321 let task_id = task_id_str
322 .parse::<uuid::Uuid>()
323 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
324
325 println!("{}", "=== Replaying Task from DLQ ===".bold().cyan());
326 println!("Task ID: {}", task_id.to_string().yellow());
327
328 let replayed = broker.replay_from_dlq(&task_id).await?;
329
330 if replayed {
331 println!("{}", "✓ Task replayed successfully".green().bold());
332 println!(" The task will be processed again by workers");
333 } else {
334 println!("{}", "✗ Task not found in DLQ".red());
335 }
336
337 Ok(())
338}
339
340#[allow(clippy::unused_async)]
368pub async fn init_config(path: &str) -> anyhow::Result<()> {
369 let config = crate::config::Config::default_config();
370
371 config.to_file(path)?;
372
373 println!("{}", "✓ Configuration file created".green().bold());
374 println!(" Location: {}", path.cyan());
375 println!();
376 println!("Edit the file and run:");
377 println!(" celers worker --config {path}");
378
379 Ok(())
380}
381
382pub async fn list_queues(broker_url: &str) -> anyhow::Result<()> {
384 let client = redis::Client::open(broker_url)?;
386 let mut conn = client.get_multiplexed_async_connection().await?;
387
388 println!("{}", "=== Redis Queues ===".bold().cyan());
389 println!();
390
391 let mut cursor = 0;
393 let mut queue_keys: Vec<String> = Vec::new();
394
395 loop {
396 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
397 .arg(cursor)
398 .arg("MATCH")
399 .arg("celers:*")
400 .arg("COUNT")
401 .arg(100)
402 .query_async(&mut conn)
403 .await?;
404
405 queue_keys.extend(keys);
406 cursor = new_cursor;
407
408 if cursor == 0 {
409 break;
410 }
411 }
412
413 if queue_keys.is_empty() {
414 println!("{}", "No queues found".yellow());
415 return Ok(());
416 }
417
418 #[derive(Tabled)]
419 struct QueueInfo {
420 #[tabled(rename = "Queue")]
421 name: String,
422 #[tabled(rename = "Type")]
423 queue_type: String,
424 #[tabled(rename = "Size")]
425 size: String,
426 }
427
428 let mut queue_infos = Vec::new();
429
430 for key in queue_keys {
431 let key_type: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
432
433 let size: isize = match key_type.as_str() {
434 "list" => redis::cmd("LLEN").arg(&key).query_async(&mut conn).await?,
435 "zset" => redis::cmd("ZCARD").arg(&key).query_async(&mut conn).await?,
436 _ => 0,
437 };
438
439 let queue_type = if key.contains(":dlq") {
440 "DLQ".to_string()
441 } else if key.contains(":delayed") {
442 "Delayed".to_string()
443 } else if key_type == "zset" {
444 "Priority".to_string()
445 } else {
446 "FIFO".to_string()
447 };
448
449 queue_infos.push(QueueInfo {
450 name: key,
451 queue_type,
452 size: size.to_string(),
453 });
454 }
455
456 let table = Table::new(queue_infos).with(Style::rounded()).to_string();
457 println!("{table}");
458
459 Ok(())
460}
461
462pub async fn purge_queue(broker_url: &str, queue: &str, confirm: bool) -> anyhow::Result<()> {
464 let broker = RedisBroker::new(broker_url, queue)?;
465
466 let queue_size = broker.queue_size().await?;
467
468 if queue_size == 0 {
469 println!("{}", "✓ Queue is already empty".green());
470 return Ok(());
471 }
472
473 if !confirm {
474 println!(
475 "{}",
476 format!("⚠️ This will delete {queue_size} tasks from queue '{queue}'")
477 .yellow()
478 .bold()
479 );
480 println!(" Add --confirm to proceed");
481 return Ok(());
482 }
483
484 let client = redis::Client::open(broker_url)?;
486 let mut conn = client.get_multiplexed_async_connection().await?;
487
488 let queue_key = format!("celers:{queue}");
489 redis::cmd("DEL")
490 .arg(&queue_key)
491 .query_async::<()>(&mut conn)
492 .await?;
493
494 println!(
495 "{}",
496 format!("✓ Purged {queue_size} tasks from queue '{queue}'").green()
497 );
498
499 Ok(())
500}
501
502pub async fn inspect_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
504 let task_id = task_id_str
505 .parse::<uuid::Uuid>()
506 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
507
508 println!("{}", "=== Task Details ===".bold().cyan());
509 println!("Task ID: {}", task_id.to_string().yellow());
510 println!();
511
512 let client = redis::Client::open(broker_url)?;
514 let mut conn = client.get_multiplexed_async_connection().await?;
515
516 let queue_key = format!("celers:{queue}");
518 let queue_type: String = redis::cmd("TYPE")
519 .arg(&queue_key)
520 .query_async(&mut conn)
521 .await?;
522
523 let mut found = false;
524
525 if queue_type == "list" {
527 let tasks: Vec<String> = redis::cmd("LRANGE")
528 .arg(&queue_key)
529 .arg(0)
530 .arg(-1)
531 .query_async(&mut conn)
532 .await?;
533
534 for task_str in tasks {
535 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
536 if task.metadata.id == task_id {
537 print_task_details(&task, "Main Queue");
538 found = true;
539 break;
540 }
541 }
542 }
543 } else if queue_type == "zset" {
544 let tasks: Vec<String> = redis::cmd("ZRANGE")
545 .arg(&queue_key)
546 .arg(0)
547 .arg(-1)
548 .query_async(&mut conn)
549 .await?;
550
551 for task_str in tasks {
552 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
553 if task.metadata.id == task_id {
554 print_task_details(&task, "Main Queue (Priority)");
555 found = true;
556 break;
557 }
558 }
559 }
560 }
561
562 if !found {
564 let dlq_key = format!("celers:{queue}:dlq");
565 let dlq_tasks: Vec<String> = redis::cmd("LRANGE")
566 .arg(&dlq_key)
567 .arg(0)
568 .arg(-1)
569 .query_async(&mut conn)
570 .await?;
571
572 for task_str in dlq_tasks {
573 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
574 if task.metadata.id == task_id {
575 print_task_details(&task, "Dead Letter Queue");
576 found = true;
577 break;
578 }
579 }
580 }
581 }
582
583 if !found {
585 let delayed_key = format!("celers:{queue}:delayed");
586 let delayed_tasks: Vec<String> = redis::cmd("ZRANGE")
587 .arg(&delayed_key)
588 .arg(0)
589 .arg(-1)
590 .query_async(&mut conn)
591 .await?;
592
593 for task_str in delayed_tasks {
594 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
595 if task.metadata.id == task_id {
596 print_task_details(&task, "Delayed Queue");
597 found = true;
598 break;
599 }
600 }
601 }
602 }
603
604 if !found {
605 println!("{}", "✗ Task not found in any queue".red());
606 println!();
607 println!("The task may have been:");
608 println!(" • Already processed");
609 println!(" • Deleted");
610 println!(" • In a different queue");
611 }
612
613 Ok(())
614}
615
616fn print_task_details(task: &celers_core::SerializedTask, location: &str) {
618 println!("{}", format!("Location: {location}").green().bold());
619 println!();
620
621 #[derive(Tabled)]
622 struct TaskDetail {
623 #[tabled(rename = "Field")]
624 field: String,
625 #[tabled(rename = "Value")]
626 value: String,
627 }
628
629 let details = vec![
630 TaskDetail {
631 field: "ID".to_string(),
632 value: task.metadata.id.to_string(),
633 },
634 TaskDetail {
635 field: "Name".to_string(),
636 value: task.metadata.name.clone(),
637 },
638 TaskDetail {
639 field: "State".to_string(),
640 value: format!("{:?}", task.metadata.state),
641 },
642 TaskDetail {
643 field: "Priority".to_string(),
644 value: task.metadata.priority.to_string(),
645 },
646 TaskDetail {
647 field: "Max Retries".to_string(),
648 value: task.metadata.max_retries.to_string(),
649 },
650 TaskDetail {
651 field: "Timeout".to_string(),
652 value: task
653 .metadata
654 .timeout_secs
655 .map_or_else(|| "default".to_string(), |s| format!("{s}s")),
656 },
657 TaskDetail {
658 field: "Created At".to_string(),
659 value: task.metadata.created_at.to_string(),
660 },
661 TaskDetail {
662 field: "Payload Size".to_string(),
663 value: format!("{} bytes", task.payload.len()),
664 },
665 ];
666
667 let table = Table::new(details).with(Style::rounded()).to_string();
668 println!("{table}");
669}
670
671pub async fn cancel_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
673 let task_id = task_id_str
674 .parse::<uuid::Uuid>()
675 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
676
677 println!("{}", "=== Cancel Task ===".bold().yellow());
678 println!("Task ID: {}", task_id.to_string().yellow());
679 println!();
680
681 let broker = RedisBroker::new(broker_url, queue)?;
683 println!("✓ Connected to Redis: {}", broker_url.cyan());
684 println!();
685
686 println!("Sending cancellation signal...");
688 let cancelled = broker.cancel(&task_id).await?;
689
690 if cancelled {
691 println!(
692 "{}",
693 "✓ Cancellation signal sent successfully".green().bold()
694 );
695 println!();
696 println!("The task will be cancelled if:");
697 println!(" • It's currently running and has cancellation checkpoints");
698 println!(" • It's pending in the queue (will be removed)");
699 println!(" • Workers are subscribed to the cancellation channel");
700 println!();
701 println!(
702 "{}",
703 "Note: Task cancellation depends on worker implementation".yellow()
704 );
705 } else {
706 println!(
707 "{}",
708 "⚠️ No workers subscribed to cancellation channel"
709 .yellow()
710 .bold()
711 );
712 println!();
713 println!("Possible reasons:");
714 println!(" • No workers are currently running");
715 println!(" • Workers don't support cancellation");
716 println!(" • The task has already completed");
717 println!();
718 println!("Make sure workers are running and support task cancellation.");
719 }
720
721 Ok(())
722}
723
724pub async fn retry_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
726 let task_id = task_id_str
727 .parse::<uuid::Uuid>()
728 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
729
730 println!("{}", "=== Retry Task ===".bold().cyan());
731 println!("Task ID: {}", task_id.to_string().yellow());
732 println!();
733
734 let client = redis::Client::open(broker_url)?;
736 let mut conn = client.get_multiplexed_async_connection().await?;
737
738 let mut task: Option<celers_core::SerializedTask> = None;
739 let mut source_queue: Option<String> = None;
740
741 let queue_key = format!("celers:{queue}");
743 let queue_type: String = redis::cmd("TYPE")
744 .arg(&queue_key)
745 .query_async(&mut conn)
746 .await?;
747
748 if queue_type == "list" {
750 let tasks: Vec<String> = redis::cmd("LRANGE")
751 .arg(&queue_key)
752 .arg(0)
753 .arg(-1)
754 .query_async(&mut conn)
755 .await?;
756
757 for task_str in &tasks {
758 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
759 if t.metadata.id == task_id {
760 task = Some(t);
761 source_queue = Some("main_list".to_string());
762 let _: usize = redis::cmd("LREM")
764 .arg(&queue_key)
765 .arg(1)
766 .arg(task_str)
767 .query_async(&mut conn)
768 .await?;
769 break;
770 }
771 }
772 }
773 } else if queue_type == "zset" {
774 let tasks: Vec<String> = redis::cmd("ZRANGE")
775 .arg(&queue_key)
776 .arg(0)
777 .arg(-1)
778 .query_async(&mut conn)
779 .await?;
780
781 for task_str in tasks {
782 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
783 if t.metadata.id == task_id {
784 task = Some(t);
785 source_queue = Some("main_zset".to_string());
786 let _: usize = redis::cmd("ZREM")
788 .arg(&queue_key)
789 .arg(&task_str)
790 .query_async(&mut conn)
791 .await?;
792 break;
793 }
794 }
795 }
796 }
797
798 if task.is_none() {
800 let dlq_key = format!("celers:{queue}:dlq");
801 let dlq_tasks: Vec<String> = redis::cmd("LRANGE")
802 .arg(&dlq_key)
803 .arg(0)
804 .arg(-1)
805 .query_async(&mut conn)
806 .await?;
807
808 for task_str in dlq_tasks {
809 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
810 if t.metadata.id == task_id {
811 task = Some(t);
812 source_queue = Some("dlq".to_string());
813 let _: usize = redis::cmd("LREM")
815 .arg(&dlq_key)
816 .arg(1)
817 .arg(&task_str)
818 .query_async(&mut conn)
819 .await?;
820 break;
821 }
822 }
823 }
824 }
825
826 if task.is_none() {
828 let delayed_key = format!("celers:{queue}:delayed");
829 let delayed_tasks: Vec<String> = redis::cmd("ZRANGE")
830 .arg(&delayed_key)
831 .arg(0)
832 .arg(-1)
833 .query_async(&mut conn)
834 .await?;
835
836 for task_str in delayed_tasks {
837 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
838 if t.metadata.id == task_id {
839 task = Some(t);
840 source_queue = Some("delayed".to_string());
841 let _: usize = redis::cmd("ZREM")
843 .arg(&delayed_key)
844 .arg(&task_str)
845 .query_async(&mut conn)
846 .await?;
847 break;
848 }
849 }
850 }
851 }
852
853 if let Some(mut t) = task {
855 println!(
856 "✓ Task found in: {}",
857 source_queue
858 .expect("source_queue set when task is found")
859 .cyan()
860 );
861 println!();
862
863 t.metadata.state = celers_core::TaskState::Pending;
865 t.metadata.updated_at = Utc::now();
866
867 let task_json = serde_json::to_string(&t)?;
869 let _: usize = redis::cmd("LPUSH")
870 .arg(&queue_key)
871 .arg(&task_json)
872 .query_async(&mut conn)
873 .await?;
874
875 println!("{}", "✓ Task retried successfully".green().bold());
876 println!();
877 println!("The task has been:");
878 println!(" • Removed from its current queue");
879 println!(" • Reset to Pending state");
880 println!(" • Re-enqueued to main queue");
881 println!();
882 println!("Workers will process it again.");
883 } else {
884 println!("{}", "✗ Task not found in any queue".red());
885 println!();
886 println!("The task may have been:");
887 println!(" • Already processed and completed");
888 println!(" • Deleted manually");
889 println!(" • In a different queue");
890 }
891
892 Ok(())
893}
894
895pub async fn show_task_result(backend_url: &str, task_id_str: &str) -> anyhow::Result<()> {
897 let task_id = task_id_str
898 .parse::<uuid::Uuid>()
899 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
900
901 println!("{}", "=== Task Result ===".bold().cyan());
902 println!("Task ID: {}", task_id.to_string().yellow());
903 println!();
904
905 let client = redis::Client::open(backend_url)?;
907 let mut conn = client.get_multiplexed_async_connection().await?;
908
909 let result_key = format!("celery-task-meta-{task_id}");
911 let result_data: Option<String> = redis::cmd("GET")
912 .arg(&result_key)
913 .query_async(&mut conn)
914 .await?;
915
916 if let Some(data) = result_data {
917 let result: serde_json::Value = serde_json::from_str(&data)?;
919
920 println!("{}", "✓ Task result found".green().bold());
921 println!();
922
923 #[derive(Tabled)]
924 struct ResultField {
925 #[tabled(rename = "Field")]
926 field: String,
927 #[tabled(rename = "Value")]
928 value: String,
929 }
930
931 let mut fields = vec![
932 ResultField {
933 field: "Task ID".to_string(),
934 value: result
935 .get("task_id")
936 .and_then(|v| v.as_str())
937 .unwrap_or("N/A")
938 .to_string(),
939 },
940 ResultField {
941 field: "Status".to_string(),
942 value: result
943 .get("status")
944 .and_then(|v| v.as_str())
945 .unwrap_or("UNKNOWN")
946 .to_string(),
947 },
948 ];
949
950 if let Some(task_result) = result.get("result") {
952 fields.push(ResultField {
953 field: "Result".to_string(),
954 value: serde_json::to_string_pretty(task_result)?,
955 });
956 }
957
958 if let Some(traceback) = result.get("traceback") {
960 if !traceback.is_null() {
961 fields.push(ResultField {
962 field: "Error".to_string(),
963 value: traceback
964 .as_str()
965 .unwrap_or("Error information unavailable")
966 .to_string(),
967 });
968 }
969 }
970
971 if let Some(date_done) = result.get("date_done") {
973 if !date_done.is_null() {
974 fields.push(ResultField {
975 field: "Completed At".to_string(),
976 value: date_done.as_str().unwrap_or("N/A").to_string(),
977 });
978 }
979 }
980
981 let table = Table::new(fields).with(Style::rounded()).to_string();
982 println!("{table}");
983 } else {
984 println!("{}", "✗ Task result not found".red());
985 println!();
986 println!("Possible reasons:");
987 println!(" • Task hasn't completed yet");
988 println!(" • Task result has expired (TTL)");
989 println!(" • Wrong backend URL");
990 println!(" • Task was never executed");
991 }
992
993 Ok(())
994}
995
996pub async fn requeue_task(
998 broker_url: &str,
999 from_queue: &str,
1000 to_queue: &str,
1001 task_id_str: &str,
1002) -> anyhow::Result<()> {
1003 let task_id = task_id_str
1004 .parse::<uuid::Uuid>()
1005 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
1006
1007 let client = redis::Client::open(broker_url)?;
1008 let mut conn = client.get_multiplexed_async_connection().await?;
1009
1010 let from_key = format!("celers:{from_queue}");
1012 let to_key = format!("celers:{to_queue}");
1013
1014 let from_type: String = redis::cmd("TYPE")
1016 .arg(&from_key)
1017 .query_async(&mut conn)
1018 .await?;
1019
1020 let mut task: Option<celers_core::SerializedTask> = None;
1021 let mut source_type = String::new();
1022
1023 if from_type == "list" {
1025 let tasks: Vec<String> = redis::cmd("LRANGE")
1027 .arg(&from_key)
1028 .arg(0)
1029 .arg(-1)
1030 .query_async(&mut conn)
1031 .await?;
1032
1033 for task_str in &tasks {
1034 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1035 if t.metadata.id == task_id {
1036 task = Some(t);
1037 source_type = "list".to_string();
1038 let _: usize = redis::cmd("LREM")
1040 .arg(&from_key)
1041 .arg(1)
1042 .arg(task_str)
1043 .query_async(&mut conn)
1044 .await?;
1045 break;
1046 }
1047 }
1048 }
1049 } else if from_type == "zset" {
1050 let tasks: Vec<(String, f64)> = redis::cmd("ZRANGE")
1052 .arg(&from_key)
1053 .arg(0)
1054 .arg(-1)
1055 .arg("WITHSCORES")
1056 .query_async(&mut conn)
1057 .await?;
1058
1059 for (task_str, _score) in &tasks {
1060 if let Ok(t) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1061 if t.metadata.id == task_id {
1062 task = Some(t);
1063 source_type = "zset".to_string();
1064 let _: usize = redis::cmd("ZREM")
1066 .arg(&from_key)
1067 .arg(task_str)
1068 .query_async(&mut conn)
1069 .await?;
1070 break;
1071 }
1072 }
1073 }
1074 } else {
1075 return Err(anyhow::anyhow!(
1076 "Source queue '{from_queue}' not found or invalid queue type"
1077 ));
1078 }
1079
1080 if let Some(t) = task {
1081 let to_type: String = redis::cmd("TYPE")
1083 .arg(&to_key)
1084 .query_async(&mut conn)
1085 .await?;
1086
1087 let task_json = serde_json::to_string(&t)?;
1088
1089 if to_type == "list" || to_type == "none" {
1090 let _: usize = redis::cmd("LPUSH")
1092 .arg(&to_key)
1093 .arg(&task_json)
1094 .query_async(&mut conn)
1095 .await?;
1096 println!(
1097 "{}",
1098 format!("✓ Task moved from '{from_queue}' ({source_type}) to '{to_queue}' (FIFO)")
1099 .green()
1100 .bold()
1101 );
1102 } else if to_type == "zset" {
1103 let priority = f64::from(t.metadata.priority);
1105 let _: usize = redis::cmd("ZADD")
1106 .arg(&to_key)
1107 .arg(priority)
1108 .arg(&task_json)
1109 .query_async(&mut conn)
1110 .await?;
1111 println!(
1112 "{}",
1113 format!(
1114 "✓ Task moved from '{from_queue}' ({source_type}) to '{to_queue}' (Priority)"
1115 )
1116 .green()
1117 .bold()
1118 );
1119 } else {
1120 return Err(anyhow::anyhow!(
1121 "Destination queue '{to_queue}' has invalid type: {to_type}"
1122 ));
1123 }
1124
1125 println!();
1127 println!(" {} {}", "Task ID:".cyan(), t.metadata.id);
1128 println!(" {} {}", "Task Name:".cyan(), t.metadata.name);
1129 println!(" {} {}", "Priority:".cyan(), t.metadata.priority);
1130 } else {
1131 println!(
1132 "{}",
1133 format!("✗ Task not found in queue '{from_queue}'").red()
1134 );
1135 println!();
1136 println!("Possible reasons:");
1137 println!(" • Task ID is incorrect");
1138 println!(" • Task is in a different queue");
1139 println!(" • Task has already been processed");
1140 return Err(anyhow::anyhow!("Task not found"));
1141 }
1142
1143 Ok(())
1144}
1145
1146pub async fn queue_stats(broker_url: &str, queue: &str) -> anyhow::Result<()> {
1148 let client = redis::Client::open(broker_url)?;
1149 let mut conn = client.get_multiplexed_async_connection().await?;
1150
1151 let queue_key = format!("celers:{queue}");
1153 let processing_key = format!("{queue_key}:processing");
1154 let dlq_key = format!("{queue_key}:dlq");
1155 let delayed_key = format!("{queue_key}:delayed");
1156
1157 let queue_type: String = redis::cmd("TYPE")
1159 .arg(&queue_key)
1160 .query_async(&mut conn)
1161 .await?;
1162
1163 let queue_size: usize = if queue_type == "list" {
1165 redis::cmd("LLEN")
1166 .arg(&queue_key)
1167 .query_async(&mut conn)
1168 .await?
1169 } else if queue_type == "zset" {
1170 redis::cmd("ZCARD")
1171 .arg(&queue_key)
1172 .query_async(&mut conn)
1173 .await?
1174 } else {
1175 0
1176 };
1177
1178 let processing_size: usize = redis::cmd("LLEN")
1179 .arg(&processing_key)
1180 .query_async(&mut conn)
1181 .await
1182 .unwrap_or(0);
1183
1184 let dlq_size: usize = redis::cmd("LLEN")
1185 .arg(&dlq_key)
1186 .query_async(&mut conn)
1187 .await
1188 .unwrap_or(0);
1189
1190 let delayed_size: usize = redis::cmd("ZCARD")
1191 .arg(&delayed_key)
1192 .query_async(&mut conn)
1193 .await
1194 .unwrap_or(0);
1195
1196 let mut task_names = std::collections::HashMap::new();
1198
1199 if queue_size > 0 {
1200 let sample_size = std::cmp::min(queue_size, 100);
1201 let tasks: Vec<String> = if queue_type == "list" {
1202 redis::cmd("LRANGE")
1203 .arg(&queue_key)
1204 .arg(0)
1205 .arg(sample_size as isize - 1)
1206 .query_async(&mut conn)
1207 .await?
1208 } else if queue_type == "zset" {
1209 redis::cmd("ZRANGE")
1210 .arg(&queue_key)
1211 .arg(0)
1212 .arg(sample_size as isize - 1)
1213 .query_async(&mut conn)
1214 .await?
1215 } else {
1216 vec![]
1217 };
1218
1219 for task_str in tasks {
1220 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
1221 *task_names.entry(task.metadata.name.clone()).or_insert(0) += 1;
1222 }
1223 }
1224 }
1225
1226 println!("{}", format!("Queue Statistics: {queue}").cyan().bold());
1228 println!();
1229
1230 #[derive(Tabled)]
1231 struct StatRow {
1232 #[tabled(rename = "Metric")]
1233 metric: String,
1234 #[tabled(rename = "Value")]
1235 value: String,
1236 }
1237
1238 let stats = vec![
1239 StatRow {
1240 metric: "Queue Type".to_string(),
1241 value: if queue_type == "list" {
1242 "FIFO (List)".to_string()
1243 } else if queue_type == "zset" {
1244 "Priority (Sorted Set)".to_string()
1245 } else {
1246 format!("Unknown ({queue_type})")
1247 },
1248 },
1249 StatRow {
1250 metric: "Pending Tasks".to_string(),
1251 value: queue_size.to_string(),
1252 },
1253 StatRow {
1254 metric: "Processing Tasks".to_string(),
1255 value: processing_size.to_string(),
1256 },
1257 StatRow {
1258 metric: "Dead Letter Queue".to_string(),
1259 value: dlq_size.to_string(),
1260 },
1261 StatRow {
1262 metric: "Delayed Tasks".to_string(),
1263 value: delayed_size.to_string(),
1264 },
1265 StatRow {
1266 metric: "Total Tasks".to_string(),
1267 value: (queue_size + processing_size + dlq_size + delayed_size).to_string(),
1268 },
1269 ];
1270
1271 let table = Table::new(stats).with(Style::rounded()).to_string();
1272 println!("{table}");
1273
1274 if !task_names.is_empty() {
1276 println!();
1277 println!("{}", "Task Type Distribution (sample):".cyan().bold());
1278 println!();
1279
1280 #[derive(Tabled)]
1281 struct TaskTypeRow {
1282 #[tabled(rename = "Task Name")]
1283 task_name: String,
1284 #[tabled(rename = "Count")]
1285 count: usize,
1286 }
1287
1288 let mut task_types: Vec<TaskTypeRow> = task_names
1289 .into_iter()
1290 .map(|(name, count)| TaskTypeRow {
1291 task_name: name,
1292 count,
1293 })
1294 .collect();
1295
1296 task_types.sort_by(|a, b| b.count.cmp(&a.count));
1297
1298 let table = Table::new(task_types.into_iter().take(10))
1299 .with(Style::rounded())
1300 .to_string();
1301 println!("{table}");
1302 }
1303
1304 println!();
1306 if dlq_size > 0 {
1307 println!("{}", format!("⚠ Warning: {dlq_size} tasks in DLQ").yellow());
1308 }
1309 if processing_size > queue_size * 2 {
1310 println!(
1311 "{}",
1312 "⚠ Warning: High number of processing tasks (possible stuck workers)".yellow()
1313 );
1314 }
1315 if queue_size == 0 && processing_size == 0 && dlq_size == 0 {
1316 println!("{}", "✓ Queue is empty and healthy".green());
1317 }
1318
1319 Ok(())
1320}
1321
1322pub async fn move_queue(
1324 broker_url: &str,
1325 from_queue: &str,
1326 to_queue: &str,
1327 confirm: bool,
1328) -> anyhow::Result<()> {
1329 let client = redis::Client::open(broker_url)?;
1330 let mut conn = client.get_multiplexed_async_connection().await?;
1331
1332 let from_key = format!("celers:{from_queue}");
1334 let to_key = format!("celers:{to_queue}");
1335
1336 let from_type: String = redis::cmd("TYPE")
1338 .arg(&from_key)
1339 .query_async(&mut conn)
1340 .await?;
1341
1342 if from_type == "none" {
1343 println!(
1344 "{}",
1345 format!("✗ Source queue '{from_queue}' does not exist").red()
1346 );
1347 return Ok(());
1348 }
1349
1350 let queue_size: usize = if from_type == "list" {
1352 redis::cmd("LLEN")
1353 .arg(&from_key)
1354 .query_async(&mut conn)
1355 .await?
1356 } else if from_type == "zset" {
1357 redis::cmd("ZCARD")
1358 .arg(&from_key)
1359 .query_async(&mut conn)
1360 .await?
1361 } else {
1362 println!("{}", format!("✗ Unknown queue type: {from_type}").red());
1363 return Ok(());
1364 };
1365
1366 if queue_size == 0 {
1367 println!(
1368 "{}",
1369 format!("✗ Source queue '{from_queue}' is empty").yellow()
1370 );
1371 return Ok(());
1372 }
1373
1374 if !confirm {
1376 println!(
1377 "{}",
1378 format!(
1379 "⚠ Warning: This will move {queue_size} tasks from '{from_queue}' to '{to_queue}'"
1380 )
1381 .yellow()
1382 );
1383 println!("{}", "Use --confirm to proceed".yellow());
1384 return Ok(());
1385 }
1386
1387 println!(
1388 "{}",
1389 format!("Moving {queue_size} tasks from '{from_queue}' to '{to_queue}'...").cyan()
1390 );
1391
1392 let to_type: String = redis::cmd("TYPE")
1394 .arg(&to_key)
1395 .query_async(&mut conn)
1396 .await?;
1397
1398 let mut moved_count = 0;
1399
1400 if from_type == "list" {
1402 loop {
1404 let task: Option<String> = redis::cmd("RPOP")
1405 .arg(&from_key)
1406 .query_async(&mut conn)
1407 .await?;
1408
1409 match task {
1410 Some(task_str) => {
1411 if to_type == "list" || to_type == "none" {
1412 let _: usize = redis::cmd("LPUSH")
1414 .arg(&to_key)
1415 .arg(&task_str)
1416 .query_async(&mut conn)
1417 .await?;
1418 } else if to_type == "zset" {
1419 if let Ok(task) =
1421 serde_json::from_str::<celers_core::SerializedTask>(&task_str)
1422 {
1423 let priority = f64::from(task.metadata.priority);
1424 let _: usize = redis::cmd("ZADD")
1425 .arg(&to_key)
1426 .arg(priority)
1427 .arg(&task_str)
1428 .query_async(&mut conn)
1429 .await?;
1430 }
1431 }
1432 moved_count += 1;
1433
1434 if moved_count % 100 == 0 {
1435 print!(
1436 "\r{}",
1437 format!("Moved {moved_count} / {queue_size} tasks...").cyan()
1438 );
1439 use std::io::Write;
1440 std::io::stdout().flush()?;
1441 }
1442 }
1443 None => break,
1444 }
1445 }
1446 } else if from_type == "zset" {
1447 loop {
1449 let result: Vec<(String, f64)> = redis::cmd("ZPOPMIN")
1450 .arg(&from_key)
1451 .arg(1)
1452 .query_async(&mut conn)
1453 .await?;
1454
1455 if result.is_empty() {
1456 break;
1457 }
1458
1459 let (task_str, _score) = &result[0];
1460
1461 if to_type == "list" || to_type == "none" {
1462 let _: usize = redis::cmd("LPUSH")
1464 .arg(&to_key)
1465 .arg(task_str)
1466 .query_async(&mut conn)
1467 .await?;
1468 } else if to_type == "zset" {
1469 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(task_str) {
1471 let priority = f64::from(task.metadata.priority);
1472 let _: usize = redis::cmd("ZADD")
1473 .arg(&to_key)
1474 .arg(priority)
1475 .arg(task_str)
1476 .query_async(&mut conn)
1477 .await?;
1478 }
1479 }
1480 moved_count += 1;
1481
1482 if moved_count % 100 == 0 {
1483 print!(
1484 "\r{}",
1485 format!("Moved {moved_count} / {queue_size} tasks...").cyan()
1486 );
1487 use std::io::Write;
1488 std::io::stdout().flush()?;
1489 }
1490 }
1491 }
1492
1493 println!();
1494 println!(
1495 "{}",
1496 format!("✓ Successfully moved {moved_count} tasks from '{from_queue}' to '{to_queue}'")
1497 .green()
1498 .bold()
1499 );
1500
1501 let dest_queue_type = if to_type == "list" || to_type == "none" {
1503 "FIFO"
1504 } else if to_type == "zset" {
1505 "Priority"
1506 } else {
1507 "Unknown"
1508 };
1509
1510 println!(
1511 " {} {} → {}",
1512 "Queue Type:".cyan(),
1513 from_type,
1514 dest_queue_type
1515 );
1516
1517 Ok(())
1518}
1519
1520pub async fn export_queue(broker_url: &str, queue: &str, output_file: &str) -> anyhow::Result<()> {
1522 let client = redis::Client::open(broker_url)?;
1523 let mut conn = client.get_multiplexed_async_connection().await?;
1524
1525 let queue_key = format!("celers:{queue}");
1526
1527 let queue_type: String = redis::cmd("TYPE")
1529 .arg(&queue_key)
1530 .query_async(&mut conn)
1531 .await?;
1532
1533 if queue_type == "none" {
1534 println!("{}", format!("✗ Queue '{queue}' does not exist").red());
1535 return Ok(());
1536 }
1537
1538 println!("{}", format!("Exporting queue '{queue}'...").cyan());
1539
1540 let tasks: Vec<String> = if queue_type == "list" {
1542 redis::cmd("LRANGE")
1543 .arg(&queue_key)
1544 .arg(0)
1545 .arg(-1)
1546 .query_async(&mut conn)
1547 .await?
1548 } else if queue_type == "zset" {
1549 redis::cmd("ZRANGE")
1550 .arg(&queue_key)
1551 .arg(0)
1552 .arg(-1)
1553 .query_async(&mut conn)
1554 .await?
1555 } else {
1556 println!("{}", format!("✗ Unknown queue type: {queue_type}").red());
1557 return Ok(());
1558 };
1559
1560 let mut export_tasks = Vec::new();
1562 for task_str in tasks {
1563 if let Ok(task) = serde_json::from_str::<celers_core::SerializedTask>(&task_str) {
1564 export_tasks.push(task);
1565 }
1566 }
1567
1568 #[derive(serde::Serialize)]
1569 struct QueueExport {
1570 queue_name: String,
1571 queue_type: String,
1572 exported_at: String,
1573 task_count: usize,
1574 tasks: Vec<celers_core::SerializedTask>,
1575 }
1576
1577 let export_data = QueueExport {
1578 queue_name: queue.to_string(),
1579 queue_type: queue_type.clone(),
1580 exported_at: chrono::Utc::now().to_rfc3339(),
1581 task_count: export_tasks.len(),
1582 tasks: export_tasks,
1583 };
1584
1585 let json = serde_json::to_string_pretty(&export_data)?;
1587 std::fs::write(output_file, json)?;
1588
1589 println!(
1590 "{}",
1591 format!(
1592 "✓ Exported {} tasks from queue '{}' to '{}'",
1593 export_data.task_count, queue, output_file
1594 )
1595 .green()
1596 .bold()
1597 );
1598 println!(" {} {}", "Queue Type:".cyan(), queue_type);
1599 let file_size = std::fs::metadata(output_file)?.len();
1600 println!(" {} {} bytes", "File Size:".cyan(), file_size);
1601
1602 Ok(())
1603}
1604
1605pub async fn import_queue(
1607 broker_url: &str,
1608 queue: &str,
1609 input_file: &str,
1610 confirm: bool,
1611) -> anyhow::Result<()> {
1612 let json = std::fs::read_to_string(input_file)?;
1614
1615 #[derive(serde::Deserialize)]
1616 struct QueueExport {
1617 queue_name: String,
1618 queue_type: String,
1619 exported_at: String,
1620 task_count: usize,
1621 tasks: Vec<celers_core::SerializedTask>,
1622 }
1623
1624 let export_data: QueueExport = serde_json::from_str(&json)?;
1625
1626 println!("{}", "Import Information:".cyan().bold());
1628 println!(" {} {}", "Source Queue:".cyan(), export_data.queue_name);
1629 println!(" {} {}", "Source Type:".cyan(), export_data.queue_type);
1630 println!(" {} {}", "Exported At:".cyan(), export_data.exported_at);
1631 println!(" {} {}", "Task Count:".cyan(), export_data.task_count);
1632 println!(" {} {}", "Destination Queue:".cyan(), queue);
1633 println!();
1634
1635 if !confirm {
1636 println!(
1637 "{}",
1638 format!(
1639 "⚠ Warning: This will import {} tasks into queue '{}'",
1640 export_data.task_count, queue
1641 )
1642 .yellow()
1643 );
1644 println!("{}", "Use --confirm to proceed".yellow());
1645 return Ok(());
1646 }
1647
1648 let client = redis::Client::open(broker_url)?;
1649 let mut conn = client.get_multiplexed_async_connection().await?;
1650
1651 let queue_key = format!("celers:{queue}");
1652
1653 let to_type: String = redis::cmd("TYPE")
1655 .arg(&queue_key)
1656 .query_async(&mut conn)
1657 .await?;
1658
1659 println!(
1660 "{}",
1661 format!("Importing {} tasks...", export_data.task_count).cyan()
1662 );
1663
1664 let mut imported = 0;
1665 for task in export_data.tasks {
1666 let task_json = serde_json::to_string(&task)?;
1667
1668 if to_type == "list" || to_type == "none" {
1669 let _: usize = redis::cmd("LPUSH")
1671 .arg(&queue_key)
1672 .arg(&task_json)
1673 .query_async(&mut conn)
1674 .await?;
1675 } else if to_type == "zset" {
1676 let priority = f64::from(task.metadata.priority);
1678 let _: usize = redis::cmd("ZADD")
1679 .arg(&queue_key)
1680 .arg(priority)
1681 .arg(&task_json)
1682 .query_async(&mut conn)
1683 .await?;
1684 }
1685
1686 imported += 1;
1687 if imported % 100 == 0 {
1688 print!(
1689 "\r{}",
1690 format!(
1691 "Imported {} / {} tasks...",
1692 imported, export_data.task_count
1693 )
1694 .cyan()
1695 );
1696 use std::io::Write;
1697 std::io::stdout().flush()?;
1698 }
1699 }
1700
1701 println!();
1702 println!(
1703 "{}",
1704 format!("✓ Successfully imported {imported} tasks into queue '{queue}'")
1705 .green()
1706 .bold()
1707 );
1708
1709 Ok(())
1710}
1711
1712pub async fn show_metrics(
1714 format: &str,
1715 output_file: Option<&str>,
1716 pattern: Option<&str>,
1717 watch_interval: Option<u64>,
1718) -> anyhow::Result<()> {
1719 if watch_interval.is_some() && output_file.is_some() {
1721 println!(
1722 "{}",
1723 "⚠ Watch mode cannot be used with file output".yellow()
1724 );
1725 return Ok(());
1726 }
1727
1728 if let Some(interval) = watch_interval {
1729 println!("{}", "=== Metrics Watch Mode ===".bold().green());
1731 println!(
1732 "{}",
1733 format!("Refreshing every {interval} seconds (Ctrl+C to stop)").dimmed()
1734 );
1735 println!();
1736
1737 loop {
1738 print!("\x1B[2J\x1B[1;1H"); println!(
1743 "{}",
1744 format!("Last updated: {}", Utc::now().format("%Y-%m-%d %H:%M:%S")).dimmed()
1745 );
1746 println!();
1747
1748 format_and_display_metrics(format, pattern)?;
1750
1751 tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
1753 }
1754 } else {
1755 format_and_output_metrics(format, output_file, pattern)?;
1757 Ok(())
1758 }
1759}
1760
1761fn format_and_output_metrics(
1763 format: &str,
1764 output_file: Option<&str>,
1765 pattern: Option<&str>,
1766) -> anyhow::Result<()> {
1767 let metrics_text = celers_metrics::gather_metrics();
1769
1770 let filtered_metrics = if let Some(pat) = pattern {
1772 metrics_text
1773 .lines()
1774 .filter(|line| {
1775 if line.starts_with("# HELP") || line.starts_with("# TYPE") {
1777 line.contains(pat)
1778 } else if line.starts_with('#') {
1779 false
1781 } else {
1782 line.contains(pat)
1784 }
1785 })
1786 .collect::<Vec<_>>()
1787 .join("\n")
1788 } else {
1789 metrics_text.clone()
1790 };
1791
1792 let output = match format.to_lowercase().as_str() {
1794 "json" => {
1795 let mut metrics_map = serde_json::Map::new();
1797
1798 for line in filtered_metrics.lines() {
1799 if line.starts_with('#') || line.trim().is_empty() {
1800 continue;
1801 }
1802
1803 if let Some(space_idx) = line.rfind(' ') {
1805 let (metric_part, value_str) = line.split_at(space_idx);
1806 let value_str = value_str.trim();
1807
1808 if let Ok(value) = value_str.parse::<f64>() {
1809 let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1810 &metric_part[..brace_idx]
1811 } else {
1812 metric_part
1813 };
1814
1815 metrics_map.insert(metric_name.to_string(), serde_json::json!(value));
1816 }
1817 }
1818 }
1819
1820 serde_json::to_string_pretty(&metrics_map)?
1821 }
1822
1823 "prometheus" | "prom" => {
1824 filtered_metrics
1826 }
1827
1828 _ => {
1829 let mut output = String::new();
1831 output.push_str(&format!("{}\n\n", "=== CeleRS Metrics ===".bold().green()));
1832
1833 let mut current_metric = String::new();
1834 let mut help_text = String::new();
1835
1836 for line in filtered_metrics.lines() {
1837 if line.starts_with("# HELP") {
1838 if let Some(help) = line.strip_prefix("# HELP ") {
1840 let parts: Vec<&str> = help.splitn(2, ' ').collect();
1841 if parts.len() == 2 {
1842 current_metric = parts[0].to_string();
1843 help_text = parts[1].to_string();
1844 }
1845 }
1846 } else if line.starts_with("# TYPE") {
1847 } else if line.starts_with('#') || line.trim().is_empty() {
1849 } else {
1851 if let Some(space_idx) = line.rfind(' ') {
1853 let (metric_part, value_str) = line.split_at(space_idx);
1854 let value_str = value_str.trim();
1855
1856 let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1857 &metric_part[..brace_idx]
1858 } else {
1859 metric_part
1860 };
1861
1862 if metric_name == current_metric && !help_text.is_empty() {
1864 output.push_str(&format!("{}\n", metric_name.cyan().bold()));
1865 output.push_str(&format!(" {}\n", help_text.dimmed()));
1866 output.push_str(&format!(
1867 " {} {}\n\n",
1868 "Value:".yellow(),
1869 value_str.green()
1870 ));
1871 help_text.clear();
1872 }
1873 }
1874 }
1875 }
1876
1877 if output.trim().is_empty() {
1878 output = format!("{}\n", "No metrics found".yellow());
1879 if pattern.is_some() {
1880 output.push_str(&format!(
1881 "{}\n",
1882 "Try adjusting your filter pattern".dimmed()
1883 ));
1884 }
1885 }
1886
1887 output
1888 }
1889 };
1890
1891 if let Some(file_path) = output_file {
1893 std::fs::write(file_path, &output)?;
1894 println!(
1895 "{}",
1896 format!("✓ Metrics exported to '{file_path}'")
1897 .green()
1898 .bold()
1899 );
1900 println!(" {} {}", "Format:".cyan(), format);
1901 if let Some(pat) = pattern {
1902 println!(" {} {}", "Filter:".cyan(), pat);
1903 }
1904 } else {
1905 println!("{output}");
1906 }
1907
1908 Ok(())
1909}
1910
1911fn format_and_display_metrics(format: &str, pattern: Option<&str>) -> anyhow::Result<()> {
1913 let metrics_text = celers_metrics::gather_metrics();
1915
1916 let filtered_metrics = if let Some(pat) = pattern {
1918 metrics_text
1919 .lines()
1920 .filter(|line| {
1921 if line.starts_with("# HELP") || line.starts_with("# TYPE") {
1923 line.contains(pat)
1924 } else if line.starts_with('#') {
1925 false
1926 } else {
1927 line.contains(pat)
1928 }
1929 })
1930 .collect::<Vec<_>>()
1931 .join("\n")
1932 } else {
1933 metrics_text.clone()
1934 };
1935
1936 let output = match format.to_lowercase().as_str() {
1938 "json" => {
1939 let mut metrics_map = serde_json::Map::new();
1941
1942 for line in filtered_metrics.lines() {
1943 if line.starts_with('#') || line.trim().is_empty() {
1944 continue;
1945 }
1946
1947 if let Some(space_idx) = line.rfind(' ') {
1948 let (metric_part, value_str) = line.split_at(space_idx);
1949 let value_str = value_str.trim();
1950
1951 if let Ok(value) = value_str.parse::<f64>() {
1952 let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1953 &metric_part[..brace_idx]
1954 } else {
1955 metric_part
1956 };
1957
1958 metrics_map.insert(metric_name.to_string(), serde_json::json!(value));
1959 }
1960 }
1961 }
1962
1963 serde_json::to_string_pretty(&metrics_map)?
1964 }
1965
1966 "prometheus" | "prom" => filtered_metrics,
1967
1968 _ => {
1969 let mut output = String::new();
1971 output.push_str(&format!("{}\n\n", "=== CeleRS Metrics ===".bold().green()));
1972
1973 let mut current_metric = String::new();
1974 let mut help_text = String::new();
1975
1976 for line in filtered_metrics.lines() {
1977 if line.starts_with("# HELP") {
1978 if let Some(help) = line.strip_prefix("# HELP ") {
1979 let parts: Vec<&str> = help.splitn(2, ' ').collect();
1980 if parts.len() == 2 {
1981 current_metric = parts[0].to_string();
1982 help_text = parts[1].to_string();
1983 }
1984 }
1985 } else if line.starts_with('#') || line.trim().is_empty() {
1986 } else if let Some(space_idx) = line.rfind(' ') {
1988 let (metric_part, value_str) = line.split_at(space_idx);
1989 let value_str = value_str.trim();
1990
1991 let metric_name = if let Some(brace_idx) = metric_part.find('{') {
1992 &metric_part[..brace_idx]
1993 } else {
1994 metric_part
1995 };
1996
1997 if metric_name == current_metric && !help_text.is_empty() {
1998 output.push_str(&format!("{}\n", metric_name.cyan().bold()));
1999 output.push_str(&format!(" {}\n", help_text.dimmed()));
2000 output.push_str(&format!(
2001 " {} {}\n\n",
2002 "Value:".yellow(),
2003 value_str.green()
2004 ));
2005 help_text.clear();
2006 }
2007 }
2008 }
2009
2010 if output.trim().is_empty() {
2011 output = format!("{}\n", "No metrics found".yellow());
2012 if pattern.is_some() {
2013 output.push_str(&format!(
2014 "{}\n",
2015 "Try adjusting your filter pattern".dimmed()
2016 ));
2017 }
2018 }
2019
2020 output
2021 }
2022 };
2023
2024 println!("{output}");
2025 Ok(())
2026}
2027
2028pub async fn validate_config(config_path: &str, test_connection: bool) -> anyhow::Result<()> {
2030 use std::path::Path;
2031
2032 println!("{}", "=== Configuration Validation ===".bold().green());
2033 println!();
2034
2035 if !Path::new(config_path).exists() {
2037 println!(
2038 "{}",
2039 format!("✗ Configuration file not found: {config_path}")
2040 .red()
2041 .bold()
2042 );
2043 return Ok(());
2044 }
2045
2046 println!(
2047 "{}",
2048 format!("📄 Loading configuration from '{config_path}'...").cyan()
2049 );
2050 println!();
2051
2052 let config = match crate::config::Config::from_file(config_path) {
2054 Ok(cfg) => {
2055 println!("{}", "✓ Configuration file is valid TOML".green());
2056 cfg
2057 }
2058 Err(e) => {
2059 println!("{}", "✗ Failed to parse configuration file:".red().bold());
2060 println!(" {}", format!("{e}").red());
2061 return Ok(());
2062 }
2063 };
2064
2065 println!();
2066 println!("{}", "Configuration Details:".bold());
2067 println!();
2068
2069 println!("{}", "Broker Configuration:".cyan().bold());
2071 println!(" {} {}", "Type:".yellow(), config.broker.broker_type);
2072 println!(" {} {}", "URL:".yellow(), config.broker.url);
2073 println!(" {} {}", "Queue:".yellow(), config.broker.queue);
2074 println!(" {} {}", "Mode:".yellow(), config.broker.mode);
2075
2076 println!();
2077
2078 println!("{}", "Worker Configuration:".cyan().bold());
2080 println!(
2081 " {} {}",
2082 "Concurrency:".yellow(),
2083 config.worker.concurrency
2084 );
2085 println!(
2086 " {} {} ms",
2087 "Poll Interval:".yellow(),
2088 config.worker.poll_interval_ms
2089 );
2090 println!(
2091 " {} {}",
2092 "Max Retries:".yellow(),
2093 config.worker.max_retries
2094 );
2095 println!(
2096 " {} {} seconds",
2097 "Default Timeout:".yellow(),
2098 config.worker.default_timeout_secs
2099 );
2100
2101 println!();
2102
2103 let warnings = config.validate()?;
2105 if warnings.is_empty() {
2106 println!(
2107 "{}",
2108 "✓ Configuration is valid with no warnings".green().bold()
2109 );
2110 } else {
2111 println!("{}", "Configuration Warnings:".yellow().bold());
2112 for warning in &warnings {
2113 println!(" {} {}", "⚠".yellow(), warning.yellow());
2114 }
2115 }
2116
2117 println!();
2118
2119 if !config.queues.is_empty() {
2121 println!("{}", "Configured Queues:".cyan().bold());
2122 for queue in &config.queues {
2123 println!(" • {}", queue.dimmed());
2124 }
2125 println!();
2126 }
2127
2128 if test_connection {
2130 println!("{}", "Testing broker connection...".cyan().bold());
2131 println!();
2132
2133 match config.broker.broker_type.to_lowercase().as_str() {
2134 "redis" => {
2135 match redis::Client::open(config.broker.url.as_str()) {
2136 Ok(client) => {
2137 match client.get_multiplexed_async_connection().await {
2138 Ok(mut conn) => {
2139 match redis::cmd("PING").query_async::<String>(&mut conn).await {
2141 Ok(_) => {
2142 println!(
2143 "{}",
2144 "✓ Successfully connected to Redis broker"
2145 .green()
2146 .bold()
2147 );
2148 }
2149 Err(e) => {
2150 println!(
2151 "{}",
2152 "✗ Failed to PING Redis broker:".red().bold()
2153 );
2154 println!(" {}", format!("{e}").red());
2155 }
2156 }
2157 }
2158 Err(e) => {
2159 println!("{}", "✗ Failed to connect to Redis broker:".red().bold());
2160 println!(" {}", format!("{e}").red());
2161 }
2162 }
2163 }
2164 Err(e) => {
2165 println!("{}", "✗ Invalid Redis URL:".red().bold());
2166 println!(" {}", format!("{e}").red());
2167 }
2168 }
2169 }
2170 "postgres" | "postgresql" => {
2171 match sqlx::postgres::PgPool::connect(&config.broker.url).await {
2173 Ok(pool) => {
2174 match sqlx::query("SELECT 1").fetch_one(&pool).await {
2176 Ok(_) => {
2177 println!(
2178 "{}",
2179 "✓ Successfully connected to PostgreSQL broker"
2180 .green()
2181 .bold()
2182 );
2183 }
2184 Err(e) => {
2185 println!("{}", "✗ Failed to query PostgreSQL broker:".red().bold());
2186 println!(" {}", format!("{e}").red());
2187 }
2188 }
2189 pool.close().await;
2190 }
2191 Err(e) => {
2192 println!(
2193 "{}",
2194 "✗ Failed to connect to PostgreSQL broker:".red().bold()
2195 );
2196 println!(" {}", format!("{e}").red());
2197 }
2198 }
2199 }
2200 "mysql" => {
2201 println!(
2202 "{}",
2203 "ℹ MySQL connection testing via CLI not yet available".cyan()
2204 );
2205 println!(" {}", "To test MySQL connection:".dimmed());
2206 println!(
2207 " {}",
2208 " 1. Use the 'db test-connection' command with your MySQL URL".dimmed()
2209 );
2210 println!(
2211 " {}",
2212 " 2. Or use mysql client: mysql -h <host> -u <user> -p".dimmed()
2213 );
2214 }
2215 "amqp" | "rabbitmq" => {
2216 println!(
2217 "{}",
2218 "ℹ AMQP connection testing via CLI not yet available".cyan()
2219 );
2220 println!(" {}", "To test RabbitMQ connection:".dimmed());
2221 println!(
2222 " {}",
2223 " 1. Check RabbitMQ Management UI at http://<host>:15672".dimmed()
2224 );
2225 println!(" {}", " 2. Or use rabbitmq-diagnostics ping".dimmed());
2226 println!(
2227 " {}",
2228 " 3. Verify credentials and virtual host configuration".dimmed()
2229 );
2230 }
2231 "sqs" => {
2232 println!(
2233 "{}",
2234 "ℹ SQS connection testing via CLI not yet available".cyan()
2235 );
2236 println!(" {}", "To test AWS SQS connection:".dimmed());
2237 println!(
2238 " {}",
2239 " 1. Ensure AWS credentials are configured (aws configure)".dimmed()
2240 );
2241 println!(" {}", " 2. Test with: aws sqs list-queues".dimmed());
2242 println!(
2243 " {}",
2244 " 3. Verify IAM permissions for SQS operations".dimmed()
2245 );
2246 }
2247 _ => {
2248 println!(
2249 "{}",
2250 format!(
2251 "⚠ Cannot test connection for broker type '{}'",
2252 config.broker.broker_type
2253 )
2254 .yellow()
2255 );
2256 }
2257 }
2258
2259 println!();
2260 }
2261
2262 println!("{}", "✓ Configuration validation complete".green().bold());
2263
2264 Ok(())
2265}
2266
2267pub async fn pause_queue(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2269 let client = redis::Client::open(broker_url)?;
2270 let mut conn = client.get_multiplexed_async_connection().await?;
2271
2272 let pause_key = format!("celers:{queue}:paused");
2273
2274 let timestamp = chrono::Utc::now().to_rfc3339();
2276 let _: () = redis::cmd("SET")
2277 .arg(&pause_key)
2278 .arg(×tamp)
2279 .query_async(&mut conn)
2280 .await?;
2281
2282 println!(
2283 "{}",
2284 format!("✓ Queue '{queue}' has been paused").green().bold()
2285 );
2286 println!();
2287 println!("{}", "Note:".yellow().bold());
2288 println!(" • Workers will stop processing tasks from this queue");
2289 println!(" • Existing tasks will remain in the queue");
2290 println!(" • Use 'celers queue resume' to resume processing");
2291 println!();
2292 println!(" Paused at: {}", timestamp.cyan());
2293
2294 Ok(())
2295}
2296
2297pub async fn resume_queue(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2299 let client = redis::Client::open(broker_url)?;
2300 let mut conn = client.get_multiplexed_async_connection().await?;
2301
2302 let pause_key = format!("celers:{queue}:paused");
2303
2304 let paused: Option<String> = redis::cmd("GET")
2306 .arg(&pause_key)
2307 .query_async(&mut conn)
2308 .await?;
2309
2310 if paused.is_none() {
2311 println!("{}", format!("✓ Queue '{queue}' is not paused").yellow());
2312 return Ok(());
2313 }
2314
2315 let _: () = redis::cmd("DEL")
2317 .arg(&pause_key)
2318 .query_async(&mut conn)
2319 .await?;
2320
2321 println!(
2322 "{}",
2323 format!("✓ Queue '{queue}' has been resumed").green().bold()
2324 );
2325 println!();
2326 println!("{}", "Note:".yellow().bold());
2327 println!(" • Workers will now process tasks from this queue");
2328 if let Some(paused_at) = paused {
2329 println!(" • Was paused at: {}", paused_at.dimmed());
2330 }
2331
2332 Ok(())
2333}
2334
2335pub async fn health_check(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2337 println!("{}", "=== System Health Check ===".bold().cyan());
2338 println!();
2339
2340 let mut health_issues = Vec::new();
2341 let mut health_warnings = Vec::new();
2342
2343 println!("{}", "1. Broker Connection".bold());
2345 let client = match redis::Client::open(broker_url) {
2346 Ok(c) => {
2347 println!(" {} Redis client created", "✓".green());
2348 c
2349 }
2350 Err(e) => {
2351 println!(" {} Failed to create Redis client: {}", "✗".red(), e);
2352 health_issues.push("Cannot create Redis client".to_string());
2353 println!();
2354 println!("{}", "Health Check Failed".red().bold());
2355 return Ok(());
2356 }
2357 };
2358
2359 let mut conn = match client.get_multiplexed_async_connection().await {
2360 Ok(c) => {
2361 println!(" {} Successfully connected to broker", "✓".green());
2362 c
2363 }
2364 Err(e) => {
2365 println!(" {} Failed to connect: {}", "✗".red(), e);
2366 health_issues.push("Cannot connect to broker".to_string());
2367 println!();
2368 println!("{}", "Health Check Failed".red().bold());
2369 return Ok(());
2370 }
2371 };
2372
2373 match redis::cmd("PING").query_async::<String>(&mut conn).await {
2375 Ok(_) => {
2376 println!(" {} PING successful", "✓".green());
2377 }
2378 Err(e) => {
2379 println!(" {} PING failed: {}", "⚠".yellow(), e);
2380 health_warnings.push("PING to broker failed".to_string());
2381 }
2382 }
2383
2384 println!();
2385
2386 println!("{}", "2. Queue Status".bold());
2388 let broker = RedisBroker::new(broker_url, queue)?;
2389
2390 let _queue_size = match broker.queue_size().await {
2391 Ok(size) => {
2392 println!(" {} Queue size: {}", "✓".green(), size);
2393 size
2394 }
2395 Err(e) => {
2396 println!(" {} Failed to get queue size: {}", "✗".red(), e);
2397 health_issues.push("Cannot get queue size".to_string());
2398 0
2399 }
2400 };
2401
2402 let dlq_size = match broker.dlq_size().await {
2403 Ok(size) => {
2404 if size > 0 {
2405 println!(" {} DLQ size: {} (has failed tasks)", "⚠".yellow(), size);
2406 health_warnings.push(format!("{size} tasks in Dead Letter Queue"));
2407 } else {
2408 println!(" {} DLQ size: {} (empty)", "✓".green(), size);
2409 }
2410 size
2411 }
2412 Err(e) => {
2413 println!(" {} Failed to get DLQ size: {}", "✗".red(), e);
2414 health_issues.push("Cannot get DLQ size".to_string());
2415 0
2416 }
2417 };
2418
2419 println!();
2420
2421 println!("{}", "3. Queue Accessibility".bold());
2423 let queue_key = format!("celers:{queue}");
2424 let _queue_type: String = match redis::cmd("TYPE")
2425 .arg(&queue_key)
2426 .query_async(&mut conn)
2427 .await
2428 {
2429 Ok(t) => {
2430 if t == "none" {
2431 println!(
2432 " {} Queue does not exist (will be created on first task)",
2433 "⚠".yellow()
2434 );
2435 health_warnings.push("Queue not yet created".to_string());
2436 } else if t == "list" {
2437 println!(" {} Queue type: FIFO (list)", "✓".green());
2438 } else if t == "zset" {
2439 println!(" {} Queue type: Priority (sorted set)", "✓".green());
2440 } else {
2441 println!(" {} Unknown queue type: {}", "⚠".yellow(), t);
2442 health_warnings.push(format!("Unknown queue type: {t}"));
2443 }
2444 t
2445 }
2446 Err(e) => {
2447 println!(" {} Failed to check queue type: {}", "✗".red(), e);
2448 health_issues.push("Cannot check queue type".to_string());
2449 "none".to_string()
2450 }
2451 };
2452
2453 let pause_key = format!("celers:{queue}:paused");
2455 match redis::cmd("GET")
2456 .arg(&pause_key)
2457 .query_async::<Option<String>>(&mut conn)
2458 .await
2459 {
2460 Ok(Some(paused_at)) => {
2461 println!(" {} Queue is PAUSED (since: {})", "⚠".yellow(), paused_at);
2462 health_warnings.push(format!("Queue is paused since {paused_at}"));
2463 }
2464 Ok(None) => {
2465 println!(" {} Queue is not paused", "✓".green());
2466 }
2467 Err(e) => {
2468 println!(" {} Failed to check pause status: {}", "⚠".yellow(), e);
2469 }
2470 }
2471
2472 println!();
2473
2474 println!("{}", "4. Broker Memory".bold());
2476 match redis::cmd("INFO")
2477 .arg("memory")
2478 .query_async::<String>(&mut conn)
2479 .await
2480 {
2481 Ok(info) => {
2482 for line in info.lines() {
2484 if line.starts_with("used_memory_human:") {
2485 let memory = line.split(':').nth(1).unwrap_or("N/A");
2486 println!(" {} Used memory: {}", "✓".green(), memory);
2487 break;
2488 }
2489 }
2490 }
2491 Err(_) => {
2492 println!(" {} Memory info not available", "⚠".yellow());
2493 }
2494 }
2495
2496 println!();
2497
2498 println!("{}", "Health Summary".bold().cyan());
2500 println!();
2501
2502 if health_issues.is_empty() && health_warnings.is_empty() {
2503 println!(
2504 "{}",
2505 " ✓ All checks passed! System is healthy.".green().bold()
2506 );
2507 } else {
2508 if !health_issues.is_empty() {
2509 println!("{}", " Critical Issues:".red().bold());
2510 for issue in &health_issues {
2511 println!(" {} {}", "✗".red(), issue);
2512 }
2513 println!();
2514 }
2515
2516 if !health_warnings.is_empty() {
2517 println!("{}", " Warnings:".yellow().bold());
2518 for warning in &health_warnings {
2519 println!(" {} {}", "⚠".yellow(), warning);
2520 }
2521 println!();
2522 }
2523
2524 if health_issues.is_empty() {
2525 println!(
2526 "{}",
2527 " Overall: System is operational with warnings"
2528 .yellow()
2529 .bold()
2530 );
2531 } else {
2532 println!("{}", " Overall: System has critical issues".red().bold());
2533 }
2534 }
2535
2536 println!();
2537
2538 if dlq_size > 0 {
2540 println!("{}", "Recommendations:".cyan().bold());
2541 println!(" • Inspect DLQ: celers dlq inspect");
2542 println!(" • Clear DLQ: celers dlq clear --confirm");
2543 println!();
2544 }
2545
2546 Ok(())
2547}
2548
2549pub async fn list_workers(broker_url: &str) -> anyhow::Result<()> {
2551 let client = redis::Client::open(broker_url)?;
2552 let mut conn = client.get_multiplexed_async_connection().await?;
2553
2554 println!("{}", "=== Active Workers ===".bold().cyan());
2555 println!();
2556
2557 let worker_pattern = "celers:worker:*:heartbeat";
2559 let mut cursor = 0;
2560 let mut worker_keys: Vec<String> = Vec::new();
2561
2562 loop {
2563 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
2564 .arg(cursor)
2565 .arg("MATCH")
2566 .arg(worker_pattern)
2567 .arg("COUNT")
2568 .arg(100)
2569 .query_async(&mut conn)
2570 .await?;
2571
2572 worker_keys.extend(keys);
2573 cursor = new_cursor;
2574
2575 if cursor == 0 {
2576 break;
2577 }
2578 }
2579
2580 if worker_keys.is_empty() {
2581 println!("{}", "No active workers found".yellow());
2582 println!();
2583 println!("Workers register themselves when they start processing tasks.");
2584 return Ok(());
2585 }
2586
2587 #[derive(Tabled)]
2588 struct WorkerInfo {
2589 #[tabled(rename = "Worker ID")]
2590 id: String,
2591 #[tabled(rename = "Status")]
2592 status: String,
2593 #[tabled(rename = "Last Heartbeat")]
2594 last_heartbeat: String,
2595 }
2596
2597 let mut workers = Vec::new();
2598 let worker_count = worker_keys.len();
2599
2600 for key in &worker_keys {
2601 let parts: Vec<&str> = key.split(':').collect();
2603 if parts.len() >= 3 {
2604 let worker_id = parts[2].to_string();
2605
2606 let heartbeat: Option<String> =
2608 redis::cmd("GET").arg(key).query_async(&mut conn).await?;
2609
2610 let status = if heartbeat.is_some() {
2611 "Active".to_string()
2612 } else {
2613 "Unknown".to_string()
2614 };
2615
2616 let last_heartbeat = heartbeat.unwrap_or_else(|| "N/A".to_string());
2617
2618 workers.push(WorkerInfo {
2619 id: worker_id,
2620 status,
2621 last_heartbeat,
2622 });
2623 }
2624 }
2625
2626 let table = Table::new(workers).with(Style::rounded()).to_string();
2627 println!("{table}");
2628 println!();
2629 println!(
2630 "{}",
2631 format!("Total active workers: {worker_count}")
2632 .cyan()
2633 .bold()
2634 );
2635
2636 Ok(())
2637}
2638
2639pub async fn worker_stats(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2641 let client = redis::Client::open(broker_url)?;
2642 let mut conn = client.get_multiplexed_async_connection().await?;
2643
2644 println!(
2645 "{}",
2646 format!("=== Worker Statistics: {worker_id} ===")
2647 .bold()
2648 .cyan()
2649 );
2650 println!();
2651
2652 let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
2654 let heartbeat: Option<String> = redis::cmd("GET")
2655 .arg(&heartbeat_key)
2656 .query_async(&mut conn)
2657 .await?;
2658
2659 if heartbeat.is_none() {
2660 println!("{}", format!("✗ Worker '{worker_id}' not found").red());
2661 println!();
2662 println!("Possible reasons:");
2663 println!(" • Worker is not running");
2664 println!(" • Worker ID is incorrect");
2665 println!(" • Worker hasn't sent a heartbeat yet");
2666 return Ok(());
2667 }
2668
2669 #[derive(Tabled)]
2670 struct StatRow {
2671 #[tabled(rename = "Metric")]
2672 metric: String,
2673 #[tabled(rename = "Value")]
2674 value: String,
2675 }
2676
2677 let stats_key = format!("celers:worker:{worker_id}:stats");
2679 let stats: Option<String> = redis::cmd("GET")
2680 .arg(&stats_key)
2681 .query_async(&mut conn)
2682 .await?;
2683
2684 let mut stat_rows = vec![
2685 StatRow {
2686 metric: "Worker ID".to_string(),
2687 value: worker_id.to_string(),
2688 },
2689 StatRow {
2690 metric: "Status".to_string(),
2691 value: "Active".to_string(),
2692 },
2693 StatRow {
2694 metric: "Last Heartbeat".to_string(),
2695 value: heartbeat.unwrap_or_else(|| "N/A".to_string()),
2696 },
2697 ];
2698
2699 if let Some(stats_json) = stats {
2700 if let Ok(stats_data) = serde_json::from_str::<serde_json::Value>(&stats_json) {
2701 if let Some(tasks_processed) = stats_data.get("tasks_processed") {
2702 stat_rows.push(StatRow {
2703 metric: "Tasks Processed".to_string(),
2704 value: tasks_processed.to_string(),
2705 });
2706 }
2707 if let Some(tasks_failed) = stats_data.get("tasks_failed") {
2708 stat_rows.push(StatRow {
2709 metric: "Tasks Failed".to_string(),
2710 value: tasks_failed.to_string(),
2711 });
2712 }
2713 if let Some(uptime) = stats_data.get("uptime_seconds") {
2714 stat_rows.push(StatRow {
2715 metric: "Uptime".to_string(),
2716 value: format!("{uptime} seconds"),
2717 });
2718 }
2719 }
2720 }
2721
2722 let table = Table::new(stat_rows).with(Style::rounded()).to_string();
2723 println!("{table}");
2724
2725 Ok(())
2726}
2727
2728pub async fn stop_worker(broker_url: &str, worker_id: &str, graceful: bool) -> anyhow::Result<()> {
2730 let client = redis::Client::open(broker_url)?;
2731 let mut conn = client.get_multiplexed_async_connection().await?;
2732
2733 println!(
2734 "{}",
2735 format!("=== Stop Worker: {worker_id} ===").bold().yellow()
2736 );
2737 println!();
2738
2739 let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
2741 let heartbeat: Option<String> = redis::cmd("GET")
2742 .arg(&heartbeat_key)
2743 .query_async(&mut conn)
2744 .await?;
2745
2746 if heartbeat.is_none() {
2747 println!("{}", format!("✗ Worker '{worker_id}' not found").red());
2748 return Ok(());
2749 }
2750
2751 let channel = if graceful {
2753 format!("celers:worker:{worker_id}:shutdown_graceful")
2754 } else {
2755 format!("celers:worker:{worker_id}:shutdown")
2756 };
2757
2758 let subscribers: usize = redis::cmd("PUBLISH")
2759 .arg(&channel)
2760 .arg("STOP")
2761 .query_async(&mut conn)
2762 .await?;
2763
2764 if subscribers > 0 {
2765 println!(
2766 "{}",
2767 format!(
2768 "✓ Stop signal sent to worker '{}' (mode: {})",
2769 worker_id,
2770 if graceful { "graceful" } else { "immediate" }
2771 )
2772 .green()
2773 .bold()
2774 );
2775 println!();
2776 if graceful {
2777 println!("The worker will:");
2778 println!(" • Finish processing current tasks");
2779 println!(" • Stop accepting new tasks");
2780 println!(" • Shut down gracefully");
2781 } else {
2782 println!("The worker will:");
2783 println!(" • Stop immediately");
2784 println!(" • Cancel running tasks");
2785 }
2786 } else {
2787 println!(
2788 "{}",
2789 format!("⚠ No subscribers for worker '{worker_id}'")
2790 .yellow()
2791 .bold()
2792 );
2793 println!();
2794 println!("The worker may not be listening for stop commands.");
2795 }
2796
2797 Ok(())
2798}
2799
2800pub async fn pause_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2802 let client = redis::Client::open(broker_url)?;
2803 let mut conn = client.get_multiplexed_async_connection().await?;
2804
2805 let pause_key = format!("celers:worker:{worker_id}:paused");
2806 let timestamp = chrono::Utc::now().to_rfc3339();
2807
2808 let _: () = redis::cmd("SET")
2809 .arg(&pause_key)
2810 .arg(×tamp)
2811 .query_async(&mut conn)
2812 .await?;
2813
2814 println!(
2815 "{}",
2816 format!("✓ Worker '{worker_id}' has been paused")
2817 .green()
2818 .bold()
2819 );
2820 println!();
2821 println!("{}", "Note:".yellow().bold());
2822 println!(" • Worker will stop accepting new tasks");
2823 println!(" • Current tasks will continue to completion");
2824 println!(" • Use 'celers worker-mgmt resume' to resume");
2825 println!();
2826 println!(" Paused at: {}", timestamp.cyan());
2827
2828 Ok(())
2829}
2830
2831pub async fn resume_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
2833 let client = redis::Client::open(broker_url)?;
2834 let mut conn = client.get_multiplexed_async_connection().await?;
2835
2836 let pause_key = format!("celers:worker:{worker_id}:paused");
2837
2838 let paused: Option<String> = redis::cmd("GET")
2840 .arg(&pause_key)
2841 .query_async(&mut conn)
2842 .await?;
2843
2844 if paused.is_none() {
2845 println!(
2846 "{}",
2847 format!("✓ Worker '{worker_id}' is not paused").yellow()
2848 );
2849 return Ok(());
2850 }
2851
2852 let _: () = redis::cmd("DEL")
2854 .arg(&pause_key)
2855 .query_async(&mut conn)
2856 .await?;
2857
2858 println!(
2859 "{}",
2860 format!("✓ Worker '{worker_id}' has been resumed")
2861 .green()
2862 .bold()
2863 );
2864 println!();
2865 println!("{}", "Note:".yellow().bold());
2866 println!(" • Worker will now accept new tasks");
2867 if let Some(paused_at) = paused {
2868 println!(" • Was paused at: {}", paused_at.dimmed());
2869 }
2870
2871 Ok(())
2872}
2873
2874pub async fn doctor(broker_url: &str, queue: &str) -> anyhow::Result<()> {
2876 println!("{}", "=== CeleRS Doctor ===".bold().cyan());
2877 println!("{}", "Running automatic diagnostics...".dimmed());
2878 println!();
2879
2880 let client = redis::Client::open(broker_url)?;
2881 let mut conn = client.get_multiplexed_async_connection().await?;
2882
2883 let mut issues = Vec::new();
2884 let mut warnings = Vec::new();
2885 let mut recommendations = Vec::new();
2886
2887 println!("{}", "1. Checking broker connectivity...".bold());
2889 match redis::cmd("PING").query_async::<String>(&mut conn).await {
2890 Ok(_) => {
2891 println!(" {} Broker is reachable", "✓".green());
2892 }
2893 Err(e) => {
2894 println!(" {} Broker connection failed: {}", "✗".red(), e);
2895 issues.push("Cannot connect to broker".to_string());
2896 recommendations.push("Check broker URL and ensure Redis is running".to_string());
2897 }
2898 }
2899 println!();
2900
2901 println!("{}", "2. Analyzing queue health...".bold());
2903 let broker = RedisBroker::new(broker_url, queue)?;
2904
2905 let queue_size = broker.queue_size().await.unwrap_or(0);
2906 let dlq_size = broker.dlq_size().await.unwrap_or(0);
2907
2908 println!(" {} Pending tasks: {}", "•".cyan(), queue_size);
2909 println!(" {} DLQ tasks: {}", "•".cyan(), dlq_size);
2910
2911 if dlq_size > 10 {
2912 warnings.push(format!("High number of failed tasks in DLQ: {dlq_size}"));
2913 recommendations.push("Inspect DLQ with: celers dlq inspect".to_string());
2914 }
2915
2916 if queue_size > 1000 {
2917 warnings.push(format!("Large queue backlog: {queue_size} tasks"));
2918 recommendations.push("Consider scaling up workers".to_string());
2919 }
2920 println!();
2921
2922 println!("{}", "3. Checking worker availability...".bold());
2924 let worker_pattern = "celers:worker:*:heartbeat";
2925 let mut cursor = 0;
2926 let mut worker_count = 0;
2927
2928 loop {
2929 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
2930 .arg(cursor)
2931 .arg("MATCH")
2932 .arg(worker_pattern)
2933 .arg("COUNT")
2934 .arg(100)
2935 .query_async(&mut conn)
2936 .await?;
2937
2938 worker_count += keys.len();
2939 cursor = new_cursor;
2940
2941 if cursor == 0 {
2942 break;
2943 }
2944 }
2945
2946 println!(" {} Active workers: {}", "•".cyan(), worker_count);
2947
2948 if worker_count == 0 && queue_size > 0 {
2949 issues.push("No workers available to process pending tasks".to_string());
2950 recommendations.push("Start workers with: celers worker".to_string());
2951 } else if worker_count > 0 {
2952 println!(" {} Workers are available", "✓".green());
2953 }
2954 println!();
2955
2956 println!("{}", "4. Checking queue status...".bold());
2958 let pause_key = format!("celers:{queue}:paused");
2959 let paused: Option<String> = redis::cmd("GET")
2960 .arg(&pause_key)
2961 .query_async(&mut conn)
2962 .await?;
2963
2964 if let Some(paused_at) = paused {
2965 warnings.push(format!("Queue '{queue}' is paused since {paused_at}"));
2966 recommendations.push("Resume queue with: celers queue resume".to_string());
2967 println!(" {} Queue is PAUSED", "⚠".yellow());
2968 } else {
2969 println!(" {} Queue is active", "✓".green());
2970 }
2971 println!();
2972
2973 println!("{}", "5. Checking broker memory...".bold());
2975 match redis::cmd("INFO")
2976 .arg("memory")
2977 .query_async::<String>(&mut conn)
2978 .await
2979 {
2980 Ok(info) => {
2981 for line in info.lines() {
2982 if line.starts_with("used_memory_human:") {
2983 let memory = line.split(':').nth(1).unwrap_or("N/A");
2984 println!(" {} Used memory: {}", "•".cyan(), memory);
2985 }
2986 if line.starts_with("maxmemory_human:") {
2987 let max_memory = line.split(':').nth(1).unwrap_or("N/A");
2988 if max_memory != "0B" {
2989 println!(" {} Max memory: {}", "•".cyan(), max_memory);
2990 }
2991 }
2992 }
2993 println!(" {} Memory usage is acceptable", "✓".green());
2994 }
2995 Err(_) => {
2996 println!(" {} Memory info unavailable", "⚠".yellow());
2997 }
2998 }
2999 println!();
3000
3001 println!("{}", "=== Diagnosis Summary ===".bold().cyan());
3003 println!();
3004
3005 if issues.is_empty() && warnings.is_empty() {
3006 println!(
3007 "{}",
3008 " ✓ No issues detected! System is healthy.".green().bold()
3009 );
3010 } else {
3011 if !issues.is_empty() {
3012 println!("{}", " Critical Issues:".red().bold());
3013 for issue in &issues {
3014 println!(" {} {}", "✗".red(), issue);
3015 }
3016 println!();
3017 }
3018
3019 if !warnings.is_empty() {
3020 println!("{}", " Warnings:".yellow().bold());
3021 for warning in &warnings {
3022 println!(" {} {}", "⚠".yellow(), warning);
3023 }
3024 println!();
3025 }
3026
3027 if !recommendations.is_empty() {
3028 println!("{}", " Recommendations:".cyan().bold());
3029 for (i, rec) in recommendations.iter().enumerate() {
3030 println!(" {}. {}", i + 1, rec);
3031 }
3032 println!();
3033 }
3034
3035 if issues.is_empty() {
3036 println!(
3037 "{}",
3038 " Overall: System is operational with warnings"
3039 .yellow()
3040 .bold()
3041 );
3042 } else {
3043 println!(
3044 "{}",
3045 " Overall: System has critical issues that need attention"
3046 .red()
3047 .bold()
3048 );
3049 }
3050 }
3051
3052 Ok(())
3053}
3054
3055pub async fn show_task_logs(
3057 broker_url: &str,
3058 task_id_str: &str,
3059 limit: usize,
3060) -> anyhow::Result<()> {
3061 let task_id = task_id_str
3062 .parse::<uuid::Uuid>()
3063 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
3064
3065 println!("{}", "=== Task Execution Logs ===".bold().cyan());
3066 println!("Task ID: {}", task_id.to_string().yellow());
3067 println!();
3068
3069 let client = redis::Client::open(broker_url)?;
3071 let mut conn = client.get_multiplexed_async_connection().await?;
3072
3073 let logs_key = format!("celers:task:{task_id}:logs");
3075
3076 let exists: bool = redis::cmd("EXISTS")
3078 .arg(&logs_key)
3079 .query_async(&mut conn)
3080 .await?;
3081
3082 if !exists {
3083 println!("{}", "✗ No logs found for this task".red());
3084 println!();
3085 println!("Possible reasons:");
3086 println!(" • Task hasn't been executed yet");
3087 println!(" • Logs have expired (TTL)");
3088 println!(" • Task was executed before logging was enabled");
3089 println!(" • Wrong task ID");
3090 return Ok(());
3091 }
3092
3093 let log_count: isize = redis::cmd("LLEN")
3095 .arg(&logs_key)
3096 .query_async(&mut conn)
3097 .await?;
3098
3099 println!(
3100 "{}",
3101 format!("Total log entries: {log_count}").cyan().bold()
3102 );
3103 println!();
3104
3105 let logs: Vec<String> = redis::cmd("LRANGE")
3107 .arg(&logs_key)
3108 .arg(-(limit as isize))
3109 .arg(-1)
3110 .query_async(&mut conn)
3111 .await?;
3112
3113 if logs.is_empty() {
3114 println!("{}", "No log entries available".yellow());
3115 return Ok(());
3116 }
3117
3118 for (idx, log_entry) in logs.iter().enumerate() {
3120 if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log_entry) {
3122 let timestamp = log_json
3123 .get("timestamp")
3124 .and_then(|v| v.as_str())
3125 .unwrap_or("N/A");
3126 let level = log_json
3127 .get("level")
3128 .and_then(|v| v.as_str())
3129 .unwrap_or("INFO");
3130 let message = log_json
3131 .get("message")
3132 .and_then(|v| v.as_str())
3133 .unwrap_or(log_entry);
3134
3135 let level_colored = match level {
3136 "ERROR" | "error" => level.red().bold(),
3137 "WARN" | "warn" => level.yellow().bold(),
3138 "DEBUG" | "debug" => level.dimmed(),
3139 _ => level.cyan().bold(),
3140 };
3141
3142 println!(
3143 "{} {} {} {}",
3144 format!("[{}]", idx + 1).dimmed(),
3145 timestamp.dimmed(),
3146 level_colored,
3147 message
3148 );
3149 } else {
3150 println!("{} {}", format!("[{}]", idx + 1).dimmed(), log_entry);
3152 }
3153 }
3154
3155 println!();
3156 if log_count as usize > limit {
3157 println!(
3158 "{}",
3159 format!("Showing last {} of {} log entries", logs.len(), log_count).yellow()
3160 );
3161 println!(
3162 "{}",
3163 format!("Use --limit to show more entries (max: {log_count})").dimmed()
3164 );
3165 } else {
3166 println!(
3167 "{}",
3168 format!("Showing all {} log entries", logs.len())
3169 .green()
3170 .bold()
3171 );
3172 }
3173
3174 Ok(())
3175}
3176
3177pub async fn list_schedules(broker_url: &str) -> anyhow::Result<()> {
3179 let client = redis::Client::open(broker_url)?;
3180 let mut conn = client.get_multiplexed_async_connection().await?;
3181
3182 println!("{}", "=== Scheduled Tasks ===".bold().cyan());
3183 println!();
3184
3185 let schedule_pattern = "celers:schedule:*";
3187 let mut cursor = 0;
3188 let mut schedule_keys: Vec<String> = Vec::new();
3189
3190 loop {
3191 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
3192 .arg(cursor)
3193 .arg("MATCH")
3194 .arg(schedule_pattern)
3195 .arg("COUNT")
3196 .arg(100)
3197 .query_async(&mut conn)
3198 .await?;
3199
3200 schedule_keys.extend(keys);
3201 cursor = new_cursor;
3202
3203 if cursor == 0 {
3204 break;
3205 }
3206 }
3207
3208 if schedule_keys.is_empty() {
3209 println!("{}", "No scheduled tasks found".yellow());
3210 println!();
3211 println!("Add a schedule with: celers schedule add <name> --task <task> --cron <expr>");
3212 return Ok(());
3213 }
3214
3215 #[derive(Tabled)]
3216 struct ScheduleInfo {
3217 #[tabled(rename = "Name")]
3218 name: String,
3219 #[tabled(rename = "Task")]
3220 task: String,
3221 #[tabled(rename = "Cron")]
3222 cron: String,
3223 #[tabled(rename = "Status")]
3224 status: String,
3225 #[tabled(rename = "Last Run")]
3226 last_run: String,
3227 }
3228
3229 let mut schedules = Vec::new();
3230
3231 for key in &schedule_keys {
3232 let name = key.strip_prefix("celers:schedule:").unwrap_or(key);
3234
3235 let schedule_data: Option<String> =
3237 redis::cmd("GET").arg(key).query_async(&mut conn).await?;
3238
3239 if let Some(data) = schedule_data {
3240 if let Ok(schedule) = serde_json::from_str::<serde_json::Value>(&data) {
3241 let task = schedule
3242 .get("task")
3243 .and_then(|v| v.as_str())
3244 .unwrap_or("N/A")
3245 .to_string();
3246 let cron = schedule
3247 .get("cron")
3248 .and_then(|v| v.as_str())
3249 .unwrap_or("N/A")
3250 .to_string();
3251
3252 let pause_key = format!("celers:schedule:{name}:paused");
3254 let paused: Option<String> = redis::cmd("GET")
3255 .arg(&pause_key)
3256 .query_async(&mut conn)
3257 .await?;
3258
3259 let status = if paused.is_some() {
3260 "Paused".to_string()
3261 } else {
3262 "Active".to_string()
3263 };
3264
3265 let last_run = schedule
3266 .get("last_run")
3267 .and_then(|v| v.as_str())
3268 .unwrap_or("Never")
3269 .to_string();
3270
3271 schedules.push(ScheduleInfo {
3272 name: name.to_string(),
3273 task,
3274 cron,
3275 status,
3276 last_run,
3277 });
3278 }
3279 }
3280 }
3281
3282 let table = Table::new(schedules).with(Style::rounded()).to_string();
3283 println!("{table}");
3284 println!();
3285 println!(
3286 "{}",
3287 format!("Total scheduled tasks: {}", schedule_keys.len())
3288 .cyan()
3289 .bold()
3290 );
3291
3292 Ok(())
3293}
3294
3295#[allow(clippy::too_many_arguments)]
3297pub async fn add_schedule(
3298 broker_url: &str,
3299 name: &str,
3300 task: &str,
3301 cron: &str,
3302 queue: &str,
3303 args: Option<&str>,
3304) -> anyhow::Result<()> {
3305 let client = redis::Client::open(broker_url)?;
3306 let mut conn = client.get_multiplexed_async_connection().await?;
3307
3308 println!("{}", "=== Add Scheduled Task ===".bold().cyan());
3309 println!();
3310
3311 let cron_parts: Vec<&str> = cron.split_whitespace().collect();
3313 if cron_parts.len() != 5 {
3314 println!(
3315 "{}",
3316 "✗ Invalid cron expression. Expected format: 'min hour day month weekday'"
3317 .red()
3318 .bold()
3319 );
3320 println!();
3321 println!("Examples:");
3322 println!(" 0 0 * * * - Daily at midnight");
3323 println!(" 0 */2 * * * - Every 2 hours");
3324 println!(" */15 * * * * - Every 15 minutes");
3325 println!(" 0 9 * * 1-5 - Weekdays at 9 AM");
3326 return Ok(());
3327 }
3328
3329 let schedule_key = format!("celers:schedule:{name}");
3331 let exists: bool = redis::cmd("EXISTS")
3332 .arg(&schedule_key)
3333 .query_async(&mut conn)
3334 .await?;
3335
3336 if exists {
3337 println!(
3338 "{}",
3339 format!("✗ Schedule '{name}' already exists").red().bold()
3340 );
3341 println!();
3342 println!("Use a different name or remove the existing schedule first:");
3343 println!(" celers schedule remove {name} --confirm");
3344 return Ok(());
3345 }
3346
3347 let args_value = if let Some(args_str) = args {
3349 match serde_json::from_str::<serde_json::Value>(args_str) {
3350 Ok(v) => v,
3351 Err(e) => {
3352 println!("{}", "✗ Invalid JSON arguments".red().bold());
3353 println!(" Error: {e}");
3354 return Ok(());
3355 }
3356 }
3357 } else {
3358 serde_json::json!({})
3359 };
3360
3361 let schedule_data = serde_json::json!({
3363 "name": name,
3364 "task": task,
3365 "cron": cron,
3366 "queue": queue,
3367 "args": args_value,
3368 "created_at": chrono::Utc::now().to_rfc3339(),
3369 "last_run": null,
3370 });
3371
3372 let schedule_json = serde_json::to_string(&schedule_data)?;
3374 let _: () = redis::cmd("SET")
3375 .arg(&schedule_key)
3376 .arg(&schedule_json)
3377 .query_async(&mut conn)
3378 .await?;
3379
3380 println!("{}", "✓ Schedule added successfully".green().bold());
3381 println!();
3382 println!(" {} {}", "Name:".cyan(), name);
3383 println!(" {} {}", "Task:".cyan(), task);
3384 println!(" {} {}", "Cron:".cyan(), cron);
3385 println!(" {} {}", "Queue:".cyan(), queue);
3386 if let Some(args) = args {
3387 println!(" {} {}", "Args:".cyan(), args);
3388 }
3389 println!();
3390 println!("{}", "Note:".yellow().bold());
3391 println!(" • Ensure a beat scheduler is running to execute this schedule");
3392 println!(" • The schedule is active and will run at the specified times");
3393
3394 Ok(())
3395}
3396
3397pub async fn remove_schedule(broker_url: &str, name: &str, confirm: bool) -> anyhow::Result<()> {
3399 let client = redis::Client::open(broker_url)?;
3400 let mut conn = client.get_multiplexed_async_connection().await?;
3401
3402 let schedule_key = format!("celers:schedule:{name}");
3403
3404 let exists: bool = redis::cmd("EXISTS")
3406 .arg(&schedule_key)
3407 .query_async(&mut conn)
3408 .await?;
3409
3410 if !exists {
3411 println!("{}", format!("✗ Schedule '{name}' not found").red());
3412 return Ok(());
3413 }
3414
3415 if !confirm {
3416 println!(
3417 "{}",
3418 format!("⚠ Warning: This will delete schedule '{name}'")
3419 .yellow()
3420 .bold()
3421 );
3422 println!(" Add --confirm to proceed");
3423 return Ok(());
3424 }
3425
3426 let pause_key = format!("celers:schedule:{name}:paused");
3428 let _: () = redis::cmd("DEL")
3429 .arg(&schedule_key)
3430 .arg(&pause_key)
3431 .query_async(&mut conn)
3432 .await?;
3433
3434 println!(
3435 "{}",
3436 format!("✓ Schedule '{name}' removed successfully")
3437 .green()
3438 .bold()
3439 );
3440
3441 Ok(())
3442}
3443
3444pub async fn pause_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3446 let client = redis::Client::open(broker_url)?;
3447 let mut conn = client.get_multiplexed_async_connection().await?;
3448
3449 let schedule_key = format!("celers:schedule:{name}");
3450
3451 let exists: bool = redis::cmd("EXISTS")
3453 .arg(&schedule_key)
3454 .query_async(&mut conn)
3455 .await?;
3456
3457 if !exists {
3458 println!("{}", format!("✗ Schedule '{name}' not found").red());
3459 return Ok(());
3460 }
3461
3462 let pause_key = format!("celers:schedule:{name}:paused");
3464 let timestamp = chrono::Utc::now().to_rfc3339();
3465 let _: () = redis::cmd("SET")
3466 .arg(&pause_key)
3467 .arg(×tamp)
3468 .query_async(&mut conn)
3469 .await?;
3470
3471 println!(
3472 "{}",
3473 format!("✓ Schedule '{name}' has been paused")
3474 .green()
3475 .bold()
3476 );
3477 println!();
3478 println!("{}", "Note:".yellow().bold());
3479 println!(" • Schedule will not execute until resumed");
3480 println!(" • Use 'celers schedule resume {name}' to resume");
3481 println!();
3482 println!(" Paused at: {}", timestamp.cyan());
3483
3484 Ok(())
3485}
3486
3487pub async fn resume_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3489 let client = redis::Client::open(broker_url)?;
3490 let mut conn = client.get_multiplexed_async_connection().await?;
3491
3492 let schedule_key = format!("celers:schedule:{name}");
3493
3494 let exists: bool = redis::cmd("EXISTS")
3496 .arg(&schedule_key)
3497 .query_async(&mut conn)
3498 .await?;
3499
3500 if !exists {
3501 println!("{}", format!("✗ Schedule '{name}' not found").red());
3502 return Ok(());
3503 }
3504
3505 let pause_key = format!("celers:schedule:{name}:paused");
3507 let paused: Option<String> = redis::cmd("GET")
3508 .arg(&pause_key)
3509 .query_async(&mut conn)
3510 .await?;
3511
3512 if paused.is_none() {
3513 println!("{}", format!("✓ Schedule '{name}' is not paused").yellow());
3514 return Ok(());
3515 }
3516
3517 let _: () = redis::cmd("DEL")
3519 .arg(&pause_key)
3520 .query_async(&mut conn)
3521 .await?;
3522
3523 println!(
3524 "{}",
3525 format!("✓ Schedule '{name}' has been resumed")
3526 .green()
3527 .bold()
3528 );
3529 println!();
3530 println!("{}", "Note:".yellow().bold());
3531 println!(" • Schedule will now execute at the specified times");
3532 if let Some(paused_at) = paused {
3533 println!(" • Was paused at: {}", paused_at.dimmed());
3534 }
3535
3536 Ok(())
3537}
3538
3539pub async fn trigger_schedule(broker_url: &str, name: &str) -> anyhow::Result<()> {
3541 let client = redis::Client::open(broker_url)?;
3542 let mut conn = client.get_multiplexed_async_connection().await?;
3543
3544 let schedule_key = format!("celers:schedule:{name}");
3545
3546 println!(
3547 "{}",
3548 format!("=== Trigger Schedule: {name} ===").bold().cyan()
3549 );
3550 println!();
3551
3552 let schedule_data: Option<String> = redis::cmd("GET")
3554 .arg(&schedule_key)
3555 .query_async(&mut conn)
3556 .await?;
3557
3558 if let Some(data) = schedule_data {
3559 if let Ok(schedule) = serde_json::from_str::<serde_json::Value>(&data) {
3560 let task = schedule
3561 .get("task")
3562 .and_then(|v| v.as_str())
3563 .unwrap_or("N/A");
3564 let queue = schedule
3565 .get("queue")
3566 .and_then(|v| v.as_str())
3567 .unwrap_or("celers");
3568
3569 let trigger_channel = format!("celers:schedule:{name}:trigger");
3571 let subscribers: usize = redis::cmd("PUBLISH")
3572 .arg(&trigger_channel)
3573 .arg("TRIGGER")
3574 .query_async(&mut conn)
3575 .await?;
3576
3577 if subscribers > 0 {
3578 println!(
3579 "{}",
3580 format!("✓ Trigger signal sent for schedule '{name}'")
3581 .green()
3582 .bold()
3583 );
3584 println!();
3585 println!(" {} {}", "Task:".cyan(), task);
3586 println!(" {} {}", "Queue:".cyan(), queue);
3587 println!();
3588 println!("The task will be executed immediately by the beat scheduler.");
3589 } else {
3590 println!(
3591 "{}",
3592 "⚠ No beat scheduler subscribed to trigger channel"
3593 .yellow()
3594 .bold()
3595 );
3596 println!();
3597 println!("The trigger command was sent but no beat scheduler is listening.");
3598 println!("Ensure a beat scheduler is running to execute scheduled tasks.");
3599 }
3600 } else {
3601 println!("{}", "✗ Invalid schedule data".red());
3602 }
3603 } else {
3604 println!("{}", format!("✗ Schedule '{name}' not found").red());
3605 }
3606
3607 Ok(())
3608}
3609
3610pub async fn scale_workers(broker_url: &str, target_count: usize) -> anyhow::Result<()> {
3612 let client = redis::Client::open(broker_url)?;
3613 let mut conn = client.get_multiplexed_async_connection().await?;
3614
3615 println!(
3616 "{}",
3617 format!("=== Scale Workers to {target_count} ===")
3618 .bold()
3619 .cyan()
3620 );
3621 println!();
3622
3623 let pattern = "celers:worker:*:heartbeat";
3625 let keys: Vec<String> = redis::cmd("KEYS")
3626 .arg(pattern)
3627 .query_async(&mut conn)
3628 .await?;
3629
3630 let current_count = keys.len();
3631
3632 println!("Current workers: {}", current_count.to_string().yellow());
3633 println!("Target workers: {}", target_count.to_string().green());
3634 println!();
3635
3636 if current_count == target_count {
3637 println!("{}", "✓ Already at target worker count".green().bold());
3638 return Ok(());
3639 }
3640
3641 if current_count < target_count {
3642 let needed = target_count - current_count;
3643 println!(
3644 "{}",
3645 format!("⚠ Need to start {needed} more workers")
3646 .yellow()
3647 .bold()
3648 );
3649 println!();
3650 println!("To scale up, start additional worker instances:");
3651 println!(" celers worker --broker {broker_url}");
3652 println!();
3653 println!("Or run them in parallel:");
3654 for i in 1..=needed {
3655 println!(" celers worker --broker {broker_url} & # Worker {i}");
3656 }
3657 } else {
3658 let excess = current_count - target_count;
3659 println!(
3660 "{}",
3661 format!("⚠ Need to stop {excess} workers").yellow().bold()
3662 );
3663 println!();
3664 println!("To scale down, stop workers gracefully:");
3665 println!(" celers worker-mgmt list");
3666 println!(" celers worker-mgmt stop <worker-id> --graceful");
3667 }
3668
3669 Ok(())
3670}
3671
3672pub async fn drain_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
3674 let client = redis::Client::open(broker_url)?;
3675 let mut conn = client.get_multiplexed_async_connection().await?;
3676
3677 println!(
3678 "{}",
3679 format!("=== Drain Worker: {worker_id} ===").bold().cyan()
3680 );
3681 println!();
3682
3683 let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
3685 let exists: bool = redis::cmd("EXISTS")
3686 .arg(&heartbeat_key)
3687 .query_async(&mut conn)
3688 .await?;
3689
3690 if !exists {
3691 println!("{}", format!("✗ Worker '{worker_id}' not found").red());
3692 return Ok(());
3693 }
3694
3695 let drain_key = format!("celers:worker:{worker_id}:draining");
3697 let timestamp = Utc::now().to_rfc3339();
3698 redis::cmd("SET")
3699 .arg(&drain_key)
3700 .arg(×tamp)
3701 .query_async::<()>(&mut conn)
3702 .await?;
3703
3704 redis::cmd("EXPIRE")
3706 .arg(&drain_key)
3707 .arg(86400)
3708 .query_async::<()>(&mut conn)
3709 .await?;
3710
3711 println!(
3712 "{}",
3713 format!("✓ Worker '{worker_id}' is now draining")
3714 .green()
3715 .bold()
3716 );
3717 println!();
3718 println!("The worker will:");
3719 println!(" • Stop accepting new tasks");
3720 println!(" • Complete currently running tasks");
3721 println!(" • Shut down automatically when all tasks complete");
3722 println!();
3723 println!("To resume normal operation:");
3724 println!(" celers worker-mgmt resume {worker_id}");
3725
3726 Ok(())
3727}
3728
3729pub async fn schedule_history(broker_url: &str, name: &str, limit: usize) -> anyhow::Result<()> {
3731 let client = redis::Client::open(broker_url)?;
3732 let mut conn = client.get_multiplexed_async_connection().await?;
3733
3734 println!(
3735 "{}",
3736 format!("=== Schedule History: {name} ===").bold().cyan()
3737 );
3738 println!();
3739
3740 let history_key = format!("celers:schedule:{name}:history");
3742 let entries: Vec<String> = redis::cmd("ZREVRANGE")
3743 .arg(&history_key)
3744 .arg(0)
3745 .arg(limit as isize - 1)
3746 .query_async(&mut conn)
3747 .await?;
3748
3749 if entries.is_empty() {
3750 println!("{}", "No execution history found".yellow());
3751 println!();
3752 println!("History is recorded when tasks are triggered by the beat scheduler.");
3753 return Ok(());
3754 }
3755
3756 #[derive(Tabled)]
3757 struct HistoryEntry {
3758 #[tabled(rename = "#")]
3759 index: String,
3760 #[tabled(rename = "Timestamp")]
3761 timestamp: String,
3762 #[tabled(rename = "Status")]
3763 status: String,
3764 #[tabled(rename = "Task ID")]
3765 task_id: String,
3766 }
3767
3768 let mut history_entries = Vec::new();
3769 for (idx, entry) in entries.iter().enumerate() {
3770 if let Ok(data) = serde_json::from_str::<serde_json::Value>(entry) {
3771 let timestamp = data
3772 .get("timestamp")
3773 .and_then(|v| v.as_str())
3774 .unwrap_or("N/A")
3775 .to_string();
3776 let status = data
3777 .get("status")
3778 .and_then(|v| v.as_str())
3779 .unwrap_or("unknown")
3780 .to_string();
3781 let task_id = data
3782 .get("task_id")
3783 .and_then(|v| v.as_str())
3784 .unwrap_or("N/A")
3785 .to_string();
3786
3787 history_entries.push(HistoryEntry {
3788 index: (idx + 1).to_string(),
3789 timestamp,
3790 status,
3791 task_id,
3792 });
3793 }
3794 }
3795
3796 let entry_count = history_entries.len();
3797 let table = Table::new(history_entries)
3798 .with(Style::rounded())
3799 .to_string();
3800 println!("{table}");
3801 println!();
3802 println!(
3803 "Showing {} of {} entries",
3804 entry_count.to_string().yellow(),
3805 entries.len()
3806 );
3807
3808 Ok(())
3809}
3810
3811pub async fn debug_task(broker_url: &str, queue: &str, task_id_str: &str) -> anyhow::Result<()> {
3813 let task_id = task_id_str
3814 .parse::<uuid::Uuid>()
3815 .map_err(|_| anyhow::anyhow!("Invalid task ID format"))?;
3816
3817 println!("{}", format!("=== Debug Task: {task_id} ===").bold().cyan());
3818 println!();
3819
3820 let client = redis::Client::open(broker_url)?;
3821 let mut conn = client.get_multiplexed_async_connection().await?;
3822
3823 let logs_key = format!("celers:task:{task_id}:logs");
3825 let logs: Vec<String> = redis::cmd("LRANGE")
3826 .arg(&logs_key)
3827 .arg(0)
3828 .arg(-1)
3829 .query_async(&mut conn)
3830 .await?;
3831
3832 if logs.is_empty() {
3833 println!("{}", "No debug logs found for this task".yellow());
3834 } else {
3835 println!("{}", "Task Logs:".green().bold());
3836 println!();
3837 for log in &logs {
3838 if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
3839 let level = log_json
3840 .get("level")
3841 .and_then(|v| v.as_str())
3842 .unwrap_or("INFO");
3843 let message = log_json
3844 .get("message")
3845 .and_then(|v| v.as_str())
3846 .unwrap_or(log);
3847 let timestamp = log_json
3848 .get("timestamp")
3849 .and_then(|v| v.as_str())
3850 .unwrap_or("");
3851
3852 let level_colored = match level {
3853 "ERROR" | "error" => level.red(),
3854 "WARN" | "warn" => level.yellow(),
3855 "DEBUG" | "debug" => level.cyan(),
3856 _ => level.normal(),
3857 };
3858
3859 println!("[{}] {} {}", timestamp.dimmed(), level_colored, message);
3860 } else {
3861 println!("{log}");
3862 }
3863 }
3864 println!();
3865 }
3866
3867 let metadata_key = format!("celers:task:{task_id}:metadata");
3869 let metadata: Option<String> = redis::cmd("GET")
3870 .arg(&metadata_key)
3871 .query_async(&mut conn)
3872 .await?;
3873
3874 if let Some(meta_str) = metadata {
3875 println!("{}", "Task Metadata:".green().bold());
3876 println!();
3877 if let Ok(meta_json) = serde_json::from_str::<serde_json::Value>(&meta_str) {
3878 println!("{}", serde_json::to_string_pretty(&meta_json)?);
3879 } else {
3880 println!("{meta_str}");
3881 }
3882 println!();
3883 }
3884
3885 inspect_task(broker_url, queue, task_id_str).await?;
3887
3888 Ok(())
3889}
3890
3891pub async fn debug_worker(broker_url: &str, worker_id: &str) -> anyhow::Result<()> {
3893 let client = redis::Client::open(broker_url)?;
3894 let mut conn = client.get_multiplexed_async_connection().await?;
3895
3896 println!(
3897 "{}",
3898 format!("=== Debug Worker: {worker_id} ===").bold().cyan()
3899 );
3900 println!();
3901
3902 let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
3904 let heartbeat: Option<String> = redis::cmd("GET")
3905 .arg(&heartbeat_key)
3906 .query_async(&mut conn)
3907 .await?;
3908
3909 if heartbeat.is_none() {
3910 println!("{}", format!("✗ Worker '{worker_id}' not found").red());
3911 println!();
3912 println!("Possible causes:");
3913 println!(" • Worker is not running");
3914 println!(" • Worker ID is incorrect");
3915 println!(" • Heartbeat expired (worker crashed)");
3916 return Ok(());
3917 }
3918
3919 println!("{}", "Worker Status: Active".green().bold());
3920 if let Some(hb) = heartbeat {
3921 println!("Last heartbeat: {}", hb.yellow());
3922 }
3923 println!();
3924
3925 let stats_key = format!("celers:worker:{worker_id}:stats");
3927 let stats: Option<String> = redis::cmd("GET")
3928 .arg(&stats_key)
3929 .query_async(&mut conn)
3930 .await?;
3931
3932 if let Some(stats_str) = stats {
3933 println!("{}", "Worker Statistics:".green().bold());
3934 if let Ok(stats_json) = serde_json::from_str::<serde_json::Value>(&stats_str) {
3935 println!("{}", serde_json::to_string_pretty(&stats_json)?);
3936 } else {
3937 println!("{stats_str}");
3938 }
3939 println!();
3940 }
3941
3942 let pause_key = format!("celers:worker:{worker_id}:paused");
3944 let paused: bool = redis::cmd("EXISTS")
3945 .arg(&pause_key)
3946 .query_async(&mut conn)
3947 .await?;
3948
3949 if paused {
3950 println!("{}", "⚠ Worker is PAUSED".yellow().bold());
3951 println!(" Tasks are not being processed");
3952 println!();
3953 }
3954
3955 let drain_key = format!("celers:worker:{worker_id}:draining");
3957 let draining: bool = redis::cmd("EXISTS")
3958 .arg(&drain_key)
3959 .query_async(&mut conn)
3960 .await?;
3961
3962 if draining {
3963 println!("{}", "⚠ Worker is DRAINING".yellow().bold());
3964 println!(" Not accepting new tasks");
3965 println!();
3966 }
3967
3968 let logs_key = format!("celers:worker:{worker_id}:logs");
3970 let logs: Vec<String> = redis::cmd("LRANGE")
3971 .arg(&logs_key)
3972 .arg(-20)
3973 .arg(-1)
3974 .query_async(&mut conn)
3975 .await?;
3976
3977 if !logs.is_empty() {
3978 println!("{}", "Recent Worker Logs (last 20):".green().bold());
3979 println!();
3980 for log in logs {
3981 println!("{log}");
3982 }
3983 }
3984
3985 Ok(())
3986}
3987
3988pub async fn report_daily(broker_url: &str, queue: &str) -> anyhow::Result<()> {
3990 let client = redis::Client::open(broker_url)?;
3991 let mut conn = client.get_multiplexed_async_connection().await?;
3992
3993 println!("{}", "=== Daily Execution Report ===".bold().cyan());
3994 println!();
3995
3996 let now = Utc::now();
3997 let today_key = format!("celers:metrics:{}:daily:{}", queue, now.format("%Y-%m-%d"));
3998
3999 let metrics: Option<String> = redis::cmd("GET")
4001 .arg(&today_key)
4002 .query_async(&mut conn)
4003 .await?;
4004
4005 if let Some(metrics_str) = metrics {
4006 if let Ok(metrics_json) = serde_json::from_str::<serde_json::Value>(&metrics_str) {
4007 println!("{}", format!("Date: {}", now.format("%Y-%m-%d")).yellow());
4008 println!();
4009
4010 #[derive(Tabled)]
4011 struct DailyMetric {
4012 #[tabled(rename = "Metric")]
4013 metric: String,
4014 #[tabled(rename = "Count")]
4015 count: String,
4016 }
4017
4018 let mut daily_metrics = vec![];
4019
4020 if let Some(total) = metrics_json
4021 .get("total_tasks")
4022 .and_then(serde_json::Value::as_u64)
4023 {
4024 daily_metrics.push(DailyMetric {
4025 metric: "Total Tasks".to_string(),
4026 count: total.to_string(),
4027 });
4028 }
4029
4030 if let Some(succeeded) = metrics_json
4031 .get("succeeded")
4032 .and_then(serde_json::Value::as_u64)
4033 {
4034 daily_metrics.push(DailyMetric {
4035 metric: "Succeeded".to_string(),
4036 count: succeeded.to_string(),
4037 });
4038 }
4039
4040 if let Some(failed) = metrics_json
4041 .get("failed")
4042 .and_then(serde_json::Value::as_u64)
4043 {
4044 daily_metrics.push(DailyMetric {
4045 metric: "Failed".to_string(),
4046 count: failed.to_string(),
4047 });
4048 }
4049
4050 if let Some(retried) = metrics_json
4051 .get("retried")
4052 .and_then(serde_json::Value::as_u64)
4053 {
4054 daily_metrics.push(DailyMetric {
4055 metric: "Retried".to_string(),
4056 count: retried.to_string(),
4057 });
4058 }
4059
4060 if let Some(avg_time) = metrics_json
4061 .get("avg_execution_time")
4062 .and_then(serde_json::Value::as_f64)
4063 {
4064 daily_metrics.push(DailyMetric {
4065 metric: "Avg Execution Time".to_string(),
4066 count: format!("{avg_time:.2}s"),
4067 });
4068 }
4069
4070 let table = Table::new(daily_metrics).with(Style::rounded()).to_string();
4071 println!("{table}");
4072 }
4073 } else {
4074 println!("{}", "No metrics available for today".yellow());
4075 println!();
4076 println!("Metrics are collected automatically when tasks are executed.");
4077 println!("Ensure workers are running and processing tasks.");
4078 }
4079
4080 Ok(())
4081}
4082
4083pub async fn report_weekly(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4085 let client = redis::Client::open(broker_url)?;
4086 let mut conn = client.get_multiplexed_async_connection().await?;
4087
4088 println!("{}", "=== Weekly Statistics Report ===".bold().cyan());
4089 println!();
4090
4091 let now = Utc::now();
4092 let week_start = now - chrono::Duration::days(7);
4093
4094 println!(
4095 "{}",
4096 format!(
4097 "Period: {} to {}",
4098 week_start.format("%Y-%m-%d"),
4099 now.format("%Y-%m-%d")
4100 )
4101 .yellow()
4102 );
4103 println!();
4104
4105 let mut total_tasks = 0u64;
4106 let mut total_succeeded = 0u64;
4107 let mut total_failed = 0u64;
4108 let mut total_retried = 0u64;
4109
4110 for day_offset in 0..7 {
4112 let day = now - chrono::Duration::days(day_offset);
4113 let day_key = format!("celers:metrics:{}:daily:{}", queue, day.format("%Y-%m-%d"));
4114
4115 let metrics: Option<String> = redis::cmd("GET")
4116 .arg(&day_key)
4117 .query_async(&mut conn)
4118 .await?;
4119
4120 if let Some(metrics_str) = metrics {
4121 if let Ok(metrics_json) = serde_json::from_str::<serde_json::Value>(&metrics_str) {
4122 if let Some(tasks) = metrics_json
4123 .get("total_tasks")
4124 .and_then(serde_json::Value::as_u64)
4125 {
4126 total_tasks += tasks;
4127 }
4128 if let Some(succeeded) = metrics_json
4129 .get("succeeded")
4130 .and_then(serde_json::Value::as_u64)
4131 {
4132 total_succeeded += succeeded;
4133 }
4134 if let Some(failed) = metrics_json
4135 .get("failed")
4136 .and_then(serde_json::Value::as_u64)
4137 {
4138 total_failed += failed;
4139 }
4140 if let Some(retried) = metrics_json
4141 .get("retried")
4142 .and_then(serde_json::Value::as_u64)
4143 {
4144 total_retried += retried;
4145 }
4146 }
4147 }
4148 }
4149
4150 #[derive(Tabled)]
4151 struct WeeklyMetric {
4152 #[tabled(rename = "Metric")]
4153 metric: String,
4154 #[tabled(rename = "Count")]
4155 count: String,
4156 #[tabled(rename = "Percentage")]
4157 percentage: String,
4158 }
4159
4160 let success_rate = if total_tasks > 0 {
4161 (total_succeeded as f64 / total_tasks as f64) * 100.0
4162 } else {
4163 0.0
4164 };
4165
4166 let failure_rate = if total_tasks > 0 {
4167 (total_failed as f64 / total_tasks as f64) * 100.0
4168 } else {
4169 0.0
4170 };
4171
4172 let weekly_metrics = vec![
4173 WeeklyMetric {
4174 metric: "Total Tasks".to_string(),
4175 count: total_tasks.to_string(),
4176 percentage: "100%".to_string(),
4177 },
4178 WeeklyMetric {
4179 metric: "Succeeded".to_string(),
4180 count: total_succeeded.to_string(),
4181 percentage: format!("{success_rate:.1}%"),
4182 },
4183 WeeklyMetric {
4184 metric: "Failed".to_string(),
4185 count: total_failed.to_string(),
4186 percentage: format!("{failure_rate:.1}%"),
4187 },
4188 WeeklyMetric {
4189 metric: "Retried".to_string(),
4190 count: total_retried.to_string(),
4191 percentage: "-".to_string(),
4192 },
4193 ];
4194
4195 let table = Table::new(weekly_metrics)
4196 .with(Style::rounded())
4197 .to_string();
4198 println!("{table}");
4199
4200 if total_tasks == 0 {
4201 println!();
4202 println!("{}", "No tasks processed this week".yellow());
4203 }
4204
4205 Ok(())
4206}
4207
4208pub async fn analyze_bottlenecks(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4210 let client = redis::Client::open(broker_url)?;
4211 let mut conn = client.get_multiplexed_async_connection().await?;
4212
4213 println!(
4214 "{}",
4215 "=== Performance Bottleneck Analysis ===".bold().cyan()
4216 );
4217 println!();
4218
4219 let queue_key = format!("celers:{queue}");
4221 let queue_type: String = redis::cmd("TYPE")
4222 .arg(&queue_key)
4223 .query_async(&mut conn)
4224 .await?;
4225
4226 let queue_size: isize = match queue_type.as_str() {
4227 "list" => {
4228 redis::cmd("LLEN")
4229 .arg(&queue_key)
4230 .query_async(&mut conn)
4231 .await?
4232 }
4233 "zset" => {
4234 redis::cmd("ZCARD")
4235 .arg(&queue_key)
4236 .query_async(&mut conn)
4237 .await?
4238 }
4239 _ => 0,
4240 };
4241
4242 let worker_keys: Vec<String> = redis::cmd("KEYS")
4244 .arg("celers:worker:*:heartbeat")
4245 .query_async(&mut conn)
4246 .await?;
4247 let worker_count = worker_keys.len();
4248
4249 let dlq_key = format!("celers:{queue}:dlq");
4251 let dlq_size: isize = redis::cmd("LLEN")
4252 .arg(&dlq_key)
4253 .query_async(&mut conn)
4254 .await?;
4255
4256 println!("{}", "System Overview:".green().bold());
4257 println!(" Queue Depth: {}", queue_size.to_string().yellow());
4258 println!(" Active Workers: {}", worker_count.to_string().yellow());
4259 println!(" DLQ Size: {}", dlq_size.to_string().yellow());
4260 println!();
4261
4262 let mut bottlenecks = Vec::new();
4263
4264 if queue_size > 1000 {
4266 bottlenecks.push("High queue depth - consider scaling up workers");
4267 }
4268
4269 if worker_count == 0 && queue_size > 0 {
4270 bottlenecks.push("No active workers - tasks are not being processed");
4271 }
4272
4273 if worker_count > 0 && queue_size > (worker_count * 100) as isize {
4274 bottlenecks.push("Queue depth is very high relative to worker count");
4275 }
4276
4277 if dlq_size > 100 {
4278 bottlenecks.push("High DLQ size - many tasks are failing");
4279 }
4280
4281 if bottlenecks.is_empty() {
4282 println!("{}", "✓ No significant bottlenecks detected".green());
4283 } else {
4284 println!("{}", "⚠ Bottlenecks Detected:".yellow().bold());
4285 println!();
4286 for (idx, bottleneck) in bottlenecks.iter().enumerate() {
4287 println!(" {}. {}", idx + 1, bottleneck);
4288 }
4289 println!();
4290
4291 println!("{}", "Recommendations:".cyan().bold());
4292 println!();
4293 if queue_size > 1000 {
4294 println!(
4295 " • Scale up workers: celers worker-mgmt scale {}",
4296 worker_count * 2
4297 );
4298 }
4299 if worker_count == 0 {
4300 println!(" • Start workers: celers worker --broker {broker_url}");
4301 }
4302 if dlq_size > 100 {
4303 println!(" • Investigate failed tasks: celers dlq inspect");
4304 println!(" • Check task implementations for errors");
4305 }
4306 }
4307
4308 Ok(())
4309}
4310
4311pub async fn analyze_failures(broker_url: &str, queue: &str) -> anyhow::Result<()> {
4313 let broker = RedisBroker::new(broker_url, queue)?;
4314
4315 println!("{}", "=== Failure Pattern Analysis ===".bold().cyan());
4316 println!();
4317
4318 let dlq_size = broker.dlq_size().await?;
4319 println!("Total failed tasks: {}", dlq_size.to_string().yellow());
4320
4321 if dlq_size == 0 {
4322 println!("{}", "✓ No failed tasks to analyze".green());
4323 return Ok(());
4324 }
4325
4326 println!();
4327
4328 let tasks = broker.inspect_dlq(100).await?;
4330
4331 let mut task_name_failures: std::collections::HashMap<String, usize> =
4332 std::collections::HashMap::new();
4333
4334 for task in &tasks {
4335 *task_name_failures
4337 .entry(task.metadata.name.clone())
4338 .or_insert(0) += 1;
4339 }
4340
4341 println!("{}", "Failures by Task Type:".green().bold());
4342 println!();
4343
4344 #[derive(Tabled)]
4345 struct FailureCount {
4346 #[tabled(rename = "Task Name")]
4347 task_name: String,
4348 #[tabled(rename = "Failures")]
4349 count: String,
4350 }
4351
4352 let mut task_failures: Vec<FailureCount> = task_name_failures
4353 .into_iter()
4354 .map(|(name, count)| FailureCount {
4355 task_name: name,
4356 count: count.to_string(),
4357 })
4358 .collect();
4359 task_failures.sort_by(|a, b| {
4360 b.count
4361 .parse::<usize>()
4362 .unwrap_or(0)
4363 .cmp(&a.count.parse::<usize>().unwrap_or(0))
4364 });
4365
4366 let table = Table::new(task_failures.iter().take(10))
4367 .with(Style::rounded())
4368 .to_string();
4369 println!("{table}");
4370 println!();
4371 println!("{}", "Recommendations:".cyan().bold());
4372 println!();
4373 println!(" • Review task implementations for the most failing tasks");
4374 println!(" • Check error logs: celers task logs <task-id>");
4375 println!(" • Consider increasing retry limits for transient failures");
4376 println!(" • Replay fixed tasks: celers dlq replay <task-id>");
4377
4378 Ok(())
4379}
4380
4381pub async fn worker_logs(
4383 broker_url: &str,
4384 worker_id: &str,
4385 level_filter: Option<&str>,
4386 follow: bool,
4387 initial_lines: usize,
4388) -> anyhow::Result<()> {
4389 let client = redis::Client::open(broker_url)?;
4390 let mut conn = client.get_multiplexed_async_connection().await?;
4391
4392 println!(
4393 "{}",
4394 format!("=== Worker Logs: {worker_id} ===").bold().cyan()
4395 );
4396 println!();
4397
4398 let heartbeat_key = format!("celers:worker:{worker_id}:heartbeat");
4400 let exists: bool = redis::cmd("EXISTS")
4401 .arg(&heartbeat_key)
4402 .query_async(&mut conn)
4403 .await?;
4404
4405 if !exists {
4406 println!("{}", format!("✗ Worker '{worker_id}' not found").red());
4407 return Ok(());
4408 }
4409
4410 let logs_key = format!("celers:worker:{worker_id}:logs");
4411
4412 let logs: Vec<String> = redis::cmd("LRANGE")
4414 .arg(&logs_key)
4415 .arg(-(initial_lines as isize))
4416 .arg(-1)
4417 .query_async(&mut conn)
4418 .await?;
4419
4420 for log in &logs {
4422 display_log_line(log, level_filter);
4423 }
4424
4425 if !follow {
4426 return Ok(());
4427 }
4428
4429 println!();
4430 println!("{}", "=== Following logs (Ctrl+C to stop) ===".dimmed());
4431 println!();
4432
4433 let mut last_length: isize = redis::cmd("LLEN")
4435 .arg(&logs_key)
4436 .query_async(&mut conn)
4437 .await?;
4438
4439 loop {
4440 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4442
4443 let current_length: isize = redis::cmd("LLEN")
4445 .arg(&logs_key)
4446 .query_async(&mut conn)
4447 .await?;
4448
4449 if current_length > last_length {
4450 let new_logs: Vec<String> = redis::cmd("LRANGE")
4452 .arg(&logs_key)
4453 .arg(last_length)
4454 .arg(-1)
4455 .query_async(&mut conn)
4456 .await?;
4457
4458 for log in &new_logs {
4459 display_log_line(log, level_filter);
4460 }
4461
4462 last_length = current_length;
4463 }
4464
4465 let still_exists: bool = redis::cmd("EXISTS")
4467 .arg(&heartbeat_key)
4468 .query_async(&mut conn)
4469 .await?;
4470
4471 if !still_exists {
4472 println!();
4473 println!("{}", "Worker has stopped".yellow());
4474 break;
4475 }
4476 }
4477
4478 Ok(())
4479}
4480
4481#[allow(dead_code)]
4483fn display_log_line(log: &str, level_filter: Option<&str>) {
4484 if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(log) {
4485 let level = log_json
4486 .get("level")
4487 .and_then(|v| v.as_str())
4488 .unwrap_or("INFO");
4489 let message = log_json
4490 .get("message")
4491 .and_then(|v| v.as_str())
4492 .unwrap_or(log);
4493 let timestamp = log_json
4494 .get("timestamp")
4495 .and_then(|v| v.as_str())
4496 .unwrap_or("");
4497
4498 if let Some(filter) = level_filter {
4500 if !level.eq_ignore_ascii_case(filter) {
4501 return;
4502 }
4503 }
4504
4505 let level_colored = match level.to_uppercase().as_str() {
4506 "ERROR" => level.red(),
4507 "WARN" => level.yellow(),
4508 "DEBUG" => level.cyan(),
4509 _ => level.normal(),
4510 };
4511
4512 println!("[{}] {} {}", timestamp.dimmed(), level_colored, message);
4513 } else {
4514 if level_filter.is_none() {
4516 println!("{log}");
4517 }
4518 }
4519}
4520
4521pub async fn autoscale_start(
4523 broker_url: &str,
4524 queue: &str,
4525 autoscale_config: Option<crate::config::AutoScaleConfig>,
4526) -> anyhow::Result<()> {
4527 println!("{}", "=== Auto-Scaling Service ===".bold().green());
4528 println!();
4529
4530 let config = match autoscale_config {
4531 Some(cfg) if cfg.enabled => cfg,
4532 Some(_) => {
4533 println!(
4534 "{}",
4535 "⚠️ Auto-scaling is disabled in configuration".yellow()
4536 );
4537 return Ok(());
4538 }
4539 None => {
4540 println!("{}", "⚠️ No auto-scaling configuration found".yellow());
4541 println!("Add [autoscale] section to your celers.toml");
4542 return Ok(());
4543 }
4544 };
4545
4546 println!("Configuration:");
4547 println!(" Min workers: {}", config.min_workers.to_string().cyan());
4548 println!(" Max workers: {}", config.max_workers.to_string().cyan());
4549 println!(
4550 " Scale up threshold: {}",
4551 config.scale_up_threshold.to_string().cyan()
4552 );
4553 println!(
4554 " Scale down threshold: {}",
4555 config.scale_down_threshold.to_string().cyan()
4556 );
4557 println!(
4558 " Check interval: {}s",
4559 config.check_interval_secs.to_string().cyan()
4560 );
4561 println!();
4562
4563 let broker = RedisBroker::new(broker_url, queue)?;
4564 println!("{}", "✓ Connected to broker".green());
4565 println!();
4566 println!("{}", "Starting auto-scaling monitor...".green().bold());
4567 println!("{}", " Press Ctrl+C to stop".dimmed());
4568 println!();
4569
4570 loop {
4571 tokio::time::sleep(tokio::time::Duration::from_secs(config.check_interval_secs)).await;
4572
4573 let queue_size = broker.queue_size().await?;
4575
4576 let client = redis::Client::open(broker_url)?;
4578 let mut conn = client.get_multiplexed_async_connection().await?;
4579 let worker_keys: Vec<String> = redis::cmd("KEYS")
4580 .arg("celers:worker:*:heartbeat")
4581 .query_async(&mut conn)
4582 .await?;
4583 let current_workers = worker_keys.len();
4584
4585 println!(
4586 "[{}] Queue: {}, Workers: {}",
4587 Utc::now().format("%H:%M:%S").to_string().dimmed(),
4588 queue_size.to_string().yellow(),
4589 current_workers.to_string().cyan()
4590 );
4591
4592 if queue_size > config.scale_up_threshold && current_workers < config.max_workers {
4594 let needed = config.max_workers.min(current_workers + 1);
4595 println!(
4596 " {} Scale up recommended: {} -> {}",
4597 "↑".green().bold(),
4598 current_workers,
4599 needed
4600 );
4601 } else if queue_size < config.scale_down_threshold && current_workers > config.min_workers {
4602 let target = config.min_workers.max(current_workers.saturating_sub(1));
4603 println!(
4604 " {} Scale down possible: {} -> {}",
4605 "↓".yellow().bold(),
4606 current_workers,
4607 target
4608 );
4609 }
4610 }
4611}
4612
4613pub async fn autoscale_status(
4615 broker_url: &str,
4616 autoscale_config: Option<crate::config::AutoScaleConfig>,
4617) -> anyhow::Result<()> {
4618 println!("{}", "=== Auto-Scaling Status ===".bold().cyan());
4619 println!();
4620
4621 if let Some(cfg) = autoscale_config {
4622 println!(
4623 "Status: {}",
4624 if cfg.enabled {
4625 "Enabled".green()
4626 } else {
4627 "Disabled".red()
4628 }
4629 );
4630 println!();
4631 println!("Configuration:");
4632 println!(" Min workers: {}", cfg.min_workers);
4633 println!(" Max workers: {}", cfg.max_workers);
4634 println!(" Scale up threshold: {}", cfg.scale_up_threshold);
4635 println!(" Scale down threshold: {}", cfg.scale_down_threshold);
4636 println!(" Check interval: {}s", cfg.check_interval_secs);
4637 println!();
4638
4639 let client = redis::Client::open(broker_url)?;
4641 let mut conn = client.get_multiplexed_async_connection().await?;
4642 let worker_keys: Vec<String> = redis::cmd("KEYS")
4643 .arg("celers:worker:*:heartbeat")
4644 .query_async(&mut conn)
4645 .await?;
4646
4647 println!("Current State:");
4648 println!(" Active workers: {}", worker_keys.len().to_string().cyan());
4649 } else {
4650 println!("{}", "Auto-scaling is not configured".yellow());
4651 println!("Add [autoscale] section to your celers.toml");
4652 }
4653
4654 Ok(())
4655}
4656
4657pub async fn alert_start(
4659 broker_url: &str,
4660 queue: &str,
4661 alert_config: Option<crate::config::AlertConfig>,
4662) -> anyhow::Result<()> {
4663 println!("{}", "=== Alert Monitoring Service ===".bold().green());
4664 println!();
4665
4666 let config = match alert_config {
4667 Some(cfg) if cfg.enabled => cfg,
4668 Some(_) => {
4669 println!(
4670 "{}",
4671 "⚠️ Alert monitoring is disabled in configuration".yellow()
4672 );
4673 return Ok(());
4674 }
4675 None => {
4676 println!("{}", "⚠️ No alert configuration found".yellow());
4677 println!("Add [alerts] section to your celers.toml");
4678 return Ok(());
4679 }
4680 };
4681
4682 if config.webhook_url.is_none() {
4683 println!("{}", "⚠️ No webhook URL configured".yellow());
4684 return Ok(());
4685 }
4686
4687 println!("Configuration:");
4688 println!(
4689 " Webhook URL: {}",
4690 config
4691 .webhook_url
4692 .as_ref()
4693 .expect("webhook_url validated to be Some")
4694 .cyan()
4695 );
4696 println!(
4697 " DLQ threshold: {}",
4698 config.dlq_threshold.to_string().cyan()
4699 );
4700 println!(
4701 " Failed threshold: {}",
4702 config.failed_threshold.to_string().cyan()
4703 );
4704 println!(
4705 " Check interval: {}s",
4706 config.check_interval_secs.to_string().cyan()
4707 );
4708 println!();
4709
4710 let broker = RedisBroker::new(broker_url, queue)?;
4711 println!("{}", "✓ Connected to broker".green());
4712 println!();
4713 println!("{}", "Starting alert monitor...".green().bold());
4714 println!("{}", " Press Ctrl+C to stop".dimmed());
4715 println!();
4716
4717 let webhook_url = config
4718 .webhook_url
4719 .expect("webhook_url validated to be Some");
4720
4721 loop {
4722 tokio::time::sleep(tokio::time::Duration::from_secs(config.check_interval_secs)).await;
4723
4724 let dlq_size = broker.dlq_size().await?;
4726
4727 println!(
4728 "[{}] DLQ size: {}",
4729 Utc::now().format("%H:%M:%S").to_string().dimmed(),
4730 dlq_size.to_string().yellow()
4731 );
4732
4733 if dlq_size > config.dlq_threshold {
4735 let message = format!(
4736 "⚠️ DLQ size ({}) exceeded threshold ({})",
4737 dlq_size, config.dlq_threshold
4738 );
4739 println!(" {} Sending alert...", "!".red().bold());
4740
4741 if let Err(e) = send_webhook_alert(&webhook_url, &message).await {
4742 println!(" {} Failed to send alert: {}", "✗".red(), e);
4743 } else {
4744 println!(" {} Alert sent", "✓".green());
4745 }
4746 }
4747 }
4748}
4749
4750pub async fn alert_test(webhook_url: &str, message: &str) -> anyhow::Result<()> {
4752 println!("{}", "=== Testing Webhook ===".bold().cyan());
4753 println!();
4754 println!("Webhook URL: {}", webhook_url.cyan());
4755 println!("Message: {}", message.yellow());
4756 println!();
4757
4758 println!("Sending test notification...");
4759 send_webhook_alert(webhook_url, message).await?;
4760
4761 println!("{}", "✓ Test notification sent successfully".green());
4762
4763 Ok(())
4764}
4765
4766async fn send_webhook_alert(webhook_url: &str, message: &str) -> anyhow::Result<()> {
4768 let client = reqwest::Client::new();
4769 let payload = serde_json::json!({
4770 "text": message,
4771 "timestamp": Utc::now().to_rfc3339(),
4772 });
4773
4774 let response = client.post(webhook_url).json(&payload).send().await?;
4775
4776 if !response.status().is_success() {
4777 anyhow::bail!("Webhook request failed with status: {}", response.status());
4778 }
4779
4780 Ok(())
4781}
4782
4783pub async fn db_test_connection(url: &str, benchmark: bool) -> anyhow::Result<()> {
4785 println!("{}", "=== Database Connection Test ===".bold().cyan());
4786 println!();
4787 println!("Database URL: {}", mask_password(url).cyan());
4788 println!();
4789
4790 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
4792 "PostgreSQL"
4793 } else if url.starts_with("mysql://") {
4794 "MySQL"
4795 } else {
4796 "Unknown"
4797 };
4798
4799 println!("Database type: {}", db_type.yellow());
4800 println!();
4801
4802 println!("Testing connection...");
4804 let start = std::time::Instant::now();
4805
4806 if db_type == "PostgreSQL" {
4808 match test_postgres_connection(url).await {
4809 Ok(version) => {
4810 let duration = start.elapsed();
4811 println!("{}", "✓ Connection successful".green());
4812 println!(" Version: {}", version.cyan());
4813 println!(" Latency: {}ms", duration.as_millis().to_string().yellow());
4814 }
4815 Err(e) => {
4816 println!("{}", "✗ Connection failed".red());
4817 println!(" Error: {e}");
4818 return Err(e);
4819 }
4820 }
4821 } else if db_type == "MySQL" {
4822 println!("{}", "⚠️ MySQL support not yet implemented".yellow());
4823 return Ok(());
4824 } else {
4825 println!("{}", "⚠️ Unknown database type".yellow());
4826 return Ok(());
4827 }
4828
4829 if benchmark {
4831 println!();
4832 println!("{}", "Running latency benchmark...".bold());
4833 println!();
4834
4835 let mut latencies = Vec::new();
4836 for i in 1..=10 {
4837 let start = std::time::Instant::now();
4838 if let Err(e) = test_postgres_connection(url).await {
4839 println!(" {} Query {} failed: {}", "✗".red(), i, e);
4840 continue;
4841 }
4842 let duration = start.elapsed();
4843 latencies.push(duration.as_millis());
4844 println!(" {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
4845 }
4846
4847 if !latencies.is_empty() {
4848 let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
4849 let min = latencies
4850 .iter()
4851 .min()
4852 .expect("collection validated to be non-empty");
4853 let max = latencies
4854 .iter()
4855 .max()
4856 .expect("collection validated to be non-empty");
4857
4858 println!();
4859 println!("Benchmark results:");
4860 println!(" Average: {}ms", avg.to_string().cyan());
4861 println!(" Min: {}ms", min.to_string().green());
4862 println!(" Max: {}ms", max.to_string().yellow());
4863 }
4864 }
4865
4866 Ok(())
4867}
4868
4869async fn test_postgres_connection(url: &str) -> anyhow::Result<String> {
4871 use celers_broker_postgres::PostgresBroker;
4872
4873 let broker = PostgresBroker::new(url).await?;
4875
4876 let connected = broker.test_connection().await?;
4878 if connected {
4879 Ok("Connected".to_string())
4880 } else {
4881 anyhow::bail!("Connection test failed")
4882 }
4883}
4884
4885pub async fn db_health(url: &str) -> anyhow::Result<()> {
4887 println!("{}", "=== Database Health Check ===".bold().cyan());
4888 println!();
4889
4890 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
4891 "PostgreSQL"
4892 } else if url.starts_with("mysql://") {
4893 "MySQL"
4894 } else {
4895 "Unknown"
4896 };
4897
4898 println!("Database: {}", db_type.yellow());
4899 println!();
4900
4901 if db_type == "PostgreSQL" {
4902 check_postgres_health(url).await?;
4903 } else {
4904 println!(
4905 "{}",
4906 "⚠️ Health check not supported for this database type".yellow()
4907 );
4908 }
4909
4910 Ok(())
4911}
4912
4913async fn check_postgres_health(url: &str) -> anyhow::Result<()> {
4915 use celers_broker_postgres::PostgresBroker;
4916
4917 println!("Checking connection...");
4918 let broker = PostgresBroker::new(url).await?;
4919 println!("{}", " ✓ Connection OK".green());
4920
4921 println!();
4922 println!("Testing connection with latency measurement...");
4923
4924 let mut latencies = Vec::new();
4926 for i in 1..=5 {
4927 let start = std::time::Instant::now();
4928 let connected = broker.test_connection().await?;
4929 let duration = start.elapsed();
4930
4931 if connected {
4932 latencies.push(duration.as_millis());
4933 println!(" {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
4934 } else {
4935 println!(" {} Query {} failed", "✗".red(), i);
4936 }
4937 }
4938
4939 if !latencies.is_empty() {
4940 let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
4941 let min = latencies
4942 .iter()
4943 .min()
4944 .expect("collection validated to be non-empty");
4945 let max = latencies
4946 .iter()
4947 .max()
4948 .expect("collection validated to be non-empty");
4949
4950 println!();
4951 println!("Query Performance:");
4952 println!(" Average: {}ms", avg.to_string().cyan());
4953 println!(" Min: {}ms", min.to_string().green());
4954 println!(" Max: {}ms", max.to_string().yellow());
4955
4956 if avg > 100 {
4957 println!(" {}", "⚠️ High query latency detected".yellow());
4958 } else {
4959 println!(" {}", "✓ Query latency is healthy".green());
4960 }
4961 }
4962
4963 println!();
4965 println!("Connection Pool Status:");
4966 let pool_metrics = broker.get_pool_metrics();
4967 println!(
4968 " Max Connections: {}",
4969 pool_metrics.max_size.to_string().cyan()
4970 );
4971 println!(" Active: {}", pool_metrics.size.to_string().cyan());
4972 println!(" Idle: {}", pool_metrics.idle.to_string().cyan());
4973 println!(" In-Use: {}", pool_metrics.in_use.to_string().cyan());
4974
4975 let utilization = if pool_metrics.max_size > 0 {
4976 (f64::from(pool_metrics.in_use) / f64::from(pool_metrics.max_size)) * 100.0
4977 } else {
4978 0.0
4979 };
4980
4981 if utilization > 80.0 {
4982 println!(
4983 " {}",
4984 "⚠️ High pool utilization - consider scaling".yellow()
4985 );
4986 }
4987
4988 println!();
4989 println!("{}", "✓ Database health check completed".green().bold());
4990
4991 Ok(())
4992}
4993
4994pub async fn db_pool_stats(url: &str) -> anyhow::Result<()> {
4996 println!("{}", "=== Connection Pool Statistics ===".bold().cyan());
4997 println!();
4998 println!("Database URL: {}", mask_password(url).cyan());
4999 println!();
5000
5001 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
5002 "PostgreSQL"
5003 } else if url.starts_with("mysql://") {
5004 "MySQL"
5005 } else {
5006 "Unknown"
5007 };
5008
5009 if db_type == "PostgreSQL" {
5010 use celers_broker_postgres::PostgresBroker;
5011
5012 println!("Connecting to database...");
5013 let broker = PostgresBroker::new(url).await?;
5014 println!("{}", " ✓ Connected".green());
5015 println!();
5016
5017 let metrics = broker.get_pool_metrics();
5018
5019 #[derive(Tabled)]
5020 struct PoolStat {
5021 #[tabled(rename = "Metric")]
5022 metric: String,
5023 #[tabled(rename = "Value")]
5024 value: String,
5025 }
5026
5027 let stats = vec![
5028 PoolStat {
5029 metric: "Max Connections".to_string(),
5030 value: metrics.max_size.to_string(),
5031 },
5032 PoolStat {
5033 metric: "Active Connections".to_string(),
5034 value: metrics.size.to_string(),
5035 },
5036 PoolStat {
5037 metric: "Idle Connections".to_string(),
5038 value: metrics.idle.to_string(),
5039 },
5040 PoolStat {
5041 metric: "In-Use Connections".to_string(),
5042 value: metrics.in_use.to_string(),
5043 },
5044 PoolStat {
5045 metric: "Waiting Tasks".to_string(),
5046 value: if metrics.waiting > 0 {
5047 metrics.waiting.to_string()
5048 } else {
5049 "0 (estimated)".to_string()
5050 },
5051 },
5052 ];
5053
5054 let table = Table::new(stats).with(Style::rounded()).to_string();
5055 println!("{table}");
5056 println!();
5057
5058 let utilization = if metrics.max_size > 0 {
5060 (f64::from(metrics.in_use) / f64::from(metrics.max_size)) * 100.0
5061 } else {
5062 0.0
5063 };
5064
5065 println!("Pool Utilization: {utilization:.1}%");
5066 if utilization > 80.0 {
5067 println!(
5068 "{}",
5069 "⚠️ High pool utilization - consider increasing max_connections".yellow()
5070 );
5071 } else if utilization < 20.0 && metrics.max_size > 10 {
5072 println!(
5073 "{}",
5074 "ℹ️ Low pool utilization - consider reducing max_connections".cyan()
5075 );
5076 } else {
5077 println!("{}", "✓ Pool utilization is healthy".green());
5078 }
5079 } else {
5080 println!(
5081 "{}",
5082 "⚠️ Pool statistics only supported for PostgreSQL".yellow()
5083 );
5084 }
5085
5086 Ok(())
5087}
5088
5089pub async fn db_migrate(url: &str, action: &str, steps: usize) -> anyhow::Result<()> {
5091 println!("{}", "=== Database Migrations ===".bold().cyan());
5092 println!();
5093 println!("Database URL: {}", mask_password(url).cyan());
5094 println!("Action: {}", action.yellow());
5095 println!();
5096
5097 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
5098 "PostgreSQL"
5099 } else if url.starts_with("mysql://") {
5100 "MySQL"
5101 } else {
5102 "Unknown"
5103 };
5104
5105 if db_type == "PostgreSQL" {
5106 use celers_broker_postgres::PostgresBroker;
5107
5108 match action.to_lowercase().as_str() {
5109 "apply" => {
5110 println!("Applying migrations...");
5111 let broker = PostgresBroker::new(url).await?;
5112 let _ = broker; println!("{}", " ✓ Schema initialized".green());
5115 println!();
5116 println!("{}", "✓ Migrations applied successfully".green().bold());
5117 }
5118 "rollback" => {
5119 println!("Rolling back {steps} migration(s)...");
5120 println!();
5121 println!(
5122 "{}",
5123 "⚠️ Manual rollback required - use SQL scripts".yellow()
5124 );
5125 println!(" CeleRS uses auto-migration with SQLx");
5126 println!(" To rollback, restore from database backup");
5127 }
5128 "status" => {
5129 println!("Checking migration status...");
5130 let broker = PostgresBroker::new(url).await?;
5131 let connected = broker.test_connection().await?;
5132
5133 println!();
5134 if connected {
5135 println!("{}", " ✓ Database schema is up-to-date".green());
5136 println!(" Tables: celers_tasks, celers_results");
5137 } else {
5138 println!("{}", " ✗ Cannot connect to database".red());
5139 }
5140 }
5141 _ => {
5142 anyhow::bail!("Unknown action '{action}'. Valid actions: apply, rollback, status");
5143 }
5144 }
5145 } else {
5146 println!(
5147 "{}",
5148 "⚠️ Migrations only supported for PostgreSQL".yellow()
5149 );
5150 }
5151
5152 Ok(())
5153}
5154
5155fn mask_password(url: &str) -> String {
5157 if let Some(at_pos) = url.find('@') {
5158 if let Some(colon_pos) = url[..at_pos].rfind(':') {
5159 let before = &url[..=colon_pos];
5160 let after = &url[at_pos..];
5161 return format!("{before}****{after}");
5162 }
5163 }
5164 url.to_string()
5165}
5166
5167pub async fn run_dashboard(broker_url: &str, queue: &str, refresh_secs: u64) -> anyhow::Result<()> {
5169 use crossterm::{
5170 event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
5171 execute,
5172 terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
5173 };
5174 use ratatui::{
5175 backend::CrosstermBackend,
5176 layout::{Constraint, Direction, Layout},
5177 style::{Color, Modifier, Style},
5178 widgets::{Block, Borders, Gauge, List, ListItem, Paragraph},
5179 Terminal,
5180 };
5181 use std::io;
5182 use std::time::Duration;
5183
5184 let broker = RedisBroker::new(broker_url, queue)?;
5185
5186 enable_raw_mode()?;
5188 let mut stdout = io::stdout();
5189 execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
5190 let backend = CrosstermBackend::new(stdout);
5191 let mut terminal = Terminal::new(backend)?;
5192
5193 let refresh_duration = Duration::from_secs(refresh_secs);
5194 let mut last_update = std::time::Instant::now();
5195
5196 let result = loop {
5197 let queue_size = broker.queue_size().await.unwrap_or(0);
5199 let dlq_size = broker.dlq_size().await.unwrap_or(0);
5200 let now = Utc::now();
5201
5202 let mut con = redis::Client::open(broker_url)?.get_connection()?;
5204 let keys: Vec<String> = redis::cmd("KEYS")
5205 .arg("celers:worker:*:heartbeat")
5206 .query(&mut con)?;
5207 let worker_count = keys.len();
5208
5209 terminal.draw(|f| {
5211 let size = f.area();
5212
5213 let chunks = Layout::default()
5215 .direction(Direction::Vertical)
5216 .margin(1)
5217 .constraints([
5218 Constraint::Length(3),
5219 Constraint::Length(7),
5220 Constraint::Length(7),
5221 Constraint::Min(0),
5222 ])
5223 .split(size);
5224
5225 let title = Paragraph::new(format!(
5227 "CeleRS Dashboard - Queue: {} | Last Update: {}",
5228 queue,
5229 now.format("%Y-%m-%d %H:%M:%S")
5230 ))
5231 .style(
5232 Style::default()
5233 .fg(Color::Cyan)
5234 .add_modifier(Modifier::BOLD),
5235 )
5236 .block(Block::default().borders(Borders::ALL));
5237 f.render_widget(title, chunks[0]);
5238
5239 let queue_stats = [
5241 format!("Pending Tasks: {queue_size}"),
5242 format!("DLQ Size: {dlq_size}"),
5243 format!("Active Workers: {worker_count}"),
5244 ];
5245 let queue_block = Paragraph::new(queue_stats.join("\n"))
5246 .style(Style::default().fg(Color::Green))
5247 .block(
5248 Block::default()
5249 .borders(Borders::ALL)
5250 .title("Queue Statistics"),
5251 );
5252 f.render_widget(queue_block, chunks[1]);
5253
5254 let max_display = 1000;
5256 let ratio = (queue_size.min(max_display) as f64 / max_display as f64).min(1.0);
5257 let gauge_color = if queue_size > 500 {
5258 Color::Red
5259 } else if queue_size > 100 {
5260 Color::Yellow
5261 } else {
5262 Color::Green
5263 };
5264
5265 let gauge = Gauge::default()
5266 .block(Block::default().borders(Borders::ALL).title("Queue Depth"))
5267 .gauge_style(Style::default().fg(gauge_color))
5268 .ratio(ratio)
5269 .label(format!("{queue_size} / {max_display}"));
5270 f.render_widget(gauge, chunks[2]);
5271
5272 let mut messages = vec![];
5274 if worker_count == 0 && queue_size > 0 {
5275 messages.push(
5276 ListItem::new("⚠️ No active workers - tasks are not being processed")
5277 .style(Style::default().fg(Color::Red)),
5278 );
5279 }
5280 if dlq_size > 10 {
5281 messages.push(
5282 ListItem::new(format!("⚠️ High DLQ size: {dlq_size} failed tasks"))
5283 .style(Style::default().fg(Color::Yellow)),
5284 );
5285 }
5286 if queue_size > 1000 {
5287 messages.push(
5288 ListItem::new("⚠️ Queue backlog is high - consider scaling workers")
5289 .style(Style::default().fg(Color::Yellow)),
5290 );
5291 }
5292 if messages.is_empty() {
5293 messages.push(
5294 ListItem::new("✓ All systems normal").style(Style::default().fg(Color::Green)),
5295 );
5296 }
5297
5298 messages.push(ListItem::new(""));
5299 messages.push(
5300 ListItem::new("Press 'q' to quit").style(Style::default().fg(Color::DarkGray)),
5301 );
5302
5303 let status_list = List::new(messages).block(
5304 Block::default()
5305 .borders(Borders::ALL)
5306 .title("Status & Alerts"),
5307 );
5308 f.render_widget(status_list, chunks[3]);
5309 })?;
5310
5311 if event::poll(Duration::from_millis(100))? {
5313 if let Event::Key(key) = event::read()? {
5314 if key.code == KeyCode::Char('q') {
5315 break Ok(());
5316 }
5317 }
5318 }
5319
5320 if last_update.elapsed() >= refresh_duration {
5322 last_update = std::time::Instant::now();
5323 }
5324 };
5325
5326 disable_raw_mode()?;
5328 execute!(
5329 terminal.backend_mut(),
5330 LeaveAlternateScreen,
5331 DisableMouseCapture
5332 )?;
5333 terminal.show_cursor()?;
5334
5335 result
5336}
5337
5338#[allow(dead_code)]
5344async fn retry_with_backoff<F, T, E>(
5345 operation: F,
5346 max_retries: u32,
5347 operation_name: &str,
5348) -> Result<T, E>
5349where
5350 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
5351 E: std::fmt::Display,
5352{
5353 let mut retries = 0;
5354 loop {
5355 match operation().await {
5356 Ok(result) => return Ok(result),
5357 Err(err) => {
5358 retries += 1;
5359 if retries >= max_retries {
5360 eprintln!(
5361 "{}",
5362 format!("✗ {operation_name} failed after {max_retries} attempts: {err}")
5363 .red()
5364 );
5365 return Err(err);
5366 }
5367
5368 let backoff_ms = 100 * (2_u64.pow(retries - 1));
5369 eprintln!(
5370 "{}",
5371 format!(
5372 "⚠ {operation_name} failed (attempt {retries}/{max_retries}), retrying in {backoff_ms}ms..."
5373 )
5374 .yellow()
5375 );
5376 tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
5377 }
5378 }
5379 }
5380}
5381
5382#[allow(dead_code)]
5384fn format_bytes(bytes: usize) -> String {
5385 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
5386 let mut size = bytes as f64;
5387 let mut unit_idx = 0;
5388
5389 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
5390 size /= 1024.0;
5391 unit_idx += 1;
5392 }
5393
5394 if unit_idx == 0 {
5395 format!("{} {}", size as usize, UNITS[unit_idx])
5396 } else {
5397 format!("{:.2} {}", size, UNITS[unit_idx])
5398 }
5399}
5400
5401#[allow(dead_code)]
5403fn format_duration(seconds: u64) -> String {
5404 if seconds < 60 {
5405 format!("{seconds}s")
5406 } else if seconds < 3600 {
5407 let minutes = seconds / 60;
5408 let secs = seconds % 60;
5409 if secs == 0 {
5410 format!("{minutes}m")
5411 } else {
5412 format!("{minutes}m {secs}s")
5413 }
5414 } else if seconds < 86400 {
5415 let hours = seconds / 3600;
5416 let minutes = (seconds % 3600) / 60;
5417 if minutes == 0 {
5418 format!("{hours}h")
5419 } else {
5420 format!("{hours}h {minutes}m")
5421 }
5422 } else {
5423 let days = seconds / 86400;
5424 let hours = (seconds % 86400) / 3600;
5425 if hours == 0 {
5426 format!("{days}d")
5427 } else {
5428 format!("{days}d {hours}h")
5429 }
5430 }
5431}
5432
5433#[allow(dead_code)]
5435fn validate_task_id(task_id: &str) -> anyhow::Result<uuid::Uuid> {
5436 uuid::Uuid::parse_str(task_id)
5437 .map_err(|e| anyhow::anyhow!("Invalid task ID format: {e}. Expected UUID format."))
5438}
5439
5440#[allow(dead_code)]
5442fn validate_queue_name(queue: &str) -> anyhow::Result<()> {
5443 if queue.is_empty() {
5444 anyhow::bail!("Queue name cannot be empty");
5445 }
5446 if queue.contains(char::is_whitespace) {
5447 anyhow::bail!("Queue name cannot contain whitespace");
5448 }
5449 if queue.len() > 255 {
5450 anyhow::bail!("Queue name too long (max 255 characters)");
5451 }
5452 Ok(())
5453}
5454
5455#[allow(dead_code)]
5457fn calculate_percentage(part: usize, total: usize) -> f64 {
5458 if total == 0 {
5459 0.0
5460 } else {
5461 (part as f64 / total as f64) * 100.0
5462 }
5463}
5464
5465#[cfg(test)]
5466mod tests {
5467 #[test]
5468 fn test_task_id_parsing() {
5469 let valid_id = "550e8400-e29b-41d4-a716-446655440000";
5471 assert!(valid_id.parse::<uuid::Uuid>().is_ok());
5472
5473 let invalid_id = "not-a-valid-uuid";
5475 assert!(invalid_id.parse::<uuid::Uuid>().is_err());
5476 }
5477
5478 #[test]
5479 fn test_worker_id_extraction() {
5480 let key = "celers:worker:worker-123:heartbeat";
5481 let parts: Vec<&str> = key.split(':').collect();
5482 assert_eq!(parts.len(), 4);
5483 assert_eq!(parts[2], "worker-123");
5484 }
5485
5486 #[test]
5487 fn test_log_level_matching() {
5488 let levels = vec![
5489 "ERROR", "error", "WARN", "warn", "INFO", "info", "DEBUG", "debug",
5490 ];
5491
5492 for level in levels {
5493 let _colored = match level {
5494 "ERROR" | "error" => level,
5495 "WARN" | "warn" => level,
5496 "DEBUG" | "debug" => level,
5497 _ => level,
5498 };
5499 assert!(!level.is_empty());
5501 }
5502 }
5503
5504 #[test]
5505 fn test_redis_key_formatting() {
5506 let task_id = uuid::Uuid::new_v4();
5507 let logs_key = format!("celers:task:{}:logs", task_id);
5508 assert!(logs_key.starts_with("celers:task:"));
5509 assert!(logs_key.ends_with(":logs"));
5510
5511 let worker_id = "worker-123";
5512 let heartbeat_key = format!("celers:worker:{}:heartbeat", worker_id);
5513 assert_eq!(heartbeat_key, "celers:worker:worker-123:heartbeat");
5514
5515 let pause_key = format!("celers:worker:{}:paused", worker_id);
5516 assert_eq!(pause_key, "celers:worker:worker-123:paused");
5517 }
5518
5519 #[test]
5520 fn test_queue_key_formatting() {
5521 let queue = "test-queue";
5522 let queue_key = format!("celers:{}", queue);
5523 assert_eq!(queue_key, "celers:test-queue");
5524
5525 let dlq_key = format!("{}:dlq", queue_key);
5526 assert_eq!(dlq_key, "celers:test-queue:dlq");
5527
5528 let delayed_key = format!("{}:delayed", queue_key);
5529 assert_eq!(delayed_key, "celers:test-queue:delayed");
5530 }
5531
5532 #[test]
5533 fn test_json_log_parsing() {
5534 let valid_log =
5535 r#"{"timestamp":"2026-01-04T10:00:00Z","level":"INFO","message":"Test message"}"#;
5536 let parsed: Result<serde_json::Value, _> = serde_json::from_str(valid_log);
5537 assert!(parsed.is_ok());
5538
5539 let log_json = parsed.unwrap();
5540 assert_eq!(log_json.get("level").and_then(|v| v.as_str()), Some("INFO"));
5541 assert_eq!(
5542 log_json.get("message").and_then(|v| v.as_str()),
5543 Some("Test message")
5544 );
5545
5546 let invalid_log = "not json";
5547 let parsed: Result<serde_json::Value, _> = serde_json::from_str(invalid_log);
5548 assert!(parsed.is_err());
5549 }
5550
5551 #[test]
5552 fn test_limit_range_calculation() {
5553 let limit = 50;
5554 let log_count = 100;
5555
5556 let start_idx = -(limit as isize);
5558 let end_idx = -1;
5559
5560 assert_eq!(start_idx, -50);
5561 assert_eq!(end_idx, -1);
5562
5563 assert!(log_count as usize > limit);
5565 }
5566
5567 #[test]
5568 fn test_diagnostic_thresholds() {
5569 let dlq_size = 15;
5571 assert!(dlq_size > 10, "Should trigger warning when DLQ > 10");
5572
5573 let queue_size = 1500;
5575 assert!(
5576 queue_size > 1000,
5577 "Should trigger warning when queue > 1000"
5578 );
5579
5580 let worker_count = 0;
5582 let pending_tasks = 50;
5583 assert!(
5584 worker_count == 0 && pending_tasks > 0,
5585 "Should trigger error when no workers but tasks pending"
5586 );
5587 }
5588
5589 #[test]
5590 fn test_shutdown_channel_naming() {
5591 let worker_id = "worker-123";
5592
5593 let graceful_channel = format!("celers:worker:{}:shutdown_graceful", worker_id);
5594 assert_eq!(
5595 graceful_channel,
5596 "celers:worker:worker-123:shutdown_graceful"
5597 );
5598
5599 let immediate_channel = format!("celers:worker:{}:shutdown", worker_id);
5600 assert_eq!(immediate_channel, "celers:worker:worker-123:shutdown");
5601 }
5602
5603 #[test]
5604 fn test_timestamp_formatting() {
5605 let timestamp = chrono::Utc::now().to_rfc3339();
5606 assert!(timestamp.contains('T'));
5607 assert!(timestamp.contains('Z') || timestamp.contains('+'));
5608 }
5609
5610 #[test]
5611 fn test_mask_password() {
5612 let pg_url = "postgres://user:password123@localhost:5432/dbname";
5614 let masked = super::mask_password(pg_url);
5615 assert!(masked.contains("postgres://user:****@localhost"));
5616 assert!(!masked.contains("password123"));
5617
5618 let mysql_url = "mysql://admin:secret@127.0.0.1:3306/db";
5620 let masked = super::mask_password(mysql_url);
5621 assert!(masked.contains("mysql://admin:****@127.0.0.1"));
5622 assert!(!masked.contains("secret"));
5623
5624 let no_pass_url = "redis://localhost:6379";
5626 let masked = super::mask_password(no_pass_url);
5627 assert_eq!(masked, no_pass_url);
5628 }
5629
5630 #[test]
5631 fn test_format_bytes() {
5632 assert_eq!(super::format_bytes(0), "0 B");
5633 assert_eq!(super::format_bytes(500), "500 B");
5634 assert_eq!(super::format_bytes(1024), "1.00 KB");
5635 assert_eq!(super::format_bytes(1536), "1.50 KB");
5636 assert_eq!(super::format_bytes(1048576), "1.00 MB");
5637 assert_eq!(super::format_bytes(1073741824), "1.00 GB");
5638 }
5639
5640 #[test]
5641 fn test_format_duration() {
5642 assert_eq!(super::format_duration(0), "0s");
5643 assert_eq!(super::format_duration(30), "30s");
5644 assert_eq!(super::format_duration(60), "1m");
5645 assert_eq!(super::format_duration(90), "1m 30s");
5646 assert_eq!(super::format_duration(3600), "1h");
5647 assert_eq!(super::format_duration(3660), "1h 1m");
5648 assert_eq!(super::format_duration(86400), "1d");
5649 assert_eq!(super::format_duration(90000), "1d 1h");
5650 }
5651
5652 #[test]
5653 fn test_validate_task_id() {
5654 let valid = "550e8400-e29b-41d4-a716-446655440000";
5656 assert!(super::validate_task_id(valid).is_ok());
5657
5658 assert!(super::validate_task_id("not-a-uuid").is_err());
5660 assert!(super::validate_task_id("").is_err());
5661 assert!(super::validate_task_id("12345").is_err());
5662 }
5663
5664 #[test]
5665 fn test_validate_queue_name() {
5666 assert!(super::validate_queue_name("default").is_ok());
5668 assert!(super::validate_queue_name("high-priority").is_ok());
5669 assert!(super::validate_queue_name("queue_1").is_ok());
5670
5671 assert!(super::validate_queue_name("").is_err()); assert!(super::validate_queue_name("queue name").is_err()); assert!(super::validate_queue_name(&"x".repeat(256)).is_err()); }
5676
5677 #[test]
5678 fn test_calculate_percentage() {
5679 assert_eq!(super::calculate_percentage(0, 100), 0.0);
5680 assert_eq!(super::calculate_percentage(50, 100), 50.0);
5681 assert_eq!(super::calculate_percentage(100, 100), 100.0);
5682 assert_eq!(super::calculate_percentage(25, 100), 25.0);
5683
5684 assert_eq!(super::calculate_percentage(10, 0), 0.0);
5686 }
5687}