use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "args")]
pub enum ControlCommand {
Ping {
reply_to: String,
},
Inspect(InspectCommand),
Shutdown {
timeout: Option<u64>,
},
Revoke {
task_id: Uuid,
terminate: bool,
signal: Option<String>,
},
RateLimit {
task_name: String,
rate: Option<f64>,
},
TimeLimit {
task_name: String,
soft: Option<u64>,
hard: Option<u64>,
},
AddConsumer {
queue: String,
},
CancelConsumer {
queue: String,
},
Queue(QueueCommand),
BulkRevoke {
task_ids: Vec<Uuid>,
terminate: bool,
},
RevokeByPattern {
pattern: String,
terminate: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action")]
pub enum QueueCommand {
Purge {
queue: String,
},
Length {
queue: String,
},
Delete {
queue: String,
if_empty: bool,
if_unused: bool,
},
Bind {
queue: String,
exchange: String,
routing_key: String,
},
Unbind {
queue: String,
exchange: String,
routing_key: String,
},
Declare {
queue: String,
durable: bool,
exclusive: bool,
auto_delete: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "method")]
pub enum InspectCommand {
Active,
Scheduled,
Reserved,
Revoked,
Registered,
Stats,
QueueInfo,
Report,
Conf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum ControlResponse {
Pong {
hostname: String,
timestamp: f64,
},
Inspect(Box<InspectResponse>),
Ack {
ok: bool,
message: Option<String>,
},
Error {
error: String,
},
Queue(QueueResponse),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "queue_type", content = "result")]
pub enum QueueResponse {
Purged {
queue: String,
message_count: u64,
},
Length {
queue: String,
message_count: u64,
},
Deleted {
queue: String,
},
Bound {
queue: String,
exchange: String,
routing_key: String,
},
Unbound {
queue: String,
exchange: String,
routing_key: String,
},
Declared {
queue: String,
message_count: u64,
consumer_count: u32,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "inspect_type", content = "data")]
pub enum InspectResponse {
Active(Vec<ActiveTaskInfo>),
Scheduled(Vec<ScheduledTaskInfo>),
Reserved(Vec<ReservedTaskInfo>),
Revoked(Vec<Uuid>),
Registered(Vec<String>),
Stats(WorkerStats),
QueueInfo(HashMap<String, QueueStats>),
Report(WorkerReport),
Conf(WorkerConf),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActiveTaskInfo {
pub id: Uuid,
pub name: String,
pub args: String,
pub kwargs: String,
pub started: f64,
pub hostname: String,
pub worker_pid: Option<u32>,
pub delivery_info: Option<DeliveryInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledTaskInfo {
pub id: Uuid,
pub name: String,
pub args: String,
pub kwargs: String,
pub eta: f64,
pub priority: Option<u8>,
pub request: Option<RequestInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReservedTaskInfo {
pub id: Uuid,
pub name: String,
pub args: String,
pub kwargs: String,
pub delivery_info: Option<DeliveryInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeliveryInfo {
pub queue: String,
pub routing_key: Option<String>,
pub exchange: Option<String>,
pub delivery_tag: Option<String>,
pub redelivered: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestInfo {
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
pub priority: Option<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerStats {
pub total_tasks: u64,
pub active_tasks: u32,
pub succeeded: u64,
pub failed: u64,
pub retried: u64,
pub uptime: f64,
pub loadavg: Option<[f64; 3]>,
pub memory_usage: Option<u64>,
pub pool: Option<PoolStats>,
pub broker: Option<BrokerStats>,
pub clock: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolStats {
pub pool_type: String,
pub max_concurrency: u32,
pub pool_size: u32,
pub available: u32,
pub processes: Vec<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BrokerStats {
pub url: String,
pub connected: bool,
pub heartbeat: Option<u64>,
pub transport: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
pub name: String,
pub messages: u64,
pub consumers: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerReport {
pub hostname: String,
pub sw_ver: String,
pub sw_sys: String,
pub stats: WorkerStats,
pub active: Vec<ActiveTaskInfo>,
pub scheduled: Vec<ScheduledTaskInfo>,
pub reserved: Vec<ReservedTaskInfo>,
pub registered: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConf {
pub broker_url: String,
pub result_backend: Option<String>,
pub default_queue: String,
pub prefetch_multiplier: u32,
pub concurrency: u32,
pub task_soft_time_limit: Option<u64>,
pub task_time_limit: Option<u64>,
pub task_acks_late: bool,
pub task_reject_on_worker_lost: bool,
pub hostname: String,
}
impl ControlCommand {
#[inline]
pub fn ping(reply_to: impl Into<String>) -> Self {
Self::Ping {
reply_to: reply_to.into(),
}
}
#[inline]
#[must_use]
pub fn inspect_active() -> Self {
Self::Inspect(InspectCommand::Active)
}
#[inline]
#[must_use]
pub fn inspect_scheduled() -> Self {
Self::Inspect(InspectCommand::Scheduled)
}
#[inline]
#[must_use]
pub fn inspect_reserved() -> Self {
Self::Inspect(InspectCommand::Reserved)
}
#[inline]
#[must_use]
pub fn inspect_revoked() -> Self {
Self::Inspect(InspectCommand::Revoked)
}
#[inline]
#[must_use]
pub fn inspect_registered() -> Self {
Self::Inspect(InspectCommand::Registered)
}
#[inline]
#[must_use]
pub fn inspect_stats() -> Self {
Self::Inspect(InspectCommand::Stats)
}
#[inline]
#[must_use]
pub fn inspect_queue_info() -> Self {
Self::Inspect(InspectCommand::QueueInfo)
}
#[inline]
#[must_use]
pub fn shutdown(timeout: Option<u64>) -> Self {
Self::Shutdown { timeout }
}
#[inline]
#[must_use]
pub fn revoke(task_id: Uuid, terminate: bool) -> Self {
Self::Revoke {
task_id,
terminate,
signal: None,
}
}
#[inline]
#[must_use]
pub fn bulk_revoke(task_ids: Vec<Uuid>, terminate: bool) -> Self {
Self::BulkRevoke {
task_ids,
terminate,
}
}
#[inline]
pub fn revoke_by_pattern(pattern: impl Into<String>, terminate: bool) -> Self {
Self::RevokeByPattern {
pattern: pattern.into(),
terminate,
}
}
#[inline]
pub fn queue_purge(queue: impl Into<String>) -> Self {
Self::Queue(QueueCommand::Purge {
queue: queue.into(),
})
}
#[inline]
pub fn queue_length(queue: impl Into<String>) -> Self {
Self::Queue(QueueCommand::Length {
queue: queue.into(),
})
}
#[inline]
pub fn queue_delete(queue: impl Into<String>, if_empty: bool, if_unused: bool) -> Self {
Self::Queue(QueueCommand::Delete {
queue: queue.into(),
if_empty,
if_unused,
})
}
#[inline]
pub fn queue_bind(
queue: impl Into<String>,
exchange: impl Into<String>,
routing_key: impl Into<String>,
) -> Self {
Self::Queue(QueueCommand::Bind {
queue: queue.into(),
exchange: exchange.into(),
routing_key: routing_key.into(),
})
}
#[inline]
pub fn queue_unbind(
queue: impl Into<String>,
exchange: impl Into<String>,
routing_key: impl Into<String>,
) -> Self {
Self::Queue(QueueCommand::Unbind {
queue: queue.into(),
exchange: exchange.into(),
routing_key: routing_key.into(),
})
}
#[inline]
pub fn queue_declare(
queue: impl Into<String>,
durable: bool,
exclusive: bool,
auto_delete: bool,
) -> Self {
Self::Queue(QueueCommand::Declare {
queue: queue.into(),
durable,
exclusive,
auto_delete,
})
}
}
impl QueueCommand {
#[inline]
#[must_use]
pub fn queue_name(&self) -> &str {
match self {
Self::Purge { queue }
| Self::Length { queue }
| Self::Delete { queue, .. }
| Self::Bind { queue, .. }
| Self::Unbind { queue, .. }
| Self::Declare { queue, .. } => queue,
}
}
}
impl ControlResponse {
#[inline]
pub fn pong(hostname: impl Into<String>) -> Self {
Self::Pong {
hostname: hostname.into(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
}
}
#[inline]
#[must_use]
pub fn ack(ok: bool, message: Option<String>) -> Self {
Self::Ack { ok, message }
}
#[inline]
pub fn error(error: impl Into<String>) -> Self {
Self::Error {
error: error.into(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_control_command_serialization() {
let cmd = ControlCommand::ping("test-reply");
let json = serde_json::to_string(&cmd).unwrap();
assert!(json.contains("Ping"));
assert!(json.contains("test-reply"));
let cmd: ControlCommand = serde_json::from_str(&json).unwrap();
matches!(cmd, ControlCommand::Ping { reply_to } if reply_to == "test-reply");
}
#[test]
fn test_inspect_command_serialization() {
let cmd = ControlCommand::inspect_active();
let json = serde_json::to_string(&cmd).unwrap();
assert!(json.contains("Inspect"));
assert!(json.contains("Active"));
}
#[test]
fn test_control_response_serialization() {
let resp = ControlResponse::pong("worker-1");
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("Pong"));
assert!(json.contains("worker-1"));
}
#[test]
fn test_inspect_response_serialization() {
let stats = WorkerStats {
total_tasks: 100,
active_tasks: 2,
succeeded: 95,
failed: 3,
retried: 2,
uptime: 3600.0,
loadavg: Some([0.5, 0.6, 0.7]),
memory_usage: Some(1024 * 1024 * 100),
pool: None,
broker: None,
clock: None,
};
let resp = InspectResponse::Stats(stats);
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("Stats"));
assert!(json.contains("100"));
}
#[test]
fn test_active_task_info() {
let info = ActiveTaskInfo {
id: Uuid::new_v4(),
name: "tasks.add".to_string(),
args: "[1, 2]".to_string(),
kwargs: "{}".to_string(),
started: 1_234_567_890.0,
hostname: "worker-1".to_string(),
worker_pid: Some(12345),
delivery_info: Some(DeliveryInfo {
queue: "celery".to_string(),
routing_key: Some("tasks.add".to_string()),
exchange: Some(String::new()),
delivery_tag: Some("1".to_string()),
redelivered: false,
}),
};
let json = serde_json::to_string(&info).unwrap();
assert!(json.contains("tasks.add"));
}
}