celers_core/
event.rs

1//! Real-time event types for task and worker lifecycle
2//!
3//! This module provides Celery-compatible event types for monitoring task execution
4//! and worker status. Events can be published to various transports (Redis pub/sub,
5//! AMQP fanout, etc.) for real-time monitoring.
6//!
7//! # Event Types
8//!
9//! ## Task Events
10//! - `TaskSent` - Task was sent to the queue
11//! - `TaskReceived` - Task was received by a worker
12//! - `TaskStarted` - Task execution started
13//! - `TaskSucceeded` - Task completed successfully
14//! - `TaskFailed` - Task execution failed
15//! - `TaskRetried` - Task is being retried
16//! - `TaskRevoked` - Task was revoked/cancelled
17//! - `TaskRejected` - Task was rejected by worker
18//!
19//! ## Worker Events
20//! - `WorkerOnline` - Worker came online
21//! - `WorkerOffline` - Worker going offline
22//! - `WorkerHeartbeat` - Periodic worker heartbeat
23//!
24//! # Example
25//!
26//! ```rust
27//! use celers_core::event::{Event, TaskEvent, WorkerEvent};
28//! use uuid::Uuid;
29//! use chrono::Utc;
30//!
31//! // Create a task started event
32//! let event = Event::Task(TaskEvent::Started {
33//!     task_id: Uuid::new_v4(),
34//!     task_name: "my_task".to_string(),
35//!     hostname: "worker-1".to_string(),
36//!     timestamp: Utc::now(),
37//!     pid: std::process::id(),
38//! });
39//!
40//! // Serialize for transport
41//! let json = serde_json::to_string(&event).unwrap();
42//! ```
43
44use chrono::{DateTime, Utc};
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48/// Task lifecycle events (Celery-compatible)
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50#[serde(tag = "type", rename_all = "kebab-case")]
51pub enum TaskEvent {
52    /// Task was sent to the queue
53    #[serde(rename = "task-sent")]
54    Sent {
55        /// Unique task ID
56        task_id: Uuid,
57        /// Task name (function name)
58        task_name: String,
59        /// Queue name where task was sent
60        queue: String,
61        /// Event timestamp
62        timestamp: DateTime<Utc>,
63        /// Task arguments (serialized)
64        #[serde(skip_serializing_if = "Option::is_none")]
65        args: Option<String>,
66        /// Task keyword arguments (serialized)
67        #[serde(skip_serializing_if = "Option::is_none")]
68        kwargs: Option<String>,
69        /// Task ETA if scheduled
70        #[serde(skip_serializing_if = "Option::is_none")]
71        eta: Option<DateTime<Utc>>,
72        /// Task expiration time
73        #[serde(skip_serializing_if = "Option::is_none")]
74        expires: Option<DateTime<Utc>>,
75        /// Number of retries configured
76        #[serde(skip_serializing_if = "Option::is_none")]
77        retries: Option<u32>,
78    },
79
80    /// Task was received by a worker
81    #[serde(rename = "task-received")]
82    Received {
83        /// Unique task ID
84        task_id: Uuid,
85        /// Task name
86        task_name: String,
87        /// Worker hostname
88        hostname: String,
89        /// Event timestamp
90        timestamp: DateTime<Utc>,
91        /// Worker process ID
92        pid: u32,
93    },
94
95    /// Task execution started
96    #[serde(rename = "task-started")]
97    Started {
98        /// Unique task ID
99        task_id: Uuid,
100        /// Task name
101        task_name: String,
102        /// Worker hostname
103        hostname: String,
104        /// Event timestamp
105        timestamp: DateTime<Utc>,
106        /// Worker process ID
107        pid: u32,
108    },
109
110    /// Task completed successfully
111    #[serde(rename = "task-succeeded")]
112    Succeeded {
113        /// Unique task ID
114        task_id: Uuid,
115        /// Task name
116        task_name: String,
117        /// Worker hostname
118        hostname: String,
119        /// Event timestamp
120        timestamp: DateTime<Utc>,
121        /// Execution runtime in seconds
122        runtime: f64,
123        /// Result value (serialized)
124        #[serde(skip_serializing_if = "Option::is_none")]
125        result: Option<String>,
126    },
127
128    /// Task execution failed
129    #[serde(rename = "task-failed")]
130    Failed {
131        /// Unique task ID
132        task_id: Uuid,
133        /// Task name
134        task_name: String,
135        /// Worker hostname
136        hostname: String,
137        /// Event timestamp
138        timestamp: DateTime<Utc>,
139        /// Exception type name
140        exception: String,
141        /// Exception message
142        #[serde(skip_serializing_if = "Option::is_none")]
143        traceback: Option<String>,
144    },
145
146    /// Task is being retried
147    #[serde(rename = "task-retried")]
148    Retried {
149        /// Unique task ID
150        task_id: Uuid,
151        /// Task name
152        task_name: String,
153        /// Worker hostname
154        hostname: String,
155        /// Event timestamp
156        timestamp: DateTime<Utc>,
157        /// Exception that caused retry
158        exception: String,
159        /// Current retry attempt number
160        retries: u32,
161    },
162
163    /// Task was revoked/cancelled
164    #[serde(rename = "task-revoked")]
165    Revoked {
166        /// Unique task ID
167        task_id: Uuid,
168        /// Task name
169        #[serde(skip_serializing_if = "Option::is_none")]
170        task_name: Option<String>,
171        /// Event timestamp
172        timestamp: DateTime<Utc>,
173        /// Whether to terminate running task
174        terminated: bool,
175        /// Signal used for termination
176        #[serde(skip_serializing_if = "Option::is_none")]
177        signum: Option<i32>,
178        /// Whether task should be expired
179        expired: bool,
180    },
181
182    /// Task was rejected by worker
183    #[serde(rename = "task-rejected")]
184    Rejected {
185        /// Unique task ID
186        task_id: Uuid,
187        /// Task name
188        #[serde(skip_serializing_if = "Option::is_none")]
189        task_name: Option<String>,
190        /// Worker hostname
191        hostname: String,
192        /// Event timestamp
193        timestamp: DateTime<Utc>,
194        /// Rejection reason
195        reason: String,
196    },
197}
198
199/// Worker lifecycle events
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201#[serde(tag = "type", rename_all = "kebab-case")]
202pub enum WorkerEvent {
203    /// Worker came online
204    #[serde(rename = "worker-online")]
205    Online {
206        /// Worker hostname
207        hostname: String,
208        /// Event timestamp
209        timestamp: DateTime<Utc>,
210        /// Software information
211        sw_ident: String,
212        /// Software version
213        sw_ver: String,
214        /// Software system (OS)
215        sw_sys: String,
216    },
217
218    /// Worker going offline
219    #[serde(rename = "worker-offline")]
220    Offline {
221        /// Worker hostname
222        hostname: String,
223        /// Event timestamp
224        timestamp: DateTime<Utc>,
225    },
226
227    /// Periodic worker heartbeat
228    #[serde(rename = "worker-heartbeat")]
229    Heartbeat {
230        /// Worker hostname
231        hostname: String,
232        /// Event timestamp
233        timestamp: DateTime<Utc>,
234        /// Current task count in progress
235        active: u32,
236        /// Tasks processed since last heartbeat
237        processed: u64,
238        /// Current system load average
239        #[serde(skip_serializing_if = "Option::is_none")]
240        loadavg: Option<[f64; 3]>,
241        /// Heartbeat frequency in seconds
242        freq: f64,
243    },
244}
245
246/// Combined event type for all `CeleRS` events
247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
248#[serde(untagged)]
249pub enum Event {
250    /// Task lifecycle event
251    Task(TaskEvent),
252    /// Worker lifecycle event
253    Worker(WorkerEvent),
254}
255
256impl Event {
257    /// Get the event type as a string
258    #[inline]
259    #[must_use]
260    pub fn event_type(&self) -> &'static str {
261        match self {
262            Event::Task(TaskEvent::Sent { .. }) => "task-sent",
263            Event::Task(TaskEvent::Received { .. }) => "task-received",
264            Event::Task(TaskEvent::Started { .. }) => "task-started",
265            Event::Task(TaskEvent::Succeeded { .. }) => "task-succeeded",
266            Event::Task(TaskEvent::Failed { .. }) => "task-failed",
267            Event::Task(TaskEvent::Retried { .. }) => "task-retried",
268            Event::Task(TaskEvent::Revoked { .. }) => "task-revoked",
269            Event::Task(TaskEvent::Rejected { .. }) => "task-rejected",
270            Event::Worker(WorkerEvent::Online { .. }) => "worker-online",
271            Event::Worker(WorkerEvent::Offline { .. }) => "worker-offline",
272            Event::Worker(WorkerEvent::Heartbeat { .. }) => "worker-heartbeat",
273        }
274    }
275
276    /// Get the timestamp of the event
277    #[inline]
278    #[must_use]
279    pub fn timestamp(&self) -> DateTime<Utc> {
280        match self {
281            Event::Task(e) => match e {
282                TaskEvent::Sent { timestamp, .. }
283                | TaskEvent::Received { timestamp, .. }
284                | TaskEvent::Started { timestamp, .. }
285                | TaskEvent::Succeeded { timestamp, .. }
286                | TaskEvent::Failed { timestamp, .. }
287                | TaskEvent::Retried { timestamp, .. }
288                | TaskEvent::Revoked { timestamp, .. }
289                | TaskEvent::Rejected { timestamp, .. } => *timestamp,
290            },
291            Event::Worker(e) => match e {
292                WorkerEvent::Online { timestamp, .. }
293                | WorkerEvent::Offline { timestamp, .. }
294                | WorkerEvent::Heartbeat { timestamp, .. } => *timestamp,
295            },
296        }
297    }
298
299    /// Get the task ID if this is a task event
300    #[inline]
301    #[must_use]
302    pub fn task_id(&self) -> Option<Uuid> {
303        match self {
304            Event::Task(e) => Some(match e {
305                TaskEvent::Sent { task_id, .. }
306                | TaskEvent::Received { task_id, .. }
307                | TaskEvent::Started { task_id, .. }
308                | TaskEvent::Succeeded { task_id, .. }
309                | TaskEvent::Failed { task_id, .. }
310                | TaskEvent::Retried { task_id, .. }
311                | TaskEvent::Revoked { task_id, .. }
312                | TaskEvent::Rejected { task_id, .. } => *task_id,
313            }),
314            Event::Worker(_) => None,
315        }
316    }
317
318    /// Get the hostname if available
319    #[inline]
320    #[must_use]
321    pub fn hostname(&self) -> Option<&str> {
322        match self {
323            Event::Task(e) => match e {
324                TaskEvent::Received { hostname, .. }
325                | TaskEvent::Started { hostname, .. }
326                | TaskEvent::Succeeded { hostname, .. }
327                | TaskEvent::Failed { hostname, .. }
328                | TaskEvent::Retried { hostname, .. }
329                | TaskEvent::Rejected { hostname, .. } => Some(hostname),
330                TaskEvent::Sent { .. } | TaskEvent::Revoked { .. } => None,
331            },
332            Event::Worker(e) => match e {
333                WorkerEvent::Online { hostname, .. }
334                | WorkerEvent::Offline { hostname, .. }
335                | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname),
336            },
337        }
338    }
339
340    /// Check if this is a task event
341    #[inline]
342    #[must_use]
343    pub const fn is_task_event(&self) -> bool {
344        matches!(self, Event::Task(_))
345    }
346
347    /// Check if this is a worker event
348    #[inline]
349    #[must_use]
350    pub const fn is_worker_event(&self) -> bool {
351        matches!(self, Event::Worker(_))
352    }
353}
354
355/// Builder for creating task events with common fields
356#[derive(Debug, Clone)]
357pub struct TaskEventBuilder {
358    task_id: Uuid,
359    task_name: String,
360    hostname: Option<String>,
361    pid: Option<u32>,
362}
363
364impl TaskEventBuilder {
365    /// Create a new task event builder
366    pub fn new(task_id: Uuid, task_name: impl Into<String>) -> Self {
367        Self {
368            task_id,
369            task_name: task_name.into(),
370            hostname: None,
371            pid: None,
372        }
373    }
374
375    /// Set the worker hostname
376    #[must_use]
377    pub fn hostname(mut self, hostname: impl Into<String>) -> Self {
378        self.hostname = Some(hostname.into());
379        self
380    }
381
382    /// Set the worker process ID
383    #[must_use]
384    pub fn pid(mut self, pid: u32) -> Self {
385        self.pid = Some(pid);
386        self
387    }
388
389    /// Build a task-sent event
390    pub fn sent(self, queue: impl Into<String>) -> Event {
391        Event::Task(TaskEvent::Sent {
392            task_id: self.task_id,
393            task_name: self.task_name,
394            queue: queue.into(),
395            timestamp: Utc::now(),
396            args: None,
397            kwargs: None,
398            eta: None,
399            expires: None,
400            retries: None,
401        })
402    }
403
404    /// Build a task-received event
405    #[must_use]
406    pub fn received(self) -> Event {
407        Event::Task(TaskEvent::Received {
408            task_id: self.task_id,
409            task_name: self.task_name,
410            hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
411            timestamp: Utc::now(),
412            pid: self.pid.unwrap_or(0),
413        })
414    }
415
416    /// Build a task-started event
417    #[must_use]
418    pub fn started(self) -> Event {
419        Event::Task(TaskEvent::Started {
420            task_id: self.task_id,
421            task_name: self.task_name,
422            hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
423            timestamp: Utc::now(),
424            pid: self.pid.unwrap_or(0),
425        })
426    }
427
428    /// Build a task-succeeded event
429    #[must_use]
430    pub fn succeeded(self, runtime: f64) -> Event {
431        Event::Task(TaskEvent::Succeeded {
432            task_id: self.task_id,
433            task_name: self.task_name,
434            hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
435            timestamp: Utc::now(),
436            runtime,
437            result: None,
438        })
439    }
440
441    /// Build a task-failed event
442    pub fn failed(self, exception: impl Into<String>) -> Event {
443        Event::Task(TaskEvent::Failed {
444            task_id: self.task_id,
445            task_name: self.task_name,
446            hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
447            timestamp: Utc::now(),
448            exception: exception.into(),
449            traceback: None,
450        })
451    }
452
453    /// Build a task-retried event
454    pub fn retried(self, exception: impl Into<String>, retries: u32) -> Event {
455        Event::Task(TaskEvent::Retried {
456            task_id: self.task_id,
457            task_name: self.task_name,
458            hostname: self.hostname.unwrap_or_else(|| "unknown".to_string()),
459            timestamp: Utc::now(),
460            exception: exception.into(),
461            retries,
462        })
463    }
464}
465
466/// Builder for creating worker events
467#[derive(Debug, Clone)]
468pub struct WorkerEventBuilder {
469    hostname: String,
470}
471
472impl WorkerEventBuilder {
473    /// Create a new worker event builder
474    pub fn new(hostname: impl Into<String>) -> Self {
475        Self {
476            hostname: hostname.into(),
477        }
478    }
479
480    /// Build a worker-online event
481    #[must_use]
482    pub fn online(self) -> Event {
483        Event::Worker(WorkerEvent::Online {
484            hostname: self.hostname,
485            timestamp: Utc::now(),
486            sw_ident: "celers".to_string(),
487            sw_ver: env!("CARGO_PKG_VERSION").to_string(),
488            sw_sys: std::env::consts::OS.to_string(),
489        })
490    }
491
492    /// Build a worker-offline event
493    #[must_use]
494    pub fn offline(self) -> Event {
495        Event::Worker(WorkerEvent::Offline {
496            hostname: self.hostname,
497            timestamp: Utc::now(),
498        })
499    }
500
501    /// Build a worker-heartbeat event
502    #[must_use]
503    pub fn heartbeat(self, active: u32, processed: u64, loadavg: [f64; 3], freq: f64) -> Event {
504        // Only include loadavg if non-zero (indicating it was actually measured)
505        let loadavg_opt = if loadavg == [0.0, 0.0, 0.0] {
506            None
507        } else {
508            Some(loadavg)
509        };
510
511        Event::Worker(WorkerEvent::Heartbeat {
512            hostname: self.hostname,
513            timestamp: Utc::now(),
514            active,
515            processed,
516            loadavg: loadavg_opt,
517            freq,
518        })
519    }
520}
521
522// ============================================================================
523// Event Emitter Trait and Implementations
524// ============================================================================
525
526use async_trait::async_trait;
527use std::sync::Arc;
528use tokio::sync::broadcast;
529
530/// Trait for emitting events to various transports
531///
532/// Implementations can send events to Redis pub/sub, AMQP fanout exchanges,
533/// in-memory channels, or other event sinks.
534///
535/// # Example
536///
537/// ```rust
538/// use celers_core::event::{Event, EventEmitter, InMemoryEventEmitter};
539/// use celers_core::event::WorkerEventBuilder;
540///
541/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
542/// let emitter = InMemoryEventEmitter::new(100);
543/// let mut receiver = emitter.subscribe();
544///
545/// // Emit an event
546/// let event = WorkerEventBuilder::new("worker-1").online();
547/// emitter.emit(event.clone()).await?;
548///
549/// // Receive it
550/// let received = receiver.recv().await?;
551/// assert_eq!(received.event_type(), "worker-online");
552/// # Ok(())
553/// # }
554/// ```
555#[async_trait]
556pub trait EventEmitter: Send + Sync {
557    /// Emit an event to the transport
558    async fn emit(&self, event: Event) -> crate::Result<()>;
559
560    /// Emit multiple events
561    async fn emit_batch(&self, events: Vec<Event>) -> crate::Result<()> {
562        for event in events {
563            self.emit(event).await?;
564        }
565        Ok(())
566    }
567
568    /// Check if the emitter is enabled/active
569    fn is_enabled(&self) -> bool {
570        true
571    }
572}
573
574/// No-op event emitter that discards all events
575///
576/// Useful for testing or when event emission is not needed.
577#[derive(Debug, Clone, Default)]
578pub struct NoOpEventEmitter;
579
580impl NoOpEventEmitter {
581    /// Create a new no-op event emitter
582    #[must_use]
583    pub fn new() -> Self {
584        Self
585    }
586}
587
588#[async_trait]
589impl EventEmitter for NoOpEventEmitter {
590    async fn emit(&self, _event: Event) -> crate::Result<()> {
591        Ok(())
592    }
593
594    fn is_enabled(&self) -> bool {
595        false
596    }
597}
598
599/// In-memory event emitter using broadcast channels
600///
601/// Useful for testing and local event distribution.
602#[derive(Debug, Clone)]
603pub struct InMemoryEventEmitter {
604    sender: broadcast::Sender<Event>,
605}
606
607impl InMemoryEventEmitter {
608    /// Create a new in-memory event emitter with the specified buffer capacity
609    #[must_use]
610    pub fn new(capacity: usize) -> Self {
611        let (sender, _) = broadcast::channel(capacity);
612        Self { sender }
613    }
614
615    /// Subscribe to events
616    #[must_use]
617    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
618        self.sender.subscribe()
619    }
620
621    /// Get the number of active subscribers
622    #[inline]
623    #[must_use]
624    pub fn subscriber_count(&self) -> usize {
625        self.sender.receiver_count()
626    }
627}
628
629#[async_trait]
630impl EventEmitter for InMemoryEventEmitter {
631    async fn emit(&self, event: Event) -> crate::Result<()> {
632        // Ignore send errors (no subscribers)
633        let _ = self.sender.send(event);
634        Ok(())
635    }
636}
637
638/// Logging event emitter that logs events using tracing
639///
640/// Useful for debugging and development.
641#[derive(Debug, Clone, Default)]
642pub struct LoggingEventEmitter {
643    /// Log level for events
644    level: LogLevel,
645}
646
647/// Log level for event logging
648#[derive(Debug, Clone, Copy, Default)]
649pub enum LogLevel {
650    /// Trace level
651    Trace,
652    /// Debug level
653    #[default]
654    Debug,
655    /// Info level
656    Info,
657}
658
659impl LoggingEventEmitter {
660    /// Create a new logging event emitter with default (debug) level
661    #[must_use]
662    pub fn new() -> Self {
663        Self::default()
664    }
665
666    /// Create a logging event emitter with specified level
667    #[must_use]
668    pub fn with_level(level: LogLevel) -> Self {
669        Self { level }
670    }
671}
672
673#[async_trait]
674impl EventEmitter for LoggingEventEmitter {
675    async fn emit(&self, event: Event) -> crate::Result<()> {
676        let event_type = event.event_type();
677        let task_id = event.task_id().map(|id| id.to_string());
678        let hostname = event.hostname().map(String::from);
679
680        match self.level {
681            LogLevel::Trace => {
682                tracing::trace!(
683                    event_type = event_type,
684                    task_id = ?task_id,
685                    hostname = ?hostname,
686                    "Event emitted"
687                );
688            }
689            LogLevel::Debug => {
690                tracing::debug!(
691                    event_type = event_type,
692                    task_id = ?task_id,
693                    hostname = ?hostname,
694                    "Event emitted"
695                );
696            }
697            LogLevel::Info => {
698                tracing::info!(
699                    event_type = event_type,
700                    task_id = ?task_id,
701                    hostname = ?hostname,
702                    "Event emitted"
703                );
704            }
705        }
706        Ok(())
707    }
708}
709
710/// Composite event emitter that sends to multiple emitters
711///
712/// Useful for sending events to multiple destinations simultaneously.
713#[derive(Clone)]
714pub struct CompositeEventEmitter {
715    emitters: Vec<Arc<dyn EventEmitter>>,
716}
717
718impl CompositeEventEmitter {
719    /// Create a new composite emitter
720    #[must_use]
721    pub fn new() -> Self {
722        Self {
723            emitters: Vec::new(),
724        }
725    }
726
727    /// Add an emitter to the composite
728    #[must_use]
729    pub fn with_emitter<E: EventEmitter + 'static>(mut self, emitter: E) -> Self {
730        self.emitters.push(Arc::new(emitter));
731        self
732    }
733
734    /// Add an Arc-wrapped emitter
735    #[must_use]
736    pub fn add_arc(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
737        self.emitters.push(emitter);
738        self
739    }
740}
741
742impl Default for CompositeEventEmitter {
743    fn default() -> Self {
744        Self::new()
745    }
746}
747
748#[async_trait]
749impl EventEmitter for CompositeEventEmitter {
750    async fn emit(&self, event: Event) -> crate::Result<()> {
751        for emitter in &self.emitters {
752            if emitter.is_enabled() {
753                emitter.emit(event.clone()).await?;
754            }
755        }
756        Ok(())
757    }
758
759    fn is_enabled(&self) -> bool {
760        self.emitters.iter().any(|e| e.is_enabled())
761    }
762}
763
764impl std::fmt::Debug for CompositeEventEmitter {
765    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
766        f.debug_struct("CompositeEventEmitter")
767            .field("emitter_count", &self.emitters.len())
768            .finish()
769    }
770}
771
772// ============================================================================
773// Event Consumer Infrastructure
774// ============================================================================
775
776use std::collections::HashMap;
777use std::path::PathBuf;
778use tokio::sync::RwLock;
779
780/// Event filter for selecting which events to process
781#[derive(Clone)]
782pub enum EventFilter {
783    /// Accept all events
784    All,
785    /// Accept only task events
786    TaskOnly,
787    /// Accept only worker events
788    WorkerOnly,
789    /// Accept specific event types (e.g., "task-started", "worker-online")
790    EventTypes(Vec<String>),
791    /// Accept events matching task name pattern
792    TaskName(String),
793    /// Accept events from specific hostname
794    Hostname(String),
795    /// Accept events matching custom predicate
796    Custom(Arc<dyn Fn(&Event) -> bool + Send + Sync>),
797    /// Combine multiple filters with AND logic
798    And(Vec<EventFilter>),
799    /// Combine multiple filters with OR logic
800    Or(Vec<EventFilter>),
801}
802
803impl EventFilter {
804    /// Check if an event matches this filter
805    #[must_use]
806    pub fn matches(&self, event: &Event) -> bool {
807        match self {
808            EventFilter::All => true,
809            EventFilter::TaskOnly => matches!(event, Event::Task(_)),
810            EventFilter::WorkerOnly => matches!(event, Event::Worker(_)),
811            EventFilter::EventTypes(types) => types.contains(&event.event_type().to_string()),
812            EventFilter::TaskName(name) => {
813                if let Event::Task(task_event) = event {
814                    match task_event {
815                        TaskEvent::Sent { task_name, .. }
816                        | TaskEvent::Received { task_name, .. }
817                        | TaskEvent::Started { task_name, .. }
818                        | TaskEvent::Succeeded { task_name, .. }
819                        | TaskEvent::Failed { task_name, .. }
820                        | TaskEvent::Retried { task_name, .. } => task_name == name,
821                        TaskEvent::Revoked { task_name, .. }
822                        | TaskEvent::Rejected { task_name, .. } => {
823                            matches!(task_name.as_ref(), Some(tn) if tn == name)
824                        }
825                    }
826                } else {
827                    false
828                }
829            }
830            EventFilter::Hostname(hostname) => {
831                let event_hostname = match event {
832                    Event::Task(task_event) => match task_event {
833                        TaskEvent::Received { hostname, .. }
834                        | TaskEvent::Started { hostname, .. }
835                        | TaskEvent::Succeeded { hostname, .. }
836                        | TaskEvent::Failed { hostname, .. }
837                        | TaskEvent::Retried { hostname, .. }
838                        | TaskEvent::Rejected { hostname, .. } => Some(hostname),
839                        _ => None,
840                    },
841                    Event::Worker(worker_event) => match worker_event {
842                        WorkerEvent::Online { hostname, .. }
843                        | WorkerEvent::Offline { hostname, .. }
844                        | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname),
845                    },
846                };
847                matches!(event_hostname, Some(h) if h == hostname)
848            }
849            EventFilter::Custom(predicate) => predicate(event),
850            EventFilter::And(filters) => filters.iter().all(|f| f.matches(event)),
851            EventFilter::Or(filters) => filters.iter().any(|f| f.matches(event)),
852        }
853    }
854
855    /// Create a filter that accepts events from a list of task names
856    pub fn task_names(names: Vec<String>) -> Self {
857        EventFilter::Or(names.into_iter().map(EventFilter::TaskName).collect())
858    }
859
860    /// Create a filter that accepts events from a list of hostnames
861    pub fn hostnames(names: Vec<String>) -> Self {
862        EventFilter::Or(names.into_iter().map(EventFilter::Hostname).collect())
863    }
864}
865
866impl std::fmt::Debug for EventFilter {
867    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
868        match self {
869            EventFilter::All => write!(f, "EventFilter::All"),
870            EventFilter::TaskOnly => write!(f, "EventFilter::TaskOnly"),
871            EventFilter::WorkerOnly => write!(f, "EventFilter::WorkerOnly"),
872            EventFilter::EventTypes(types) => f
873                .debug_tuple("EventFilter::EventTypes")
874                .field(types)
875                .finish(),
876            EventFilter::TaskName(name) => {
877                f.debug_tuple("EventFilter::TaskName").field(name).finish()
878            }
879            EventFilter::Hostname(hostname) => f
880                .debug_tuple("EventFilter::Hostname")
881                .field(hostname)
882                .finish(),
883            EventFilter::Custom(_) => write!(f, "EventFilter::Custom(<closure>)"),
884            EventFilter::And(filters) => f.debug_tuple("EventFilter::And").field(filters).finish(),
885            EventFilter::Or(filters) => f.debug_tuple("EventFilter::Or").field(filters).finish(),
886        }
887    }
888}
889
890/// Event handler function type
891pub type EventHandler = Arc<
892    dyn Fn(Event) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<()>> + Send>>
893        + Send
894        + Sync,
895>;
896
897/// Event receiver trait for consuming events
898#[async_trait]
899pub trait EventReceiver: Send + Sync {
900    /// Receive the next event
901    async fn receive(&mut self) -> crate::Result<Option<Event>>;
902
903    /// Receive events with a timeout
904    async fn receive_timeout(
905        &mut self,
906        timeout: std::time::Duration,
907    ) -> crate::Result<Option<Event>> {
908        tokio::time::timeout(timeout, self.receive())
909            .await
910            .map_err(|_| crate::CelersError::Broker("Receive timeout".to_string()))?
911    }
912
913    /// Check if the receiver is still active
914    fn is_active(&self) -> bool {
915        true
916    }
917}
918
919/// Event dispatcher for routing events to handlers based on filters
920#[derive(Clone)]
921pub struct EventDispatcher {
922    handlers: Arc<RwLock<Vec<(EventFilter, EventHandler)>>>,
923}
924
925impl EventDispatcher {
926    /// Create a new event dispatcher
927    #[must_use]
928    pub fn new() -> Self {
929        Self {
930            handlers: Arc::new(RwLock::new(Vec::new())),
931        }
932    }
933
934    /// Register a handler with a filter
935    pub async fn register<F, Fut>(&self, filter: EventFilter, handler: F)
936    where
937        F: Fn(Event) -> Fut + Send + Sync + 'static,
938        Fut: std::future::Future<Output = crate::Result<()>> + Send + 'static,
939    {
940        let handler_arc = Arc::new(move |event: Event| {
941            Box::pin(handler(event))
942                as std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<()>> + Send>>
943        });
944
945        let mut handlers = self.handlers.write().await;
946        handlers.push((filter, handler_arc));
947    }
948
949    /// Dispatch an event to all matching handlers
950    ///
951    /// # Errors
952    ///
953    /// Returns an error if any handler fails to process the event.
954    pub async fn dispatch(&self, event: Event) -> crate::Result<()> {
955        let handlers = self.handlers.read().await;
956
957        for (filter, handler) in handlers.iter() {
958            if filter.matches(&event) {
959                handler(event.clone()).await?;
960            }
961        }
962
963        Ok(())
964    }
965
966    /// Dispatch events in batch
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if any handler fails to process any event.
971    pub async fn dispatch_batch(&self, events: Vec<Event>) -> crate::Result<()> {
972        for event in events {
973            self.dispatch(event).await?;
974        }
975        Ok(())
976    }
977
978    /// Get the number of registered handlers
979    pub async fn handler_count(&self) -> usize {
980        self.handlers.read().await.len()
981    }
982
983    /// Clear all handlers
984    pub async fn clear(&self) {
985        self.handlers.write().await.clear();
986    }
987}
988
989impl Default for EventDispatcher {
990    fn default() -> Self {
991        Self::new()
992    }
993}
994
995impl std::fmt::Debug for EventDispatcher {
996    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
997        f.debug_struct("EventDispatcher")
998            .field("handlers", &"Arc<RwLock<Vec<...>>>")
999            .finish()
1000    }
1001}
1002
1003/// Event persistence storage backend
1004#[async_trait]
1005pub trait EventStorage: Send + Sync {
1006    /// Store an event
1007    async fn store(&self, event: &Event) -> crate::Result<()>;
1008
1009    /// Store multiple events
1010    async fn store_batch(&self, events: &[Event]) -> crate::Result<()> {
1011        for event in events {
1012            self.store(event).await?;
1013        }
1014        Ok(())
1015    }
1016
1017    /// Query events by filter
1018    async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>>;
1019
1020    /// Query events in a time range
1021    async fn query_range(
1022        &self,
1023        start: DateTime<Utc>,
1024        end: DateTime<Utc>,
1025        limit: Option<usize>,
1026    ) -> crate::Result<Vec<Event>>;
1027
1028    /// Delete old events
1029    async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize>;
1030}
1031
1032/// File-based event storage (append-only JSON lines)
1033pub struct FileEventStorage {
1034    path: PathBuf,
1035    file_handle: Arc<RwLock<Option<tokio::fs::File>>>,
1036}
1037
1038impl FileEventStorage {
1039    /// Create a new file-based event storage
1040    pub fn new(path: impl Into<PathBuf>) -> Self {
1041        Self {
1042            path: path.into(),
1043            file_handle: Arc::new(RwLock::new(None)),
1044        }
1045    }
1046
1047    /// Initialize the storage file
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the file cannot be opened or created.
1052    pub async fn init(&self) -> crate::Result<()> {
1053        let mut handle = self.file_handle.write().await;
1054        let file = tokio::fs::OpenOptions::new()
1055            .create(true)
1056            .append(true)
1057            .open(&self.path)
1058            .await
1059            .map_err(|e| crate::CelersError::Broker(format!("Failed to open event file: {e}")))?;
1060
1061        *handle = Some(file);
1062        Ok(())
1063    }
1064
1065    /// Read all events from the file
1066    async fn read_all(&self) -> crate::Result<Vec<Event>> {
1067        use tokio::io::{AsyncBufReadExt, BufReader};
1068
1069        let file = tokio::fs::File::open(&self.path)
1070            .await
1071            .map_err(|e| crate::CelersError::Broker(format!("Failed to open event file: {e}")))?;
1072
1073        let reader = BufReader::new(file);
1074        let mut lines = reader.lines();
1075        let mut events = Vec::new();
1076
1077        while let Some(line) = lines
1078            .next_line()
1079            .await
1080            .map_err(|e| crate::CelersError::Broker(format!("Failed to read line: {e}")))?
1081        {
1082            if let Ok(event) = serde_json::from_str::<Event>(&line) {
1083                events.push(event);
1084            }
1085        }
1086
1087        Ok(events)
1088    }
1089}
1090
1091#[async_trait]
1092impl EventStorage for FileEventStorage {
1093    async fn store(&self, event: &Event) -> crate::Result<()> {
1094        use tokio::io::AsyncWriteExt;
1095
1096        let mut handle = self.file_handle.write().await;
1097        if handle.is_none() {
1098            drop(handle);
1099            self.init().await?;
1100            handle = self.file_handle.write().await;
1101        }
1102
1103        if let Some(file) = handle.as_mut() {
1104            let json = serde_json::to_string(event)
1105                .map_err(|e| crate::CelersError::Serialization(e.to_string()))?;
1106
1107            file.write_all(json.as_bytes())
1108                .await
1109                .map_err(|e| crate::CelersError::Broker(format!("Failed to write event: {e}")))?;
1110            file.write_all(b"\n")
1111                .await
1112                .map_err(|e| crate::CelersError::Broker(format!("Failed to write newline: {e}")))?;
1113            file.flush()
1114                .await
1115                .map_err(|e| crate::CelersError::Broker(format!("Failed to flush: {e}")))?;
1116        }
1117
1118        Ok(())
1119    }
1120
1121    async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>> {
1122        let all_events = self.read_all().await?;
1123        let mut filtered: Vec<Event> = all_events
1124            .into_iter()
1125            .filter(|e| filter.matches(e))
1126            .collect();
1127
1128        if let Some(limit) = limit {
1129            filtered.truncate(limit);
1130        }
1131
1132        Ok(filtered)
1133    }
1134
1135    async fn query_range(
1136        &self,
1137        start: DateTime<Utc>,
1138        end: DateTime<Utc>,
1139        limit: Option<usize>,
1140    ) -> crate::Result<Vec<Event>> {
1141        let all_events = self.read_all().await?;
1142        let mut filtered: Vec<Event> = all_events
1143            .into_iter()
1144            .filter(|e| {
1145                let timestamp = e.timestamp();
1146                timestamp >= start && timestamp <= end
1147            })
1148            .collect();
1149
1150        if let Some(limit) = limit {
1151            filtered.truncate(limit);
1152        }
1153
1154        Ok(filtered)
1155    }
1156
1157    async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize> {
1158        use tokio::io::AsyncWriteExt;
1159
1160        let all_events = self.read_all().await?;
1161        let (keep, remove): (Vec<_>, Vec<_>) = all_events
1162            .into_iter()
1163            .partition(|e| e.timestamp() >= before);
1164
1165        let removed_count = remove.len();
1166
1167        // Rewrite file with only kept events
1168        let temp_path = self.path.with_extension("tmp");
1169        let mut temp_file = tokio::fs::File::create(&temp_path)
1170            .await
1171            .map_err(|e| crate::CelersError::Broker(format!("Failed to create temp file: {e}")))?;
1172
1173        for event in keep {
1174            let json = serde_json::to_string(&event)
1175                .map_err(|e| crate::CelersError::Serialization(e.to_string()))?;
1176            temp_file
1177                .write_all(json.as_bytes())
1178                .await
1179                .map_err(|e| crate::CelersError::Broker(format!("Failed to write: {e}")))?;
1180            temp_file
1181                .write_all(b"\n")
1182                .await
1183                .map_err(|e| crate::CelersError::Broker(format!("Failed to write newline: {e}")))?;
1184        }
1185
1186        temp_file
1187            .flush()
1188            .await
1189            .map_err(|e| crate::CelersError::Broker(format!("Failed to flush: {e}")))?;
1190        drop(temp_file);
1191
1192        // Replace original file with temp file
1193        tokio::fs::rename(&temp_path, &self.path)
1194            .await
1195            .map_err(|e| crate::CelersError::Broker(format!("Failed to rename file: {e}")))?;
1196
1197        // Reinitialize file handle
1198        let mut handle = self.file_handle.write().await;
1199        *handle = None;
1200        drop(handle);
1201        self.init().await?;
1202
1203        Ok(removed_count)
1204    }
1205}
1206
1207/// In-memory event storage (for testing and development)
1208#[derive(Clone)]
1209pub struct InMemoryEventStorage {
1210    events: Arc<RwLock<Vec<Event>>>,
1211    max_size: usize,
1212}
1213
1214impl InMemoryEventStorage {
1215    /// Create a new in-memory event storage
1216    #[must_use]
1217    pub fn new(max_size: usize) -> Self {
1218        Self {
1219            events: Arc::new(RwLock::new(Vec::new())),
1220            max_size,
1221        }
1222    }
1223
1224    /// Get the number of stored events
1225    pub async fn len(&self) -> usize {
1226        self.events.read().await.len()
1227    }
1228
1229    /// Check if storage is empty
1230    pub async fn is_empty(&self) -> bool {
1231        self.events.read().await.is_empty()
1232    }
1233
1234    /// Clear all events
1235    pub async fn clear(&self) {
1236        self.events.write().await.clear();
1237    }
1238}
1239
1240#[async_trait]
1241impl EventStorage for InMemoryEventStorage {
1242    async fn store(&self, event: &Event) -> crate::Result<()> {
1243        let mut events = self.events.write().await;
1244        events.push(event.clone());
1245
1246        // Trim to max size (FIFO)
1247        if events.len() > self.max_size {
1248            let excess = events.len() - self.max_size;
1249            events.drain(0..excess);
1250        }
1251
1252        Ok(())
1253    }
1254
1255    async fn query(&self, filter: &EventFilter, limit: Option<usize>) -> crate::Result<Vec<Event>> {
1256        let events = self.events.read().await;
1257        let mut filtered: Vec<Event> = events
1258            .iter()
1259            .filter(|e| filter.matches(e))
1260            .cloned()
1261            .collect();
1262
1263        if let Some(limit) = limit {
1264            filtered.truncate(limit);
1265        }
1266
1267        Ok(filtered)
1268    }
1269
1270    async fn query_range(
1271        &self,
1272        start: DateTime<Utc>,
1273        end: DateTime<Utc>,
1274        limit: Option<usize>,
1275    ) -> crate::Result<Vec<Event>> {
1276        let events = self.events.read().await;
1277        let mut filtered: Vec<Event> = events
1278            .iter()
1279            .filter(|e| {
1280                let timestamp = e.timestamp();
1281                timestamp >= start && timestamp <= end
1282            })
1283            .cloned()
1284            .collect();
1285
1286        if let Some(limit) = limit {
1287            filtered.truncate(limit);
1288        }
1289
1290        Ok(filtered)
1291    }
1292
1293    async fn cleanup(&self, before: DateTime<Utc>) -> crate::Result<usize> {
1294        let mut events = self.events.write().await;
1295        let original_len = events.len();
1296        events.retain(|e| e.timestamp() >= before);
1297        let removed = original_len - events.len();
1298        Ok(removed)
1299    }
1300}
1301
1302/// Event stream for real-time event delivery
1303pub struct EventStream {
1304    receiver: broadcast::Receiver<Event>,
1305    filter: EventFilter,
1306}
1307
1308impl EventStream {
1309    /// Create a new event stream with a filter
1310    #[must_use]
1311    pub fn new(receiver: broadcast::Receiver<Event>, filter: EventFilter) -> Self {
1312        Self { receiver, filter }
1313    }
1314
1315    /// Receive the next matching event
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if the receiver is closed or lagged behind.
1320    pub async fn recv(&mut self) -> Result<Event, broadcast::error::RecvError> {
1321        loop {
1322            let event = self.receiver.recv().await?;
1323            if self.filter.matches(&event) {
1324                return Ok(event);
1325            }
1326        }
1327    }
1328
1329    /// Try to receive an event without blocking
1330    ///
1331    /// # Errors
1332    ///
1333    /// Returns an error if no event is available, the receiver is closed, or lagged behind.
1334    pub fn try_recv(&mut self) -> Result<Event, broadcast::error::TryRecvError> {
1335        loop {
1336            let event = self.receiver.try_recv()?;
1337            if self.filter.matches(&event) {
1338                return Ok(event);
1339            }
1340        }
1341    }
1342}
1343
1344// Note: Database-backed event storage is available in celers-backend-db crate
1345// to avoid adding database dependencies to celers-core
1346
1347/// Event alert condition for triggering notifications
1348#[derive(Clone)]
1349pub enum AlertCondition {
1350    /// Alert on specific event type
1351    EventType(String),
1352    /// Alert when task fails
1353    TaskFailed,
1354    /// Alert when task exceeds retry count
1355    TaskRetryExceeded(u32),
1356    /// Alert when worker goes offline
1357    WorkerOffline,
1358    /// Alert when event rate exceeds threshold (events per second)
1359    RateExceeds { threshold: f64, window_secs: u64 },
1360    /// Alert on custom condition
1361    Custom(Arc<dyn Fn(&Event) -> bool + Send + Sync>),
1362}
1363
1364impl std::fmt::Debug for AlertCondition {
1365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366        match self {
1367            AlertCondition::EventType(event_type) => {
1368                f.debug_tuple("EventType").field(event_type).finish()
1369            }
1370            AlertCondition::TaskFailed => write!(f, "TaskFailed"),
1371            AlertCondition::TaskRetryExceeded(max) => {
1372                f.debug_tuple("TaskRetryExceeded").field(max).finish()
1373            }
1374            AlertCondition::WorkerOffline => write!(f, "WorkerOffline"),
1375            AlertCondition::RateExceeds {
1376                threshold,
1377                window_secs,
1378            } => f
1379                .debug_struct("RateExceeds")
1380                .field("threshold", threshold)
1381                .field("window_secs", window_secs)
1382                .finish(),
1383            AlertCondition::Custom(_) => write!(f, "Custom(<closure>)"),
1384        }
1385    }
1386}
1387
1388impl AlertCondition {
1389    /// Check if an event triggers this alert condition
1390    #[must_use]
1391    pub fn check(&self, event: &Event, context: &AlertContext) -> bool {
1392        match self {
1393            AlertCondition::EventType(event_type) => event.event_type() == event_type,
1394            AlertCondition::TaskFailed => matches!(event, Event::Task(TaskEvent::Failed { .. })),
1395            AlertCondition::TaskRetryExceeded(max) => {
1396                if let Event::Task(TaskEvent::Retried { retries, .. }) = event {
1397                    retries >= max
1398                } else {
1399                    false
1400                }
1401            }
1402            AlertCondition::WorkerOffline => {
1403                matches!(event, Event::Worker(WorkerEvent::Offline { .. }))
1404            }
1405            AlertCondition::RateExceeds {
1406                threshold,
1407                window_secs,
1408            } => {
1409                let rate = context.get_event_rate(*window_secs);
1410                rate > *threshold
1411            }
1412            AlertCondition::Custom(predicate) => predicate(event),
1413        }
1414    }
1415}
1416
1417/// Context information for alert conditions
1418#[derive(Debug, Clone, Default)]
1419pub struct AlertContext {
1420    /// Recent event timestamps for rate calculation
1421    recent_events: Arc<RwLock<Vec<DateTime<Utc>>>>,
1422}
1423
1424impl AlertContext {
1425    /// Create a new alert context
1426    #[must_use]
1427    pub fn new() -> Self {
1428        Self::default()
1429    }
1430
1431    /// Record an event timestamp
1432    pub async fn record_event(&self, timestamp: DateTime<Utc>) {
1433        let mut events = self.recent_events.write().await;
1434        events.push(timestamp);
1435
1436        // Keep only last 1000 events to prevent unbounded growth
1437        if events.len() > 1000 {
1438            let excess = events.len() - 1000;
1439            events.drain(0..excess);
1440        }
1441    }
1442
1443    /// Get event rate (events per second) over a time window
1444    #[allow(clippy::unused_self)]
1445    fn get_event_rate(&self, _window_secs: u64) -> f64 {
1446        // This is a synchronous approximation for rate calculation
1447        // In practice, you'd use the async version with proper locking
1448        0.0 // Placeholder - actual implementation would need async context
1449    }
1450
1451    #[allow(clippy::cast_possible_wrap, clippy::cast_precision_loss)]
1452    /// Get event rate (async version)
1453    pub async fn get_event_rate_async(&self, window_secs: u64) -> f64 {
1454        let events = self.recent_events.read().await;
1455        let now = Utc::now();
1456        let cutoff = now - chrono::Duration::seconds(window_secs as i64);
1457
1458        let count = events.iter().filter(|&&ts| ts >= cutoff).count();
1459        count as f64 / window_secs as f64
1460    }
1461}
1462
1463/// Alert severity level
1464#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1465pub enum AlertSeverity {
1466    /// Informational alert
1467    Info,
1468    /// Warning alert
1469    Warning,
1470    /// Error alert
1471    Error,
1472    /// Critical alert requiring immediate attention
1473    Critical,
1474}
1475
1476/// Alert triggered by an event
1477#[derive(Debug, Clone)]
1478pub struct Alert {
1479    /// Alert severity
1480    pub severity: AlertSeverity,
1481    /// Alert title/summary
1482    pub title: String,
1483    /// Alert description
1484    pub message: String,
1485    /// Event that triggered the alert
1486    pub event: Event,
1487    /// Timestamp when alert was triggered
1488    pub timestamp: DateTime<Utc>,
1489    /// Additional metadata
1490    pub metadata: HashMap<String, String>,
1491}
1492
1493impl Alert {
1494    /// Create a new alert
1495    pub fn new(
1496        severity: AlertSeverity,
1497        title: impl Into<String>,
1498        message: impl Into<String>,
1499        event: Event,
1500    ) -> Self {
1501        Self {
1502            severity,
1503            title: title.into(),
1504            message: message.into(),
1505            event,
1506            timestamp: Utc::now(),
1507            metadata: HashMap::new(),
1508        }
1509    }
1510
1511    /// Add metadata to the alert
1512    #[must_use]
1513    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1514        self.metadata.insert(key.into(), value.into());
1515        self
1516    }
1517}
1518
1519/// Alert handler trait
1520#[async_trait]
1521pub trait AlertHandler: Send + Sync {
1522    /// Handle an alert
1523    async fn handle(&self, alert: &Alert) -> crate::Result<()>;
1524}
1525
1526/// Logging alert handler that logs alerts using tracing
1527#[derive(Debug, Clone, Default)]
1528pub struct LoggingAlertHandler;
1529
1530impl LoggingAlertHandler {
1531    /// Create a new logging alert handler
1532    #[must_use]
1533    pub fn new() -> Self {
1534        Self
1535    }
1536}
1537
1538#[async_trait]
1539impl AlertHandler for LoggingAlertHandler {
1540    async fn handle(&self, alert: &Alert) -> crate::Result<()> {
1541        let task_id = alert.event.task_id();
1542        let hostname = alert.event.hostname();
1543
1544        match alert.severity {
1545            AlertSeverity::Info => {
1546                tracing::info!(
1547                    severity = "info",
1548                    title = %alert.title,
1549                    message = %alert.message,
1550                    event_type = alert.event.event_type(),
1551                    task_id = ?task_id,
1552                    hostname = ?hostname,
1553                    "Alert triggered"
1554                );
1555            }
1556            AlertSeverity::Warning => {
1557                tracing::warn!(
1558                    severity = "warning",
1559                    title = %alert.title,
1560                    message = %alert.message,
1561                    event_type = alert.event.event_type(),
1562                    task_id = ?task_id,
1563                    hostname = ?hostname,
1564                    "Alert triggered"
1565                );
1566            }
1567            AlertSeverity::Error => {
1568                tracing::error!(
1569                    severity = "error",
1570                    title = %alert.title,
1571                    message = %alert.message,
1572                    event_type = alert.event.event_type(),
1573                    task_id = ?task_id,
1574                    hostname = ?hostname,
1575                    "Alert triggered"
1576                );
1577            }
1578            AlertSeverity::Critical => {
1579                tracing::error!(
1580                    severity = "critical",
1581                    title = %alert.title,
1582                    message = %alert.message,
1583                    event_type = alert.event.event_type(),
1584                    task_id = ?task_id,
1585                    hostname = ?hostname,
1586                    "CRITICAL ALERT"
1587                );
1588            }
1589        }
1590        Ok(())
1591    }
1592}
1593
1594/// Type alias for alert handler registry entries
1595type AlertHandlerEntry = (AlertCondition, AlertSeverity, String, Arc<dyn AlertHandler>);
1596
1597/// Alert manager for monitoring events and triggering alerts
1598pub struct AlertManager {
1599    handlers: Arc<RwLock<Vec<AlertHandlerEntry>>>,
1600    context: AlertContext,
1601}
1602
1603impl AlertManager {
1604    /// Create a new alert manager
1605    #[must_use]
1606    pub fn new() -> Self {
1607        Self {
1608            handlers: Arc::new(RwLock::new(Vec::new())),
1609            context: AlertContext::new(),
1610        }
1611    }
1612
1613    /// Register an alert handler
1614    pub async fn register<H: AlertHandler + 'static>(
1615        &self,
1616        condition: AlertCondition,
1617        severity: AlertSeverity,
1618        title: impl Into<String>,
1619        handler: H,
1620    ) {
1621        let mut handlers = self.handlers.write().await;
1622        handlers.push((condition, severity, title.into(), Arc::new(handler)));
1623    }
1624
1625    /// Process an event and trigger alerts if conditions match
1626    ///
1627    /// # Errors
1628    ///
1629    /// Returns an error if any alert handler fails to process the alert.
1630    pub async fn process_event(&self, event: Event) -> crate::Result<()> {
1631        // Record event for rate tracking
1632        self.context.record_event(event.timestamp()).await;
1633
1634        let handlers = self.handlers.read().await;
1635
1636        for (condition, severity, title, handler) in handlers.iter() {
1637            if condition.check(&event, &self.context) {
1638                let message = format!("Event {} triggered alert condition", event.event_type());
1639                let alert = Alert::new(*severity, title.clone(), message, event.clone());
1640                handler.handle(&alert).await?;
1641            }
1642        }
1643
1644        Ok(())
1645    }
1646
1647    /// Get the number of registered alert handlers
1648    pub async fn handler_count(&self) -> usize {
1649        self.handlers.read().await.len()
1650    }
1651
1652    /// Clear all alert handlers
1653    pub async fn clear(&self) {
1654        self.handlers.write().await.clear();
1655    }
1656}
1657
1658impl Default for AlertManager {
1659    fn default() -> Self {
1660        Self::new()
1661    }
1662}
1663
1664/// Event monitor for collecting statistics
1665#[derive(Debug, Clone, Default)]
1666pub struct EventMonitor {
1667    stats: Arc<RwLock<EventStats>>,
1668}
1669
1670#[derive(Debug, Clone, Default)]
1671pub struct EventStats {
1672    pub total_events: u64,
1673    pub task_events: u64,
1674    pub worker_events: u64,
1675    pub events_by_type: HashMap<String, u64>,
1676    pub events_by_hostname: HashMap<String, u64>,
1677    pub last_event_time: Option<DateTime<Utc>>,
1678}
1679
1680impl EventMonitor {
1681    /// Create a new event monitor
1682    #[must_use]
1683    pub fn new() -> Self {
1684        Self::default()
1685    }
1686
1687    /// Record an event
1688    pub async fn record(&self, event: &Event) {
1689        let mut stats = self.stats.write().await;
1690
1691        stats.total_events += 1;
1692        stats.last_event_time = Some(event.timestamp());
1693
1694        match event {
1695            Event::Task(_) => stats.task_events += 1,
1696            Event::Worker(_) => stats.worker_events += 1,
1697        }
1698
1699        let event_type = event.event_type().to_string();
1700        *stats.events_by_type.entry(event_type).or_insert(0) += 1;
1701
1702        if let Some(hostname) = match event {
1703            Event::Task(task_event) => match task_event {
1704                TaskEvent::Received { hostname, .. }
1705                | TaskEvent::Started { hostname, .. }
1706                | TaskEvent::Succeeded { hostname, .. }
1707                | TaskEvent::Failed { hostname, .. }
1708                | TaskEvent::Retried { hostname, .. }
1709                | TaskEvent::Rejected { hostname, .. } => Some(hostname.clone()),
1710                _ => None,
1711            },
1712            Event::Worker(worker_event) => match worker_event {
1713                WorkerEvent::Online { hostname, .. }
1714                | WorkerEvent::Offline { hostname, .. }
1715                | WorkerEvent::Heartbeat { hostname, .. } => Some(hostname.clone()),
1716            },
1717        } {
1718            *stats.events_by_hostname.entry(hostname).or_insert(0) += 1;
1719        }
1720    }
1721
1722    /// Get current statistics
1723    pub async fn get_stats(&self) -> EventStats {
1724        self.stats.read().await.clone()
1725    }
1726
1727    /// Reset statistics
1728    pub async fn reset(&self) {
1729        let mut stats = self.stats.write().await;
1730        *stats = EventStats::default();
1731    }
1732}
1733
1734#[cfg(test)]
1735mod tests {
1736    use super::*;
1737
1738    #[test]
1739    fn test_task_event_serialization() {
1740        let event = Event::Task(TaskEvent::Started {
1741            task_id: Uuid::nil(),
1742            task_name: "test_task".to_string(),
1743            hostname: "worker-1".to_string(),
1744            timestamp: DateTime::parse_from_rfc3339("2026-01-01T12:00:00Z")
1745                .unwrap()
1746                .with_timezone(&Utc),
1747            pid: 1234,
1748        });
1749
1750        let json = serde_json::to_string(&event).unwrap();
1751        assert!(json.contains("task-started"));
1752        assert!(json.contains("test_task"));
1753        assert!(json.contains("worker-1"));
1754
1755        // Deserialize back
1756        let parsed: Event = serde_json::from_str(&json).unwrap();
1757        assert_eq!(event, parsed);
1758    }
1759
1760    #[test]
1761    fn test_worker_event_serialization() {
1762        let event = Event::Worker(WorkerEvent::Heartbeat {
1763            hostname: "worker-1".to_string(),
1764            timestamp: Utc::now(),
1765            active: 5,
1766            processed: 100,
1767            loadavg: Some([1.0, 0.8, 0.5]),
1768            freq: 2.0,
1769        });
1770
1771        let json = serde_json::to_string(&event).unwrap();
1772        assert!(json.contains("worker-heartbeat"));
1773        assert!(json.contains("worker-1"));
1774
1775        let parsed: Event = serde_json::from_str(&json).unwrap();
1776        assert_eq!(event.event_type(), parsed.event_type());
1777    }
1778
1779    #[test]
1780    fn test_event_type() {
1781        let task_event = Event::Task(TaskEvent::Sent {
1782            task_id: Uuid::new_v4(),
1783            task_name: "test".to_string(),
1784            queue: "celery".to_string(),
1785            timestamp: Utc::now(),
1786            args: None,
1787            kwargs: None,
1788            eta: None,
1789            expires: None,
1790            retries: None,
1791        });
1792
1793        assert_eq!(task_event.event_type(), "task-sent");
1794        assert!(task_event.is_task_event());
1795        assert!(!task_event.is_worker_event());
1796    }
1797
1798    #[test]
1799    fn test_task_event_builder() {
1800        let task_id = Uuid::new_v4();
1801        let event = TaskEventBuilder::new(task_id, "my_task")
1802            .hostname("worker-1")
1803            .pid(1234)
1804            .started();
1805
1806        assert_eq!(event.event_type(), "task-started");
1807        assert_eq!(event.task_id(), Some(task_id));
1808        assert_eq!(event.hostname(), Some("worker-1"));
1809    }
1810
1811    #[test]
1812    fn test_worker_event_builder() {
1813        let event = WorkerEventBuilder::new("worker-1").online();
1814
1815        assert_eq!(event.event_type(), "worker-online");
1816        assert!(event.is_worker_event());
1817        assert_eq!(event.hostname(), Some("worker-1"));
1818    }
1819
1820    #[test]
1821    fn test_task_id_extraction() {
1822        let task_id = Uuid::new_v4();
1823        let event = TaskEventBuilder::new(task_id, "test").sent("celery");
1824        assert_eq!(event.task_id(), Some(task_id));
1825
1826        let worker_event = WorkerEventBuilder::new("worker-1").online();
1827        assert_eq!(worker_event.task_id(), None);
1828    }
1829
1830    #[tokio::test]
1831    async fn test_noop_event_emitter() {
1832        let emitter = NoOpEventEmitter::new();
1833        let event = WorkerEventBuilder::new("worker-1").online();
1834
1835        // Should not fail
1836        emitter.emit(event).await.unwrap();
1837
1838        // Should report as disabled
1839        assert!(!emitter.is_enabled());
1840    }
1841
1842    #[tokio::test]
1843    async fn test_in_memory_event_emitter() {
1844        let emitter = InMemoryEventEmitter::new(10);
1845        let mut receiver = emitter.subscribe();
1846
1847        let task_id = Uuid::new_v4();
1848        let event = TaskEventBuilder::new(task_id, "test_task")
1849            .hostname("worker-1")
1850            .started();
1851
1852        // Emit event
1853        emitter.emit(event.clone()).await.unwrap();
1854
1855        // Receive event
1856        let received = receiver.recv().await.unwrap();
1857        assert_eq!(received.event_type(), "task-started");
1858        assert_eq!(received.task_id(), Some(task_id));
1859    }
1860
1861    #[tokio::test]
1862    async fn test_in_memory_emitter_multiple_subscribers() {
1863        let emitter = InMemoryEventEmitter::new(10);
1864        let mut receiver1 = emitter.subscribe();
1865        let mut receiver2 = emitter.subscribe();
1866
1867        assert_eq!(emitter.subscriber_count(), 2);
1868
1869        let event = WorkerEventBuilder::new("worker-1").heartbeat(5, 100, [1.0, 0.8, 0.5], 2.0);
1870        emitter.emit(event).await.unwrap();
1871
1872        // Both receivers should get the event
1873        let r1 = receiver1.recv().await.unwrap();
1874        let r2 = receiver2.recv().await.unwrap();
1875
1876        assert_eq!(r1.event_type(), "worker-heartbeat");
1877        assert_eq!(r2.event_type(), "worker-heartbeat");
1878    }
1879
1880    #[tokio::test]
1881    async fn test_logging_event_emitter() {
1882        let emitter = LoggingEventEmitter::new();
1883        let event = TaskEventBuilder::new(Uuid::new_v4(), "test")
1884            .hostname("worker-1")
1885            .succeeded(1.5);
1886
1887        // Should not fail (just logs)
1888        emitter.emit(event).await.unwrap();
1889
1890        // Test with different log levels
1891        let emitter_info = LoggingEventEmitter::with_level(LogLevel::Info);
1892        let event = WorkerEventBuilder::new("worker-1").offline();
1893        emitter_info.emit(event).await.unwrap();
1894    }
1895
1896    #[tokio::test]
1897    async fn test_composite_event_emitter() {
1898        let in_memory = InMemoryEventEmitter::new(10);
1899        let mut receiver = in_memory.subscribe();
1900
1901        let composite = CompositeEventEmitter::new()
1902            .with_emitter(in_memory.clone())
1903            .with_emitter(NoOpEventEmitter::new());
1904
1905        let event = TaskEventBuilder::new(Uuid::new_v4(), "test")
1906            .hostname("worker-1")
1907            .started();
1908
1909        // Emit through composite
1910        composite.emit(event.clone()).await.unwrap();
1911
1912        // Should receive through in_memory
1913        let received = receiver.recv().await.unwrap();
1914        assert_eq!(received.event_type(), "task-started");
1915    }
1916
1917    #[tokio::test]
1918    async fn test_emit_batch() {
1919        let emitter = InMemoryEventEmitter::new(10);
1920        let mut receiver = emitter.subscribe();
1921
1922        let events = vec![
1923            WorkerEventBuilder::new("worker-1").online(),
1924            TaskEventBuilder::new(Uuid::new_v4(), "task1")
1925                .hostname("worker-1")
1926                .started(),
1927            TaskEventBuilder::new(Uuid::new_v4(), "task2")
1928                .hostname("worker-1")
1929                .started(),
1930        ];
1931
1932        emitter.emit_batch(events).await.unwrap();
1933
1934        // Receive all events
1935        let e1 = receiver.recv().await.unwrap();
1936        let e2 = receiver.recv().await.unwrap();
1937        let e3 = receiver.recv().await.unwrap();
1938
1939        assert_eq!(e1.event_type(), "worker-online");
1940        assert_eq!(e2.event_type(), "task-started");
1941        assert_eq!(e3.event_type(), "task-started");
1942    }
1943}