use std::sync::Arc;
use tokio::spawn;
use super::{Event, EventSubscriber};
struct Subscription {
subscriber: Arc<dyn EventSubscriber>,
event_types: Vec<&'static str>,
}
impl Subscription {
fn accepts(&self, event: &Event) -> bool {
self.event_types.contains(&event.event_type())
}
}
pub struct EventPublisher {
subscriptions: Vec<Subscription>,
}
impl EventPublisher {
pub fn new() -> Self {
Self {
subscriptions: Vec::new(),
}
}
pub fn subscribe(
&mut self,
subscriber: impl EventSubscriber + 'static,
event_types: &[&'static str],
) {
self.subscriptions.push(Subscription {
subscriber: Arc::new(subscriber),
event_types: event_types.to_vec(),
});
}
pub fn subscriber_count(&self) -> usize {
self.subscriptions.len()
}
pub fn publish(&self, event: Event) {
for subscription in &self.subscriptions {
if !subscription.accepts(&event) {
continue;
}
let subscriber = subscription.subscriber.clone();
let event = event.clone();
spawn(async move {
subscriber.handle(&event).await;
});
}
}
}
impl Default for EventPublisher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::notify::{SubscriberFuture, WebhookSubscriber};
use rust_decimal::Decimal;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::time::sleep;
use chrono::Utc;
use ironflow_store::models::RunStatus;
use uuid::Uuid;
fn sample_run_status_changed() -> Event {
Event::RunStatusChanged {
run_id: Uuid::now_v7(),
workflow_name: "deploy".to_string(),
from: RunStatus::Running,
to: RunStatus::Completed,
error: None,
cost_usd: Decimal::new(42, 2),
duration_ms: 5000,
at: Utc::now(),
}
}
fn sample_user_signed_in() -> Event {
Event::UserSignedIn {
user_id: Uuid::now_v7(),
username: "alice".to_string(),
at: Utc::now(),
}
}
#[test]
fn starts_empty() {
let publisher = EventPublisher::new();
assert_eq!(publisher.subscriber_count(), 0);
}
#[test]
fn subscribe_increments_count() {
let mut publisher = EventPublisher::new();
publisher.subscribe(
WebhookSubscriber::new("https://example.com"),
&[Event::RUN_STATUS_CHANGED],
);
assert_eq!(publisher.subscriber_count(), 1);
}
#[test]
fn publish_with_no_subscribers_is_noop() {
let publisher = EventPublisher::new();
publisher.publish(sample_run_status_changed());
}
#[test]
fn default_is_empty() {
let publisher = EventPublisher::default();
assert_eq!(publisher.subscriber_count(), 0);
}
struct CountingSubscriber {
count: AtomicU32,
}
impl CountingSubscriber {
fn new() -> Self {
Self {
count: AtomicU32::new(0),
}
}
fn count(&self) -> u32 {
self.count.load(Ordering::SeqCst)
}
}
impl EventSubscriber for CountingSubscriber {
fn name(&self) -> &str {
"counting"
}
fn handle<'a>(&'a self, _event: &'a Event) -> SubscriberFuture<'a> {
Box::pin(async move {
self.count.fetch_add(1, Ordering::SeqCst);
})
}
}
#[tokio::test]
async fn subscriber_receives_matching_events() {
let subscriber = Arc::new(CountingSubscriber::new());
let mut publisher = EventPublisher::new();
struct ArcSub(Arc<CountingSubscriber>);
impl EventSubscriber for ArcSub {
fn name(&self) -> &str {
self.0.name()
}
fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
self.0.handle(event)
}
}
publisher.subscribe(ArcSub(subscriber.clone()), &[Event::RUN_STATUS_CHANGED]);
publisher.publish(sample_run_status_changed()); publisher.publish(sample_user_signed_in());
sleep(Duration::from_millis(50)).await;
assert_eq!(subscriber.count(), 1);
}
#[tokio::test]
async fn all_filter_matches_everything() {
let subscriber = Arc::new(CountingSubscriber::new());
let mut publisher = EventPublisher::new();
struct ArcSub(Arc<CountingSubscriber>);
impl EventSubscriber for ArcSub {
fn name(&self) -> &str {
self.0.name()
}
fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
self.0.handle(event)
}
}
publisher.subscribe(ArcSub(subscriber.clone()), Event::ALL);
publisher.publish(sample_run_status_changed());
publisher.publish(sample_user_signed_in());
sleep(Duration::from_millis(50)).await;
assert_eq!(subscriber.count(), 2);
}
#[tokio::test]
async fn empty_filter_matches_nothing() {
let subscriber = Arc::new(CountingSubscriber::new());
let mut publisher = EventPublisher::new();
struct ArcSub(Arc<CountingSubscriber>);
impl EventSubscriber for ArcSub {
fn name(&self) -> &str {
self.0.name()
}
fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
self.0.handle(event)
}
}
publisher.subscribe(ArcSub(subscriber.clone()), &[]);
publisher.publish(sample_run_status_changed());
publisher.publish(sample_user_signed_in());
sleep(Duration::from_millis(50)).await;
assert_eq!(subscriber.count(), 0);
}
}