use crate::models::EventSeverity;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BroadcastEvent {
pub workflow_id: i64,
pub timestamp: i64,
pub event_type: String,
pub severity: EventSeverity,
pub data: serde_json::Value,
}
#[derive(Clone)]
pub struct EventBroadcaster {
sender: Arc<broadcast::Sender<BroadcastEvent>>,
}
impl EventBroadcaster {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender: Arc::new(sender),
}
}
pub fn broadcast(&self, event: BroadcastEvent) {
let _ = self.sender.send(event);
}
pub fn subscribe(&self) -> broadcast::Receiver<BroadcastEvent> {
self.sender.subscribe()
}
}
impl Default for EventBroadcaster {
fn default() -> Self {
Self::new(512)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_broadcast_event() {
let broadcaster = EventBroadcaster::new(16);
let mut receiver = broadcaster.subscribe();
let event = BroadcastEvent {
workflow_id: 1,
timestamp: 1234567890,
event_type: "job_started".to_string(),
severity: EventSeverity::Info,
data: serde_json::json!({"job_id": 42}),
};
broadcaster.broadcast(event.clone());
let received = receiver.recv().await.unwrap();
assert_eq!(received.workflow_id, 1);
assert_eq!(received.event_type, "job_started");
assert_eq!(received.severity, EventSeverity::Info);
}
#[tokio::test]
async fn test_broadcast_no_subscribers() {
let broadcaster = EventBroadcaster::new(16);
let event = BroadcastEvent {
workflow_id: 1,
timestamp: 1234567890,
event_type: "test".to_string(),
severity: EventSeverity::Debug,
data: serde_json::json!({}),
};
broadcaster.broadcast(event);
}
#[tokio::test]
async fn test_multiple_subscribers() {
let broadcaster = EventBroadcaster::new(16);
let mut receiver1 = broadcaster.subscribe();
let mut receiver2 = broadcaster.subscribe();
let event = BroadcastEvent {
workflow_id: 1,
timestamp: 1234567890,
event_type: "test".to_string(),
severity: EventSeverity::Warning,
data: serde_json::json!({"value": 123}),
};
broadcaster.broadcast(event);
let received1 = receiver1.recv().await.unwrap();
let received2 = receiver2.recv().await.unwrap();
assert_eq!(received1.workflow_id, received2.workflow_id);
assert_eq!(received1.event_type, received2.event_type);
assert_eq!(received1.severity, received2.severity);
}
}