Skip to main content

varpulis_runtime/
timer.rs

1//! Timer module for periodic event generation
2//!
3//! This module provides functionality for spawning timer tasks that
4//! periodically generate events, similar to Apama's `on wait(period)`.
5
6use std::time::Duration;
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tracing::debug;
11
12use crate::event::Event;
13
14/// Spawn a timer task that periodically sends timer events
15///
16/// # Arguments
17/// * `interval_ns` - Interval between timer fires in nanoseconds
18/// * `initial_delay_ns` - Optional initial delay before first fire in nanoseconds
19/// * `timer_event_type` - Event type name for timer events
20/// * `event_tx` - Channel to send timer events
21///
22/// # Returns
23/// A JoinHandle for the spawned timer task
24pub fn spawn_timer(
25    interval_ns: u64,
26    initial_delay_ns: Option<u64>,
27    timer_event_type: String,
28    event_tx: mpsc::Sender<Event>,
29) -> JoinHandle<()> {
30    tokio::spawn(async move {
31        // Apply initial delay if specified
32        if let Some(delay_ns) = initial_delay_ns {
33            let delay = Duration::from_nanos(delay_ns);
34            debug!(
35                "Timer {} waiting for initial delay: {:?}",
36                timer_event_type, delay
37            );
38            tokio::time::sleep(delay).await;
39        }
40
41        let interval = Duration::from_nanos(interval_ns);
42        debug!(
43            "Timer {} starting with interval: {:?}",
44            timer_event_type, interval
45        );
46
47        let mut interval_timer = tokio::time::interval(interval);
48        // Skip the immediate first tick
49        interval_timer.tick().await;
50
51        loop {
52            interval_timer.tick().await;
53
54            // Create timer event
55            let mut event = Event::new(timer_event_type.clone());
56            event.data.insert(
57                "timestamp".into(),
58                varpulis_core::Value::Int(chrono::Utc::now().timestamp_millis()),
59            );
60
61            debug!("Timer {} fired", timer_event_type);
62
63            // Send timer event
64            if event_tx.send(event).await.is_err() {
65                debug!("Timer {} stopping: channel closed", timer_event_type);
66                break;
67            }
68        }
69    })
70}
71
72/// Timer manager that tracks spawned timer tasks
73#[derive(Debug)]
74pub struct TimerManager {
75    handles: Vec<JoinHandle<()>>,
76}
77
78impl TimerManager {
79    pub const fn new() -> Self {
80        Self {
81            handles: Vec::new(),
82        }
83    }
84
85    /// Spawn timers from engine configuration
86    pub fn spawn_timers(
87        &mut self,
88        timers: Vec<(u64, Option<u64>, String)>,
89        event_tx: mpsc::Sender<Event>,
90    ) {
91        for (interval_ns, initial_delay_ns, timer_event_type) in timers {
92            let handle = spawn_timer(
93                interval_ns,
94                initial_delay_ns,
95                timer_event_type,
96                event_tx.clone(),
97            );
98            self.handles.push(handle);
99        }
100    }
101
102    /// Stop all timer tasks
103    pub fn stop_all(&mut self) {
104        for handle in self.handles.drain(..) {
105            handle.abort();
106        }
107    }
108}
109
110impl Default for TimerManager {
111    fn default() -> Self {
112        Self::new()
113    }
114}