varpulis_runtime/
testing.rs1use std::time::Duration;
23
24use chrono::{DateTime, Duration as ChronoDuration, Utc};
25use tokio::sync::mpsc;
26
27use crate::engine::Engine;
28use crate::event::Event;
29
30#[derive(Debug)]
32struct TimerState {
33 interval_ns: u64,
35 event_type: String,
37 next_fire: DateTime<Utc>,
39}
40
41#[derive(Debug)]
46pub struct SimulatedScheduler {
47 timers: Vec<TimerState>,
48 current_time: DateTime<Utc>,
49}
50
51impl SimulatedScheduler {
52 pub const fn new(start_time: DateTime<Utc>) -> Self {
54 Self {
55 timers: Vec::new(),
56 current_time: start_time,
57 }
58 }
59
60 pub fn register_timers(&mut self, configs: Vec<(u64, Option<u64>, String)>) {
64 for (interval_ns, initial_delay_ns, event_type) in configs {
65 let delay_ns = initial_delay_ns.unwrap_or(0);
66 let first_fire = self.current_time
67 + ChronoDuration::nanoseconds(delay_ns as i64)
68 + ChronoDuration::nanoseconds(interval_ns as i64);
69
70 self.timers.push(TimerState {
71 interval_ns,
72 event_type,
73 next_fire: first_fire,
74 });
75 }
76 }
77
78 pub fn advance(&mut self, duration: Duration) -> Vec<Event> {
83 let target_time =
84 self.current_time + ChronoDuration::nanoseconds(duration.as_nanos() as i64);
85 let mut events = Vec::new();
86
87 for timer in &mut self.timers {
88 while timer.next_fire <= target_time {
89 let mut event = Event::new(timer.event_type.clone());
90 event.timestamp = timer.next_fire;
91 event.data.insert(
92 "timestamp".into(),
93 varpulis_core::Value::Int(timer.next_fire.timestamp_millis()),
94 );
95 events.push((timer.next_fire, event));
96 timer.next_fire += ChronoDuration::nanoseconds(timer.interval_ns as i64);
97 }
98 }
99
100 events.sort_by_key(|(t, _)| *t);
102
103 self.current_time = target_time;
104
105 events.into_iter().map(|(_, e)| e).collect()
106 }
107
108 pub const fn now(&self) -> DateTime<Utc> {
110 self.current_time
111 }
112}
113
114#[derive(Debug)]
119pub struct TestUniverse {
120 engine: Engine,
121 scheduler: SimulatedScheduler,
122 output_rx: mpsc::Receiver<Event>,
123}
124
125impl TestUniverse {
126 pub fn from_source(vpl_source: &str) -> Result<Self, String> {
131 let program =
132 varpulis_parser::parse(vpl_source).map_err(|e| format!("parse error: {e}"))?;
133
134 let (output_tx, output_rx) = mpsc::channel(10_000);
135 let mut engine = Engine::new(output_tx);
136 engine
137 .load(&program)
138 .map_err(|e| format!("load error: {e}"))?;
139
140 let start_time = Utc::now();
141 let mut scheduler = SimulatedScheduler::new(start_time);
142
143 let timer_configs = engine.get_timers();
145 scheduler.register_timers(timer_configs);
146
147 Ok(Self {
148 engine,
149 scheduler,
150 output_rx,
151 })
152 }
153
154 pub async fn send(&mut self, event: Event) -> Result<(), String> {
156 self.engine
157 .process(event)
158 .await
159 .map_err(|e| format!("process error: {e}"))?;
160 Ok(())
161 }
162
163 pub async fn send_at_now(&mut self, event_type: &str) -> Result<(), String> {
165 let event = Event::new(event_type);
166 self.send(event).await
167 }
168
169 pub async fn advance(&mut self, duration: Duration) -> Vec<Event> {
173 let timer_events = self.scheduler.advance(duration);
174
175 for event in timer_events {
177 let _ = self.engine.process(event).await;
178 }
179
180 self.drain_output()
182 }
183
184 pub fn drain_output(&mut self) -> Vec<Event> {
186 let mut events = Vec::new();
187 while let Ok(event) = self.output_rx.try_recv() {
188 events.push(event);
189 }
190 events
191 }
192
193 pub const fn now(&self) -> DateTime<Utc> {
195 self.scheduler.now()
196 }
197
198 pub const fn engine(&self) -> &Engine {
200 &self.engine
201 }
202
203 pub const fn engine_mut(&mut self) -> &mut Engine {
205 &mut self.engine
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_simulated_scheduler_basic() {
215 let start = Utc::now();
216 let mut scheduler = SimulatedScheduler::new(start);
217 scheduler.register_timers(vec![(
218 10_000_000, None,
220 "Timer".to_string(),
221 )]);
222
223 let events = scheduler.advance(Duration::from_millis(35));
225 assert_eq!(events.len(), 3);
226 for event in &events {
227 assert_eq!(event.event_type.as_ref(), "Timer");
228 }
229 }
230
231 #[test]
232 fn test_simulated_scheduler_with_delay() {
233 let start = Utc::now();
234 let mut scheduler = SimulatedScheduler::new(start);
235 scheduler.register_timers(vec![(
236 10_000_000, Some(20_000_000), "Delayed".to_string(),
239 )]);
240
241 let events = scheduler.advance(Duration::from_millis(25));
243 assert_eq!(events.len(), 0);
244
245 let events = scheduler.advance(Duration::from_millis(10));
247 assert_eq!(events.len(), 1);
248 }
249
250 #[test]
251 fn test_simulated_scheduler_multiple_timers() {
252 let start = Utc::now();
253 let mut scheduler = SimulatedScheduler::new(start);
254 scheduler.register_timers(vec![
255 (10_000_000, None, "Fast".to_string()),
256 (20_000_000, None, "Slow".to_string()),
257 ]);
258
259 let events = scheduler.advance(Duration::from_millis(25));
261 assert_eq!(events.len(), 3);
262
263 let fast_count = events
264 .iter()
265 .filter(|e| e.event_type.as_ref() == "Fast")
266 .count();
267 let slow_count = events
268 .iter()
269 .filter(|e| e.event_type.as_ref() == "Slow")
270 .count();
271 assert_eq!(fast_count, 2);
272 assert_eq!(slow_count, 1);
273 }
274
275 #[test]
276 fn test_simulated_scheduler_zero_advance() {
277 let start = Utc::now();
278 let mut scheduler = SimulatedScheduler::new(start);
279 scheduler.register_timers(vec![(10_000_000, None, "Timer".to_string())]);
280
281 let events = scheduler.advance(Duration::ZERO);
282 assert_eq!(events.len(), 0);
283 }
284
285 #[test]
286 fn test_simulated_scheduler_now_advances() {
287 let start = Utc::now();
288 let mut scheduler = SimulatedScheduler::new(start);
289 assert_eq!(scheduler.now(), start);
290
291 scheduler.advance(Duration::from_secs(1));
292 assert!(scheduler.now() > start);
293 }
294
295 #[tokio::test]
296 async fn test_universe_basic_stream() {
297 let mut universe = TestUniverse::from_source(
298 r"
299 stream HighTemp = SensorReading
300 .where(temperature > 100)
301 .emit(sensor: sensor_id, temp: temperature)
302 ",
303 )
304 .unwrap();
305
306 let event = Event::new("SensorReading")
307 .with_field("temperature", 105)
308 .with_field("sensor_id", "S1");
309 universe.send(event).await.unwrap();
310
311 let outputs = universe.drain_output();
312 assert_eq!(outputs.len(), 1);
313 assert_eq!(outputs[0].event_type.as_ref(), "HighTemp");
314 }
315
316 #[tokio::test]
317 async fn test_universe_filters_correctly() {
318 let mut universe = TestUniverse::from_source(
319 r"
320 stream HighTemp = SensorReading
321 .where(temperature > 100)
322 .emit(temp: temperature)
323 ",
324 )
325 .unwrap();
326
327 let event = Event::new("SensorReading").with_field("temperature", 50);
329 universe.send(event).await.unwrap();
330
331 let outputs = universe.drain_output();
332 assert_eq!(outputs.len(), 0);
333 }
334}