varpulis_runtime/
timer.rs1use std::time::Duration;
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tracing::debug;
11
12use crate::event::Event;
13
14pub fn spawn_timer(
25 interval_ns: u64,
26 initial_delay_ns: Option<u64>,
27 timer_event_type: String,
28 event_tx: mpsc::Sender<Event>,
29) -> JoinHandle<()> {
30 tokio::spawn(async move {
31 if let Some(delay_ns) = initial_delay_ns {
33 let delay = Duration::from_nanos(delay_ns);
34 debug!(
35 "Timer {} waiting for initial delay: {:?}",
36 timer_event_type, delay
37 );
38 tokio::time::sleep(delay).await;
39 }
40
41 let interval = Duration::from_nanos(interval_ns);
42 debug!(
43 "Timer {} starting with interval: {:?}",
44 timer_event_type, interval
45 );
46
47 let mut interval_timer = tokio::time::interval(interval);
48 interval_timer.tick().await;
50
51 loop {
52 interval_timer.tick().await;
53
54 let mut event = Event::new(timer_event_type.clone());
56 event.data.insert(
57 "timestamp".into(),
58 varpulis_core::Value::Int(chrono::Utc::now().timestamp_millis()),
59 );
60
61 debug!("Timer {} fired", timer_event_type);
62
63 if event_tx.send(event).await.is_err() {
65 debug!("Timer {} stopping: channel closed", timer_event_type);
66 break;
67 }
68 }
69 })
70}
71
72#[derive(Debug)]
74pub struct TimerManager {
75 handles: Vec<JoinHandle<()>>,
76}
77
78impl TimerManager {
79 pub const fn new() -> Self {
80 Self {
81 handles: Vec::new(),
82 }
83 }
84
85 pub fn spawn_timers(
87 &mut self,
88 timers: Vec<(u64, Option<u64>, String)>,
89 event_tx: mpsc::Sender<Event>,
90 ) {
91 for (interval_ns, initial_delay_ns, timer_event_type) in timers {
92 let handle = spawn_timer(
93 interval_ns,
94 initial_delay_ns,
95 timer_event_type,
96 event_tx.clone(),
97 );
98 self.handles.push(handle);
99 }
100 }
101
102 pub fn stop_all(&mut self) {
104 for handle in self.handles.drain(..) {
105 handle.abort();
106 }
107 }
108}
109
110impl Default for TimerManager {
111 fn default() -> Self {
112 Self::new()
113 }
114}