1use crate::error::Result;
9use crate::types::Event;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Notify};
13
14#[async_trait]
20pub trait EventSource: Send + Sync {
21 async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()>;
26
27 async fn stop(&self) -> Result<()>;
29
30 fn name(&self) -> &str;
32}
33
34type EventFactory = dyn Fn() -> Event + Send + Sync;
36
37pub struct CronSource {
42 name: String,
43 interval: std::time::Duration,
44 factory: Arc<EventFactory>,
45 stop_signal: Arc<Notify>,
46}
47
48impl CronSource {
49 pub fn new<F>(
55 name: impl Into<String>,
56 interval: std::time::Duration,
57 factory: F,
58 ) -> Self
59 where
60 F: Fn() -> Event + Send + Sync + 'static,
61 {
62 Self {
63 name: name.into(),
64 interval,
65 factory: Arc::new(factory),
66 stop_signal: Arc::new(Notify::new()),
67 }
68 }
69}
70
71#[async_trait]
72impl EventSource for CronSource {
73 async fn start(&self, sender: mpsc::Sender<Event>) -> Result<()> {
74 let mut interval = tokio::time::interval(self.interval);
75 let factory = self.factory.clone();
76 let stop = self.stop_signal.clone();
77 let name = self.name.clone();
78
79 interval.tick().await;
81
82 loop {
83 tokio::select! {
84 _ = interval.tick() => {
85 let event = (factory)();
86 if sender.send(event).await.is_err() {
87 tracing::debug!(source = %name, "CronSource sender closed, stopping");
88 break;
89 }
90 }
91 _ = stop.notified() => {
92 tracing::debug!(source = %name, "CronSource received stop signal");
93 break;
94 }
95 }
96 }
97
98 Ok(())
99 }
100
101 async fn stop(&self) -> Result<()> {
102 self.stop_signal.notify_one();
103 Ok(())
104 }
105
106 fn name(&self) -> &str {
107 &self.name
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use std::sync::atomic::{AtomicU32, Ordering};
115
116 #[tokio::test]
117 async fn test_cron_source_emits_events() {
118 let counter = Arc::new(AtomicU32::new(0));
119 let counter_clone = counter.clone();
120
121 let source = CronSource::new(
122 "test-cron",
123 std::time::Duration::from_millis(50),
124 move || {
125 let n = counter_clone.fetch_add(1, Ordering::SeqCst);
126 Event::new(
127 format!("events.cron.tick.{}", n),
128 "cron",
129 format!("Tick {}", n),
130 "cron-source",
131 serde_json::json!({"tick": n}),
132 )
133 },
134 );
135
136 assert_eq!(source.name(), "test-cron");
137
138 let (tx, mut rx) = mpsc::channel(100);
139
140 let source_handle = {
142 let source_ref = &source;
143 let tx = tx.clone();
144 tokio::spawn({
145 let _name = source_ref.name().to_string();
146 let interval = source_ref.interval;
147 let factory = source_ref.factory.clone();
148 let stop = source_ref.stop_signal.clone();
149
150 async move {
151 let mut interval = tokio::time::interval(interval);
152 interval.tick().await; loop {
155 tokio::select! {
156 _ = interval.tick() => {
157 let event = (factory)();
158 if tx.send(event).await.is_err() {
159 break;
160 }
161 }
162 _ = stop.notified() => {
163 break;
164 }
165 }
166 }
167 }
168 })
169 };
170
171 tokio::time::sleep(std::time::Duration::from_millis(180)).await;
173 source.stop().await.unwrap();
174 source_handle.await.unwrap();
175
176 let mut events = Vec::new();
178 while let Ok(event) = rx.try_recv() {
179 events.push(event);
180 }
181
182 assert!(events.len() >= 2, "Expected >= 2 events, got {}", events.len());
184 assert!(events[0].subject.starts_with("events.cron.tick."));
185 }
186
187 #[tokio::test]
188 async fn test_cron_source_stop() {
189 let source = CronSource::new(
190 "stoppable",
191 std::time::Duration::from_millis(10),
192 || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
193 );
194
195 let (tx, _rx) = mpsc::channel(100);
196
197 let stop = source.stop_signal.clone();
198 let factory = source.factory.clone();
199 let interval = source.interval;
200
201 let handle = tokio::spawn(async move {
202 let mut interval_timer = tokio::time::interval(interval);
203 interval_timer.tick().await;
204
205 loop {
206 tokio::select! {
207 _ = interval_timer.tick() => {
208 let event = (factory)();
209 if tx.send(event).await.is_err() {
210 break;
211 }
212 }
213 _ = stop.notified() => {
214 break;
215 }
216 }
217 }
218 });
219
220 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
221 source.stop().await.unwrap();
222
223 tokio::time::timeout(std::time::Duration::from_secs(1), handle)
225 .await
226 .unwrap()
227 .unwrap();
228 }
229
230 #[tokio::test]
231 async fn test_cron_source_sender_closed() {
232 let source = CronSource::new(
233 "closed-sender",
234 std::time::Duration::from_millis(10),
235 || Event::new("events.cron.a", "cron", "A", "src", serde_json::json!({})),
236 );
237
238 let (tx, rx) = mpsc::channel(1);
239 drop(rx); let result = source.start(tx).await;
243 assert!(result.is_ok());
244 }
245
246 #[tokio::test]
247 async fn test_cron_source_name() {
248 let source = CronSource::new(
249 "health-sweep",
250 std::time::Duration::from_secs(30),
251 || Event::new("events.health.sweep", "health", "Sweep", "src", serde_json::json!({})),
252 );
253 assert_eq!(source.name(), "health-sweep");
254 }
255
256 #[test]
257 fn test_event_source_is_send_sync() {
258 fn assert_send_sync<T: Send + Sync>() {}
259 assert_send_sync::<CronSource>();
260 }
261}