celers_protocol/
event.rs

1//! Celery event message format
2//!
3//! This module provides Celery-compatible event messages for task lifecycle
4//! and worker state events.
5//!
6//! # Event Types
7//!
8//! ## Task Events
9//! - `task-sent` - Task was sent to a queue
10//! - `task-received` - Task was received by a worker
11//! - `task-started` - Task execution started
12//! - `task-succeeded` - Task completed successfully
13//! - `task-failed` - Task execution failed
14//! - `task-rejected` - Task was rejected by a worker
15//! - `task-revoked` - Task was revoked
16//! - `task-retried` - Task is being retried
17//!
18//! ## Worker Events
19//! - `worker-online` - Worker came online
20//! - `worker-offline` - Worker went offline
21//! - `worker-heartbeat` - Worker heartbeat
22
23use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use uuid::Uuid;
27
28/// Event type enumeration
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "kebab-case")]
31pub enum EventType {
32    // Task events
33    TaskSent,
34    TaskReceived,
35    TaskStarted,
36    TaskSucceeded,
37    TaskFailed,
38    TaskRejected,
39    TaskRevoked,
40    TaskRetried,
41
42    // Worker events
43    WorkerOnline,
44    WorkerOffline,
45    WorkerHeartbeat,
46
47    // Custom event type
48    #[serde(untagged)]
49    Custom(String),
50}
51
52impl EventType {
53    /// Get the event type string
54    #[inline]
55    pub fn as_str(&self) -> &str {
56        match self {
57            EventType::TaskSent => "task-sent",
58            EventType::TaskReceived => "task-received",
59            EventType::TaskStarted => "task-started",
60            EventType::TaskSucceeded => "task-succeeded",
61            EventType::TaskFailed => "task-failed",
62            EventType::TaskRejected => "task-rejected",
63            EventType::TaskRevoked => "task-revoked",
64            EventType::TaskRetried => "task-retried",
65            EventType::WorkerOnline => "worker-online",
66            EventType::WorkerOffline => "worker-offline",
67            EventType::WorkerHeartbeat => "worker-heartbeat",
68            EventType::Custom(s) => s,
69        }
70    }
71
72    /// Check if this is a task event
73    #[inline]
74    pub fn is_task_event(&self) -> bool {
75        matches!(
76            self,
77            EventType::TaskSent
78                | EventType::TaskReceived
79                | EventType::TaskStarted
80                | EventType::TaskSucceeded
81                | EventType::TaskFailed
82                | EventType::TaskRejected
83                | EventType::TaskRevoked
84                | EventType::TaskRetried
85        )
86    }
87
88    /// Check if this is a worker event
89    #[inline]
90    pub fn is_worker_event(&self) -> bool {
91        matches!(
92            self,
93            EventType::WorkerOnline | EventType::WorkerOffline | EventType::WorkerHeartbeat
94        )
95    }
96}
97
98impl std::fmt::Display for EventType {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "{}", self.as_str())
101    }
102}
103
104impl std::str::FromStr for EventType {
105    type Err = String;
106
107    fn from_str(s: &str) -> Result<Self, Self::Err> {
108        match s {
109            "task-sent" => Ok(EventType::TaskSent),
110            "task-received" => Ok(EventType::TaskReceived),
111            "task-started" => Ok(EventType::TaskStarted),
112            "task-succeeded" => Ok(EventType::TaskSucceeded),
113            "task-failed" => Ok(EventType::TaskFailed),
114            "task-rejected" => Ok(EventType::TaskRejected),
115            "task-revoked" => Ok(EventType::TaskRevoked),
116            "task-retried" => Ok(EventType::TaskRetried),
117            "worker-online" => Ok(EventType::WorkerOnline),
118            "worker-offline" => Ok(EventType::WorkerOffline),
119            "worker-heartbeat" => Ok(EventType::WorkerHeartbeat),
120            other => Ok(EventType::Custom(other.to_string())),
121        }
122    }
123}
124
125/// Base event message structure
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct EventMessage {
128    /// Event type
129    #[serde(rename = "type")]
130    pub event_type: String,
131
132    /// Timestamp when event occurred
133    pub timestamp: f64,
134
135    /// Hostname of the sender
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub hostname: Option<String>,
138
139    /// UTC offset
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub utcoffset: Option<i32>,
142
143    /// Process ID
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub pid: Option<u32>,
146
147    /// Clock value (for ordering)
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub clock: Option<u64>,
150
151    /// Additional event-specific fields
152    #[serde(flatten)]
153    pub fields: HashMap<String, serde_json::Value>,
154}
155
156impl EventMessage {
157    /// Create a new event message
158    pub fn new(event_type: EventType) -> Self {
159        Self {
160            event_type: event_type.as_str().to_string(),
161            timestamp: Utc::now().timestamp() as f64
162                + (Utc::now().timestamp_subsec_nanos() as f64 / 1_000_000_000.0),
163            hostname: None,
164            utcoffset: Some(0),
165            pid: None,
166            clock: None,
167            fields: HashMap::new(),
168        }
169    }
170
171    /// Create an event with a specific timestamp
172    pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
173        self.timestamp = timestamp.timestamp() as f64
174            + (timestamp.timestamp_subsec_nanos() as f64 / 1_000_000_000.0);
175        self
176    }
177
178    /// Set the hostname
179    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
180        self.hostname = Some(hostname.into());
181        self
182    }
183
184    /// Set the process ID
185    pub fn with_pid(mut self, pid: u32) -> Self {
186        self.pid = Some(pid);
187        self
188    }
189
190    /// Set the clock value
191    pub fn with_clock(mut self, clock: u64) -> Self {
192        self.clock = Some(clock);
193        self
194    }
195
196    /// Add a custom field
197    pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
198        self.fields.insert(key.into(), value);
199        self
200    }
201
202    /// Get the event type
203    pub fn get_type(&self) -> &str {
204        &self.event_type
205    }
206
207    /// Get the timestamp as DateTime
208    pub fn get_datetime(&self) -> DateTime<Utc> {
209        DateTime::from_timestamp(
210            self.timestamp as i64,
211            ((self.timestamp.fract()) * 1_000_000_000.0) as u32,
212        )
213        .unwrap_or_else(Utc::now)
214    }
215
216    /// Serialize to JSON bytes
217    pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
218        serde_json::to_vec(self)
219    }
220
221    /// Deserialize from JSON bytes
222    pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
223        serde_json::from_slice(bytes)
224    }
225}
226
227impl From<EventType> for EventMessage {
228    fn from(event_type: EventType) -> Self {
229        Self::new(event_type)
230    }
231}
232
233/// Task-specific event data
234#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
235pub struct TaskEvent {
236    /// Base event
237    #[serde(flatten)]
238    pub base: EventMessage,
239
240    /// Task ID (UUID)
241    pub uuid: Uuid,
242
243    /// Task name
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub name: Option<String>,
246
247    /// Task arguments (as JSON string)
248    #[serde(skip_serializing_if = "Option::is_none")]
249    pub args: Option<String>,
250
251    /// Task keyword arguments (as JSON string)
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub kwargs: Option<String>,
254
255    /// Retry count
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub retries: Option<u32>,
258
259    /// ETA for delayed tasks
260    #[serde(skip_serializing_if = "Option::is_none")]
261    pub eta: Option<String>,
262
263    /// Task expiration
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub expires: Option<String>,
266
267    /// Parent task ID
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub parent_id: Option<Uuid>,
270
271    /// Root task ID
272    #[serde(skip_serializing_if = "Option::is_none")]
273    pub root_id: Option<Uuid>,
274
275    /// Result (for task-succeeded)
276    #[serde(skip_serializing_if = "Option::is_none")]
277    pub result: Option<serde_json::Value>,
278
279    /// Exception type (for task-failed)
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub exception: Option<String>,
282
283    /// Traceback (for task-failed)
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub traceback: Option<String>,
286
287    /// Runtime in seconds (for task-succeeded)
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub runtime: Option<f64>,
290
291    /// Queue name
292    #[serde(skip_serializing_if = "Option::is_none")]
293    pub queue: Option<String>,
294
295    /// Exchange name
296    #[serde(skip_serializing_if = "Option::is_none")]
297    pub exchange: Option<String>,
298
299    /// Routing key
300    #[serde(skip_serializing_if = "Option::is_none")]
301    pub routing_key: Option<String>,
302}
303
304impl TaskEvent {
305    /// Create a new task event
306    pub fn new(event_type: EventType, task_id: Uuid) -> Self {
307        Self {
308            base: EventMessage::new(event_type),
309            uuid: task_id,
310            name: None,
311            args: None,
312            kwargs: None,
313            retries: None,
314            eta: None,
315            expires: None,
316            parent_id: None,
317            root_id: None,
318            result: None,
319            exception: None,
320            traceback: None,
321            runtime: None,
322            queue: None,
323            exchange: None,
324            routing_key: None,
325        }
326    }
327
328    /// Create a task-sent event
329    pub fn sent(task_id: Uuid, task_name: &str) -> Self {
330        Self {
331            name: Some(task_name.to_string()),
332            ..Self::new(EventType::TaskSent, task_id)
333        }
334    }
335
336    /// Create a task-received event
337    pub fn received(task_id: Uuid, task_name: &str) -> Self {
338        Self {
339            name: Some(task_name.to_string()),
340            ..Self::new(EventType::TaskReceived, task_id)
341        }
342    }
343
344    /// Create a task-started event
345    pub fn started(task_id: Uuid, task_name: &str) -> Self {
346        Self {
347            name: Some(task_name.to_string()),
348            ..Self::new(EventType::TaskStarted, task_id)
349        }
350    }
351
352    /// Create a task-succeeded event
353    pub fn succeeded(task_id: Uuid, result: serde_json::Value, runtime: f64) -> Self {
354        Self {
355            result: Some(result),
356            runtime: Some(runtime),
357            ..Self::new(EventType::TaskSucceeded, task_id)
358        }
359    }
360
361    /// Create a task-failed event
362    pub fn failed(task_id: Uuid, exception: &str, traceback: Option<&str>) -> Self {
363        Self {
364            exception: Some(exception.to_string()),
365            traceback: traceback.map(|s| s.to_string()),
366            ..Self::new(EventType::TaskFailed, task_id)
367        }
368    }
369
370    /// Create a task-retried event
371    pub fn retried(task_id: Uuid, exception: &str, retries: u32) -> Self {
372        Self {
373            exception: Some(exception.to_string()),
374            retries: Some(retries),
375            ..Self::new(EventType::TaskRetried, task_id)
376        }
377    }
378
379    /// Create a task-revoked event
380    pub fn revoked(task_id: Uuid, terminated: bool, signum: Option<i32>) -> Self {
381        let mut event = Self::new(EventType::TaskRevoked, task_id);
382        event
383            .base
384            .fields
385            .insert("terminated".to_string(), serde_json::json!(terminated));
386        if let Some(sig) = signum {
387            event
388                .base
389                .fields
390                .insert("signum".to_string(), serde_json::json!(sig));
391        }
392        event
393    }
394
395    /// Create a task-rejected event
396    pub fn rejected(task_id: Uuid, requeue: bool) -> Self {
397        let mut event = Self::new(EventType::TaskRejected, task_id);
398        event
399            .base
400            .fields
401            .insert("requeue".to_string(), serde_json::json!(requeue));
402        event
403    }
404
405    /// Set task name
406    pub fn with_name(mut self, name: impl Into<String>) -> Self {
407        self.name = Some(name.into());
408        self
409    }
410
411    /// Set task args
412    pub fn with_args(mut self, args: impl Into<String>) -> Self {
413        self.args = Some(args.into());
414        self
415    }
416
417    /// Set task kwargs
418    pub fn with_kwargs(mut self, kwargs: impl Into<String>) -> Self {
419        self.kwargs = Some(kwargs.into());
420        self
421    }
422
423    /// Set hostname
424    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
425        self.base.hostname = Some(hostname.into());
426        self
427    }
428
429    /// Set queue
430    pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
431        self.queue = Some(queue.into());
432        self
433    }
434
435    /// Set parent task ID
436    pub fn with_parent(mut self, parent_id: Uuid) -> Self {
437        self.parent_id = Some(parent_id);
438        self
439    }
440
441    /// Set root task ID
442    pub fn with_root(mut self, root_id: Uuid) -> Self {
443        self.root_id = Some(root_id);
444        self
445    }
446
447    /// Serialize to JSON bytes
448    pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
449        serde_json::to_vec(self)
450    }
451
452    /// Deserialize from JSON bytes
453    pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
454        serde_json::from_slice(bytes)
455    }
456}
457
458/// Worker-specific event data
459#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
460pub struct WorkerEvent {
461    /// Base event
462    #[serde(flatten)]
463    pub base: EventMessage,
464
465    /// Software name/version (e.g., "celers-worker v0.1.0")
466    #[serde(rename = "sw_ident", skip_serializing_if = "Option::is_none")]
467    pub software_identity: Option<String>,
468
469    /// Software version
470    #[serde(rename = "sw_ver", skip_serializing_if = "Option::is_none")]
471    pub software_version: Option<String>,
472
473    /// Software system (e.g., "Linux")
474    #[serde(rename = "sw_sys", skip_serializing_if = "Option::is_none")]
475    pub software_system: Option<String>,
476
477    /// Number of active tasks
478    #[serde(skip_serializing_if = "Option::is_none")]
479    pub active: Option<u32>,
480
481    /// Number of processed tasks
482    #[serde(skip_serializing_if = "Option::is_none")]
483    pub processed: Option<u64>,
484
485    /// Load average (1, 5, 15 minutes)
486    #[serde(skip_serializing_if = "Option::is_none")]
487    pub loadavg: Option<[f64; 3]>,
488
489    /// Frequency of heartbeats (seconds)
490    #[serde(skip_serializing_if = "Option::is_none")]
491    pub freq: Option<f64>,
492}
493
494impl WorkerEvent {
495    /// Create a new worker event
496    pub fn new(event_type: EventType) -> Self {
497        Self {
498            base: EventMessage::new(event_type),
499            software_identity: None,
500            software_version: None,
501            software_system: None,
502            active: None,
503            processed: None,
504            loadavg: None,
505            freq: None,
506        }
507    }
508
509    /// Create a worker-online event
510    pub fn online(hostname: &str) -> Self {
511        Self {
512            base: EventMessage::new(EventType::WorkerOnline).with_hostname(hostname),
513            ..Self::new(EventType::WorkerOnline)
514        }
515    }
516
517    /// Create a worker-offline event
518    pub fn offline(hostname: &str) -> Self {
519        Self {
520            base: EventMessage::new(EventType::WorkerOffline).with_hostname(hostname),
521            ..Self::new(EventType::WorkerOffline)
522        }
523    }
524
525    /// Create a worker-heartbeat event
526    pub fn heartbeat(hostname: &str, active: u32, processed: u64) -> Self {
527        Self {
528            base: EventMessage::new(EventType::WorkerHeartbeat).with_hostname(hostname),
529            active: Some(active),
530            processed: Some(processed),
531            ..Self::new(EventType::WorkerHeartbeat)
532        }
533    }
534
535    /// Set software identity
536    pub fn with_software(mut self, identity: &str, version: &str, system: &str) -> Self {
537        self.software_identity = Some(identity.to_string());
538        self.software_version = Some(version.to_string());
539        self.software_system = Some(system.to_string());
540        self
541    }
542
543    /// Set load average
544    pub fn with_loadavg(mut self, loadavg: [f64; 3]) -> Self {
545        self.loadavg = Some(loadavg);
546        self
547    }
548
549    /// Set heartbeat frequency
550    pub fn with_freq(mut self, freq: f64) -> Self {
551        self.freq = Some(freq);
552        self
553    }
554
555    /// Serialize to JSON bytes
556    pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
557        serde_json::to_vec(self)
558    }
559
560    /// Deserialize from JSON bytes
561    pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
562        serde_json::from_slice(bytes)
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use serde_json::json;
570
571    #[test]
572    fn test_event_type_as_str() {
573        assert_eq!(EventType::TaskSent.as_str(), "task-sent");
574        assert_eq!(EventType::TaskSucceeded.as_str(), "task-succeeded");
575        assert_eq!(EventType::WorkerOnline.as_str(), "worker-online");
576    }
577
578    #[test]
579    fn test_event_type_is_task_event() {
580        assert!(EventType::TaskSent.is_task_event());
581        assert!(EventType::TaskFailed.is_task_event());
582        assert!(!EventType::WorkerOnline.is_task_event());
583    }
584
585    #[test]
586    fn test_event_type_is_worker_event() {
587        assert!(EventType::WorkerOnline.is_worker_event());
588        assert!(EventType::WorkerHeartbeat.is_worker_event());
589        assert!(!EventType::TaskSent.is_worker_event());
590    }
591
592    #[test]
593    fn test_event_type_display() {
594        assert_eq!(EventType::TaskStarted.to_string(), "task-started");
595    }
596
597    #[test]
598    fn test_event_type_from_str() {
599        use std::str::FromStr;
600
601        assert_eq!(
602            EventType::from_str("task-sent").unwrap(),
603            EventType::TaskSent
604        );
605        assert_eq!(
606            EventType::from_str("task-received").unwrap(),
607            EventType::TaskReceived
608        );
609        assert_eq!(
610            EventType::from_str("task-started").unwrap(),
611            EventType::TaskStarted
612        );
613        assert_eq!(
614            EventType::from_str("task-succeeded").unwrap(),
615            EventType::TaskSucceeded
616        );
617        assert_eq!(
618            EventType::from_str("task-failed").unwrap(),
619            EventType::TaskFailed
620        );
621        assert_eq!(
622            EventType::from_str("task-rejected").unwrap(),
623            EventType::TaskRejected
624        );
625        assert_eq!(
626            EventType::from_str("task-revoked").unwrap(),
627            EventType::TaskRevoked
628        );
629        assert_eq!(
630            EventType::from_str("task-retried").unwrap(),
631            EventType::TaskRetried
632        );
633        assert_eq!(
634            EventType::from_str("worker-online").unwrap(),
635            EventType::WorkerOnline
636        );
637        assert_eq!(
638            EventType::from_str("worker-offline").unwrap(),
639            EventType::WorkerOffline
640        );
641        assert_eq!(
642            EventType::from_str("worker-heartbeat").unwrap(),
643            EventType::WorkerHeartbeat
644        );
645
646        // Test custom event type
647        match EventType::from_str("custom-event").unwrap() {
648            EventType::Custom(s) => assert_eq!(s, "custom-event"),
649            _ => panic!("Expected Custom variant"),
650        }
651    }
652
653    #[test]
654    fn test_event_message_creation() {
655        let event = EventMessage::new(EventType::TaskSent).with_hostname("worker-1");
656
657        assert_eq!(event.event_type, "task-sent");
658        assert_eq!(event.hostname, Some("worker-1".to_string()));
659        assert!(event.timestamp > 0.0);
660    }
661
662    #[test]
663    fn test_event_message_json() {
664        let event = EventMessage::new(EventType::TaskStarted)
665            .with_hostname("host-1")
666            .with_pid(12345)
667            .with_clock(100);
668
669        let json_bytes = event.to_json().unwrap();
670        let decoded = EventMessage::from_json(&json_bytes).unwrap();
671
672        assert_eq!(decoded.event_type, "task-started");
673        assert_eq!(decoded.hostname, Some("host-1".to_string()));
674        assert_eq!(decoded.pid, Some(12345));
675        assert_eq!(decoded.clock, Some(100));
676    }
677
678    #[test]
679    fn test_task_event_sent() {
680        let task_id = Uuid::new_v4();
681        let event = TaskEvent::sent(task_id, "tasks.add")
682            .with_args("[1, 2]")
683            .with_kwargs("{}")
684            .with_hostname("worker-1")
685            .with_queue("celery");
686
687        assert_eq!(event.base.event_type, "task-sent");
688        assert_eq!(event.uuid, task_id);
689        assert_eq!(event.name, Some("tasks.add".to_string()));
690        assert_eq!(event.args, Some("[1, 2]".to_string()));
691        assert_eq!(event.queue, Some("celery".to_string()));
692    }
693
694    #[test]
695    fn test_task_event_succeeded() {
696        let task_id = Uuid::new_v4();
697        let event = TaskEvent::succeeded(task_id, json!(42), 0.123);
698
699        assert_eq!(event.base.event_type, "task-succeeded");
700        assert_eq!(event.result, Some(json!(42)));
701        assert_eq!(event.runtime, Some(0.123));
702    }
703
704    #[test]
705    fn test_task_event_failed() {
706        let task_id = Uuid::new_v4();
707        let event = TaskEvent::failed(task_id, "ValueError: bad input", Some("traceback..."));
708
709        assert_eq!(event.base.event_type, "task-failed");
710        assert_eq!(event.exception, Some("ValueError: bad input".to_string()));
711        assert_eq!(event.traceback, Some("traceback...".to_string()));
712    }
713
714    #[test]
715    fn test_task_event_retried() {
716        let task_id = Uuid::new_v4();
717        let event = TaskEvent::retried(task_id, "Timeout", 3);
718
719        assert_eq!(event.base.event_type, "task-retried");
720        assert_eq!(event.exception, Some("Timeout".to_string()));
721        assert_eq!(event.retries, Some(3));
722    }
723
724    #[test]
725    fn test_task_event_revoked() {
726        let task_id = Uuid::new_v4();
727        let event = TaskEvent::revoked(task_id, true, Some(9));
728
729        assert_eq!(event.base.event_type, "task-revoked");
730        assert_eq!(event.base.fields.get("terminated"), Some(&json!(true)));
731        assert_eq!(event.base.fields.get("signum"), Some(&json!(9)));
732    }
733
734    #[test]
735    fn test_task_event_json_round_trip() {
736        let task_id = Uuid::new_v4();
737        let parent_id = Uuid::new_v4();
738
739        let event = TaskEvent::started(task_id, "tasks.process")
740            .with_hostname("worker-2")
741            .with_parent(parent_id)
742            .with_queue("high-priority");
743
744        let json_bytes = event.to_json().unwrap();
745        let decoded = TaskEvent::from_json(&json_bytes).unwrap();
746
747        assert_eq!(decoded.uuid, task_id);
748        assert_eq!(decoded.name, Some("tasks.process".to_string()));
749        assert_eq!(decoded.parent_id, Some(parent_id));
750    }
751
752    #[test]
753    fn test_worker_event_online() {
754        let event =
755            WorkerEvent::online("worker@host").with_software("celers-worker", "0.1.0", "Linux");
756
757        assert_eq!(event.base.event_type, "worker-online");
758        assert_eq!(event.base.hostname, Some("worker@host".to_string()));
759        assert_eq!(event.software_identity, Some("celers-worker".to_string()));
760        assert_eq!(event.software_version, Some("0.1.0".to_string()));
761        assert_eq!(event.software_system, Some("Linux".to_string()));
762    }
763
764    #[test]
765    fn test_worker_event_heartbeat() {
766        let event = WorkerEvent::heartbeat("worker@host", 5, 1000)
767            .with_loadavg([0.5, 0.7, 0.9])
768            .with_freq(2.0);
769
770        assert_eq!(event.base.event_type, "worker-heartbeat");
771        assert_eq!(event.active, Some(5));
772        assert_eq!(event.processed, Some(1000));
773        assert_eq!(event.loadavg, Some([0.5, 0.7, 0.9]));
774        assert_eq!(event.freq, Some(2.0));
775    }
776
777    #[test]
778    fn test_worker_event_json_round_trip() {
779        let event = WorkerEvent::heartbeat("worker-1", 3, 500);
780
781        let json_bytes = event.to_json().unwrap();
782        let decoded = WorkerEvent::from_json(&json_bytes).unwrap();
783
784        assert_eq!(decoded.base.event_type, "worker-heartbeat");
785        assert_eq!(decoded.active, Some(3));
786        assert_eq!(decoded.processed, Some(500));
787    }
788
789    #[test]
790    fn test_event_message_from_event_type() {
791        let event: EventMessage = EventType::TaskSent.into();
792        assert_eq!(event.event_type, "task-sent");
793        assert!(event.timestamp > 0.0);
794
795        let event2: EventMessage = EventType::WorkerOnline.into();
796        assert_eq!(event2.event_type, "worker-online");
797    }
798
799    #[test]
800    fn test_event_message_equality() {
801        let event1 = EventMessage::new(EventType::TaskSent)
802            .with_hostname("host-1")
803            .with_pid(123);
804        let event3 = EventMessage::new(EventType::TaskSent)
805            .with_hostname("host-2")
806            .with_pid(123);
807
808        // Test that the same event equals itself (cloning preserves all fields including timestamp)
809        assert_eq!(event1, event1.clone());
810
811        // Verify different hostnames result in different events
812        assert_ne!(event1.hostname, event3.hostname);
813    }
814
815    #[test]
816    fn test_task_event_equality() {
817        let task_id = Uuid::new_v4();
818        let event1 = TaskEvent::sent(task_id, "tasks.add")
819            .with_hostname("worker-1")
820            .with_queue("celery");
821        let event2 = event1.clone();
822
823        assert_eq!(event1, event2);
824        assert_eq!(event1.uuid, event2.uuid);
825    }
826
827    #[test]
828    fn test_worker_event_equality() {
829        let event1 =
830            WorkerEvent::online("worker@host").with_software("celers-worker", "0.1.0", "Linux");
831        let event2 = event1.clone();
832
833        assert_eq!(event1, event2);
834        assert_eq!(event1.software_identity, event2.software_identity);
835    }
836}