use std::collections::HashMap;
use std::sync::Arc;
use things3_core::Result;
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
use crate::progress::ProgressUpdate;
use super::filter::{EventFilter, EventSubscription};
use super::types::{Event, EventType};
pub struct EventBroadcaster {
sender: broadcast::Sender<Event>,
subscriptions: Arc<RwLock<HashMap<Uuid, EventSubscription>>>,
}
impl EventBroadcaster {
#[must_use]
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
Self {
sender,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn subscribe(&self, filter: EventFilter) -> broadcast::Receiver<Event> {
let subscription_id = Uuid::new_v4();
let (sub_sender, receiver) = broadcast::channel(100);
let subscription = EventSubscription {
id: subscription_id,
filter,
sender: sub_sender,
};
{
let mut subscriptions = self.subscriptions.write().await;
subscriptions.insert(subscription_id, subscription);
}
receiver
}
pub async fn unsubscribe(&self, subscription_id: Uuid) {
let mut subscriptions = self.subscriptions.write().await;
subscriptions.remove(&subscription_id);
}
pub async fn broadcast(&self, event: Event) -> Result<()> {
let _ = self.sender.send(event.clone());
let subscriptions = self.subscriptions.read().await;
for subscription in subscriptions.values() {
if subscription.filter.matches(&event) {
let _ = subscription.sender.send(event.clone());
}
}
Ok(())
}
pub async fn broadcast_task_event(
&self,
event_type: EventType,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: chrono::Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_project_event(
&self,
event_type: EventType,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: chrono::Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_area_event(
&self,
event_type: EventType,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: chrono::Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_progress_event(
&self,
event_type: EventType,
data: Option<serde_json::Value>,
source: &str,
) -> Result<()> {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: chrono::Utc::now(),
data,
source: source.to_string(),
};
self.broadcast(event).await
}
pub async fn broadcast_progress_update(
&self,
update: ProgressUpdate,
source: &str,
) -> Result<()> {
let event_type = match update.status {
crate::progress::ProgressStatus::Started => EventType::ProgressStarted {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::InProgress => EventType::ProgressUpdated {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::Completed => EventType::ProgressCompleted {
operation_id: update.operation_id,
},
crate::progress::ProgressStatus::Failed
| crate::progress::ProgressStatus::Cancelled => EventType::ProgressFailed {
operation_id: update.operation_id,
},
};
let data = serde_json::to_value(&update)?;
self.broadcast_progress_event(event_type, Some(data), source)
.await
}
pub async fn subscription_count(&self) -> usize {
self.subscriptions.read().await.len()
}
#[must_use]
pub fn subscribe_all(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
impl Default for EventBroadcaster {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::{Event, EventFilter, EventType};
use chrono::Utc;
use std::sync::Arc;
use things3_core::ThingsId;
use uuid::Uuid;
#[tokio::test]
async fn test_event_broadcaster() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.id, event.id);
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_filter() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
let broadcast_result = broadcaster.broadcast(event).await;
assert!(broadcast_result.is_ok());
let received_event =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(event)) = received_event {
assert_eq!(event.source, "test");
}
}
#[tokio::test]
async fn test_progress_update_to_event() {
use crate::progress::ProgressUpdate;
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Half done".to_string()),
timestamp: Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
broadcaster
.broadcast_progress_update(update, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_subscribe_all() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.id, event.id);
}
#[tokio::test]
async fn test_event_broadcaster_unsubscribe() {
let broadcaster = EventBroadcaster::new();
let subscription_id = Uuid::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver = broadcaster.subscribe(filter).await;
broadcaster.unsubscribe(subscription_id).await;
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_task_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let task_id = ThingsId::new_v4();
let event_type = EventType::TaskCreated {
task_id: task_id.clone(),
};
let data = Some(serde_json::json!({"title": "Test Task"}));
broadcaster
.broadcast_task_event(event_type, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_project_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let project_id = ThingsId::new_v4();
let event_type = EventType::ProjectCreated {
project_id: project_id.clone(),
};
let data = Some(serde_json::json!({"title": "Test Project"}));
broadcaster
.broadcast_project_event(event_type, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_area_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let area_id = ThingsId::new_v4();
let event_type = EventType::AreaCreated {
area_id: area_id.clone(),
};
let data = Some(serde_json::json!({"title": "Test Area"}));
broadcaster
.broadcast_area_event(event_type, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_progress_event() {
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let operation_id = Uuid::new_v4();
let event_type = EventType::ProgressStarted { operation_id };
let data = Some(serde_json::json!({"message": "Starting operation"}));
broadcaster
.broadcast_progress_event(event_type, data, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
async fn test_event_broadcaster_broadcast_progress_update() {
use crate::progress::ProgressUpdate;
let broadcaster = EventBroadcaster::new();
let mut receiver = broadcaster.subscribe_all();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Half done".to_string()),
timestamp: Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
broadcaster
.broadcast_progress_update(update, "test")
.await
.unwrap();
let received_event = receiver.recv().await.unwrap();
assert_eq!(received_event.source, "test");
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_filtered_subscription() {
let broadcaster = EventBroadcaster::new();
let task_id = ThingsId::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(), }]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_entity_id_filter() {
let broadcaster = EventBroadcaster::new();
let task_id = ThingsId::new_v4();
let filter = EventFilter {
event_types: None,
entity_ids: Some(vec![task_id.clone()]),
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_source_filter() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: Some(vec!["test_source".to_string()]),
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test_source".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test_source");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_with_timestamp_filter() {
let broadcaster = EventBroadcaster::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
let filter = EventFilter {
event_types: None,
entity_ids: None,
sources: None,
since: Some(past_time),
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
if let Ok(Ok(received_event)) = result {
assert_eq!(received_event.source, "test");
} else {
}
}
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_filter_no_match() {
let broadcaster = EventBroadcaster::new();
let task_id = ThingsId::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskUpdated {
task_id: ThingsId::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let mut receiver = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
data: None,
source: "test".to_string(),
};
broadcaster.broadcast(event).await.unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), receiver.recv()).await;
assert!(result.is_err()); }
#[tokio::test]
#[ignore = "This test is flaky due to async timing issues"]
async fn test_event_broadcaster_broadcast_error_handling() {
let broadcaster = EventBroadcaster::new();
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
data: Some(serde_json::json!({"test": "data"})),
source: "test".to_string(),
};
let result = broadcaster.broadcast(event).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_event_broadcaster_subscription_count() {
let broadcaster = EventBroadcaster::new();
assert_eq!(broadcaster.subscription_count().await, 0);
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver = broadcaster.subscribe(filter).await;
assert_eq!(broadcaster.subscription_count().await, 1);
let filter2 = EventFilter {
event_types: Some(vec![EventType::ProjectCreated {
project_id: ThingsId::new_v4(),
}]),
entity_ids: None,
sources: None,
since: None,
};
let _receiver2 = broadcaster.subscribe(filter2).await;
assert_eq!(broadcaster.subscription_count().await, 2);
}
#[tokio::test]
async fn test_event_broadcaster_multiple_subscribers() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter::default();
let mut subscriber1 = broadcaster.subscribe(filter.clone()).await;
let mut subscriber2 = broadcaster.subscribe(filter.clone()).await;
let mut subscriber3 = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received1 = subscriber1.try_recv().unwrap();
let received2 = subscriber2.try_recv().unwrap();
let received3 = subscriber3.try_recv().unwrap();
assert_eq!(received1.id, event.id);
assert_eq!(received2.id, event.id);
assert_eq!(received3.id, event.id);
}
#[tokio::test]
async fn test_event_broadcaster_with_different_filters() {
let broadcaster = EventBroadcaster::new();
let task_filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(),
}]),
..Default::default()
};
let project_filter = EventFilter {
event_types: Some(vec![EventType::ProjectCreated {
project_id: ThingsId::new_v4(),
}]),
..Default::default()
};
let mut task_subscriber = broadcaster.subscribe(task_filter).await;
let mut project_subscriber = broadcaster.subscribe(project_filter).await;
let task_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(task_event.clone()).await.unwrap();
let received = task_subscriber.try_recv().unwrap();
assert_eq!(received, task_event);
assert!(project_subscriber.try_recv().is_err());
}
#[tokio::test]
async fn test_event_broadcaster_with_entity_id_filters() {
let broadcaster = EventBroadcaster::new();
let task_id = ThingsId::new_v4();
let filter = EventFilter {
entity_ids: Some(vec![task_id.clone()]),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_with_source_filters() {
let broadcaster = EventBroadcaster::new();
let filter = EventFilter {
sources: Some(vec!["test_source".to_string()]),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: "test_source".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_with_timestamp_filters() {
let broadcaster = EventBroadcaster::new();
let now = Utc::now();
let start_time = now - chrono::Duration::minutes(5);
let _end_time = now + chrono::Duration::minutes(5);
let filter = EventFilter {
since: Some(start_time),
..Default::default()
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: now,
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_concurrent_subscriptions() {
let broadcaster = Arc::new(EventBroadcaster::new());
let mut handles = vec![];
for i in 0..10 {
let broadcaster_clone = broadcaster.clone();
let handle = tokio::spawn(async move {
let filter = EventFilter::default();
let mut subscriber = broadcaster_clone.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: format!("test_{i}"),
data: None,
};
broadcaster_clone.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received.source, format!("test_{i}"));
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_event_broadcaster_filter_combinations() {
let broadcaster = EventBroadcaster::new();
let task_id = ThingsId::new_v4();
let filter = EventFilter {
event_types: Some(vec![EventType::TaskCreated {
task_id: ThingsId::new_v4(),
}]),
entity_ids: Some(vec![task_id.clone()]),
sources: Some(vec!["test_source".to_string()]),
since: Some(Utc::now() - chrono::Duration::hours(1)),
};
let mut subscriber = broadcaster.subscribe(filter).await;
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated { task_id },
timestamp: Utc::now(),
source: "test_source".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_large_message_handling() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
let large_data = serde_json::Value::String("x".repeat(10000));
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: "test".to_string(),
data: Some(large_data),
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn test_event_broadcaster_rapid_events() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
for i in 0..100 {
let event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: format!("test_{i}"),
data: None,
};
broadcaster.broadcast(event).await.unwrap();
}
let mut received_count = 0;
while subscriber.try_recv().is_ok() {
received_count += 1;
}
assert_eq!(received_count, 100);
}
#[tokio::test]
async fn test_event_broadcaster_edge_cases() {
let broadcaster = EventBroadcaster::new();
let empty_filter = EventFilter::default();
let mut subscriber = broadcaster.subscribe(empty_filter).await;
let minimal_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: String::new(),
data: None,
};
broadcaster.broadcast(minimal_event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received, minimal_event);
}
#[tokio::test]
async fn test_event_broadcaster_all_event_types() {
let broadcaster = EventBroadcaster::new();
let mut subscriber = broadcaster.subscribe(EventFilter::default()).await;
let event_types = vec![
EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
EventType::TaskUpdated {
task_id: ThingsId::new_v4(),
},
EventType::TaskDeleted {
task_id: ThingsId::new_v4(),
},
EventType::TaskCompleted {
task_id: ThingsId::new_v4(),
},
EventType::TaskCancelled {
task_id: ThingsId::new_v4(),
},
EventType::ProjectCreated {
project_id: ThingsId::new_v4(),
},
EventType::ProjectUpdated {
project_id: ThingsId::new_v4(),
},
EventType::ProjectDeleted {
project_id: ThingsId::new_v4(),
},
EventType::ProjectCompleted {
project_id: ThingsId::new_v4(),
},
EventType::AreaCreated {
area_id: ThingsId::new_v4(),
},
EventType::AreaUpdated {
area_id: ThingsId::new_v4(),
},
EventType::AreaDeleted {
area_id: ThingsId::new_v4(),
},
EventType::ProgressStarted {
operation_id: Uuid::new_v4(),
},
EventType::ProgressUpdated {
operation_id: Uuid::new_v4(),
},
EventType::ProgressCompleted {
operation_id: Uuid::new_v4(),
},
EventType::ProgressFailed {
operation_id: Uuid::new_v4(),
},
];
for event_type in event_types {
let event = Event {
id: Uuid::new_v4(),
event_type,
timestamp: Utc::now(),
source: "test".to_string(),
data: None,
};
broadcaster.broadcast(event.clone()).await.unwrap();
let received = subscriber.try_recv().unwrap();
assert_eq!(received.event_type, event.event_type);
}
}
#[tokio::test]
async fn test_event_broadcaster_filter_edge_cases() {
let broadcaster = EventBroadcaster::new();
let comprehensive_filter = EventFilter {
event_types: Some(vec![
EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
EventType::ProjectCreated {
project_id: ThingsId::new_v4(),
},
]),
entity_ids: Some(vec![ThingsId::new_v4(), ThingsId::new_v4()]),
sources: Some(vec!["source1".to_string(), "source2".to_string()]),
since: Some(Utc::now() - chrono::Duration::hours(1)),
};
let mut subscriber = broadcaster.subscribe(comprehensive_filter).await;
let matching_event = Event {
id: Uuid::new_v4(),
event_type: EventType::TaskCreated {
task_id: ThingsId::new_v4(),
},
timestamp: Utc::now(),
source: "source1".to_string(),
data: Some(serde_json::json!({"key": "value"})),
};
broadcaster.broadcast(matching_event.clone()).await.unwrap();
let received = subscriber.try_recv();
if let Ok(received_event) = received {
assert_eq!(received_event.id, matching_event.id);
}
}
}