Skip to main content

a3s_event/
source.rs

1//! Event sources — adapters that produce events from external signals
2//!
3//! `EventSource` defines a standard interface for components that generate
4//! events on a schedule, from webhooks, or from metric thresholds.
5//! Sources emit events through a sender channel that the EventBus or
6//! Broker can consume.
7
8use crate::error::Result;
9use crate::types::Event;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Notify};
13
14/// Trait for event sources
15///
16/// An event source generates events from external signals and sends
17/// them through the provided channel. Sources run asynchronously and
18/// can be stopped gracefully.
19#[async_trait]
20pub trait EventSource: Send + Sync {
21    /// Start the source, emitting events through the sender
22    ///
23    /// This method runs until `stop()` is called or the sender is dropped.
24    /// Implementations should handle errors gracefully (log and continue).
25    async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()>;
26
27    /// Signal the source to stop
28    async fn stop(&self) -> Result<()>;
29
30    /// Human-readable source name
31    fn name(&self) -> &str;
32}
33
34/// Type alias for the event factory function used by CronSource
35type EventFactory = dyn Fn() -> Event + Send + Sync;
36
37/// Event source that emits events on a fixed interval
38///
39/// Uses `tokio::time::interval` for scheduling and `Notify` for
40/// graceful shutdown.
41pub struct CronSource {
42    name: String,
43    interval: std::time::Duration,
44    factory: Arc<EventFactory>,
45    stop_signal: Arc<Notify>,
46}
47
48impl CronSource {
49    /// Create a new cron source
50    ///
51    /// - `name` — source identifier
52    /// - `interval` — time between event emissions
53    /// - `factory` — closure that creates each event
54    pub fn new<F>(
55        name: impl Into<String>,
56        interval: std::time::Duration,
57        factory: F,
58    ) -> Self
59    where
60        F: Fn() -> Event + Send + Sync + 'static,
61    {
62        Self {
63            name: name.into(),
64            interval,
65            factory: Arc::new(factory),
66            stop_signal: Arc::new(Notify::new()),
67        }
68    }
69}
70
71#[async_trait]
72impl EventSource for CronSource {
73    async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()> {
74        let mut interval = tokio::time::interval(self.interval);
75        let factory = self.factory.clone();
76        let stop = self.stop_signal.clone();
77        let name = self.name.clone();
78
79        // Consume the first tick (fires immediately)
80        interval.tick().await;
81
82        loop {
83            tokio::select! {
84                _ = interval.tick() => {
85                    let event = (factory)();
86                    if sender.send(event).await.is_err() {
87                        tracing::debug!(source = %name, "CronSource sender closed, stopping");
88                        break;
89                    }
90                }
91                _ = stop.notified() => {
92                    tracing::debug!(source = %name, "CronSource received stop signal");
93                    break;
94                }
95            }
96        }
97
98        Ok(())
99    }
100
101    async fn stop(&self) -> Result<()> {
102        self.stop_signal.notify_one();
103        Ok(())
104    }
105
106    fn name(&self) -> &str {
107        &self.name
108    }
109}
110
111/// Marker trait for webhook-based event sources
112///
113/// Concrete implementations require an HTTP server, which is out of scope
114/// for a library crate. This trait defines the contract for webhook sources
115/// that can be implemented by applications.
116#[async_trait]
117pub trait WebhookSource: EventSource {
118    /// The path this webhook listens on (e.g., "/webhooks/github")
119    fn path(&self) -> &str;
120
121    /// Accepted content types (e.g., "application/json")
122    fn content_types(&self) -> Vec<String> {
123        vec!["application/json".to_string()]
124    }
125}
126
127/// Marker trait for metrics-based event sources
128///
129/// Concrete implementations require a metrics collector (Prometheus, etc.),
130/// which is out of scope for a library crate. This trait defines the contract
131/// for metric threshold sources.
132#[async_trait]
133pub trait MetricsSource: EventSource {
134    /// The metric name being monitored
135    fn metric_name(&self) -> &str;
136
137    /// The threshold value that triggers an event
138    fn threshold(&self) -> f64;
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use std::sync::atomic::{AtomicU32, Ordering};
145
146    #[tokio::test]
147    async fn test_cron_source_emits_events() {
148        let counter = Arc::new(AtomicU32::new(0));
149        let counter_clone = counter.clone();
150
151        let source = CronSource::new(
152            "test-cron",
153            std::time::Duration::from_millis(50),
154            move || {
155                let n = counter_clone.fetch_add(1, Ordering::SeqCst);
156                Event::new(
157                    format!("events.cron.tick.{}", n),
158                    "cron",
159                    format!("Tick {}", n),
160                    "cron-source",
161                    serde_json::json!({"tick": n}),
162                )
163            },
164        );
165
166        assert_eq!(source.name(), "test-cron");
167
168        let (tx, mut rx) = mpsc::channel(100);
169
170        // Start source in background
171        let source_handle = {
172            let source_ref = &source;
173            let tx = tx.clone();
174            tokio::spawn({
175                let _name = source_ref.name().to_string();
176                let interval = source_ref.interval;
177                let factory = source_ref.factory.clone();
178                let stop = source_ref.stop_signal.clone();
179
180                async move {
181                    let mut interval = tokio::time::interval(interval);
182                    interval.tick().await; // consume first immediate tick
183
184                    loop {
185                        tokio::select! {
186                            _ = interval.tick() => {
187                                let event = (factory)();
188                                if tx.send(event).await.is_err() {
189                                    break;
190                                }
191                            }
192                            _ = stop.notified() => {
193                                break;
194                            }
195                        }
196                    }
197                }
198            })
199        };
200
201        // Wait for a few events
202        tokio::time::sleep(std::time::Duration::from_millis(180)).await;
203        source.stop().await.unwrap();
204        source_handle.await.unwrap();
205
206        // Collect received events
207        let mut events = Vec::new();
208        while let Ok(event) = rx.try_recv() {
209            events.push(event);
210        }
211
212        // Should have received at least 2 events in 180ms with 50ms interval
213        assert!(events.len() >= 2, "Expected >= 2 events, got {}", events.len());
214        assert!(events[0].subject.starts_with("events.cron.tick."));
215    }
216
217    #[tokio::test]
218    async fn test_cron_source_stop() {
219        let source = CronSource::new(
220            "stoppable",
221            std::time::Duration::from_millis(10),
222            || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
223        );
224
225        let (tx, _rx) = mpsc::channel(100);
226
227        let stop = source.stop_signal.clone();
228        let factory = source.factory.clone();
229        let interval = source.interval;
230
231        let handle = tokio::spawn(async move {
232            let mut interval_timer = tokio::time::interval(interval);
233            interval_timer.tick().await;
234
235            loop {
236                tokio::select! {
237                    _ = interval_timer.tick() => {
238                        let event = (factory)();
239                        if tx.send(event).await.is_err() {
240                            break;
241                        }
242                    }
243                    _ = stop.notified() => {
244                        break;
245                    }
246                }
247            }
248        });
249
250        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
251        source.stop().await.unwrap();
252
253        // Should complete without hanging
254        tokio::time::timeout(std::time::Duration::from_secs(1), handle)
255            .await
256            .unwrap()
257            .unwrap();
258    }
259
260    #[tokio::test]
261    async fn test_cron_source_sender_closed() {
262        let source = CronSource::new(
263            "closed-sender",
264            std::time::Duration::from_millis(10),
265            || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
266        );
267
268        let (tx, rx) = mpsc::channel(1);
269        drop(rx); // Close receiver immediately
270
271        // start should complete without error when sender is closed
272        let result = source.start(tx).await;
273        assert!(result.is_ok());
274    }
275
276    #[tokio::test]
277    async fn test_cron_source_name() {
278        let source = CronSource::new(
279            "health-sweep",
280            std::time::Duration::from_secs(30),
281            || Event::new("events.health.sweep", "health", "Sweep", "src", serde_json::json!({})),
282        );
283        assert_eq!(source.name(), "health-sweep");
284    }
285
286    #[test]
287    fn test_event_source_is_send_sync() {
288        fn assert_send_sync<T: Send + Sync>() {}
289        assert_send_sync::<CronSource>();
290    }
291}