celers_core/
control.rs

1//! Worker Control Commands
2//!
3//! This module provides the protocol for remote worker control and inspection.
4//! It enables clients to query worker state, inspect running tasks, and send
5//! control commands to workers.
6//!
7//! # Example
8//!
9//! ```rust
10//! use celers_core::control::{ControlCommand, InspectCommand, InspectResponse};
11//!
12//! // Create an inspect active tasks command
13//! let cmd = ControlCommand::Inspect(InspectCommand::Active);
14//!
15//! // Serialize for sending to worker
16//! let json = serde_json::to_string(&cmd).unwrap();
17//! ```
18
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use uuid::Uuid;
22
23/// Control commands that can be sent to workers
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "type", content = "args")]
26pub enum ControlCommand {
27    /// Ping the worker to check if it's alive
28    Ping {
29        /// Unique ID for this ping request
30        reply_to: String,
31    },
32
33    /// Inspect worker state
34    Inspect(InspectCommand),
35
36    /// Shutdown the worker gracefully
37    Shutdown {
38        /// Optional timeout in seconds
39        timeout: Option<u64>,
40    },
41
42    /// Revoke a task
43    Revoke {
44        /// Task ID to revoke
45        task_id: Uuid,
46        /// Whether to terminate the task if already running
47        terminate: bool,
48        /// Whether to send SIGKILL (true) or SIGTERM (false)
49        signal: Option<String>,
50    },
51
52    /// Set rate limit for a task type
53    RateLimit {
54        /// Task name to rate limit
55        task_name: String,
56        /// Maximum tasks per second (None to remove limit)
57        rate: Option<f64>,
58    },
59
60    /// Set time limit for a task type
61    TimeLimit {
62        /// Task name to limit
63        task_name: String,
64        /// Soft time limit in seconds (warning)
65        soft: Option<u64>,
66        /// Hard time limit in seconds (kill)
67        hard: Option<u64>,
68    },
69
70    /// Add consumer for a queue
71    AddConsumer {
72        /// Queue name to consume from
73        queue: String,
74    },
75
76    /// Cancel consumer for a queue
77    CancelConsumer {
78        /// Queue name to stop consuming
79        queue: String,
80    },
81
82    /// Queue control commands
83    Queue(QueueCommand),
84
85    /// Bulk revoke tasks
86    BulkRevoke {
87        /// Task IDs to revoke
88        task_ids: Vec<Uuid>,
89        /// Whether to terminate running tasks
90        terminate: bool,
91    },
92
93    /// Revoke tasks matching a pattern
94    RevokeByPattern {
95        /// Pattern to match task names (glob format)
96        pattern: String,
97        /// Whether to terminate running tasks
98        terminate: bool,
99    },
100}
101
102/// Queue control sub-commands
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(tag = "action")]
105pub enum QueueCommand {
106    /// Purge all messages from a queue
107    Purge {
108        /// Queue name to purge
109        queue: String,
110    },
111
112    /// Get the number of messages in a queue
113    Length {
114        /// Queue name to check
115        queue: String,
116    },
117
118    /// Delete a queue
119    Delete {
120        /// Queue name to delete
121        queue: String,
122        /// Only delete if queue is empty
123        if_empty: bool,
124        /// Only delete if queue has no consumers
125        if_unused: bool,
126    },
127
128    /// Bind a queue to an exchange (AMQP)
129    Bind {
130        /// Queue name
131        queue: String,
132        /// Exchange name
133        exchange: String,
134        /// Routing key
135        routing_key: String,
136    },
137
138    /// Unbind a queue from an exchange (AMQP)
139    Unbind {
140        /// Queue name
141        queue: String,
142        /// Exchange name
143        exchange: String,
144        /// Routing key
145        routing_key: String,
146    },
147
148    /// Declare a new queue
149    Declare {
150        /// Queue name
151        queue: String,
152        /// Whether the queue should survive broker restart
153        durable: bool,
154        /// Whether the queue is exclusive to this connection
155        exclusive: bool,
156        /// Whether the queue should be auto-deleted when unused
157        auto_delete: bool,
158    },
159}
160
161/// Inspection sub-commands
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(tag = "method")]
164pub enum InspectCommand {
165    /// List currently executing tasks
166    Active,
167
168    /// List scheduled (ETA/countdown) tasks
169    Scheduled,
170
171    /// List reserved (prefetched) tasks
172    Reserved,
173
174    /// List revoked task IDs
175    Revoked,
176
177    /// Get registered task types
178    Registered,
179
180    /// Get worker statistics
181    Stats,
182
183    /// Get task queue lengths
184    QueueInfo,
185
186    /// Report current state
187    Report,
188
189    /// Get configuration
190    Conf,
191}
192
193/// Response to a control command
194#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "type", content = "payload")]
196pub enum ControlResponse {
197    /// Pong response
198    Pong {
199        /// Worker hostname
200        hostname: String,
201        /// UTC timestamp
202        timestamp: f64,
203    },
204
205    /// Inspection results
206    Inspect(Box<InspectResponse>),
207
208    /// Acknowledgement (for shutdown, revoke, etc.)
209    Ack {
210        /// Success status
211        ok: bool,
212        /// Optional message
213        message: Option<String>,
214    },
215
216    /// Error response
217    Error {
218        /// Error message
219        error: String,
220    },
221
222    /// Queue operation response
223    Queue(QueueResponse),
224}
225
226/// Response to queue control commands
227#[derive(Debug, Clone, Serialize, Deserialize)]
228#[serde(tag = "queue_type", content = "result")]
229pub enum QueueResponse {
230    /// Purge result - number of messages deleted
231    Purged {
232        /// Queue name
233        queue: String,
234        /// Number of messages purged
235        message_count: u64,
236    },
237
238    /// Queue length result
239    Length {
240        /// Queue name
241        queue: String,
242        /// Number of messages in queue
243        message_count: u64,
244    },
245
246    /// Queue deleted
247    Deleted {
248        /// Queue name
249        queue: String,
250    },
251
252    /// Queue bound to exchange
253    Bound {
254        /// Queue name
255        queue: String,
256        /// Exchange name
257        exchange: String,
258        /// Routing key
259        routing_key: String,
260    },
261
262    /// Queue unbound from exchange
263    Unbound {
264        /// Queue name
265        queue: String,
266        /// Exchange name
267        exchange: String,
268        /// Routing key
269        routing_key: String,
270    },
271
272    /// Queue declared
273    Declared {
274        /// Queue name
275        queue: String,
276        /// Number of messages already in queue
277        message_count: u64,
278        /// Number of consumers
279        consumer_count: u32,
280    },
281}
282
283/// Response data from inspect commands
284#[derive(Debug, Clone, Serialize, Deserialize)]
285#[serde(tag = "inspect_type", content = "data")]
286pub enum InspectResponse {
287    /// Active tasks
288    Active(Vec<ActiveTaskInfo>),
289
290    /// Scheduled tasks
291    Scheduled(Vec<ScheduledTaskInfo>),
292
293    /// Reserved tasks
294    Reserved(Vec<ReservedTaskInfo>),
295
296    /// Revoked task IDs
297    Revoked(Vec<Uuid>),
298
299    /// Registered task names
300    Registered(Vec<String>),
301
302    /// Worker statistics
303    Stats(WorkerStats),
304
305    /// Queue information
306    QueueInfo(HashMap<String, QueueStats>),
307
308    /// Worker report
309    Report(WorkerReport),
310
311    /// Worker configuration
312    Conf(WorkerConf),
313}
314
315/// Information about an actively executing task
316#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct ActiveTaskInfo {
318    /// Task ID
319    pub id: Uuid,
320    /// Task name
321    pub name: String,
322    /// Task arguments (JSON)
323    pub args: String,
324    /// Task keyword arguments (JSON)
325    pub kwargs: String,
326    /// When the task started (Unix timestamp)
327    pub started: f64,
328    /// Worker hostname executing the task
329    pub hostname: String,
330    /// Worker process ID
331    pub worker_pid: Option<u32>,
332    /// Delivery info (queue, routing key, etc.)
333    pub delivery_info: Option<DeliveryInfo>,
334}
335
336/// Information about a scheduled task (with ETA or countdown)
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct ScheduledTaskInfo {
339    /// Task ID
340    pub id: Uuid,
341    /// Task name
342    pub name: String,
343    /// Task arguments (JSON)
344    pub args: String,
345    /// Task keyword arguments (JSON)
346    pub kwargs: String,
347    /// Scheduled execution time (Unix timestamp)
348    pub eta: f64,
349    /// Priority
350    pub priority: Option<u8>,
351    /// Request info
352    pub request: Option<RequestInfo>,
353}
354
355/// Information about a reserved (prefetched) task
356#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct ReservedTaskInfo {
358    /// Task ID
359    pub id: Uuid,
360    /// Task name
361    pub name: String,
362    /// Task arguments (JSON)
363    pub args: String,
364    /// Task keyword arguments (JSON)
365    pub kwargs: String,
366    /// Delivery info
367    pub delivery_info: Option<DeliveryInfo>,
368}
369
370/// Delivery information for a task
371#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct DeliveryInfo {
373    /// Source queue
374    pub queue: String,
375    /// Routing key
376    pub routing_key: Option<String>,
377    /// Exchange name
378    pub exchange: Option<String>,
379    /// Delivery tag
380    pub delivery_tag: Option<String>,
381    /// Redelivered flag
382    pub redelivered: bool,
383}
384
385/// Request information for a task
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct RequestInfo {
388    /// Correlation ID
389    pub correlation_id: Option<String>,
390    /// Reply-to queue
391    pub reply_to: Option<String>,
392    /// Message priority
393    pub priority: Option<u8>,
394}
395
396/// Worker statistics
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct WorkerStats {
399    /// Total tasks processed
400    pub total_tasks: u64,
401    /// Tasks currently active
402    pub active_tasks: u32,
403    /// Total successful tasks
404    pub succeeded: u64,
405    /// Total failed tasks
406    pub failed: u64,
407    /// Total retried tasks
408    pub retried: u64,
409    /// Worker uptime in seconds
410    pub uptime: f64,
411    /// System load average (1, 5, 15 min)
412    pub loadavg: Option<[f64; 3]>,
413    /// Process memory usage in bytes
414    pub memory_usage: Option<u64>,
415    /// Pool information
416    pub pool: Option<PoolStats>,
417    /// Broker connection stats
418    pub broker: Option<BrokerStats>,
419    /// Clock offset from server
420    pub clock: Option<f64>,
421}
422
423/// Worker pool statistics
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct PoolStats {
426    /// Pool type (prefork, solo, threads, etc.)
427    pub pool_type: String,
428    /// Maximum concurrency
429    pub max_concurrency: u32,
430    /// Current pool size
431    pub pool_size: u32,
432    /// Available workers in pool
433    pub available: u32,
434    /// Worker process IDs
435    pub processes: Vec<u32>,
436}
437
438/// Broker connection statistics
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct BrokerStats {
441    /// Broker URL (sanitized, no credentials)
442    pub url: String,
443    /// Connection status
444    pub connected: bool,
445    /// Heartbeat interval
446    pub heartbeat: Option<u64>,
447    /// Transport type
448    pub transport: String,
449}
450
451/// Queue statistics
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct QueueStats {
454    /// Queue name
455    pub name: String,
456    /// Number of messages in queue
457    pub messages: u64,
458    /// Number of consumers
459    pub consumers: u32,
460}
461
462/// Worker report (comprehensive status)
463#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct WorkerReport {
465    /// Worker hostname
466    pub hostname: String,
467    /// Software version
468    pub sw_ver: String,
469    /// Software system (e.g., "celers")
470    pub sw_sys: String,
471    /// Worker statistics
472    pub stats: WorkerStats,
473    /// Active tasks
474    pub active: Vec<ActiveTaskInfo>,
475    /// Scheduled tasks
476    pub scheduled: Vec<ScheduledTaskInfo>,
477    /// Reserved tasks
478    pub reserved: Vec<ReservedTaskInfo>,
479    /// Registered tasks
480    pub registered: Vec<String>,
481}
482
483/// Worker configuration
484#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct WorkerConf {
486    /// Broker URL (sanitized)
487    pub broker_url: String,
488    /// Result backend URL (sanitized)
489    pub result_backend: Option<String>,
490    /// Default queue
491    pub default_queue: String,
492    /// Prefetch multiplier
493    pub prefetch_multiplier: u32,
494    /// Concurrency
495    pub concurrency: u32,
496    /// Task soft time limit
497    pub task_soft_time_limit: Option<u64>,
498    /// Task time limit
499    pub task_time_limit: Option<u64>,
500    /// Task acks late
501    pub task_acks_late: bool,
502    /// Task reject on worker lost
503    pub task_reject_on_worker_lost: bool,
504    /// Worker hostname
505    pub hostname: String,
506}
507
508impl ControlCommand {
509    /// Create a ping command
510    #[inline]
511    pub fn ping(reply_to: impl Into<String>) -> Self {
512        Self::Ping {
513            reply_to: reply_to.into(),
514        }
515    }
516
517    /// Create an inspect active command
518    #[inline]
519    #[must_use]
520    pub fn inspect_active() -> Self {
521        Self::Inspect(InspectCommand::Active)
522    }
523
524    /// Create an inspect scheduled command
525    #[inline]
526    #[must_use]
527    pub fn inspect_scheduled() -> Self {
528        Self::Inspect(InspectCommand::Scheduled)
529    }
530
531    /// Create an inspect reserved command
532    #[inline]
533    #[must_use]
534    pub fn inspect_reserved() -> Self {
535        Self::Inspect(InspectCommand::Reserved)
536    }
537
538    /// Create an inspect revoked command
539    #[inline]
540    #[must_use]
541    pub fn inspect_revoked() -> Self {
542        Self::Inspect(InspectCommand::Revoked)
543    }
544
545    /// Create an inspect registered command
546    #[inline]
547    #[must_use]
548    pub fn inspect_registered() -> Self {
549        Self::Inspect(InspectCommand::Registered)
550    }
551
552    /// Create an inspect stats command
553    #[inline]
554    #[must_use]
555    pub fn inspect_stats() -> Self {
556        Self::Inspect(InspectCommand::Stats)
557    }
558
559    /// Create an inspect queue info command
560    #[inline]
561    #[must_use]
562    pub fn inspect_queue_info() -> Self {
563        Self::Inspect(InspectCommand::QueueInfo)
564    }
565
566    /// Create a shutdown command
567    #[inline]
568    #[must_use]
569    pub fn shutdown(timeout: Option<u64>) -> Self {
570        Self::Shutdown { timeout }
571    }
572
573    /// Create a revoke command
574    #[inline]
575    #[must_use]
576    pub fn revoke(task_id: Uuid, terminate: bool) -> Self {
577        Self::Revoke {
578            task_id,
579            terminate,
580            signal: None,
581        }
582    }
583
584    /// Create a bulk revoke command
585    #[inline]
586    #[must_use]
587    pub fn bulk_revoke(task_ids: Vec<Uuid>, terminate: bool) -> Self {
588        Self::BulkRevoke {
589            task_ids,
590            terminate,
591        }
592    }
593
594    /// Create a revoke by pattern command
595    #[inline]
596    pub fn revoke_by_pattern(pattern: impl Into<String>, terminate: bool) -> Self {
597        Self::RevokeByPattern {
598            pattern: pattern.into(),
599            terminate,
600        }
601    }
602
603    /// Create a queue purge command
604    #[inline]
605    pub fn queue_purge(queue: impl Into<String>) -> Self {
606        Self::Queue(QueueCommand::Purge {
607            queue: queue.into(),
608        })
609    }
610
611    /// Create a queue length command
612    #[inline]
613    pub fn queue_length(queue: impl Into<String>) -> Self {
614        Self::Queue(QueueCommand::Length {
615            queue: queue.into(),
616        })
617    }
618
619    /// Create a queue delete command
620    #[inline]
621    pub fn queue_delete(queue: impl Into<String>, if_empty: bool, if_unused: bool) -> Self {
622        Self::Queue(QueueCommand::Delete {
623            queue: queue.into(),
624            if_empty,
625            if_unused,
626        })
627    }
628
629    /// Create a queue bind command
630    #[inline]
631    pub fn queue_bind(
632        queue: impl Into<String>,
633        exchange: impl Into<String>,
634        routing_key: impl Into<String>,
635    ) -> Self {
636        Self::Queue(QueueCommand::Bind {
637            queue: queue.into(),
638            exchange: exchange.into(),
639            routing_key: routing_key.into(),
640        })
641    }
642
643    /// Create a queue unbind command
644    #[inline]
645    pub fn queue_unbind(
646        queue: impl Into<String>,
647        exchange: impl Into<String>,
648        routing_key: impl Into<String>,
649    ) -> Self {
650        Self::Queue(QueueCommand::Unbind {
651            queue: queue.into(),
652            exchange: exchange.into(),
653            routing_key: routing_key.into(),
654        })
655    }
656
657    /// Create a queue declare command
658    #[inline]
659    pub fn queue_declare(
660        queue: impl Into<String>,
661        durable: bool,
662        exclusive: bool,
663        auto_delete: bool,
664    ) -> Self {
665        Self::Queue(QueueCommand::Declare {
666            queue: queue.into(),
667            durable,
668            exclusive,
669            auto_delete,
670        })
671    }
672}
673
674impl QueueCommand {
675    /// Get the queue name this command operates on
676    #[inline]
677    #[must_use]
678    pub fn queue_name(&self) -> &str {
679        match self {
680            Self::Purge { queue }
681            | Self::Length { queue }
682            | Self::Delete { queue, .. }
683            | Self::Bind { queue, .. }
684            | Self::Unbind { queue, .. }
685            | Self::Declare { queue, .. } => queue,
686        }
687    }
688}
689
690impl ControlResponse {
691    /// Create a pong response
692    #[inline]
693    pub fn pong(hostname: impl Into<String>) -> Self {
694        Self::Pong {
695            hostname: hostname.into(),
696            timestamp: std::time::SystemTime::now()
697                .duration_since(std::time::UNIX_EPOCH)
698                .unwrap_or_default()
699                .as_secs_f64(),
700        }
701    }
702
703    /// Create an ack response
704    #[inline]
705    #[must_use]
706    pub fn ack(ok: bool, message: Option<String>) -> Self {
707        Self::Ack { ok, message }
708    }
709
710    /// Create an error response
711    #[inline]
712    pub fn error(error: impl Into<String>) -> Self {
713        Self::Error {
714            error: error.into(),
715        }
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_control_command_serialization() {
725        let cmd = ControlCommand::ping("test-reply");
726        let json = serde_json::to_string(&cmd).unwrap();
727        assert!(json.contains("Ping"));
728        assert!(json.contains("test-reply"));
729
730        let cmd: ControlCommand = serde_json::from_str(&json).unwrap();
731        matches!(cmd, ControlCommand::Ping { reply_to } if reply_to == "test-reply");
732    }
733
734    #[test]
735    fn test_inspect_command_serialization() {
736        let cmd = ControlCommand::inspect_active();
737        let json = serde_json::to_string(&cmd).unwrap();
738        assert!(json.contains("Inspect"));
739        assert!(json.contains("Active"));
740    }
741
742    #[test]
743    fn test_control_response_serialization() {
744        let resp = ControlResponse::pong("worker-1");
745        let json = serde_json::to_string(&resp).unwrap();
746        assert!(json.contains("Pong"));
747        assert!(json.contains("worker-1"));
748    }
749
750    #[test]
751    fn test_inspect_response_serialization() {
752        let stats = WorkerStats {
753            total_tasks: 100,
754            active_tasks: 2,
755            succeeded: 95,
756            failed: 3,
757            retried: 2,
758            uptime: 3600.0,
759            loadavg: Some([0.5, 0.6, 0.7]),
760            memory_usage: Some(1024 * 1024 * 100),
761            pool: None,
762            broker: None,
763            clock: None,
764        };
765        let resp = InspectResponse::Stats(stats);
766        let json = serde_json::to_string(&resp).unwrap();
767        assert!(json.contains("Stats"));
768        assert!(json.contains("100"));
769    }
770
771    #[test]
772    fn test_active_task_info() {
773        let info = ActiveTaskInfo {
774            id: Uuid::new_v4(),
775            name: "tasks.add".to_string(),
776            args: "[1, 2]".to_string(),
777            kwargs: "{}".to_string(),
778            started: 1_234_567_890.0,
779            hostname: "worker-1".to_string(),
780            worker_pid: Some(12345),
781            delivery_info: Some(DeliveryInfo {
782                queue: "celery".to_string(),
783                routing_key: Some("tasks.add".to_string()),
784                exchange: Some(String::new()),
785                delivery_tag: Some("1".to_string()),
786                redelivered: false,
787            }),
788        };
789        let json = serde_json::to_string(&info).unwrap();
790        assert!(json.contains("tasks.add"));
791    }
792}