use std::future::Future;
use std::time::{Duration, Instant};
use crate::rhythm::Rhythm;
#[derive(Debug, Clone, Copy)]
pub struct EventMeta {
pub timestamp: Instant,
pub delta: Duration,
pub jitter: Duration,
pub sequence: u32,
pub is_timeout: bool,
}
pub struct SysTimer {
interval: Duration,
}
impl SysTimer {
pub fn new(interval: Duration) -> Self {
Self { interval }
}
pub fn from_launch_params(params: &serde_yaml::Value) -> Self {
let ms = params
.get("interval_ms")
.and_then(|v| v.as_u64())
.unwrap_or(100);
Self::new(Duration::from_millis(ms))
}
}
impl Rhythm for SysTimer {
type Yield = EventMeta;
type Feed = ();
type Output = ();
type Error = crate::RoplatError;
async fn drive<N, F, Fut>(
&mut self,
mut nodes: N,
mut op_domain: F,
) -> (Self::Output, N)
where
N: Send,
F: FnMut(N, Self::Yield) -> Fut + Send,
Fut: Future<Output = (Self::Feed, N)> + Send,
{
let mut sequence = 0u32;
let start_time = Instant::now();
loop {
let next_target = start_time + self.interval * sequence;
tokio::time::sleep_until(next_target.into()).await;
sequence += 1;
let now = Instant::now();
let jitter = now
.checked_duration_since(next_target)
.unwrap_or(Duration::ZERO);
let event = EventMeta {
timestamp: now,
delta: self.interval,
jitter,
sequence,
is_timeout: jitter > self.interval,
};
let ((), returned_nodes) = op_domain(nodes, event).await;
nodes = returned_nodes;
}
}
}