use super::SourceActor;
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::Value;
use crate::Error;
use async_trait::async_trait;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
pub struct TimerActor;
#[async_trait]
impl SourceActor for TimerActor {
fn kind_name(&self) -> &'static str {
"Timer"
}
async fn run(
&self,
config: Molecule,
handle: ReactorHandle,
cancel: CancellationToken,
) -> Result<(), Error> {
let interval_ms = config
.fields
.get("interval")
.ok_or_else(|| Error::Runtime("Timer: missing `interval` field".into()))?
.as_duration_ms()?;
let mut sequence: i64 = 0;
let mut interval =
tokio::time::interval(Duration::from_millis(interval_ms.max(1) as u64));
loop {
tokio::select! {
_ = cancel.cancelled() => return Ok(()),
_ = interval.tick() => {
sequence += 1;
let tick = Molecule::builder("Tick")
.field("sequence", Value::Int(sequence))
.field("ts", Value::Timestamp(now_ms()))
.build();
handle.emit(tick).await?;
}
}
}
}
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}