1use chrono::{DateTime, Utc};
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50#[serde(tag = "type", rename_all = "kebab-case")]
51pub enum TaskEvent {
52 #[serde(rename = "task-sent")]
54 Sent {
55 task_id: Uuid,
57 task_name: String,
59 queue: String,
61 timestamp: DateTime<Utc>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 args: Option<String>,
66 #[serde(skip_serializing_if = "Option::is_none")]
68 kwargs: Option<String>,
69 #[serde(skip_serializing_if = "Option::is_none")]
71 eta: Option<DateTime<Utc>>,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 expires: Option<DateTime<Utc>>,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 retries: Option<u32>,
78 },
79
80 #[serde(rename = "task-received")]
82 Received {
83 task_id: Uuid,
85 task_name: String,
87 hostname: String,
89 timestamp: DateTime<Utc>,
91 pid: u32,
93 },
94
95 #[serde(rename = "task-started")]
97 Started {
98 task_id: Uuid,
100 task_name: String,
102 hostname: String,
104 timestamp: DateTime<Utc>,
106 pid: u32,
108 },
109
110 #[serde(rename = "task-succeeded")]
112 Succeeded {
113 task_id: Uuid,
115 task_name: String,
117 hostname: String,
119 timestamp: DateTime<Utc>,
121 runtime: f64,
123 #[serde(skip_serializing_if = "Option::is_none")]
125 result: Option<String>,
126 },
127
128 #[serde(rename = "task-failed")]
130 Failed {
131 task_id: Uuid,
133 task_name: String,
135 hostname: String,
137 timestamp: DateTime<Utc>,
139 exception: String,
141 #[serde(skip_serializing_if = "Option::is_none")]
143 traceback: Option<String>,
144 },
145
146 #[serde(rename = "task-retried")]
148 Retried {
149 task_id: Uuid,
151 task_name: String,
153 hostname: String,
155 timestamp: DateTime<Utc>,
157 exception: String,
159 retries: u32,
161 },
162
163 #[serde(rename = "task-revoked")]
165 Revoked {
166 task_id: Uuid,
168 #[serde(skip_serializing_if = "Option::is_none")]
170 task_name: Option<String>,
171 timestamp: DateTime<Utc>,
173 terminated: bool,
175 #[serde(skip_serializing_if = "Option::is_none")]
177 signum: Option<i32>,
178 expired: bool,
180 },
181
182 #[serde(rename = "task-rejected")]
184 Rejected {
185 task_id: Uuid,
187 #[serde(skip_serializing_if = "Option::is_none")]
189 task_name: Option<String>,
190 hostname: String,
192 timestamp: DateTime<Utc>,
194 reason: String,
196 },
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201#[serde(tag = "type", rename_all = "kebab-case")]
202pub enum WorkerEvent {
203 #[serde(rename = "worker-online")]
205 Online {
206 hostname: String,
208 timestamp: DateTime<Utc>,
210 sw_ident: String,
212 sw_ver: String,
214 sw_sys: String,
216 },
217
218 #[serde(rename = "worker-offline")]
220 Offline {
221 hostname: String,
223 timestamp: DateTime<Utc>,
225 },
226
227 #[serde(rename = "worker-heartbeat")]
229 Heartbeat {
230 hostname: String,
232 timestamp: DateTime<Utc>,
234 active: u32,
236 processed: u64,
238 #[serde(skip_serializing_if = "Option::is_none")]
240 loadavg: Option<[f64; 3]>,
241 freq: f64,
243 },
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
248#[serde(untagged)]
249pub enum Event {
250 Task(TaskEvent),
252 Worker(WorkerEvent),
254}
255
256impl Event {
257 #[inline]
259 #[must_use]
260 pub fn event_type(&self) -> &'static str {
261 match self {
262 Event::Task(TaskEvent::Sent { .. }) => "task-sent",
263 Event::Task(TaskEvent::Received { .. }) => "task-received",
264 Event::Task(TaskEvent::Started { .. }) => "task-started",
265 Event::Task(TaskEvent::Succeeded { .. }) => "task-succeeded",
266 Event::Task(TaskEvent::Failed { .. }) => "task-failed",
267 Event::Task(TaskEvent::Retried { .. }) => "task-retried",
268 Event::Task(TaskEvent::Revoked { .. }) => "task-revoked",
269 Event::Task(TaskEvent::Rejected { .. }) => "task-rejected",
270 Event::Worker(WorkerEvent::Online { .. }) => "worker-online",
271 Event::Worker(WorkerEvent::Offline { .. }) => "worker-offline",
272 Event::Worker(WorkerEvent::Heartbeat { .. }) => "worker-heartbeat",
273 }
274 }
275
276 #[inline]
278 #[must_use]
279 pub fn timestamp(&self) -> DateTime<Utc> {
280 match self {
281 Event::Task(e) => match e {
282 TaskEvent::Sent { timestamp, .. }
283 | TaskEvent::Received { timestamp, .. }
284 | TaskEvent::Started { timestamp, .. }
285 | TaskEvent::Succeeded { timestamp, .. }
286 | TaskEvent::Failed { timestamp, .. }
287 | TaskEvent::Retried { timestamp, .. }
288 | TaskEvent::Revoked { timestamp, .. }
289 | TaskEvent::Rejected { timestamp, .. } => *timestamp,
290 },
291 Event::Worker(e) => match e {
292 WorkerEvent::Online { timestamp, .. }
293 | WorkerEvent::Offline { timestamp, .. }
294 | WorkerEvent::Heartbeat { timestamp, .. } => *timestamp,
295 },
296 }
297 }
298
299 #[inline]
301 #[must_use]
302 pub fn task_id(&self) -> Option<Uuid> {
303 match self {
304 Event::Task(e) => Some(match e {
305 TaskEvent::Sent { task_id, .. }
306 | TaskEvent::Received { task_id, .. }
307 | TaskEvent::Started { task_id, .. }
308 | TaskEvent::Succeeded { task_id, .. }
309 | TaskEvent::Failed { task_id, .. }
310 | TaskEvent::Retried { task_id, .. }
311 | TaskEvent::Revoked { task_id, .. }
312 | TaskEvent::Rejected { task_id, .. } => *task_id,
313 }),
314 Event::Worker(_) => None,
315 }
316 }
317
318 #[inline]
320 #[must_use]
321 pub fn hostname(&self) -> Option<&str> {
322 match self {
323 Event::Task(e) => match e {
324 TaskEvent::Received { hostname, .. }
325 | TaskEvent::Started { hostname, .. }
326 | TaskEvent::Succeeded { hostname, .. }
327 | TaskEvent::Failed { hostname, .. }
328 | TaskEvent::Retried { hostname, .. }
329 | TaskEvent::Rejected { hostname, .. } => Some(hostname),
330 TaskEvent::Sent { .. } | TaskEvent::Revoked { .. } => None,
331 },
332 Event::Worker(e) => match e {
333 WorkerEvent::Online { hostname, .. }
334 | WorkerEvent::Offline { hostname, .. }
335 | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname),
336 },
337 }
338 }
339
340 #[inline]
342 #[must_use]
343 pub const fn is_task_event(&self) -> bool {
344 matches!(self, Event::Task(_))
345 }
346
347 #[inline]
349 #[must_use]
350 pub const fn is_worker_event(&self) -> bool {
351 matches!(self, Event::Worker(_))
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct TaskEventBuilder {
358 task_id: Uuid,
359 task_name: String,
360 hostname: Option<String>,
361 pid: Option<u32>,
362}
363
364impl TaskEventBuilder {
365 pub fn new(task_id: Uuid, task_name: impl Into<String>) -> Self {
367 Self {
368 task_id,
369 task_name: task_name.into(),
370 hostname: None,
371 pid: None,
372 }
373 }
374
375 #[must_use]
377 pub fn hostname(mut self, hostname: impl Into<String>) -> Self {
378 self.hostname = Some(hostname.into());
379 self
380 }
381
382 #[must_use]
384 pub fn pid(mut self, pid: u32) -> Self {
385 self.pid = Some(pid);
386 self
387 }
388
389 pub fn sent(self, queue: impl Into<String>) -> Event {
391 Event::Task(TaskEvent::Sent {
392 task_id: self.task_id,
393 task_name: self.task_name,
394 queue: queue.into(),
395 timestamp: Utc::now(),
396 args: None,
397 kwargs: None,
398 eta: None,
399 expires: None,
400 retries: None,
401 })
402 }
403
404 #[must_use]
406 pub fn received(self) -> Event {
407 Event::Task(TaskEvent::Received {
408 task_id: self.task_id,
409 task_name: self.task_name,
410 hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
411 timestamp: Utc::now(),
412 pid: self.pid.unwrap_or(0),
413 })
414 }
415
416 #[must_use]
418 pub fn started(self) -> Event {
419 Event::Task(TaskEvent::Started {
420 task_id: self.task_id,
421 task_name: self.task_name,
422 hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
423 timestamp: Utc::now(),
424 pid: self.pid.unwrap_or(0),
425 })
426 }
427
428 #[must_use]
430 pub fn succeeded(self, runtime: f64) -> Event {
431 Event::Task(TaskEvent::Succeeded {
432 task_id: self.task_id,
433 task_name: self.task_name,
434 hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
435 timestamp: Utc::now(),
436 runtime,
437 result: None,
438 })
439 }
440
441 pub fn failed(self, exception: impl Into<String>) -> Event {
443 Event::Task(TaskEvent::Failed {
444 task_id: self.task_id,
445 task_name: self.task_name,
446 hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
447 timestamp: Utc::now(),
448 exception: exception.into(),
449 traceback: None,
450 })
451 }
452
453 pub fn retried(self, exception: impl Into<String>, retries: u32) -> Event {
455 Event::Task(TaskEvent::Retried {
456 task_id: self.task_id,
457 task_name: self.task_name,
458 hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
459 timestamp: Utc::now(),
460 exception: exception.into(),
461 retries,
462 })
463 }
464}
465
466#[derive(Debug, Clone)]
468pub struct WorkerEventBuilder {
469 hostname: String,
470}
471
472impl WorkerEventBuilder {
473 pub fn new(hostname: impl Into<String>) -> Self {
475 Self {
476 hostname: hostname.into(),
477 }
478 }
479
480 #[must_use]
482 pub fn online(self) -> Event {
483 Event::Worker(WorkerEvent::Online {
484 hostname: self.hostname,
485 timestamp: Utc::now(),
486 sw_ident: "celers".to_string(),
487 sw_ver: env!("CARGO_PKG_VERSION").to_string(),
488 sw_sys: std::env::consts::OS.to_string(),
489 })
490 }
491
492 #[must_use]
494 pub fn offline(self) -> Event {
495 Event::Worker(WorkerEvent::Offline {
496 hostname: self.hostname,
497 timestamp: Utc::now(),
498 })
499 }
500
501 #[must_use]
503 pub fn heartbeat(self, active: u32, processed: u64, loadavg: [f64; 3], freq: f64) -> Event {
504 let loadavg_opt = if loadavg == [0.0, 0.0, 0.0] {
506 None
507 } else {
508 Some(loadavg)
509 };
510
511 Event::Worker(WorkerEvent::Heartbeat {
512 hostname: self.hostname,
513 timestamp: Utc::now(),
514 active,
515 processed,
516 loadavg: loadavg_opt,
517 freq,
518 })
519 }
520}
521
522use async_trait::async_trait;
527use std::sync::Arc;
528use tokio::sync::broadcast;
529
530#[async_trait]
556pub trait EventEmitter: Send + Sync {
557 async fn emit(&self, event: Event) -> crate::Result<()>;
559
560 async fn emit_batch(&self, events: Vec<Event>) -> crate::Result<()> {
562 for event in events {
563 self.emit(event).await?;
564 }
565 Ok(())
566 }
567
568 fn is_enabled(&self) -> bool {
570 true
571 }
572}
573
574#[derive(Debug, Clone, Default)]
578pub struct NoOpEventEmitter;
579
580impl NoOpEventEmitter {
581 #[must_use]
583 pub fn new() -> Self {
584 Self
585 }
586}
587
588#[async_trait]
589impl EventEmitter for NoOpEventEmitter {
590 async fn emit(&self, _event: Event) -> crate::Result<()> {
591 Ok(())
592 }
593
594 fn is_enabled(&self) -> bool {
595 false
596 }
597}
598
599#[derive(Debug, Clone)]
603pub struct InMemoryEventEmitter {
604 sender: broadcast::Sender<Event>,
605}
606
607impl InMemoryEventEmitter {
608 #[must_use]
610 pub fn new(capacity: usize) -> Self {
611 let (sender, _) = broadcast::channel(capacity);
612 Self { sender }
613 }
614
615 #[must_use]
617 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
618 self.sender.subscribe()
619 }
620
621 #[inline]
623 #[must_use]
624 pub fn subscriber_count(&self) -> usize {
625 self.sender.receiver_count()
626 }
627}
628
629#[async_trait]
630impl EventEmitter for InMemoryEventEmitter {
631 async fn emit(&self, event: Event) -> crate::Result<()> {
632 let _ = self.sender.send(event);
634 Ok(())
635 }
636}
637
638#[derive(Debug, Clone, Default)]
642pub struct LoggingEventEmitter {
643 level: LogLevel,
645}
646
647#[derive(Debug, Clone, Copy, Default)]
649pub enum LogLevel {
650 Trace,
652 #[default]
654 Debug,
655 Info,
657}
658
659impl LoggingEventEmitter {
660 #[must_use]
662 pub fn new() -> Self {
663 Self::default()
664 }
665
666 #[must_use]
668 pub fn with_level(level: LogLevel) -> Self {
669 Self { level }
670 }
671}
672
673#[async_trait]
674impl EventEmitter for LoggingEventEmitter {
675 async fn emit(&self, event: Event) -> crate::Result<()> {
676 let event_type = event.event_type();
677 let task_id = event.task_id().map(|id| id.to_string());
678 let hostname = event.hostname().map(String::from);
679
680 match self.level {
681 LogLevel::Trace => {
682 tracing::trace!(
683 event_type = event_type,
684 task_id = ?task_id,
685 hostname = ?hostname,
686 "Event emitted"
687 );
688 }
689 LogLevel::Debug => {
690 tracing::debug!(
691 event_type = event_type,
692 task_id = ?task_id,
693 hostname = ?hostname,
694 "Event emitted"
695 );
696 }
697 LogLevel::Info => {
698 tracing::info!(
699 event_type = event_type,
700 task_id = ?task_id,
701 hostname = ?hostname,
702 "Event emitted"
703 );
704 }
705 }
706 Ok(())
707 }
708}
709
710#[derive(Clone)]
714pub struct CompositeEventEmitter {
715 emitters: Vec<Arc<dyn EventEmitter>>,
716}
717
718impl CompositeEventEmitter {
719 #[must_use]
721 pub fn new() -> Self {
722 Self {
723 emitters: Vec::new(),
724 }
725 }
726
727 #[must_use]
729 pub fn with_emitter<E: EventEmitter + 'static>(mut self, emitter: E) -> Self {
730 self.emitters.push(Arc::new(emitter));
731 self
732 }
733
734 #[must_use]
736 pub fn add_arc(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
737 self.emitters.push(emitter);
738 self
739 }
740}
741
742impl Default for CompositeEventEmitter {
743 fn default() -> Self {
744 Self::new()
745 }
746}
747
748#[async_trait]
749impl EventEmitter for CompositeEventEmitter {
750 async fn emit(&self, event: Event) -> crate::Result<()> {
751 for emitter in &self.emitters {
752 if emitter.is_enabled() {
753 emitter.emit(event.clone()).await?;
754 }
755 }
756 Ok(())
757 }
758
759 fn is_enabled(&self) -> bool {
760 self.emitters.iter().any(|e| e.is_enabled())
761 }
762}
763
764impl std::fmt::Debug for CompositeEventEmitter {
765 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
766 f.debug_struct("CompositeEventEmitter")
767 .field("emitter_count", &self.emitters.len())
768 .finish()
769 }
770}
771
772use std::collections::HashMap;
777use std::path::PathBuf;
778use tokio::sync::RwLock;
779
780#[derive(Clone)]
782pub enum EventFilter {
783 All,
785 TaskOnly,
787 WorkerOnly,
789 EventTypes(Vec<String>),
791 TaskName(String),
793 Hostname(String),
795 Custom(Arc<dyn Fn(&Event) -> bool + Send + Sync>),
797 And(Vec<EventFilter>),
799 Or(Vec<EventFilter>),
801}
802
803impl EventFilter {
804 #[must_use]
806 pub fn matches(&self, event: &Event) -> bool {
807 match self {
808 EventFilter::All => true,
809 EventFilter::TaskOnly => matches!(event, Event::Task(_)),
810 EventFilter::WorkerOnly => matches!(event, Event::Worker(_)),
811 EventFilter::EventTypes(types) => types.contains(&event.event_type().to_string()),
812 EventFilter::TaskName(name) => {
813 if let Event::Task(task_event) = event {
814 match task_event {
815 TaskEvent::Sent { task_name, .. }
816 | TaskEvent::Received { task_name, .. }
817 | TaskEvent::Started { task_name, .. }
818 | TaskEvent::Succeeded { task_name, .. }
819 | TaskEvent::Failed { task_name, .. }
820 | TaskEvent::Retried { task_name, .. } => task_name == name,
821 TaskEvent::Revoked { task_name, .. }
822 | TaskEvent::Rejected { task_name, .. } => {
823 matches!(task_name.as_ref(), Some(tn) if tn == name)
824 }
825 }
826 } else {
827 false
828 }
829 }
830 EventFilter::Hostname(hostname) => {
831 let event_hostname = match event {
832 Event::Task(task_event) => match task_event {
833 TaskEvent::Received { hostname, .. }
834 | TaskEvent::Started { hostname, .. }
835 | TaskEvent::Succeeded { hostname, .. }
836 | TaskEvent::Failed { hostname, .. }
837 | TaskEvent::Retried { hostname, .. }
838 | TaskEvent::Rejected { hostname, .. } => Some(hostname),
839 _ => None,
840 },
841 Event::Worker(worker_event) => match worker_event {
842 WorkerEvent::Online { hostname, .. }
843 | WorkerEvent::Offline { hostname, .. }
844 | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname),
845 },
846 };
847 matches!(event_hostname, Some(h) if h == hostname)
848 }
849 EventFilter::Custom(predicate) => predicate(event),
850 EventFilter::And(filters) => filters.iter().all(|f| f.matches(event)),
851 EventFilter::Or(filters) => filters.iter().any(|f| f.matches(event)),
852 }
853 }
854
855 pub fn task_names(names: Vec<String>) -> Self {
857 EventFilter::Or(names.into_iter().map(EventFilter::TaskName).collect())
858 }
859
860 pub fn hostnames(names: Vec<String>) -> Self {
862 EventFilter::Or(names.into_iter().map(EventFilter::Hostname).collect())
863 }
864}
865
866impl std::fmt::Debug for EventFilter {
867 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
868 match self {
869 EventFilter::All => write!(f, "EventFilter::All"),
870 EventFilter::TaskOnly => write!(f, "EventFilter::TaskOnly"),
871 EventFilter::WorkerOnly => write!(f, "EventFilter::WorkerOnly"),
872 EventFilter::EventTypes(types) => f
873 .debug_tuple("EventFilter::EventTypes")
874 .field(types)
875 .finish(),
876 EventFilter::TaskName(name) => {
877 f.debug_tuple("EventFilter::TaskName").field(name).finish()
878 }
879 EventFilter::Hostname(hostname) => f
880 .debug_tuple("EventFilter::Hostname")
881 .field(hostname)
882 .finish(),
883 EventFilter::Custom(_) => write!(f, "EventFilter::Custom(<closure>)"),
884 EventFilter::And(filters) => f.debug_tuple("EventFilter::And").field(filters).finish(),
885 EventFilter::Or(filters) => f.debug_tuple("EventFilter::Or").field(filters).finish(),
886 }
887 }
888}
889
890pub type EventHandler = Arc<
892 dyn Fn(Event) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<()>> + Send>>
893 + Send
894 + Sync,
895>;
896
897#[async_trait]
899pub trait EventReceiver: Send + Sync {
900 async fn receive(&mut self) -> crate::Result<Option<Event>>;
902
903 async fn receive_timeout(
905 &mut self,
906 timeout: std::time::Duration,
907 ) -> crate::Result<Option<Event>> {
908 tokio::time::timeout(timeout, self.receive())
909 .await
910 .map_err(|_| crate::CelersError::Broker("Receive timeout".to_string()))?
911 }
912
913 fn is_active(&self) -> bool {
915 true
916 }
917}
918
919#[derive(Clone)]
921pub struct EventDispatcher {
922 handlers: Arc<RwLock<Vec<(EventFilter, EventHandler)>>>,
923}
924
925impl EventDispatcher {
926 #[must_use]
928 pub fn new() -> Self {
929 Self {
930 handlers: Arc::new(RwLock::new(Vec::new())),
931 }
932 }
933
934 pub async fn register<F, Fut>(&self, filter: EventFilter, handler: F)
936 where
937 F: Fn(Event) -> Fut + Send + Sync + 'static,
938 Fut: std::future::Future<Output = crate::Result<()>> + Send + 'static,
939 {
940 let handler_arc = Arc::new(move |event: Event| {
941 Box::pin(handler(event))
942 as std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<()>> + Send>>
943 });
944
945 let mut handlers = self.handlers.write().await;
946 handlers.push((filter, handler_arc));
947 }
948
949 pub async fn dispatch(&self, event: Event) -> crate::Result<()> {
955 let handlers = self.handlers.read().await;
956
957 for (filter, handler) in handlers.iter() {
958 if filter.matches(&event) {
959 handler(event.clone()).await?;
960 }
961 }
962
963 Ok(())
964 }
965
966 pub async fn dispatch_batch(&self, events: Vec<Event>) -> crate::Result<()> {
972 for event in events {
973 self.dispatch(event).await?;
974 }
975 Ok(())
976 }
977
978 pub async fn handler_count(&self) -> usize {
980 self.handlers.read().await.len()
981 }
982
983 pub async fn clear(&self) {
985 self.handlers.write().await.clear();
986 }
987}
988
989impl Default for EventDispatcher {
990 fn default() -> Self {
991 Self::new()
992 }
993}
994
995impl std::fmt::Debug for EventDispatcher {
996 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
997 f.debug_struct("EventDispatcher")
998 .field("handlers", &"Arc<RwLock<Vec<...>>>")
999 .finish()
1000 }
1001}
1002
1003#[async_trait]
1005pub trait EventStorage: Send + Sync {
1006 async fn store(&self, event: &Event) -> crate::Result<()>;
1008
1009 async fn store_batch(&self, events: &[Event]) -> crate::Result<()> {
1011 for event in events {
1012 self.store(event).await?;
1013 }
1014 Ok(())
1015 }
1016
1017 async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>>;
1019
1020 async fn query_range(
1022 &self,
1023 start: DateTime<Utc>,
1024 end: DateTime<Utc>,
1025 limit: Option<usize>,
1026 ) -> crate::Result<Vec<Event>>;
1027
1028 async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize>;
1030}
1031
1032pub struct FileEventStorage {
1034 path: PathBuf,
1035 file_handle: Arc<RwLock<Option<tokio::fs::File>>>,
1036}
1037
1038impl FileEventStorage {
1039 pub fn new(path: impl Into<PathBuf>) -> Self {
1041 Self {
1042 path: path.into(),
1043 file_handle: Arc::new(RwLock::new(None)),
1044 }
1045 }
1046
1047 pub async fn init(&self) -> crate::Result<()> {
1053 let mut handle = self.file_handle.write().await;
1054 let file = tokio::fs::OpenOptions::new()
1055 .create(true)
1056 .append(true)
1057 .open(&self.path)
1058 .await
1059 .map_err(|e| crate::CelersError::Broker(format!("Failed to open event file: {e}")))?;
1060
1061 *handle = Some(file);
1062 Ok(())
1063 }
1064
1065 async fn read_all(&self) -> crate::Result<Vec<Event>> {
1067 use tokio::io::{AsyncBufReadExt, BufReader};
1068
1069 let file = tokio::fs::File::open(&self.path)
1070 .await
1071 .map_err(|e| crate::CelersError::Broker(format!("Failed to open event file: {e}")))?;
1072
1073 let reader = BufReader::new(file);
1074 let mut lines = reader.lines();
1075 let mut events = Vec::new();
1076
1077 while let Some(line) = lines
1078 .next_line()
1079 .await
1080 .map_err(|e| crate::CelersError::Broker(format!("Failed to read line: {e}")))?
1081 {
1082 if let Ok(event) = serde_json::from_str::<Event>(&line) {
1083 events.push(event);
1084 }
1085 }
1086
1087 Ok(events)
1088 }
1089}
1090
1091#[async_trait]
1092impl EventStorage for FileEventStorage {
1093 async fn store(&self, event: &Event) -> crate::Result<()> {
1094 use tokio::io::AsyncWriteExt;
1095
1096 let mut handle = self.file_handle.write().await;
1097 if handle.is_none() {
1098 drop(handle);
1099 self.init().await?;
1100 handle = self.file_handle.write().await;
1101 }
1102
1103 if let Some(file) = handle.as_mut() {
1104 let json = serde_json::to_string(event)
1105 .map_err(|e| crate::CelersError::Serialization(e.to_string()))?;
1106
1107 file.write_all(json.as_bytes())
1108 .await
1109 .map_err(|e| crate::CelersError::Broker(format!("Failed to write event: {e}")))?;
1110 file.write_all(b"\n")
1111 .await
1112 .map_err(|e| crate::CelersError::Broker(format!("Failed to write newline: {e}")))?;
1113 file.flush()
1114 .await
1115 .map_err(|e| crate::CelersError::Broker(format!("Failed to flush: {e}")))?;
1116 }
1117
1118 Ok(())
1119 }
1120
1121 async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>> {
1122 let all_events = self.read_all().await?;
1123 let mut filtered: Vec<Event> = all_events
1124 .into_iter()
1125 .filter(|e| filter.matches(e))
1126 .collect();
1127
1128 if let Some(limit) = limit {
1129 filtered.truncate(limit);
1130 }
1131
1132 Ok(filtered)
1133 }
1134
1135 async fn query_range(
1136 &self,
1137 start: DateTime<Utc>,
1138 end: DateTime<Utc>,
1139 limit: Option<usize>,
1140 ) -> crate::Result<Vec<Event>> {
1141 let all_events = self.read_all().await?;
1142 let mut filtered: Vec<Event> = all_events
1143 .into_iter()
1144 .filter(|e| {
1145 let timestamp = e.timestamp();
1146 timestamp >= start && timestamp <= end
1147 })
1148 .collect();
1149
1150 if let Some(limit) = limit {
1151 filtered.truncate(limit);
1152 }
1153
1154 Ok(filtered)
1155 }
1156
1157 async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize> {
1158 use tokio::io::AsyncWriteExt;
1159
1160 let all_events = self.read_all().await?;
1161 let (keep, remove): (Vec<_>, Vec<_>) = all_events
1162 .into_iter()
1163 .partition(|e| e.timestamp() >= before);
1164
1165 let removed_count = remove.len();
1166
1167 let temp_path = self.path.with_extension("tmp");
1169 let mut temp_file = tokio::fs::File::create(&temp_path)
1170 .await
1171 .map_err(|e| crate::CelersError::Broker(format!("Failed to create temp file: {e}")))?;
1172
1173 for event in keep {
1174 let json = serde_json::to_string(&event)
1175 .map_err(|e| crate::CelersError::Serialization(e.to_string()))?;
1176 temp_file
1177 .write_all(json.as_bytes())
1178 .await
1179 .map_err(|e| crate::CelersError::Broker(format!("Failed to write: {e}")))?;
1180 temp_file
1181 .write_all(b"\n")
1182 .await
1183 .map_err(|e| crate::CelersError::Broker(format!("Failed to write newline: {e}")))?;
1184 }
1185
1186 temp_file
1187 .flush()
1188 .await
1189 .map_err(|e| crate::CelersError::Broker(format!("Failed to flush: {e}")))?;
1190 drop(temp_file);
1191
1192 tokio::fs::rename(&temp_path, &self.path)
1194 .await
1195 .map_err(|e| crate::CelersError::Broker(format!("Failed to rename file: {e}")))?;
1196
1197 let mut handle = self.file_handle.write().await;
1199 *handle = None;
1200 drop(handle);
1201 self.init().await?;
1202
1203 Ok(removed_count)
1204 }
1205}
1206
1207#[derive(Clone)]
1209pub struct InMemoryEventStorage {
1210 events: Arc<RwLock<Vec<Event>>>,
1211 max_size: usize,
1212}
1213
1214impl InMemoryEventStorage {
1215 #[must_use]
1217 pub fn new(max_size: usize) -> Self {
1218 Self {
1219 events: Arc::new(RwLock::new(Vec::new())),
1220 max_size,
1221 }
1222 }
1223
1224 pub async fn len(&self) -> usize {
1226 self.events.read().await.len()
1227 }
1228
1229 pub async fn is_empty(&self) -> bool {
1231 self.events.read().await.is_empty()
1232 }
1233
1234 pub async fn clear(&self) {
1236 self.events.write().await.clear();
1237 }
1238}
1239
1240#[async_trait]
1241impl EventStorage for InMemoryEventStorage {
1242 async fn store(&self, event: &Event) -> crate::Result<()> {
1243 let mut events = self.events.write().await;
1244 events.push(event.clone());
1245
1246 if events.len() > self.max_size {
1248 let excess = events.len() - self.max_size;
1249 events.drain(0..excess);
1250 }
1251
1252 Ok(())
1253 }
1254
1255 async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>> {
1256 let events = self.events.read().await;
1257 let mut filtered: Vec<Event> = events
1258 .iter()
1259 .filter(|e| filter.matches(e))
1260 .cloned()
1261 .collect();
1262
1263 if let Some(limit) = limit {
1264 filtered.truncate(limit);
1265 }
1266
1267 Ok(filtered)
1268 }
1269
1270 async fn query_range(
1271 &self,
1272 start: DateTime<Utc>,
1273 end: DateTime<Utc>,
1274 limit: Option<usize>,
1275 ) -> crate::Result<Vec<Event>> {
1276 let events = self.events.read().await;
1277 let mut filtered: Vec<Event> = events
1278 .iter()
1279 .filter(|e| {
1280 let timestamp = e.timestamp();
1281 timestamp >= start && timestamp <= end
1282 })
1283 .cloned()
1284 .collect();
1285
1286 if let Some(limit) = limit {
1287 filtered.truncate(limit);
1288 }
1289
1290 Ok(filtered)
1291 }
1292
1293 async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize> {
1294 let mut events = self.events.write().await;
1295 let original_len = events.len();
1296 events.retain(|e| e.timestamp() >= before);
1297 let removed = original_len - events.len();
1298 Ok(removed)
1299 }
1300}
1301
1302pub struct EventStream {
1304 receiver: broadcast::Receiver<Event>,
1305 filter: EventFilter,
1306}
1307
1308impl EventStream {
1309 #[must_use]
1311 pub fn new(receiver: broadcast::Receiver<Event>, filter: EventFilter) -> Self {
1312 Self { receiver, filter }
1313 }
1314
1315 pub async fn recv(&mut self) -> Result<Event, broadcast::error::RecvError> {
1321 loop {
1322 let event = self.receiver.recv().await?;
1323 if self.filter.matches(&event) {
1324 return Ok(event);
1325 }
1326 }
1327 }
1328
1329 pub fn try_recv(&mut self) -> Result<Event, broadcast::error::TryRecvError> {
1335 loop {
1336 let event = self.receiver.try_recv()?;
1337 if self.filter.matches(&event) {
1338 return Ok(event);
1339 }
1340 }
1341 }
1342}
1343
1344#[derive(Clone)]
1349pub enum AlertCondition {
1350 EventType(String),
1352 TaskFailed,
1354 TaskRetryExceeded(u32),
1356 WorkerOffline,
1358 RateExceeds { threshold: f64, window_secs: u64 },
1360 Custom(Arc<dyn Fn(&Event) -> bool + Send + Sync>),
1362}
1363
1364impl std::fmt::Debug for AlertCondition {
1365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366 match self {
1367 AlertCondition::EventType(event_type) => {
1368 f.debug_tuple("EventType").field(event_type).finish()
1369 }
1370 AlertCondition::TaskFailed => write!(f, "TaskFailed"),
1371 AlertCondition::TaskRetryExceeded(max) => {
1372 f.debug_tuple("TaskRetryExceeded").field(max).finish()
1373 }
1374 AlertCondition::WorkerOffline => write!(f, "WorkerOffline"),
1375 AlertCondition::RateExceeds {
1376 threshold,
1377 window_secs,
1378 } => f
1379 .debug_struct("RateExceeds")
1380 .field("threshold", threshold)
1381 .field("window_secs", window_secs)
1382 .finish(),
1383 AlertCondition::Custom(_) => write!(f, "Custom(<closure>)"),
1384 }
1385 }
1386}
1387
1388impl AlertCondition {
1389 #[must_use]
1391 pub fn check(&self, event: &Event, context: &AlertContext) -> bool {
1392 match self {
1393 AlertCondition::EventType(event_type) => event.event_type() == event_type,
1394 AlertCondition::TaskFailed => matches!(event, Event::Task(TaskEvent::Failed { .. })),
1395 AlertCondition::TaskRetryExceeded(max) => {
1396 if let Event::Task(TaskEvent::Retried { retries, .. }) = event {
1397 retries >= max
1398 } else {
1399 false
1400 }
1401 }
1402 AlertCondition::WorkerOffline => {
1403 matches!(event, Event::Worker(WorkerEvent::Offline { .. }))
1404 }
1405 AlertCondition::RateExceeds {
1406 threshold,
1407 window_secs,
1408 } => {
1409 let rate = context.get_event_rate(*window_secs);
1410 rate > *threshold
1411 }
1412 AlertCondition::Custom(predicate) => predicate(event),
1413 }
1414 }
1415}
1416
1417#[derive(Debug, Clone, Default)]
1419pub struct AlertContext {
1420 recent_events: Arc<RwLock<Vec<DateTime<Utc>>>>,
1422}
1423
1424impl AlertContext {
1425 #[must_use]
1427 pub fn new() -> Self {
1428 Self::default()
1429 }
1430
1431 pub async fn record_event(&self, timestamp: DateTime<Utc>) {
1433 let mut events = self.recent_events.write().await;
1434 events.push(timestamp);
1435
1436 if events.len() > 1000 {
1438 let excess = events.len() - 1000;
1439 events.drain(0..excess);
1440 }
1441 }
1442
1443 #[allow(clippy::unused_self)]
1445 fn get_event_rate(&self, _window_secs: u64) -> f64 {
1446 0.0 }
1450
1451 #[allow(clippy::cast_possible_wrap, clippy::cast_precision_loss)]
1452 pub async fn get_event_rate_async(&self, window_secs: u64) -> f64 {
1454 let events = self.recent_events.read().await;
1455 let now = Utc::now();
1456 let cutoff = now - chrono::Duration::seconds(window_secs as i64);
1457
1458 let count = events.iter().filter(|&&ts| ts >= cutoff).count();
1459 count as f64 / window_secs as f64
1460 }
1461}
1462
1463#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1465pub enum AlertSeverity {
1466 Info,
1468 Warning,
1470 Error,
1472 Critical,
1474}
1475
1476#[derive(Debug, Clone)]
1478pub struct Alert {
1479 pub severity: AlertSeverity,
1481 pub title: String,
1483 pub message: String,
1485 pub event: Event,
1487 pub timestamp: DateTime<Utc>,
1489 pub metadata: HashMap<String, String>,
1491}
1492
1493impl Alert {
1494 pub fn new(
1496 severity: AlertSeverity,
1497 title: impl Into<String>,
1498 message: impl Into<String>,
1499 event: Event,
1500 ) -> Self {
1501 Self {
1502 severity,
1503 title: title.into(),
1504 message: message.into(),
1505 event,
1506 timestamp: Utc::now(),
1507 metadata: HashMap::new(),
1508 }
1509 }
1510
1511 #[must_use]
1513 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1514 self.metadata.insert(key.into(), value.into());
1515 self
1516 }
1517}
1518
1519#[async_trait]
1521pub trait AlertHandler: Send + Sync {
1522 async fn handle(&self, alert: &Alert) -> crate::Result<()>;
1524}
1525
1526#[derive(Debug, Clone, Default)]
1528pub struct LoggingAlertHandler;
1529
1530impl LoggingAlertHandler {
1531 #[must_use]
1533 pub fn new() -> Self {
1534 Self
1535 }
1536}
1537
1538#[async_trait]
1539impl AlertHandler for LoggingAlertHandler {
1540 async fn handle(&self, alert: &Alert) -> crate::Result<()> {
1541 let task_id = alert.event.task_id();
1542 let hostname = alert.event.hostname();
1543
1544 match alert.severity {
1545 AlertSeverity::Info => {
1546 tracing::info!(
1547 severity = "info",
1548 title = %alert.title,
1549 message = %alert.message,
1550 event_type = alert.event.event_type(),
1551 task_id = ?task_id,
1552 hostname = ?hostname,
1553 "Alert triggered"
1554 );
1555 }
1556 AlertSeverity::Warning => {
1557 tracing::warn!(
1558 severity = "warning",
1559 title = %alert.title,
1560 message = %alert.message,
1561 event_type = alert.event.event_type(),
1562 task_id = ?task_id,
1563 hostname = ?hostname,
1564 "Alert triggered"
1565 );
1566 }
1567 AlertSeverity::Error => {
1568 tracing::error!(
1569 severity = "error",
1570 title = %alert.title,
1571 message = %alert.message,
1572 event_type = alert.event.event_type(),
1573 task_id = ?task_id,
1574 hostname = ?hostname,
1575 "Alert triggered"
1576 );
1577 }
1578 AlertSeverity::Critical => {
1579 tracing::error!(
1580 severity = "critical",
1581 title = %alert.title,
1582 message = %alert.message,
1583 event_type = alert.event.event_type(),
1584 task_id = ?task_id,
1585 hostname = ?hostname,
1586 "CRITICAL ALERT"
1587 );
1588 }
1589 }
1590 Ok(())
1591 }
1592}
1593
1594type AlertHandlerEntry = (AlertCondition, AlertSeverity, String, Arc<dyn AlertHandler>);
1596
1597pub struct AlertManager {
1599 handlers: Arc<RwLock<Vec<AlertHandlerEntry>>>,
1600 context: AlertContext,
1601}
1602
1603impl AlertManager {
1604 #[must_use]
1606 pub fn new() -> Self {
1607 Self {
1608 handlers: Arc::new(RwLock::new(Vec::new())),
1609 context: AlertContext::new(),
1610 }
1611 }
1612
1613 pub async fn register<H: AlertHandler + 'static>(
1615 &self,
1616 condition: AlertCondition,
1617 severity: AlertSeverity,
1618 title: impl Into<String>,
1619 handler: H,
1620 ) {
1621 let mut handlers = self.handlers.write().await;
1622 handlers.push((condition, severity, title.into(), Arc::new(handler)));
1623 }
1624
1625 pub async fn process_event(&self, event: Event) -> crate::Result<()> {
1631 self.context.record_event(event.timestamp()).await;
1633
1634 let handlers = self.handlers.read().await;
1635
1636 for (condition, severity, title, handler) in handlers.iter() {
1637 if condition.check(&event, &self.context) {
1638 let message = format!("Event {} triggered alert condition", event.event_type());
1639 let alert = Alert::new(*severity, title.clone(), message, event.clone());
1640 handler.handle(&alert).await?;
1641 }
1642 }
1643
1644 Ok(())
1645 }
1646
1647 pub async fn handler_count(&self) -> usize {
1649 self.handlers.read().await.len()
1650 }
1651
1652 pub async fn clear(&self) {
1654 self.handlers.write().await.clear();
1655 }
1656}
1657
1658impl Default for AlertManager {
1659 fn default() -> Self {
1660 Self::new()
1661 }
1662}
1663
1664#[derive(Debug, Clone, Default)]
1666pub struct EventMonitor {
1667 stats: Arc<RwLock<EventStats>>,
1668}
1669
1670#[derive(Debug, Clone, Default)]
1671pub struct EventStats {
1672 pub total_events: u64,
1673 pub task_events: u64,
1674 pub worker_events: u64,
1675 pub events_by_type: HashMap<String, u64>,
1676 pub events_by_hostname: HashMap<String, u64>,
1677 pub last_event_time: Option<DateTime<Utc>>,
1678}
1679
1680impl EventMonitor {
1681 #[must_use]
1683 pub fn new() -> Self {
1684 Self::default()
1685 }
1686
1687 pub async fn record(&self, event: &Event) {
1689 let mut stats = self.stats.write().await;
1690
1691 stats.total_events += 1;
1692 stats.last_event_time = Some(event.timestamp());
1693
1694 match event {
1695 Event::Task(_) => stats.task_events += 1,
1696 Event::Worker(_) => stats.worker_events += 1,
1697 }
1698
1699 let event_type = event.event_type().to_string();
1700 *stats.events_by_type.entry(event_type).or_insert(0) += 1;
1701
1702 if let Some(hostname) = match event {
1703 Event::Task(task_event) => match task_event {
1704 TaskEvent::Received { hostname, .. }
1705 | TaskEvent::Started { hostname, .. }
1706 | TaskEvent::Succeeded { hostname, .. }
1707 | TaskEvent::Failed { hostname, .. }
1708 | TaskEvent::Retried { hostname, .. }
1709 | TaskEvent::Rejected { hostname, .. } => Some(hostname.clone()),
1710 _ => None,
1711 },
1712 Event::Worker(worker_event) => match worker_event {
1713 WorkerEvent::Online { hostname, .. }
1714 | WorkerEvent::Offline { hostname, .. }
1715 | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname.clone()),
1716 },
1717 } {
1718 *stats.events_by_hostname.entry(hostname).or_insert(0) += 1;
1719 }
1720 }
1721
1722 pub async fn get_stats(&self) -> EventStats {
1724 self.stats.read().await.clone()
1725 }
1726
1727 pub async fn reset(&self) {
1729 let mut stats = self.stats.write().await;
1730 *stats = EventStats::default();
1731 }
1732}
1733
1734#[cfg(test)]
1735mod tests {
1736 use super::*;
1737
1738 #[test]
1739 fn test_task_event_serialization() {
1740 let event = Event::Task(TaskEvent::Started {
1741 task_id: Uuid::nil(),
1742 task_name: "test_task".to_string(),
1743 hostname: "worker-1".to_string(),
1744 timestamp: DateTime::parse_from_rfc3339("2026-01-01T12:00:00Z")
1745 .unwrap()
1746 .with_timezone(&Utc),
1747 pid: 1234,
1748 });
1749
1750 let json = serde_json::to_string(&event).unwrap();
1751 assert!(json.contains("task-started"));
1752 assert!(json.contains("test_task"));
1753 assert!(json.contains("worker-1"));
1754
1755 let parsed: Event = serde_json::from_str(&json).unwrap();
1757 assert_eq!(event, parsed);
1758 }
1759
1760 #[test]
1761 fn test_worker_event_serialization() {
1762 let event = Event::Worker(WorkerEvent::Heartbeat {
1763 hostname: "worker-1".to_string(),
1764 timestamp: Utc::now(),
1765 active: 5,
1766 processed: 100,
1767 loadavg: Some([1.0, 0.8, 0.5]),
1768 freq: 2.0,
1769 });
1770
1771 let json = serde_json::to_string(&event).unwrap();
1772 assert!(json.contains("worker-heartbeat"));
1773 assert!(json.contains("worker-1"));
1774
1775 let parsed: Event = serde_json::from_str(&json).unwrap();
1776 assert_eq!(event.event_type(), parsed.event_type());
1777 }
1778
1779 #[test]
1780 fn test_event_type() {
1781 let task_event = Event::Task(TaskEvent::Sent {
1782 task_id: Uuid::new_v4(),
1783 task_name: "test".to_string(),
1784 queue: "celery".to_string(),
1785 timestamp: Utc::now(),
1786 args: None,
1787 kwargs: None,
1788 eta: None,
1789 expires: None,
1790 retries: None,
1791 });
1792
1793 assert_eq!(task_event.event_type(), "task-sent");
1794 assert!(task_event.is_task_event());
1795 assert!(!task_event.is_worker_event());
1796 }
1797
1798 #[test]
1799 fn test_task_event_builder() {
1800 let task_id = Uuid::new_v4();
1801 let event = TaskEventBuilder::new(task_id, "my_task")
1802 .hostname("worker-1")
1803 .pid(1234)
1804 .started();
1805
1806 assert_eq!(event.event_type(), "task-started");
1807 assert_eq!(event.task_id(), Some(task_id));
1808 assert_eq!(event.hostname(), Some("worker-1"));
1809 }
1810
1811 #[test]
1812 fn test_worker_event_builder() {
1813 let event = WorkerEventBuilder::new("worker-1").online();
1814
1815 assert_eq!(event.event_type(), "worker-online");
1816 assert!(event.is_worker_event());
1817 assert_eq!(event.hostname(), Some("worker-1"));
1818 }
1819
1820 #[test]
1821 fn test_task_id_extraction() {
1822 let task_id = Uuid::new_v4();
1823 let event = TaskEventBuilder::new(task_id, "test").sent("celery");
1824 assert_eq!(event.task_id(), Some(task_id));
1825
1826 let worker_event = WorkerEventBuilder::new("worker-1").online();
1827 assert_eq!(worker_event.task_id(), None);
1828 }
1829
1830 #[tokio::test]
1831 async fn test_noop_event_emitter() {
1832 let emitter = NoOpEventEmitter::new();
1833 let event = WorkerEventBuilder::new("worker-1").online();
1834
1835 emitter.emit(event).await.unwrap();
1837
1838 assert!(!emitter.is_enabled());
1840 }
1841
1842 #[tokio::test]
1843 async fn test_in_memory_event_emitter() {
1844 let emitter = InMemoryEventEmitter::new(10);
1845 let mut receiver = emitter.subscribe();
1846
1847 let task_id = Uuid::new_v4();
1848 let event = TaskEventBuilder::new(task_id, "test_task")
1849 .hostname("worker-1")
1850 .started();
1851
1852 emitter.emit(event.clone()).await.unwrap();
1854
1855 let received = receiver.recv().await.unwrap();
1857 assert_eq!(received.event_type(), "task-started");
1858 assert_eq!(received.task_id(), Some(task_id));
1859 }
1860
1861 #[tokio::test]
1862 async fn test_in_memory_emitter_multiple_subscribers() {
1863 let emitter = InMemoryEventEmitter::new(10);
1864 let mut receiver1 = emitter.subscribe();
1865 let mut receiver2 = emitter.subscribe();
1866
1867 assert_eq!(emitter.subscriber_count(), 2);
1868
1869 let event = WorkerEventBuilder::new("worker-1").heartbeat(5, 100, [1.0, 0.8, 0.5], 2.0);
1870 emitter.emit(event).await.unwrap();
1871
1872 let r1 = receiver1.recv().await.unwrap();
1874 let r2 = receiver2.recv().await.unwrap();
1875
1876 assert_eq!(r1.event_type(), "worker-heartbeat");
1877 assert_eq!(r2.event_type(), "worker-heartbeat");
1878 }
1879
1880 #[tokio::test]
1881 async fn test_logging_event_emitter() {
1882 let emitter = LoggingEventEmitter::new();
1883 let event = TaskEventBuilder::new(Uuid::new_v4(), "test")
1884 .hostname("worker-1")
1885 .succeeded(1.5);
1886
1887 emitter.emit(event).await.unwrap();
1889
1890 let emitter_info = LoggingEventEmitter::with_level(LogLevel::Info);
1892 let event = WorkerEventBuilder::new("worker-1").offline();
1893 emitter_info.emit(event).await.unwrap();
1894 }
1895
1896 #[tokio::test]
1897 async fn test_composite_event_emitter() {
1898 let in_memory = InMemoryEventEmitter::new(10);
1899 let mut receiver = in_memory.subscribe();
1900
1901 let composite = CompositeEventEmitter::new()
1902 .with_emitter(in_memory.clone())
1903 .with_emitter(NoOpEventEmitter::new());
1904
1905 let event = TaskEventBuilder::new(Uuid::new_v4(), "test")
1906 .hostname("worker-1")
1907 .started();
1908
1909 composite.emit(event.clone()).await.unwrap();
1911
1912 let received = receiver.recv().await.unwrap();
1914 assert_eq!(received.event_type(), "task-started");
1915 }
1916
1917 #[tokio::test]
1918 async fn test_emit_batch() {
1919 let emitter = InMemoryEventEmitter::new(10);
1920 let mut receiver = emitter.subscribe();
1921
1922 let events = vec![
1923 WorkerEventBuilder::new("worker-1").online(),
1924 TaskEventBuilder::new(Uuid::new_v4(), "task1")
1925 .hostname("worker-1")
1926 .started(),
1927 TaskEventBuilder::new(Uuid::new_v4(), "task2")
1928 .hostname("worker-1")
1929 .started(),
1930 ];
1931
1932 emitter.emit_batch(events).await.unwrap();
1933
1934 let e1 = receiver.recv().await.unwrap();
1936 let e2 = receiver.recv().await.unwrap();
1937 let e3 = receiver.recv().await.unwrap();
1938
1939 assert_eq!(e1.event_type(), "worker-online");
1940 assert_eq!(e2.event_type(), "task-started");
1941 assert_eq!(e3.event_type(), "task-started");
1942 }
1943}