Skip to main content

varpulis_runtime/
testing.rs

1//! Time-accelerated testing infrastructure
2//!
3//! Provides [`SimulatedScheduler`] for deterministic timer simulation and
4//! [`TestUniverse`] for end-to-end VPL program testing without real-time waits.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use varpulis_runtime::testing::TestUniverse;
10//! use std::time::Duration;
11//!
12//! # async fn example() {
13//! let mut universe = TestUniverse::from_source(r#"
14//!     stream heartbeat = timer(10ms).emit(type: "heartbeat")
15//! "#).unwrap();
16//!
17//! let events = universe.advance(Duration::from_millis(35)).await;
18//! assert_eq!(events.len(), 3); // 10ms, 20ms, 30ms — deterministic
19//! # }
20//! ```
21
22use std::time::Duration;
23
24use chrono::{DateTime, Duration as ChronoDuration, Utc};
25use tokio::sync::mpsc;
26
27use crate::engine::Engine;
28use crate::event::Event;
29
30/// Internal timer state for the simulated scheduler.
31#[derive(Debug)]
32struct TimerState {
33    /// Interval between fires in nanoseconds
34    interval_ns: u64,
35    /// Event type emitted on fire
36    event_type: String,
37    /// Absolute time of next fire
38    next_fire: DateTime<Utc>,
39}
40
41/// Simulated scheduler that generates timer events deterministically.
42///
43/// Instead of waiting for real time to pass, `advance()` jumps the clock
44/// forward and collects all timer events that would have fired in that window.
45#[derive(Debug)]
46pub struct SimulatedScheduler {
47    timers: Vec<TimerState>,
48    current_time: DateTime<Utc>,
49}
50
51impl SimulatedScheduler {
52    /// Create a new scheduler starting at the given time.
53    pub const fn new(start_time: DateTime<Utc>) -> Self {
54        Self {
55            timers: Vec::new(),
56            current_time: start_time,
57        }
58    }
59
60    /// Register timers from engine configuration.
61    ///
62    /// Each tuple is `(interval_ns, initial_delay_ns, event_type)`.
63    pub fn register_timers(&mut self, configs: Vec<(u64, Option<u64>, String)>) {
64        for (interval_ns, initial_delay_ns, event_type) in configs {
65            let delay_ns = initial_delay_ns.unwrap_or(0);
66            let first_fire = self.current_time
67                + ChronoDuration::nanoseconds(delay_ns as i64)
68                + ChronoDuration::nanoseconds(interval_ns as i64);
69
70            self.timers.push(TimerState {
71                interval_ns,
72                event_type,
73                next_fire: first_fire,
74            });
75        }
76    }
77
78    /// Advance the clock by `duration` and return all timer events that fire.
79    ///
80    /// Events are returned in chronological order. Multiple events from the
81    /// same timer are generated if the advance spans multiple intervals.
82    pub fn advance(&mut self, duration: Duration) -> Vec<Event> {
83        let target_time =
84            self.current_time + ChronoDuration::nanoseconds(duration.as_nanos() as i64);
85        let mut events = Vec::new();
86
87        for timer in &mut self.timers {
88            while timer.next_fire <= target_time {
89                let mut event = Event::new(timer.event_type.clone());
90                event.timestamp = timer.next_fire;
91                event.data.insert(
92                    "timestamp".into(),
93                    varpulis_core::Value::Int(timer.next_fire.timestamp_millis()),
94                );
95                events.push((timer.next_fire, event));
96                timer.next_fire += ChronoDuration::nanoseconds(timer.interval_ns as i64);
97            }
98        }
99
100        // Sort by fire time for deterministic ordering
101        events.sort_by_key(|(t, _)| *t);
102
103        self.current_time = target_time;
104
105        events.into_iter().map(|(_, e)| e).collect()
106    }
107
108    /// Get the current simulated time.
109    pub const fn now(&self) -> DateTime<Utc> {
110        self.current_time
111    }
112}
113
114/// Test universe for end-to-end VPL program testing.
115///
116/// Combines an [`Engine`] with a [`SimulatedScheduler`] to enable deterministic,
117/// instant testing of VPL programs that include timer-based streams.
118#[derive(Debug)]
119pub struct TestUniverse {
120    engine: Engine,
121    scheduler: SimulatedScheduler,
122    output_rx: mpsc::Receiver<Event>,
123}
124
125impl TestUniverse {
126    /// Create a test universe from VPL source code.
127    ///
128    /// Parses the source, creates an engine, and registers any timer streams
129    /// with the simulated scheduler.
130    pub fn from_source(vpl_source: &str) -> Result<Self, String> {
131        let program =
132            varpulis_parser::parse(vpl_source).map_err(|e| format!("parse error: {e}"))?;
133
134        let (output_tx, output_rx) = mpsc::channel(10_000);
135        let mut engine = Engine::new(output_tx);
136        engine
137            .load(&program)
138            .map_err(|e| format!("load error: {e}"))?;
139
140        let start_time = Utc::now();
141        let mut scheduler = SimulatedScheduler::new(start_time);
142
143        // Extract timer configs from the engine
144        let timer_configs = engine.get_timers();
145        scheduler.register_timers(timer_configs);
146
147        Ok(Self {
148            engine,
149            scheduler,
150            output_rx,
151        })
152    }
153
154    /// Send an event into the engine for processing.
155    pub async fn send(&mut self, event: Event) -> Result<(), String> {
156        self.engine
157            .process(event)
158            .await
159            .map_err(|e| format!("process error: {e}"))?;
160        Ok(())
161    }
162
163    /// Create and send an event with the given type (no extra fields).
164    pub async fn send_at_now(&mut self, event_type: &str) -> Result<(), String> {
165        let event = Event::new(event_type);
166        self.send(event).await
167    }
168
169    /// Advance simulated time and process all timer events that fire.
170    ///
171    /// Returns all output events produced during the advance.
172    pub async fn advance(&mut self, duration: Duration) -> Vec<Event> {
173        let timer_events = self.scheduler.advance(duration);
174
175        // Feed timer events through the engine
176        for event in timer_events {
177            let _ = self.engine.process(event).await;
178        }
179
180        // Drain output
181        self.drain_output()
182    }
183
184    /// Drain all currently buffered output events.
185    pub fn drain_output(&mut self) -> Vec<Event> {
186        let mut events = Vec::new();
187        while let Ok(event) = self.output_rx.try_recv() {
188            events.push(event);
189        }
190        events
191    }
192
193    /// Get the current simulated time.
194    pub const fn now(&self) -> DateTime<Utc> {
195        self.scheduler.now()
196    }
197
198    /// Get a reference to the underlying engine.
199    pub const fn engine(&self) -> &Engine {
200        &self.engine
201    }
202
203    /// Get a mutable reference to the underlying engine.
204    pub const fn engine_mut(&mut self) -> &mut Engine {
205        &mut self.engine
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_simulated_scheduler_basic() {
215        let start = Utc::now();
216        let mut scheduler = SimulatedScheduler::new(start);
217        scheduler.register_timers(vec![(
218            10_000_000, // 10ms
219            None,
220            "Timer".to_string(),
221        )]);
222
223        // Advance 35ms — should get 3 timer events (10ms, 20ms, 30ms)
224        let events = scheduler.advance(Duration::from_millis(35));
225        assert_eq!(events.len(), 3);
226        for event in &events {
227            assert_eq!(event.event_type.as_ref(), "Timer");
228        }
229    }
230
231    #[test]
232    fn test_simulated_scheduler_with_delay() {
233        let start = Utc::now();
234        let mut scheduler = SimulatedScheduler::new(start);
235        scheduler.register_timers(vec![(
236            10_000_000,       // 10ms interval
237            Some(20_000_000), // 20ms initial delay
238            "Delayed".to_string(),
239        )]);
240
241        // Advance 25ms — first fire at 30ms (20ms delay + 10ms), so 0 events
242        let events = scheduler.advance(Duration::from_millis(25));
243        assert_eq!(events.len(), 0);
244
245        // Advance 10 more ms (total 35ms) — first fire at 30ms, so 1 event
246        let events = scheduler.advance(Duration::from_millis(10));
247        assert_eq!(events.len(), 1);
248    }
249
250    #[test]
251    fn test_simulated_scheduler_multiple_timers() {
252        let start = Utc::now();
253        let mut scheduler = SimulatedScheduler::new(start);
254        scheduler.register_timers(vec![
255            (10_000_000, None, "Fast".to_string()),
256            (20_000_000, None, "Slow".to_string()),
257        ]);
258
259        // Advance 25ms: Fast fires at 10, 20; Slow fires at 20
260        let events = scheduler.advance(Duration::from_millis(25));
261        assert_eq!(events.len(), 3);
262
263        let fast_count = events
264            .iter()
265            .filter(|e| e.event_type.as_ref() == "Fast")
266            .count();
267        let slow_count = events
268            .iter()
269            .filter(|e| e.event_type.as_ref() == "Slow")
270            .count();
271        assert_eq!(fast_count, 2);
272        assert_eq!(slow_count, 1);
273    }
274
275    #[test]
276    fn test_simulated_scheduler_zero_advance() {
277        let start = Utc::now();
278        let mut scheduler = SimulatedScheduler::new(start);
279        scheduler.register_timers(vec![(10_000_000, None, "Timer".to_string())]);
280
281        let events = scheduler.advance(Duration::ZERO);
282        assert_eq!(events.len(), 0);
283    }
284
285    #[test]
286    fn test_simulated_scheduler_now_advances() {
287        let start = Utc::now();
288        let mut scheduler = SimulatedScheduler::new(start);
289        assert_eq!(scheduler.now(), start);
290
291        scheduler.advance(Duration::from_secs(1));
292        assert!(scheduler.now() > start);
293    }
294
295    #[tokio::test]
296    async fn test_universe_basic_stream() {
297        let mut universe = TestUniverse::from_source(
298            r"
299            stream HighTemp = SensorReading
300                .where(temperature > 100)
301                .emit(sensor: sensor_id, temp: temperature)
302            ",
303        )
304        .unwrap();
305
306        let event = Event::new("SensorReading")
307            .with_field("temperature", 105)
308            .with_field("sensor_id", "S1");
309        universe.send(event).await.unwrap();
310
311        let outputs = universe.drain_output();
312        assert_eq!(outputs.len(), 1);
313        assert_eq!(outputs[0].event_type.as_ref(), "HighTemp");
314    }
315
316    #[tokio::test]
317    async fn test_universe_filters_correctly() {
318        let mut universe = TestUniverse::from_source(
319            r"
320            stream HighTemp = SensorReading
321                .where(temperature > 100)
322                .emit(temp: temperature)
323            ",
324        )
325        .unwrap();
326
327        // Below threshold — should not produce output
328        let event = Event::new("SensorReading").with_field("temperature", 50);
329        universe.send(event).await.unwrap();
330
331        let outputs = universe.drain_output();
332        assert_eq!(outputs.len(), 0);
333    }
334}