1use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use uuid::Uuid;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "type", content = "args")]
26pub enum ControlCommand {
27 Ping {
29 reply_to: String,
31 },
32
33 Inspect(InspectCommand),
35
36 Shutdown {
38 timeout: Option<u64>,
40 },
41
42 Revoke {
44 task_id: Uuid,
46 terminate: bool,
48 signal: Option<String>,
50 },
51
52 RateLimit {
54 task_name: String,
56 rate: Option<f64>,
58 },
59
60 TimeLimit {
62 task_name: String,
64 soft: Option<u64>,
66 hard: Option<u64>,
68 },
69
70 AddConsumer {
72 queue: String,
74 },
75
76 CancelConsumer {
78 queue: String,
80 },
81
82 Queue(QueueCommand),
84
85 BulkRevoke {
87 task_ids: Vec<Uuid>,
89 terminate: bool,
91 },
92
93 RevokeByPattern {
95 pattern: String,
97 terminate: bool,
99 },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(tag = "action")]
105pub enum QueueCommand {
106 Purge {
108 queue: String,
110 },
111
112 Length {
114 queue: String,
116 },
117
118 Delete {
120 queue: String,
122 if_empty: bool,
124 if_unused: bool,
126 },
127
128 Bind {
130 queue: String,
132 exchange: String,
134 routing_key: String,
136 },
137
138 Unbind {
140 queue: String,
142 exchange: String,
144 routing_key: String,
146 },
147
148 Declare {
150 queue: String,
152 durable: bool,
154 exclusive: bool,
156 auto_delete: bool,
158 },
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(tag = "method")]
164pub enum InspectCommand {
165 Active,
167
168 Scheduled,
170
171 Reserved,
173
174 Revoked,
176
177 Registered,
179
180 Stats,
182
183 QueueInfo,
185
186 Report,
188
189 Conf,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "type", content = "payload")]
196pub enum ControlResponse {
197 Pong {
199 hostname: String,
201 timestamp: f64,
203 },
204
205 Inspect(Box<InspectResponse>),
207
208 Ack {
210 ok: bool,
212 message: Option<String>,
214 },
215
216 Error {
218 error: String,
220 },
221
222 Queue(QueueResponse),
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228#[serde(tag = "queue_type", content = "result")]
229pub enum QueueResponse {
230 Purged {
232 queue: String,
234 message_count: u64,
236 },
237
238 Length {
240 queue: String,
242 message_count: u64,
244 },
245
246 Deleted {
248 queue: String,
250 },
251
252 Bound {
254 queue: String,
256 exchange: String,
258 routing_key: String,
260 },
261
262 Unbound {
264 queue: String,
266 exchange: String,
268 routing_key: String,
270 },
271
272 Declared {
274 queue: String,
276 message_count: u64,
278 consumer_count: u32,
280 },
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285#[serde(tag = "inspect_type", content = "data")]
286pub enum InspectResponse {
287 Active(Vec<ActiveTaskInfo>),
289
290 Scheduled(Vec<ScheduledTaskInfo>),
292
293 Reserved(Vec<ReservedTaskInfo>),
295
296 Revoked(Vec<Uuid>),
298
299 Registered(Vec<String>),
301
302 Stats(WorkerStats),
304
305 QueueInfo(HashMap<String, QueueStats>),
307
308 Report(WorkerReport),
310
311 Conf(WorkerConf),
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct ActiveTaskInfo {
318 pub id: Uuid,
320 pub name: String,
322 pub args: String,
324 pub kwargs: String,
326 pub started: f64,
328 pub hostname: String,
330 pub worker_pid: Option<u32>,
332 pub delivery_info: Option<DeliveryInfo>,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct ScheduledTaskInfo {
339 pub id: Uuid,
341 pub name: String,
343 pub args: String,
345 pub kwargs: String,
347 pub eta: f64,
349 pub priority: Option<u8>,
351 pub request: Option<RequestInfo>,
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct ReservedTaskInfo {
358 pub id: Uuid,
360 pub name: String,
362 pub args: String,
364 pub kwargs: String,
366 pub delivery_info: Option<DeliveryInfo>,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct DeliveryInfo {
373 pub queue: String,
375 pub routing_key: Option<String>,
377 pub exchange: Option<String>,
379 pub delivery_tag: Option<String>,
381 pub redelivered: bool,
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct RequestInfo {
388 pub correlation_id: Option<String>,
390 pub reply_to: Option<String>,
392 pub priority: Option<u8>,
394}
395
396#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct WorkerStats {
399 pub total_tasks: u64,
401 pub active_tasks: u32,
403 pub succeeded: u64,
405 pub failed: u64,
407 pub retried: u64,
409 pub uptime: f64,
411 pub loadavg: Option<[f64; 3]>,
413 pub memory_usage: Option<u64>,
415 pub pool: Option<PoolStats>,
417 pub broker: Option<BrokerStats>,
419 pub clock: Option<f64>,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct PoolStats {
426 pub pool_type: String,
428 pub max_concurrency: u32,
430 pub pool_size: u32,
432 pub available: u32,
434 pub processes: Vec<u32>,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct BrokerStats {
441 pub url: String,
443 pub connected: bool,
445 pub heartbeat: Option<u64>,
447 pub transport: String,
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct QueueStats {
454 pub name: String,
456 pub messages: u64,
458 pub consumers: u32,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct WorkerReport {
465 pub hostname: String,
467 pub sw_ver: String,
469 pub sw_sys: String,
471 pub stats: WorkerStats,
473 pub active: Vec<ActiveTaskInfo>,
475 pub scheduled: Vec<ScheduledTaskInfo>,
477 pub reserved: Vec<ReservedTaskInfo>,
479 pub registered: Vec<String>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct WorkerConf {
486 pub broker_url: String,
488 pub result_backend: Option<String>,
490 pub default_queue: String,
492 pub prefetch_multiplier: u32,
494 pub concurrency: u32,
496 pub task_soft_time_limit: Option<u64>,
498 pub task_time_limit: Option<u64>,
500 pub task_acks_late: bool,
502 pub task_reject_on_worker_lost: bool,
504 pub hostname: String,
506}
507
508impl ControlCommand {
509 #[inline]
511 pub fn ping(reply_to: impl Into<String>) -> Self {
512 Self::Ping {
513 reply_to: reply_to.into(),
514 }
515 }
516
517 #[inline]
519 #[must_use]
520 pub fn inspect_active() -> Self {
521 Self::Inspect(InspectCommand::Active)
522 }
523
524 #[inline]
526 #[must_use]
527 pub fn inspect_scheduled() -> Self {
528 Self::Inspect(InspectCommand::Scheduled)
529 }
530
531 #[inline]
533 #[must_use]
534 pub fn inspect_reserved() -> Self {
535 Self::Inspect(InspectCommand::Reserved)
536 }
537
538 #[inline]
540 #[must_use]
541 pub fn inspect_revoked() -> Self {
542 Self::Inspect(InspectCommand::Revoked)
543 }
544
545 #[inline]
547 #[must_use]
548 pub fn inspect_registered() -> Self {
549 Self::Inspect(InspectCommand::Registered)
550 }
551
552 #[inline]
554 #[must_use]
555 pub fn inspect_stats() -> Self {
556 Self::Inspect(InspectCommand::Stats)
557 }
558
559 #[inline]
561 #[must_use]
562 pub fn inspect_queue_info() -> Self {
563 Self::Inspect(InspectCommand::QueueInfo)
564 }
565
566 #[inline]
568 #[must_use]
569 pub fn shutdown(timeout: Option<u64>) -> Self {
570 Self::Shutdown { timeout }
571 }
572
573 #[inline]
575 #[must_use]
576 pub fn revoke(task_id: Uuid, terminate: bool) -> Self {
577 Self::Revoke {
578 task_id,
579 terminate,
580 signal: None,
581 }
582 }
583
584 #[inline]
586 #[must_use]
587 pub fn bulk_revoke(task_ids: Vec<Uuid>, terminate: bool) -> Self {
588 Self::BulkRevoke {
589 task_ids,
590 terminate,
591 }
592 }
593
594 #[inline]
596 pub fn revoke_by_pattern(pattern: impl Into<String>, terminate: bool) -> Self {
597 Self::RevokeByPattern {
598 pattern: pattern.into(),
599 terminate,
600 }
601 }
602
603 #[inline]
605 pub fn queue_purge(queue: impl Into<String>) -> Self {
606 Self::Queue(QueueCommand::Purge {
607 queue: queue.into(),
608 })
609 }
610
611 #[inline]
613 pub fn queue_length(queue: impl Into<String>) -> Self {
614 Self::Queue(QueueCommand::Length {
615 queue: queue.into(),
616 })
617 }
618
619 #[inline]
621 pub fn queue_delete(queue: impl Into<String>, if_empty: bool, if_unused: bool) -> Self {
622 Self::Queue(QueueCommand::Delete {
623 queue: queue.into(),
624 if_empty,
625 if_unused,
626 })
627 }
628
629 #[inline]
631 pub fn queue_bind(
632 queue: impl Into<String>,
633 exchange: impl Into<String>,
634 routing_key: impl Into<String>,
635 ) -> Self {
636 Self::Queue(QueueCommand::Bind {
637 queue: queue.into(),
638 exchange: exchange.into(),
639 routing_key: routing_key.into(),
640 })
641 }
642
643 #[inline]
645 pub fn queue_unbind(
646 queue: impl Into<String>,
647 exchange: impl Into<String>,
648 routing_key: impl Into<String>,
649 ) -> Self {
650 Self::Queue(QueueCommand::Unbind {
651 queue: queue.into(),
652 exchange: exchange.into(),
653 routing_key: routing_key.into(),
654 })
655 }
656
657 #[inline]
659 pub fn queue_declare(
660 queue: impl Into<String>,
661 durable: bool,
662 exclusive: bool,
663 auto_delete: bool,
664 ) -> Self {
665 Self::Queue(QueueCommand::Declare {
666 queue: queue.into(),
667 durable,
668 exclusive,
669 auto_delete,
670 })
671 }
672}
673
674impl QueueCommand {
675 #[inline]
677 #[must_use]
678 pub fn queue_name(&self) -> &str {
679 match self {
680 Self::Purge { queue }
681 | Self::Length { queue }
682 | Self::Delete { queue, .. }
683 | Self::Bind { queue, .. }
684 | Self::Unbind { queue, .. }
685 | Self::Declare { queue, .. } => queue,
686 }
687 }
688}
689
690impl ControlResponse {
691 #[inline]
693 pub fn pong(hostname: impl Into<String>) -> Self {
694 Self::Pong {
695 hostname: hostname.into(),
696 timestamp: std::time::SystemTime::now()
697 .duration_since(std::time::UNIX_EPOCH)
698 .unwrap_or_default()
699 .as_secs_f64(),
700 }
701 }
702
703 #[inline]
705 #[must_use]
706 pub fn ack(ok: bool, message: Option<String>) -> Self {
707 Self::Ack { ok, message }
708 }
709
710 #[inline]
712 pub fn error(error: impl Into<String>) -> Self {
713 Self::Error {
714 error: error.into(),
715 }
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_control_command_serialization() {
725 let cmd = ControlCommand::ping("test-reply");
726 let json = serde_json::to_string(&cmd).unwrap();
727 assert!(json.contains("Ping"));
728 assert!(json.contains("test-reply"));
729
730 let cmd: ControlCommand = serde_json::from_str(&json).unwrap();
731 matches!(cmd, ControlCommand::Ping { reply_to } if reply_to == "test-reply");
732 }
733
734 #[test]
735 fn test_inspect_command_serialization() {
736 let cmd = ControlCommand::inspect_active();
737 let json = serde_json::to_string(&cmd).unwrap();
738 assert!(json.contains("Inspect"));
739 assert!(json.contains("Active"));
740 }
741
742 #[test]
743 fn test_control_response_serialization() {
744 let resp = ControlResponse::pong("worker-1");
745 let json = serde_json::to_string(&resp).unwrap();
746 assert!(json.contains("Pong"));
747 assert!(json.contains("worker-1"));
748 }
749
750 #[test]
751 fn test_inspect_response_serialization() {
752 let stats = WorkerStats {
753 total_tasks: 100,
754 active_tasks: 2,
755 succeeded: 95,
756 failed: 3,
757 retried: 2,
758 uptime: 3600.0,
759 loadavg: Some([0.5, 0.6, 0.7]),
760 memory_usage: Some(1024 * 1024 * 100),
761 pool: None,
762 broker: None,
763 clock: None,
764 };
765 let resp = InspectResponse::Stats(stats);
766 let json = serde_json::to_string(&resp).unwrap();
767 assert!(json.contains("Stats"));
768 assert!(json.contains("100"));
769 }
770
771 #[test]
772 fn test_active_task_info() {
773 let info = ActiveTaskInfo {
774 id: Uuid::new_v4(),
775 name: "tasks.add".to_string(),
776 args: "[1, 2]".to_string(),
777 kwargs: "{}".to_string(),
778 started: 1_234_567_890.0,
779 hostname: "worker-1".to_string(),
780 worker_pid: Some(12345),
781 delivery_info: Some(DeliveryInfo {
782 queue: "celery".to_string(),
783 routing_key: Some("tasks.add".to_string()),
784 exchange: Some(String::new()),
785 delivery_tag: Some("1".to_string()),
786 redelivered: false,
787 }),
788 };
789 let json = serde_json::to_string(&info).unwrap();
790 assert!(json.contains("tasks.add"));
791 }
792}