use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use things3_core::Result;
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
use crate::progress::ProgressUpdate;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "event_type")]
pub enum EventType {
TaskCreated {
task_id: Uuid,
},
TaskUpdated {
task_id: Uuid,
},
TaskDeleted {
task_id: Uuid,
},
TaskCompleted {
task_id: Uuid,
},
TaskCancelled {
task_id: Uuid,
},
ProjectCreated {
project_id: Uuid,
},
ProjectUpdated {
project_id: Uuid,
},
ProjectDeleted {
project_id: Uuid,
},
ProjectCompleted {
project_id: Uuid,
},
AreaCreated {
area_id: Uuid,
},
AreaUpdated {
area_id: Uuid,
},
AreaDeleted {
area_id: Uuid,
},
ProgressStarted {
operation_id: Uuid,
},
ProgressUpdated {
operation_id: Uuid,
},
ProgressCompleted {
operation_id: Uuid,
},
ProgressFailed {
operation_id: Uuid,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Event {
pub id: Uuid,
pub event_type: EventType,
pub timestamp: DateTime<Utc>,
pub data: Option<serde_json::Value>,
pub source: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EventFilter {
pub event_types: Option<Vec<EventType>>,
pub entity_ids: Option<Vec<Uuid>>,
pub sources: Option<Vec<String>>,
pub since: Option<DateTime<Utc>>,
}
impl EventFilter {
#[must_use]
pub fn matches(&self, event: &Event) -> bool {
if let Some(ref types) = self.event_types {
if !types
.iter()
.any(|t| std::mem::discriminant(t) == std::mem::discriminant(&event.event_type))
{
return false;
}
}
if let Some(ref ids) = self.entity_ids {
let event_entity_id = match &event.event_type {
EventType::TaskCreated { task_id }
| EventType::TaskUpdated { task_id }
| EventType::TaskDeleted { task_id }
| EventType::TaskCompleted { task_id }
| EventType::TaskCancelled { task_id } => Some(*task_id),
EventType::ProjectCreated { project_id }
| EventType::ProjectUpdated { project_id }
| EventType::ProjectDeleted { project_id }
| EventType::ProjectCompleted { project_id } => Some(*project_id),
EventType::AreaCreated { area_id }
| EventType::AreaUpdated { area_id }
| EventType::AreaDeleted { area_id } => Some(*area_id),
EventType::ProgressStarted { operation_id }
| EventType::ProgressUpdated { operation_id }
| EventType::ProgressCompleted { operation_id }
| EventType::ProgressFailed { operation_id } => Some(*operation_id),
};
if let Some(entity_id) = event_entity_id {
if !ids.contains(&entity_id) {
return false;
}
}
}
if let Some(ref sources) = self.sources {
if !sources.contains(&event.source) {
return false;
}
}
if let Some(since) = self.since {
if event.timestamp < since {
return false;
}
}
true
}
}
#[derive(Debug, Clone)]
pub struct EventSubscription {
pub id: Uuid,
pub filter: EventFilter,
pub sender: broadcast::Sender<Event>,
}
pub struct EventBroadcaster {
sender: broadcast::Sender<Event>,
subscriptions: Arc<RwLock<HashMap<Uuid, EventSubscription>>>,
}
impl EventBroadcaster {
#[must_use]
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
Self {
sender,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn subscribe(&self, filter: EventFilter) -> broadcast::Receiver<Event> {
let subscription_id = Uuid::new_v4();
let (sub_sender, receiver) = broadcast::channel(100);
let subscription = EventSubscription {
id: subscription_id,
filter,
sender: sub_sender,
};
{
let mut subscriptions = self.subscriptions.write().await;
subscriptions.insert(subscription_id, subscription);
}
receiver
}
pub async fn unsubscribe(&self, subscription_id: Uuid) {
let mut subscriptions = self.subscriptions.write().await;
subscriptions.remove(&subscription_id);
}
pub async fn broadcast(&self, event: Event) -> Result<()> {
let _ = self.sender.send(event.clone());
let subscriptions = self.subscriptions.read().await;
for subscription in subscriptions.values() {
if subscription.filter.matches(&event) {
let _ = subscription.sender.send(event.clone());
}
}
Ok(())
}
pub async fn broadcast_task_event(
&self,
event_type: EventType,
_task_id: Uuid,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_project_event(
&self,
event_type: EventType,
_project_id: Uuid,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_area_event(
&self,
event_type: EventType,
_area_id: Uuid,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_progress_event(
&self,
event_type: EventType,
_operation_id: Uuid,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_progress_update(
&self,
update: ProgressUpdate,
source: &str,
) -> Result<()> {
let event_type = match update.status {
crate::progress::ProgressStatus::Started => EventType::ProgressStarted {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::InProgress => EventType::ProgressUpdated {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::Completed => EventType::ProgressCompleted {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::Failed
| crate::progress::ProgressStatus::Cancelled => EventType::ProgressFailed {
operation_id: update.operation_id,
},
};
let data = serde_json::to_value(&update)?;
self.broadcast_progress_event(event_type, update.operation_id, Some(data), source)
.await
}
pub async fn subscription_count(&self) -> usize {
self.subscriptions.read().await.len()
}
#[must_use]
pub fn subscribe_all(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
impl Default for EventBroadcaster {
fn default() -> Self {
Self::new()
}
}
pub struct EventListener {
broadcaster: Arc<EventBroadcaster>,
#[allow(dead_code)]
subscriptions: Vec<Uuid>,
}
impl EventListener {
#[must_use]
pub fn new(broadcaster: Arc<EventBroadcaster>) -> Self {
Self {
broadcaster,
subscriptions: Vec::new(),
}
}
pub async fn subscribe_to_events(
&mut self,
event_types: Vec<EventType>,
) -> broadcast::Receiver<Event> {
let filter = EventFilter {
event_types: Some(event_types),
entity_ids: None,
sources: None,
since: None,
};
self.broadcaster.subscribe(filter).await
}
pub async fn subscribe_to_entity(&mut self, entity_id: Uuid) -> broadcast::Receiver<Event> {
let filter = EventFilter {
event_types: None,
entity_ids: Some(vec![entity_id]),
sources: None,
since: None,
};
self.broadcaster.subscribe(filter).await
}
#[must_use]
pub fn subscribe_to_all(&self) -> broadcast::Receiver<Event> {
self.broadcaster.subscribe_all()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_creation() {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
assert!(!event.id.is_nil());
assert_eq!(event.source, "test");
}
#[test]
fn test_event_filter_matching() {
let task_id = Uuid::new_v4();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
assert!(filter.matches(&event));
let filter_no_match = EventFilter {
event_types: Some(vec![EventType::TaskUpdated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
assert!(!filter_no_match.matches(&event));
}
#[tokio::test]
async fn test_event_broadcaster() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.id, event.id);
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_filter() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
let broadcast_result = broadcaster.broadcast(event).await;
assert!(broadcast_result.is_ok());
let received_event =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(event)) = received_event {
assert_eq!(event.source, "test");
}
}
#[tokio::test]
async fn test_progress_update_to_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Half done".to_string()),
timestamp: Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
broadcaster
.broadcast_progress_update(update, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[test]
fn test_event_filter_entity_ids() {
let task_id = Uuid::new_v4();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
let filter = EventFilter {
event_types: None,
entity_ids: Some(vec![task_id]),
sources: None,
since: None,
};
assert!(filter.matches(&event));
let filter_no_match = EventFilter {
event_types: None,
entity_ids: Some(vec![Uuid::new_v4()]),
sources: None,
since: None,
};
assert!(!filter_no_match.matches(&event));
}
#[test]
fn test_event_filter_sources() {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test_source".to_string(),
};
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: Some(vec!["test_source".to_string()]),
since: None,
};
assert!(filter.matches(&event));
let filter_no_match = EventFilter {
event_types: None,
entity_ids: None,
sources: Some(vec!["other_source".to_string()]),
since: None,
};
assert!(!filter_no_match.matches(&event));
}
#[test]
fn test_event_filter_timestamp() {
let now = Utc::now();
let past = now - chrono::Duration::hours(1);
let future = now + chrono::Duration::hours(1);
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: now,
data: None,
source: "test".to_string(),
};
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: Some(past),
};
assert!(filter.matches(&event));
let filter_no_match = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: Some(future),
};
assert!(!filter_no_match.matches(&event));
}
#[test]
fn test_event_filter_all_event_types() {
let task_id = Uuid::new_v4();
let project_id = Uuid::new_v4();
let area_id = Uuid::new_v4();
let operation_id = Uuid::new_v4();
let events = vec![
Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
},
Event {
id: Uuid::new_v4(),
event_type: EventType::ProjectCreated { project_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
},
Event {
id: Uuid::new_v4(),
event_type: EventType::AreaCreated { area_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
},
Event {
id: Uuid::new_v4(),
event_type: EventType::ProgressStarted { operation_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
},
];
for event in events {
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: None,
};
assert!(filter.matches(&event));
}
}
#[test]
fn test_event_filter_entity_id_extraction() {
let task_id = Uuid::new_v4();
let project_id = Uuid::new_v4();
let area_id = Uuid::new_v4();
let operation_id = Uuid::new_v4();
let events = vec![
(EventType::TaskCreated { task_id }, Some(task_id)),
(EventType::TaskUpdated { task_id }, Some(task_id)),
(EventType::TaskDeleted { task_id }, Some(task_id)),
(EventType::TaskCompleted { task_id }, Some(task_id)),
(EventType::TaskCancelled { task_id }, Some(task_id)),
(EventType::ProjectCreated { project_id }, Some(project_id)),
(EventType::ProjectUpdated { project_id }, Some(project_id)),
(EventType::ProjectDeleted { project_id }, Some(project_id)),
(EventType::ProjectCompleted { project_id }, Some(project_id)),
(EventType::AreaCreated { area_id }, Some(area_id)),
(EventType::AreaUpdated { area_id }, Some(area_id)),
(EventType::AreaDeleted { area_id }, Some(area_id)),
(
EventType::ProgressStarted { operation_id },
Some(operation_id),
),
(
EventType::ProgressUpdated { operation_id },
Some(operation_id),
),
(
EventType::ProgressCompleted { operation_id },
Some(operation_id),
),
(
EventType::ProgressFailed { operation_id },
Some(operation_id),
),
];
for (event_type, expected_id) in events {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
let filter = EventFilter {
event_types: None,
entity_ids: expected_id.map(|id| vec![id]),
sources: None,
since: None,
};
assert!(filter.matches(&event));
}
}
#[tokio::test]
async fn test_event_broadcaster_subscribe_all() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.id, event.id);
}
#[tokio::test]
async fn test_event_listener_creation() {
let broadcaster = EventBroadcaster::new();
let listener = EventListener::new(Arc::new(broadcaster));
assert_eq!(listener.subscriptions.len(), 0);
}
#[tokio::test]
async fn test_event_listener_subscribe_to_events() {
let broadcaster = EventBroadcaster::new();
let mut listener = EventListener::new(Arc::new(broadcaster));
let event_types = vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}];
let mut receiver = listener.subscribe_to_events(event_types).await;
assert!(receiver.try_recv().is_err());
}
#[tokio::test]
async fn test_event_listener_subscribe_to_entity() {
let broadcaster = EventBroadcaster::new();
let mut listener = EventListener::new(Arc::new(broadcaster));
let entity_id = Uuid::new_v4();
let mut receiver = listener.subscribe_to_entity(entity_id).await;
assert!(receiver.try_recv().is_err());
}
#[tokio::test]
async fn test_event_listener_subscribe_to_all() {
let broadcaster = EventBroadcaster::new();
let listener = EventListener::new(Arc::new(broadcaster));
let mut receiver = listener.subscribe_to_all();
assert!(receiver.try_recv().is_err());
}
#[test]
fn test_event_serialization() {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: Some(serde_json::json!({"key": "value"})),
source: "test".to_string(),
};
let json = serde_json::to_string(&event).unwrap();
let deserialized: Event = serde_json::from_str(&json).unwrap();
assert_eq!(event.id, deserialized.id);
assert_eq!(event.source, deserialized.source);
}
#[test]
fn test_event_filter_serialization() {
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: Some(vec![Uuid::new_v4()]),
sources: Some(vec!["test".to_string()]),
since: Some(Utc::now()),
};
let json = serde_json::to_string(&filter).unwrap();
let deserialized: EventFilter = serde_json::from_str(&json).unwrap();
assert_eq!(filter.event_types, deserialized.event_types);
assert_eq!(filter.entity_ids, deserialized.entity_ids);
assert_eq!(filter.sources, deserialized.sources);
}
#[tokio::test]
async fn test_event_broadcaster_unsubscribe() {
let broadcaster = EventBroadcaster::new();
let subscription_id = Uuid::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver = broadcaster.subscribe(filter).await;
broadcaster.unsubscribe(subscription_id).await;
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_task_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let task_id = Uuid::new_v4();
let event_type = EventType::TaskCreated { task_id };
let data = Some(serde_json::json!({"title": "Test Task"}));
broadcaster
.broadcast_task_event(event_type, task_id, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_project_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let project_id = Uuid::new_v4();
let event_type = EventType::ProjectCreated { project_id };
let data = Some(serde_json::json!({"title": "Test Project"}));
broadcaster
.broadcast_project_event(event_type, project_id, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_area_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let area_id = Uuid::new_v4();
let event_type = EventType::AreaCreated { area_id };
let data = Some(serde_json::json!({"title": "Test Area"}));
broadcaster
.broadcast_area_event(event_type, area_id, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_progress_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let operation_id = Uuid::new_v4();
let event_type = EventType::ProgressStarted { operation_id };
let data = Some(serde_json::json!({"message": "Starting operation"}));
broadcaster
.broadcast_progress_event(event_type, operation_id, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_progress_update() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Half done".to_string()),
timestamp: Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
broadcaster
.broadcast_progress_update(update, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_filtered_subscription() {
let broadcaster = EventBroadcaster::new();
let task_id = Uuid::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(), }]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_entity_id_filter() {
let broadcaster = EventBroadcaster::new();
let task_id = Uuid::new_v4();
let filter = EventFilter {
event_types: None,
entity_ids: Some(vec![task_id]),
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_source_filter() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: Some(vec!["test_source".to_string()]),
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test_source".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test_source");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_timestamp_filter() {
let broadcaster = EventBroadcaster::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: Some(past_time),
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_filter_no_match() {
let broadcaster = EventBroadcaster::new();
let task_id = Uuid::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskUpdated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
assert!(result.is_err()); }
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_broadcast_error_handling() {
let broadcaster = EventBroadcaster::new();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: Some(serde_json::json!({"test": "data"})),
source: "test".to_string(),
};
let result = broadcaster.broadcast(event).await;
assert!(result.is_ok());
}
#[test]
fn test_event_subscription_creation() {
let subscription_id = Uuid::new_v4();
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: None,
};
let (sender, _receiver) = broadcast::channel(100);
let subscription = EventSubscription {
id: subscription_id,
filter,
sender,
};
assert_eq!(subscription.id, subscription_id);
}
#[tokio::test]
async fn test_event_listener_with_actual_broadcaster() {
let broadcaster = Arc::new(EventBroadcaster::new());
let mut listener = EventListener::new(broadcaster);
let event_types = vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}];
let mut receiver = listener.subscribe_to_events(event_types).await;
assert!(receiver.try_recv().is_err());
}
#[tokio::test]
async fn test_event_listener_subscribe_to_entity_with_actual_broadcaster() {
let broadcaster = Arc::new(EventBroadcaster::new());
let mut listener = EventListener::new(broadcaster);
let entity_id = Uuid::new_v4();
let mut receiver = listener.subscribe_to_entity(entity_id).await;
assert!(receiver.try_recv().is_err());
}
#[tokio::test]
async fn test_event_listener_subscribe_to_all_with_actual_broadcaster() {
let broadcaster = Arc::new(EventBroadcaster::new());
let listener = EventListener::new(broadcaster);
let mut receiver = listener.subscribe_to_all();
assert!(receiver.try_recv().is_err());
}
#[test]
fn test_all_event_types_creation() {
let task_id = Uuid::new_v4();
let project_id = Uuid::new_v4();
let area_id = Uuid::new_v4();
let operation_id = Uuid::new_v4();
let _ = EventType::TaskCreated { task_id };
let _ = EventType::TaskUpdated { task_id };
let _ = EventType::TaskDeleted { task_id };
let _ = EventType::TaskCompleted { task_id };
let _ = EventType::TaskCancelled { task_id };
let _ = EventType::ProjectCreated { project_id };
let _ = EventType::ProjectUpdated { project_id };
let _ = EventType::ProjectDeleted { project_id };
let _ = EventType::ProjectCompleted { project_id };
let _ = EventType::AreaCreated { area_id };
let _ = EventType::AreaUpdated { area_id };
let _ = EventType::AreaDeleted { area_id };
let _ = EventType::ProgressStarted { operation_id };
let _ = EventType::ProgressUpdated { operation_id };
let _ = EventType::ProgressCompleted { operation_id };
let _ = EventType::ProgressFailed { operation_id };
}
#[test]
fn test_event_creation_with_data() {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
data: Some(serde_json::json!({"key": "value"})),
source: "test".to_string(),
};
assert!(!event.id.is_nil());
assert_eq!(event.source, "test");
assert!(event.data.is_some());
}
#[test]
fn test_event_filter_creation() {
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: Some(vec![Uuid::new_v4()]),
sources: Some(vec!["test".to_string()]),
since: Some(Utc::now()),
};
assert!(filter.event_types.is_some());
assert!(filter.entity_ids.is_some());
assert!(filter.sources.is_some());
assert!(filter.since.is_some());
}
#[tokio::test]
async fn test_event_broadcaster_subscription_count() {
let broadcaster = EventBroadcaster::new();
assert_eq!(broadcaster.subscription_count().await, 0);
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver = broadcaster.subscribe(filter).await;
assert_eq!(broadcaster.subscription_count().await, 1);
let filter2 = EventFilter {
event_types: Some(vec![EventType::ProjectCreated {
project_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver2 = broadcaster.subscribe(filter2).await;
assert_eq!(broadcaster.subscription_count().await, 2);
}
#[tokio::test]
async fn test_event_filter_matching_with_timestamp() {
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: Some(Utc::now() - chrono::Duration::hours(1)),
};
let event = Event {
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test".to_string(),
timestamp: Utc::now(),
data: None,
};
assert!(filter.matches(&event));
}
#[tokio::test]
async fn test_event_filter_matching_with_sources() {
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: Some(vec!["test_source".to_string()]),
since: None,
};
let event = Event {
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test_source".to_string(),
timestamp: Utc::now(),
data: None,
};
assert!(filter.matches(&event));
}
#[tokio::test]
async fn test_event_filter_matching_with_entity_ids() {
let entity_id = Uuid::new_v4();
let filter = EventFilter {
event_types: None,
entity_ids: Some(vec![entity_id]),
sources: None,
since: None,
};
let event = Event {
event_type: EventType::TaskCreated { task_id: entity_id },
id: entity_id,
source: "test".to_string(),
timestamp: Utc::now(),
data: None,
};
assert!(filter.matches(&event));
}
#[tokio::test]
async fn test_event_filter_matching_no_match() {
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let event = Event {
event_type: EventType::ProjectCreated {
project_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test".to_string(),
timestamp: Utc::now(),
data: None,
};
assert!(!filter.matches(&event));
}
#[tokio::test]
async fn test_event_filter_matching_empty_filter() {
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: None,
};
let event = Event {
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test".to_string(),
timestamp: Utc::now(),
data: None,
};
assert!(filter.matches(&event));
}
#[tokio::test]
async fn test_event_creation_without_data() {
let event = Event {
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test".to_string(),
timestamp: Utc::now(),
data: None,
};
assert_eq!(event.source, "test");
assert!(event.data.is_none());
}
#[tokio::test]
async fn test_event_type_entity_id_extraction_comprehensive() {
let task_id = Uuid::new_v4();
let project_id = Uuid::new_v4();
let area_id = Uuid::new_v4();
let operation_id = Uuid::new_v4();
let events = vec![
EventType::TaskCreated { task_id },
EventType::TaskUpdated { task_id },
EventType::TaskDeleted { task_id },
EventType::TaskCompleted { task_id },
EventType::TaskCancelled { task_id },
EventType::ProjectCreated { project_id },
EventType::ProjectUpdated { project_id },
EventType::ProjectDeleted { project_id },
EventType::ProjectCompleted { project_id },
EventType::AreaCreated { area_id },
EventType::AreaUpdated { area_id },
EventType::AreaDeleted { area_id },
EventType::ProgressStarted { operation_id },
EventType::ProgressUpdated { operation_id },
EventType::ProgressCompleted { operation_id },
EventType::ProgressFailed { operation_id },
];
for event_type in events {
let extracted_id = match event_type {
EventType::TaskCreated { task_id }
| EventType::TaskUpdated { task_id }
| EventType::TaskDeleted { task_id }
| EventType::TaskCompleted { task_id }
| EventType::TaskCancelled { task_id } => Some(task_id),
EventType::ProjectCreated { project_id }
| EventType::ProjectUpdated { project_id }
| EventType::ProjectDeleted { project_id }
| EventType::ProjectCompleted { project_id } => Some(project_id),
EventType::AreaCreated { area_id }
| EventType::AreaUpdated { area_id }
| EventType::AreaDeleted { area_id } => Some(area_id),
EventType::ProgressStarted { operation_id }
| EventType::ProgressUpdated { operation_id }
| EventType::ProgressCompleted { operation_id }
| EventType::ProgressFailed { operation_id } => Some(operation_id),
};
assert!(extracted_id.is_some());
}
}
#[tokio::test]
async fn test_event_serialization_roundtrip() {
let original_event = Event {
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
id: Uuid::new_v4(),
source: "test".to_string(),
timestamp: Utc::now(),
data: Some(serde_json::json!({"title": "Test Task"})),
};
let json = serde_json::to_string(&original_event).unwrap();
let deserialized_event: Event = serde_json::from_str(&json).unwrap();
assert_eq!(original_event.event_type, deserialized_event.event_type);
assert_eq!(original_event.id, deserialized_event.id);
assert_eq!(original_event.source, deserialized_event.source);
assert_eq!(original_event.data, deserialized_event.data);
}
#[tokio::test]
async fn test_event_filter_serialization_roundtrip() {
let original_filter = EventFilter {
event_types: Some(vec![
EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
EventType::ProjectCreated {
project_id: Uuid::new_v4(),
},
]),
entity_ids: Some(vec![Uuid::new_v4(), Uuid::new_v4()]),
sources: Some(vec![
"test_source".to_string(),
"another_source".to_string(),
]),
since: Some(Utc::now()),
};
let json = serde_json::to_string(&original_filter).unwrap();
let deserialized_filter: EventFilter = serde_json::from_str(&json).unwrap();
assert_eq!(original_filter.event_types, deserialized_filter.event_types);
assert_eq!(original_filter.entity_ids, deserialized_filter.entity_ids);
assert_eq!(original_filter.sources, deserialized_filter.sources);
assert_eq!(original_filter.since, deserialized_filter.since);
}
#[tokio::test]
async fn test_event_broadcaster_multiple_subscribers() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter::default();
let mut subscriber1 = broadcaster.subscribe(filter.clone()).await;
let mut subscriber2 = broadcaster.subscribe(filter.clone()).await;
let mut subscriber3 = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received1 = subscriber1.try_recv().unwrap();
let received2 = subscriber2.try_recv().unwrap();
let received3 = subscriber3.try_recv().unwrap();
assert_eq!(received1.id, event.id);
assert_eq!(received2.id, event.id);
assert_eq!(received3.id, event.id);
}
#[tokio::test]
async fn test_event_broadcaster_with_different_filters() {
let broadcaster = EventBroadcaster::new();
let task_filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
..Default::default()
};
let project_filter = EventFilter {
event_types: Some(vec![EventType::ProjectCreated {
project_id: Uuid::new_v4(),
}]),
..Default::default()
};
let mut task_subscriber = broadcaster.subscribe(task_filter).await;
let mut project_subscriber = broadcaster.subscribe(project_filter).await;
let task_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(task_event.clone()).await.unwrap();
let received = task_subscriber.try_recv().unwrap();
assert_eq!(received, task_event);
assert!(project_subscriber.try_recv().is_err());
}
#[tokio::test]
async fn test_event_broadcaster_with_entity_id_filters() {
let broadcaster = EventBroadcaster::new();
let task_id = Uuid::new_v4();
let filter = EventFilter {
entity_ids: Some(vec![task_id]),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_with_source_filters() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
sources: Some(vec!["test_source".to_string()]),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: "test_source".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_with_timestamp_filters() {
let broadcaster = EventBroadcaster::new();
let now = Utc::now();
let start_time = now - chrono::Duration::minutes(5);
let _end_time = now + chrono::Duration::minutes(5);
let filter = EventFilter {
since: Some(start_time),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: now,
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_concurrent_subscriptions() {
let broadcaster = Arc::new(EventBroadcaster::new());
let mut handles = vec![];
for i in 0..10 {
let broadcaster_clone = broadcaster.clone();
let handle = tokio::spawn(async move {
let filter = EventFilter::default();
let mut subscriber = broadcaster_clone.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: format!("test_{i}"),
data: None,
};
broadcaster_clone.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received.source, format!("test_{i}"));
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_event_broadcaster_filter_combinations() {
let broadcaster = EventBroadcaster::new();
let task_id = Uuid::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: Uuid::new_v4(),
}]),
entity_ids: Some(vec![task_id]),
sources: Some(vec!["test_source".to_string()]),
since: Some(Utc::now() - chrono::Duration::hours(1)),
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
source: "test_source".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_large_message_handling() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
let large_data = serde_json::Value::String("x".repeat(10000));
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: Some(large_data),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_rapid_events() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
for i in 0..100 {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: format!("test_{i}"),
data: None,
};
broadcaster.broadcast(event).await.unwrap();
}
let mut received_count = 0;
while subscriber.try_recv().is_ok() {
received_count += 1;
}
assert_eq!(received_count, 100);
}
#[tokio::test]
async fn test_event_broadcaster_edge_cases() {
let broadcaster = EventBroadcaster::new();
let empty_filter = EventFilter::default();
let mut subscriber = broadcaster.subscribe(empty_filter).await;
let minimal_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: String::new(),
data: None,
};
broadcaster.broadcast(minimal_event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, minimal_event);
}
#[tokio::test]
async fn test_event_broadcaster_all_event_types() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
let event_types = vec![
EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
EventType::TaskUpdated {
task_id: Uuid::new_v4(),
},
EventType::TaskDeleted {
task_id: Uuid::new_v4(),
},
EventType::TaskCompleted {
task_id: Uuid::new_v4(),
},
EventType::TaskCancelled {
task_id: Uuid::new_v4(),
},
EventType::ProjectCreated {
project_id: Uuid::new_v4(),
},
EventType::ProjectUpdated {
project_id: Uuid::new_v4(),
},
EventType::ProjectDeleted {
project_id: Uuid::new_v4(),
},
EventType::ProjectCompleted {
project_id: Uuid::new_v4(),
},
EventType::AreaCreated {
area_id: Uuid::new_v4(),
},
EventType::AreaUpdated {
area_id: Uuid::new_v4(),
},
EventType::AreaDeleted {
area_id: Uuid::new_v4(),
},
EventType::ProgressStarted {
operation_id: Uuid::new_v4(),
},
EventType::ProgressUpdated {
operation_id: Uuid::new_v4(),
},
EventType::ProgressCompleted {
operation_id: Uuid::new_v4(),
},
EventType::ProgressFailed {
operation_id: Uuid::new_v4(),
},
];
for event_type in event_types {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received.event_type, event.event_type);
}
}
#[tokio::test]
async fn test_event_broadcaster_filter_edge_cases() {
let broadcaster = EventBroadcaster::new();
let comprehensive_filter = EventFilter {
event_types: Some(vec![
EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
EventType::ProjectCreated {
project_id: Uuid::new_v4(),
},
]),
entity_ids: Some(vec![Uuid::new_v4(), Uuid::new_v4()]),
sources: Some(vec!["source1".to_string(), "source2".to_string()]),
since: Some(Utc::now() - chrono::Duration::hours(1)),
};
let mut subscriber = broadcaster.subscribe(comprehensive_filter).await;
let matching_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: Uuid::new_v4(),
},
timestamp: Utc::now(),
source: "source1".to_string(),
data: Some(serde_json::json!({"key": "value"})),
};
broadcaster.broadcast(matching_event.clone()).await.unwrap();
let received = subscriber.try_recv();
if let Ok(received_event) = received {
assert_eq!(received_event.id, matching_event.id);
}
}
}