Skip to main content

a3s_event/
source.rs

1//! Event sources — adapters that produce events from external signals
2//!
3//! `EventSource` defines a standard interface for components that generate
4//! events on a schedule, from webhooks, or from metric thresholds.
5//! Sources emit events through a sender channel that the EventBus or
6//! Broker can consume.
7
8use crate::error::Result;
9use crate::types::Event;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Notify};
13
14/// Trait for event sources
15///
16/// An event source generates events from external signals and sends
17/// them through the provided channel. Sources run asynchronously and
18/// can be stopped gracefully.
19#[async_trait]
20pub trait EventSource: Send + Sync {
21    /// Start the source, emitting events through the sender
22    ///
23    /// This method runs until `stop()` is called or the sender is dropped.
24    /// Implementations should handle errors gracefully (log and continue).
25    async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()>;
26
27    /// Signal the source to stop
28    async fn stop(&self) -> Result<()>;
29
30    /// Human-readable source name
31    fn name(&self) -> &str;
32}
33
34/// Type alias for the event factory function used by CronSource
35type EventFactory = dyn Fn() -> Event + Send + Sync;
36
37/// Event source that emits events on a fixed interval
38///
39/// Uses `tokio::time::interval` for scheduling and `Notify` for
40/// graceful shutdown.
41pub struct CronSource {
42    name: String,
43    interval: std::time::Duration,
44    factory: Arc<EventFactory>,
45    stop_signal: Arc<Notify>,
46}
47
48impl CronSource {
49    /// Create a new cron source
50    ///
51    /// - `name` — source identifier
52    /// - `interval` — time between event emissions
53    /// - `factory` — closure that creates each event
54    pub fn new<F>(
55        name: impl Into<String>,
56        interval: std::time::Duration,
57        factory: F,
58    ) -> Self
59    where
60        F: Fn() -> Event + Send + Sync + 'static,
61    {
62        Self {
63            name: name.into(),
64            interval,
65            factory: Arc::new(factory),
66            stop_signal: Arc::new(Notify::new()),
67        }
68    }
69}
70
71#[async_trait]
72impl EventSource for CronSource {
73    async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()> {
74        let mut interval = tokio::time::interval(self.interval);
75        let factory = self.factory.clone();
76        let stop = self.stop_signal.clone();
77        let name = self.name.clone();
78
79        // Consume the first tick (fires immediately)
80        interval.tick().await;
81
82        loop {
83            tokio::select! {
84                _ = interval.tick() => {
85                    let event = (factory)();
86                    if sender.send(event).await.is_err() {
87                        tracing::debug!(source = %name, "CronSource sender closed, stopping");
88                        break;
89                    }
90                }
91                _ = stop.notified() => {
92                    tracing::debug!(source = %name, "CronSource received stop signal");
93                    break;
94                }
95            }
96        }
97
98        Ok(())
99    }
100
101    async fn stop(&self) -> Result<()> {
102        self.stop_signal.notify_one();
103        Ok(())
104    }
105
106    fn name(&self) -> &str {
107        &self.name
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use std::sync::atomic::{AtomicU32, Ordering};
115
116    #[tokio::test]
117    async fn test_cron_source_emits_events() {
118        let counter = Arc::new(AtomicU32::new(0));
119        let counter_clone = counter.clone();
120
121        let source = CronSource::new(
122            "test-cron",
123            std::time::Duration::from_millis(50),
124            move || {
125                let n = counter_clone.fetch_add(1, Ordering::SeqCst);
126                Event::new(
127                    format!("events.cron.tick.{}", n),
128                    "cron",
129                    format!("Tick {}", n),
130                    "cron-source",
131                    serde_json::json!({"tick": n}),
132                )
133            },
134        );
135
136        assert_eq!(source.name(), "test-cron");
137
138        let (tx, mut rx) = mpsc::channel(100);
139
140        // Start source in background
141        let source_handle = {
142            let source_ref = &source;
143            let tx = tx.clone();
144            tokio::spawn({
145                let _name = source_ref.name().to_string();
146                let interval = source_ref.interval;
147                let factory = source_ref.factory.clone();
148                let stop = source_ref.stop_signal.clone();
149
150                async move {
151                    let mut interval = tokio::time::interval(interval);
152                    interval.tick().await; // consume first immediate tick
153
154                    loop {
155                        tokio::select! {
156                            _ = interval.tick() => {
157                                let event = (factory)();
158                                if tx.send(event).await.is_err() {
159                                    break;
160                                }
161                            }
162                            _ = stop.notified() => {
163                                break;
164                            }
165                        }
166                    }
167                }
168            })
169        };
170
171        // Wait for a few events
172        tokio::time::sleep(std::time::Duration::from_millis(180)).await;
173        source.stop().await.unwrap();
174        source_handle.await.unwrap();
175
176        // Collect received events
177        let mut events = Vec::new();
178        while let Ok(event) = rx.try_recv() {
179            events.push(event);
180        }
181
182        // Should have received at least 2 events in 180ms with 50ms interval
183        assert!(events.len() >= 2, "Expected >= 2 events, got {}", events.len());
184        assert!(events[0].subject.starts_with("events.cron.tick."));
185    }
186
187    #[tokio::test]
188    async fn test_cron_source_stop() {
189        let source = CronSource::new(
190            "stoppable",
191            std::time::Duration::from_millis(10),
192            || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
193        );
194
195        let (tx, _rx) = mpsc::channel(100);
196
197        let stop = source.stop_signal.clone();
198        let factory = source.factory.clone();
199        let interval = source.interval;
200
201        let handle = tokio::spawn(async move {
202            let mut interval_timer = tokio::time::interval(interval);
203            interval_timer.tick().await;
204
205            loop {
206                tokio::select! {
207                    _ = interval_timer.tick() => {
208                        let event = (factory)();
209                        if tx.send(event).await.is_err() {
210                            break;
211                        }
212                    }
213                    _ = stop.notified() => {
214                        break;
215                    }
216                }
217            }
218        });
219
220        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
221        source.stop().await.unwrap();
222
223        // Should complete without hanging
224        tokio::time::timeout(std::time::Duration::from_secs(1), handle)
225            .await
226            .unwrap()
227            .unwrap();
228    }
229
230    #[tokio::test]
231    async fn test_cron_source_sender_closed() {
232        let source = CronSource::new(
233            "closed-sender",
234            std::time::Duration::from_millis(10),
235            || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
236        );
237
238        let (tx, rx) = mpsc::channel(1);
239        drop(rx); // Close receiver immediately
240
241        // start should complete without error when sender is closed
242        let result = source.start(tx).await;
243        assert!(result.is_ok());
244    }
245
246    #[tokio::test]
247    async fn test_cron_source_name() {
248        let source = CronSource::new(
249            "health-sweep",
250            std::time::Duration::from_secs(30),
251            || Event::new("events.health.sweep", "health", "Sweep", "src", serde_json::json!({})),
252        );
253        assert_eq!(source.name(), "health-sweep");
254    }
255
256    #[test]
257    fn test_event_source_is_send_sync() {
258        fn assert_send_sync<T: Send + Sync>() {}
259        assert_send_sync::<CronSource>();
260    }
261}