1use crate::error::Result;
9use crate::types::Event;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Notify};
13
14#[async_trait]
20pub trait EventSource: Send + Sync {
21 async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()>;
26
27 async fn stop(&self) -> Result<()>;
29
30 fn name(&self) -> &str;
32}
33
34type EventFactory = dyn Fn() -> Event + Send + Sync;
36
37pub struct CronSource {
42 name: String,
43 interval: std::time::Duration,
44 factory: Arc<EventFactory>,
45 stop_signal: Arc<Notify>,
46}
47
48impl CronSource {
49 pub fn new<F>(
55 name: impl Into<String>,
56 interval: std::time::Duration,
57 factory: F,
58 ) -> Self
59 where
60 F: Fn() -> Event + Send + Sync + 'static,
61 {
62 Self {
63 name: name.into(),
64 interval,
65 factory: Arc::new(factory),
66 stop_signal: Arc::new(Notify::new()),
67 }
68 }
69}
70
71#[async_trait]
72impl EventSource for CronSource {
73 async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()> {
74 let mut interval = tokio::time::interval(self.interval);
75 let factory = self.factory.clone();
76 let stop = self.stop_signal.clone();
77 let name = self.name.clone();
78
79 interval.tick().await;
81
82 loop {
83 tokio::select! {
84 _ = interval.tick() => {
85 let event = (factory)();
86 if sender.send(event).await.is_err() {
87 tracing::debug!(source = %name, "CronSource sender closed, stopping");
88 break;
89 }
90 }
91 _ = stop.notified() => {
92 tracing::debug!(source = %name, "CronSource received stop signal");
93 break;
94 }
95 }
96 }
97
98 Ok(())
99 }
100
101 async fn stop(&self) -> Result<()> {
102 self.stop_signal.notify_one();
103 Ok(())
104 }
105
106 fn name(&self) -> &str {
107 &self.name
108 }
109}
110
111#[async_trait]
117pub trait WebhookSource: EventSource {
118 fn path(&self) -> &str;
120
121 fn content_types(&self) -> Vec<String> {
123 vec!["application/json".to_string()]
124 }
125}
126
127#[async_trait]
133pub trait MetricsSource: EventSource {
134 fn metric_name(&self) -> &str;
136
137 fn threshold(&self) -> f64;
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use std::sync::atomic::{AtomicU32, Ordering};
145
146 #[tokio::test]
147 async fn test_cron_source_emits_events() {
148 let counter = Arc::new(AtomicU32::new(0));
149 let counter_clone = counter.clone();
150
151 let source = CronSource::new(
152 "test-cron",
153 std::time::Duration::from_millis(50),
154 move || {
155 let n = counter_clone.fetch_add(1, Ordering::SeqCst);
156 Event::new(
157 format!("events.cron.tick.{}", n),
158 "cron",
159 format!("Tick {}", n),
160 "cron-source",
161 serde_json::json!({"tick": n}),
162 )
163 },
164 );
165
166 assert_eq!(source.name(), "test-cron");
167
168 let (tx, mut rx) = mpsc::channel(100);
169
170 let source_handle = {
172 let source_ref = &source;
173 let tx = tx.clone();
174 tokio::spawn({
175 let _name = source_ref.name().to_string();
176 let interval = source_ref.interval;
177 let factory = source_ref.factory.clone();
178 let stop = source_ref.stop_signal.clone();
179
180 async move {
181 let mut interval = tokio::time::interval(interval);
182 interval.tick().await; loop {
185 tokio::select! {
186 _ = interval.tick() => {
187 let event = (factory)();
188 if tx.send(event).await.is_err() {
189 break;
190 }
191 }
192 _ = stop.notified() => {
193 break;
194 }
195 }
196 }
197 }
198 })
199 };
200
201 tokio::time::sleep(std::time::Duration::from_millis(180)).await;
203 source.stop().await.unwrap();
204 source_handle.await.unwrap();
205
206 let mut events = Vec::new();
208 while let Ok(event) = rx.try_recv() {
209 events.push(event);
210 }
211
212 assert!(events.len() >= 2, "Expected >= 2 events, got {}", events.len());
214 assert!(events[0].subject.starts_with("events.cron.tick."));
215 }
216
217 #[tokio::test]
218 async fn test_cron_source_stop() {
219 let source = CronSource::new(
220 "stoppable",
221 std::time::Duration::from_millis(10),
222 || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
223 );
224
225 let (tx, _rx) = mpsc::channel(100);
226
227 let stop = source.stop_signal.clone();
228 let factory = source.factory.clone();
229 let interval = source.interval;
230
231 let handle = tokio::spawn(async move {
232 let mut interval_timer = tokio::time::interval(interval);
233 interval_timer.tick().await;
234
235 loop {
236 tokio::select! {
237 _ = interval_timer.tick() => {
238 let event = (factory)();
239 if tx.send(event).await.is_err() {
240 break;
241 }
242 }
243 _ = stop.notified() => {
244 break;
245 }
246 }
247 }
248 });
249
250 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
251 source.stop().await.unwrap();
252
253 tokio::time::timeout(std::time::Duration::from_secs(1), handle)
255 .await
256 .unwrap()
257 .unwrap();
258 }
259
260 #[tokio::test]
261 async fn test_cron_source_sender_closed() {
262 let source = CronSource::new(
263 "closed-sender",
264 std::time::Duration::from_millis(10),
265 || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
266 );
267
268 let (tx, rx) = mpsc::channel(1);
269 drop(rx); let result = source.start(tx).await;
273 assert!(result.is_ok());
274 }
275
276 #[tokio::test]
277 async fn test_cron_source_name() {
278 let source = CronSource::new(
279 "health-sweep",
280 std::time::Duration::from_secs(30),
281 || Event::new("events.health.sweep", "health", "Sweep", "src", serde_json::json!({})),
282 );
283 assert_eq!(source.name(), "health-sweep");
284 }
285
286 #[test]
287 fn test_event_source_is_send_sync() {
288 fn assert_send_sync<T: Send + Sync>() {}
289 assert_send_sync::<CronSource>();
290 }
291}