1use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use uuid::Uuid;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "kebab-case")]
31pub enum EventType {
32 TaskSent,
34 TaskReceived,
35 TaskStarted,
36 TaskSucceeded,
37 TaskFailed,
38 TaskRejected,
39 TaskRevoked,
40 TaskRetried,
41
42 WorkerOnline,
44 WorkerOffline,
45 WorkerHeartbeat,
46
47 #[serde(untagged)]
49 Custom(String),
50}
51
52impl EventType {
53 #[inline]
55 pub fn as_str(&self) -> &str {
56 match self {
57 EventType::TaskSent => "task-sent",
58 EventType::TaskReceived => "task-received",
59 EventType::TaskStarted => "task-started",
60 EventType::TaskSucceeded => "task-succeeded",
61 EventType::TaskFailed => "task-failed",
62 EventType::TaskRejected => "task-rejected",
63 EventType::TaskRevoked => "task-revoked",
64 EventType::TaskRetried => "task-retried",
65 EventType::WorkerOnline => "worker-online",
66 EventType::WorkerOffline => "worker-offline",
67 EventType::WorkerHeartbeat => "worker-heartbeat",
68 EventType::Custom(s) => s,
69 }
70 }
71
72 #[inline]
74 pub fn is_task_event(&self) -> bool {
75 matches!(
76 self,
77 EventType::TaskSent
78 | EventType::TaskReceived
79 | EventType::TaskStarted
80 | EventType::TaskSucceeded
81 | EventType::TaskFailed
82 | EventType::TaskRejected
83 | EventType::TaskRevoked
84 | EventType::TaskRetried
85 )
86 }
87
88 #[inline]
90 pub fn is_worker_event(&self) -> bool {
91 matches!(
92 self,
93 EventType::WorkerOnline | EventType::WorkerOffline | EventType::WorkerHeartbeat
94 )
95 }
96}
97
98impl std::fmt::Display for EventType {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "{}", self.as_str())
101 }
102}
103
104impl std::str::FromStr for EventType {
105 type Err = String;
106
107 fn from_str(s: &str) -> Result<Self, Self::Err> {
108 match s {
109 "task-sent" => Ok(EventType::TaskSent),
110 "task-received" => Ok(EventType::TaskReceived),
111 "task-started" => Ok(EventType::TaskStarted),
112 "task-succeeded" => Ok(EventType::TaskSucceeded),
113 "task-failed" => Ok(EventType::TaskFailed),
114 "task-rejected" => Ok(EventType::TaskRejected),
115 "task-revoked" => Ok(EventType::TaskRevoked),
116 "task-retried" => Ok(EventType::TaskRetried),
117 "worker-online" => Ok(EventType::WorkerOnline),
118 "worker-offline" => Ok(EventType::WorkerOffline),
119 "worker-heartbeat" => Ok(EventType::WorkerHeartbeat),
120 other => Ok(EventType::Custom(other.to_string())),
121 }
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct EventMessage {
128 #[serde(rename = "type")]
130 pub event_type: String,
131
132 pub timestamp: f64,
134
135 #[serde(skip_serializing_if = "Option::is_none")]
137 pub hostname: Option<String>,
138
139 #[serde(skip_serializing_if = "Option::is_none")]
141 pub utcoffset: Option<i32>,
142
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub pid: Option<u32>,
146
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub clock: Option<u64>,
150
151 #[serde(flatten)]
153 pub fields: HashMap<String, serde_json::Value>,
154}
155
156impl EventMessage {
157 pub fn new(event_type: EventType) -> Self {
159 Self {
160 event_type: event_type.as_str().to_string(),
161 timestamp: Utc::now().timestamp() as f64
162 + (Utc::now().timestamp_subsec_nanos() as f64 / 1_000_000_000.0),
163 hostname: None,
164 utcoffset: Some(0),
165 pid: None,
166 clock: None,
167 fields: HashMap::new(),
168 }
169 }
170
171 pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
173 self.timestamp = timestamp.timestamp() as f64
174 + (timestamp.timestamp_subsec_nanos() as f64 / 1_000_000_000.0);
175 self
176 }
177
178 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
180 self.hostname = Some(hostname.into());
181 self
182 }
183
184 pub fn with_pid(mut self, pid: u32) -> Self {
186 self.pid = Some(pid);
187 self
188 }
189
190 pub fn with_clock(mut self, clock: u64) -> Self {
192 self.clock = Some(clock);
193 self
194 }
195
196 pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
198 self.fields.insert(key.into(), value);
199 self
200 }
201
202 pub fn get_type(&self) -> &str {
204 &self.event_type
205 }
206
207 pub fn get_datetime(&self) -> DateTime<Utc> {
209 DateTime::from_timestamp(
210 self.timestamp as i64,
211 ((self.timestamp.fract()) * 1_000_000_000.0) as u32,
212 )
213 .unwrap_or_else(Utc::now)
214 }
215
216 pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
218 serde_json::to_vec(self)
219 }
220
221 pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
223 serde_json::from_slice(bytes)
224 }
225}
226
227impl From<EventType> for EventMessage {
228 fn from(event_type: EventType) -> Self {
229 Self::new(event_type)
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
235pub struct TaskEvent {
236 #[serde(flatten)]
238 pub base: EventMessage,
239
240 pub uuid: Uuid,
242
243 #[serde(skip_serializing_if = "Option::is_none")]
245 pub name: Option<String>,
246
247 #[serde(skip_serializing_if = "Option::is_none")]
249 pub args: Option<String>,
250
251 #[serde(skip_serializing_if = "Option::is_none")]
253 pub kwargs: Option<String>,
254
255 #[serde(skip_serializing_if = "Option::is_none")]
257 pub retries: Option<u32>,
258
259 #[serde(skip_serializing_if = "Option::is_none")]
261 pub eta: Option<String>,
262
263 #[serde(skip_serializing_if = "Option::is_none")]
265 pub expires: Option<String>,
266
267 #[serde(skip_serializing_if = "Option::is_none")]
269 pub parent_id: Option<Uuid>,
270
271 #[serde(skip_serializing_if = "Option::is_none")]
273 pub root_id: Option<Uuid>,
274
275 #[serde(skip_serializing_if = "Option::is_none")]
277 pub result: Option<serde_json::Value>,
278
279 #[serde(skip_serializing_if = "Option::is_none")]
281 pub exception: Option<String>,
282
283 #[serde(skip_serializing_if = "Option::is_none")]
285 pub traceback: Option<String>,
286
287 #[serde(skip_serializing_if = "Option::is_none")]
289 pub runtime: Option<f64>,
290
291 #[serde(skip_serializing_if = "Option::is_none")]
293 pub queue: Option<String>,
294
295 #[serde(skip_serializing_if = "Option::is_none")]
297 pub exchange: Option<String>,
298
299 #[serde(skip_serializing_if = "Option::is_none")]
301 pub routing_key: Option<String>,
302}
303
304impl TaskEvent {
305 pub fn new(event_type: EventType, task_id: Uuid) -> Self {
307 Self {
308 base: EventMessage::new(event_type),
309 uuid: task_id,
310 name: None,
311 args: None,
312 kwargs: None,
313 retries: None,
314 eta: None,
315 expires: None,
316 parent_id: None,
317 root_id: None,
318 result: None,
319 exception: None,
320 traceback: None,
321 runtime: None,
322 queue: None,
323 exchange: None,
324 routing_key: None,
325 }
326 }
327
328 pub fn sent(task_id: Uuid, task_name: &str) -> Self {
330 Self {
331 name: Some(task_name.to_string()),
332 ..Self::new(EventType::TaskSent, task_id)
333 }
334 }
335
336 pub fn received(task_id: Uuid, task_name: &str) -> Self {
338 Self {
339 name: Some(task_name.to_string()),
340 ..Self::new(EventType::TaskReceived, task_id)
341 }
342 }
343
344 pub fn started(task_id: Uuid, task_name: &str) -> Self {
346 Self {
347 name: Some(task_name.to_string()),
348 ..Self::new(EventType::TaskStarted, task_id)
349 }
350 }
351
352 pub fn succeeded(task_id: Uuid, result: serde_json::Value, runtime: f64) -> Self {
354 Self {
355 result: Some(result),
356 runtime: Some(runtime),
357 ..Self::new(EventType::TaskSucceeded, task_id)
358 }
359 }
360
361 pub fn failed(task_id: Uuid, exception: &str, traceback: Option<&str>) -> Self {
363 Self {
364 exception: Some(exception.to_string()),
365 traceback: traceback.map(|s| s.to_string()),
366 ..Self::new(EventType::TaskFailed, task_id)
367 }
368 }
369
370 pub fn retried(task_id: Uuid, exception: &str, retries: u32) -> Self {
372 Self {
373 exception: Some(exception.to_string()),
374 retries: Some(retries),
375 ..Self::new(EventType::TaskRetried, task_id)
376 }
377 }
378
379 pub fn revoked(task_id: Uuid, terminated: bool, signum: Option<i32>) -> Self {
381 let mut event = Self::new(EventType::TaskRevoked, task_id);
382 event
383 .base
384 .fields
385 .insert("terminated".to_string(), serde_json::json!(terminated));
386 if let Some(sig) = signum {
387 event
388 .base
389 .fields
390 .insert("signum".to_string(), serde_json::json!(sig));
391 }
392 event
393 }
394
395 pub fn rejected(task_id: Uuid, requeue: bool) -> Self {
397 let mut event = Self::new(EventType::TaskRejected, task_id);
398 event
399 .base
400 .fields
401 .insert("requeue".to_string(), serde_json::json!(requeue));
402 event
403 }
404
405 pub fn with_name(mut self, name: impl Into<String>) -> Self {
407 self.name = Some(name.into());
408 self
409 }
410
411 pub fn with_args(mut self, args: impl Into<String>) -> Self {
413 self.args = Some(args.into());
414 self
415 }
416
417 pub fn with_kwargs(mut self, kwargs: impl Into<String>) -> Self {
419 self.kwargs = Some(kwargs.into());
420 self
421 }
422
423 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
425 self.base.hostname = Some(hostname.into());
426 self
427 }
428
429 pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
431 self.queue = Some(queue.into());
432 self
433 }
434
435 pub fn with_parent(mut self, parent_id: Uuid) -> Self {
437 self.parent_id = Some(parent_id);
438 self
439 }
440
441 pub fn with_root(mut self, root_id: Uuid) -> Self {
443 self.root_id = Some(root_id);
444 self
445 }
446
447 pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
449 serde_json::to_vec(self)
450 }
451
452 pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
454 serde_json::from_slice(bytes)
455 }
456}
457
458#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
460pub struct WorkerEvent {
461 #[serde(flatten)]
463 pub base: EventMessage,
464
465 #[serde(rename = "sw_ident", skip_serializing_if = "Option::is_none")]
467 pub software_identity: Option<String>,
468
469 #[serde(rename = "sw_ver", skip_serializing_if = "Option::is_none")]
471 pub software_version: Option<String>,
472
473 #[serde(rename = "sw_sys", skip_serializing_if = "Option::is_none")]
475 pub software_system: Option<String>,
476
477 #[serde(skip_serializing_if = "Option::is_none")]
479 pub active: Option<u32>,
480
481 #[serde(skip_serializing_if = "Option::is_none")]
483 pub processed: Option<u64>,
484
485 #[serde(skip_serializing_if = "Option::is_none")]
487 pub loadavg: Option<[f64; 3]>,
488
489 #[serde(skip_serializing_if = "Option::is_none")]
491 pub freq: Option<f64>,
492}
493
494impl WorkerEvent {
495 pub fn new(event_type: EventType) -> Self {
497 Self {
498 base: EventMessage::new(event_type),
499 software_identity: None,
500 software_version: None,
501 software_system: None,
502 active: None,
503 processed: None,
504 loadavg: None,
505 freq: None,
506 }
507 }
508
509 pub fn online(hostname: &str) -> Self {
511 Self {
512 base: EventMessage::new(EventType::WorkerOnline).with_hostname(hostname),
513 ..Self::new(EventType::WorkerOnline)
514 }
515 }
516
517 pub fn offline(hostname: &str) -> Self {
519 Self {
520 base: EventMessage::new(EventType::WorkerOffline).with_hostname(hostname),
521 ..Self::new(EventType::WorkerOffline)
522 }
523 }
524
525 pub fn heartbeat(hostname: &str, active: u32, processed: u64) -> Self {
527 Self {
528 base: EventMessage::new(EventType::WorkerHeartbeat).with_hostname(hostname),
529 active: Some(active),
530 processed: Some(processed),
531 ..Self::new(EventType::WorkerHeartbeat)
532 }
533 }
534
535 pub fn with_software(mut self, identity: &str, version: &str, system: &str) -> Self {
537 self.software_identity = Some(identity.to_string());
538 self.software_version = Some(version.to_string());
539 self.software_system = Some(system.to_string());
540 self
541 }
542
543 pub fn with_loadavg(mut self, loadavg: [f64; 3]) -> Self {
545 self.loadavg = Some(loadavg);
546 self
547 }
548
549 pub fn with_freq(mut self, freq: f64) -> Self {
551 self.freq = Some(freq);
552 self
553 }
554
555 pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
557 serde_json::to_vec(self)
558 }
559
560 pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
562 serde_json::from_slice(bytes)
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569 use serde_json::json;
570
571 #[test]
572 fn test_event_type_as_str() {
573 assert_eq!(EventType::TaskSent.as_str(), "task-sent");
574 assert_eq!(EventType::TaskSucceeded.as_str(), "task-succeeded");
575 assert_eq!(EventType::WorkerOnline.as_str(), "worker-online");
576 }
577
578 #[test]
579 fn test_event_type_is_task_event() {
580 assert!(EventType::TaskSent.is_task_event());
581 assert!(EventType::TaskFailed.is_task_event());
582 assert!(!EventType::WorkerOnline.is_task_event());
583 }
584
585 #[test]
586 fn test_event_type_is_worker_event() {
587 assert!(EventType::WorkerOnline.is_worker_event());
588 assert!(EventType::WorkerHeartbeat.is_worker_event());
589 assert!(!EventType::TaskSent.is_worker_event());
590 }
591
592 #[test]
593 fn test_event_type_display() {
594 assert_eq!(EventType::TaskStarted.to_string(), "task-started");
595 }
596
597 #[test]
598 fn test_event_type_from_str() {
599 use std::str::FromStr;
600
601 assert_eq!(
602 EventType::from_str("task-sent").unwrap(),
603 EventType::TaskSent
604 );
605 assert_eq!(
606 EventType::from_str("task-received").unwrap(),
607 EventType::TaskReceived
608 );
609 assert_eq!(
610 EventType::from_str("task-started").unwrap(),
611 EventType::TaskStarted
612 );
613 assert_eq!(
614 EventType::from_str("task-succeeded").unwrap(),
615 EventType::TaskSucceeded
616 );
617 assert_eq!(
618 EventType::from_str("task-failed").unwrap(),
619 EventType::TaskFailed
620 );
621 assert_eq!(
622 EventType::from_str("task-rejected").unwrap(),
623 EventType::TaskRejected
624 );
625 assert_eq!(
626 EventType::from_str("task-revoked").unwrap(),
627 EventType::TaskRevoked
628 );
629 assert_eq!(
630 EventType::from_str("task-retried").unwrap(),
631 EventType::TaskRetried
632 );
633 assert_eq!(
634 EventType::from_str("worker-online").unwrap(),
635 EventType::WorkerOnline
636 );
637 assert_eq!(
638 EventType::from_str("worker-offline").unwrap(),
639 EventType::WorkerOffline
640 );
641 assert_eq!(
642 EventType::from_str("worker-heartbeat").unwrap(),
643 EventType::WorkerHeartbeat
644 );
645
646 match EventType::from_str("custom-event").unwrap() {
648 EventType::Custom(s) => assert_eq!(s, "custom-event"),
649 _ => panic!("Expected Custom variant"),
650 }
651 }
652
653 #[test]
654 fn test_event_message_creation() {
655 let event = EventMessage::new(EventType::TaskSent).with_hostname("worker-1");
656
657 assert_eq!(event.event_type, "task-sent");
658 assert_eq!(event.hostname, Some("worker-1".to_string()));
659 assert!(event.timestamp > 0.0);
660 }
661
662 #[test]
663 fn test_event_message_json() {
664 let event = EventMessage::new(EventType::TaskStarted)
665 .with_hostname("host-1")
666 .with_pid(12345)
667 .with_clock(100);
668
669 let json_bytes = event.to_json().unwrap();
670 let decoded = EventMessage::from_json(&json_bytes).unwrap();
671
672 assert_eq!(decoded.event_type, "task-started");
673 assert_eq!(decoded.hostname, Some("host-1".to_string()));
674 assert_eq!(decoded.pid, Some(12345));
675 assert_eq!(decoded.clock, Some(100));
676 }
677
678 #[test]
679 fn test_task_event_sent() {
680 let task_id = Uuid::new_v4();
681 let event = TaskEvent::sent(task_id, "tasks.add")
682 .with_args("[1, 2]")
683 .with_kwargs("{}")
684 .with_hostname("worker-1")
685 .with_queue("celery");
686
687 assert_eq!(event.base.event_type, "task-sent");
688 assert_eq!(event.uuid, task_id);
689 assert_eq!(event.name, Some("tasks.add".to_string()));
690 assert_eq!(event.args, Some("[1, 2]".to_string()));
691 assert_eq!(event.queue, Some("celery".to_string()));
692 }
693
694 #[test]
695 fn test_task_event_succeeded() {
696 let task_id = Uuid::new_v4();
697 let event = TaskEvent::succeeded(task_id, json!(42), 0.123);
698
699 assert_eq!(event.base.event_type, "task-succeeded");
700 assert_eq!(event.result, Some(json!(42)));
701 assert_eq!(event.runtime, Some(0.123));
702 }
703
704 #[test]
705 fn test_task_event_failed() {
706 let task_id = Uuid::new_v4();
707 let event = TaskEvent::failed(task_id, "ValueError: bad input", Some("traceback..."));
708
709 assert_eq!(event.base.event_type, "task-failed");
710 assert_eq!(event.exception, Some("ValueError: bad input".to_string()));
711 assert_eq!(event.traceback, Some("traceback...".to_string()));
712 }
713
714 #[test]
715 fn test_task_event_retried() {
716 let task_id = Uuid::new_v4();
717 let event = TaskEvent::retried(task_id, "Timeout", 3);
718
719 assert_eq!(event.base.event_type, "task-retried");
720 assert_eq!(event.exception, Some("Timeout".to_string()));
721 assert_eq!(event.retries, Some(3));
722 }
723
724 #[test]
725 fn test_task_event_revoked() {
726 let task_id = Uuid::new_v4();
727 let event = TaskEvent::revoked(task_id, true, Some(9));
728
729 assert_eq!(event.base.event_type, "task-revoked");
730 assert_eq!(event.base.fields.get("terminated"), Some(&json!(true)));
731 assert_eq!(event.base.fields.get("signum"), Some(&json!(9)));
732 }
733
734 #[test]
735 fn test_task_event_json_round_trip() {
736 let task_id = Uuid::new_v4();
737 let parent_id = Uuid::new_v4();
738
739 let event = TaskEvent::started(task_id, "tasks.process")
740 .with_hostname("worker-2")
741 .with_parent(parent_id)
742 .with_queue("high-priority");
743
744 let json_bytes = event.to_json().unwrap();
745 let decoded = TaskEvent::from_json(&json_bytes).unwrap();
746
747 assert_eq!(decoded.uuid, task_id);
748 assert_eq!(decoded.name, Some("tasks.process".to_string()));
749 assert_eq!(decoded.parent_id, Some(parent_id));
750 }
751
752 #[test]
753 fn test_worker_event_online() {
754 let event =
755 WorkerEvent::online("worker@host").with_software("celers-worker", "0.1.0", "Linux");
756
757 assert_eq!(event.base.event_type, "worker-online");
758 assert_eq!(event.base.hostname, Some("worker@host".to_string()));
759 assert_eq!(event.software_identity, Some("celers-worker".to_string()));
760 assert_eq!(event.software_version, Some("0.1.0".to_string()));
761 assert_eq!(event.software_system, Some("Linux".to_string()));
762 }
763
764 #[test]
765 fn test_worker_event_heartbeat() {
766 let event = WorkerEvent::heartbeat("worker@host", 5, 1000)
767 .with_loadavg([0.5, 0.7, 0.9])
768 .with_freq(2.0);
769
770 assert_eq!(event.base.event_type, "worker-heartbeat");
771 assert_eq!(event.active, Some(5));
772 assert_eq!(event.processed, Some(1000));
773 assert_eq!(event.loadavg, Some([0.5, 0.7, 0.9]));
774 assert_eq!(event.freq, Some(2.0));
775 }
776
777 #[test]
778 fn test_worker_event_json_round_trip() {
779 let event = WorkerEvent::heartbeat("worker-1", 3, 500);
780
781 let json_bytes = event.to_json().unwrap();
782 let decoded = WorkerEvent::from_json(&json_bytes).unwrap();
783
784 assert_eq!(decoded.base.event_type, "worker-heartbeat");
785 assert_eq!(decoded.active, Some(3));
786 assert_eq!(decoded.processed, Some(500));
787 }
788
789 #[test]
790 fn test_event_message_from_event_type() {
791 let event: EventMessage = EventType::TaskSent.into();
792 assert_eq!(event.event_type, "task-sent");
793 assert!(event.timestamp > 0.0);
794
795 let event2: EventMessage = EventType::WorkerOnline.into();
796 assert_eq!(event2.event_type, "worker-online");
797 }
798
799 #[test]
800 fn test_event_message_equality() {
801 let event1 = EventMessage::new(EventType::TaskSent)
802 .with_hostname("host-1")
803 .with_pid(123);
804 let event3 = EventMessage::new(EventType::TaskSent)
805 .with_hostname("host-2")
806 .with_pid(123);
807
808 assert_eq!(event1, event1.clone());
810
811 assert_ne!(event1.hostname, event3.hostname);
813 }
814
815 #[test]
816 fn test_task_event_equality() {
817 let task_id = Uuid::new_v4();
818 let event1 = TaskEvent::sent(task_id, "tasks.add")
819 .with_hostname("worker-1")
820 .with_queue("celery");
821 let event2 = event1.clone();
822
823 assert_eq!(event1, event2);
824 assert_eq!(event1.uuid, event2.uuid);
825 }
826
827 #[test]
828 fn test_worker_event_equality() {
829 let event1 =
830 WorkerEvent::online("worker@host").with_software("celers-worker", "0.1.0", "Linux");
831 let event2 = event1.clone();
832
833 assert_eq!(event1, event2);
834 assert_eq!(event1.software_identity, event2.software_identity);
835 }
836}