thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use super::SourceActor;
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::Value;
use crate::Error;
use async_trait::async_trait;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;

pub struct TimerActor;

#[async_trait]
impl SourceActor for TimerActor {
    fn kind_name(&self) -> &'static str {
        "Timer"
    }

    async fn run(
        &self,
        config: Molecule,
        handle: ReactorHandle,
        cancel: CancellationToken,
    ) -> Result<(), Error> {
        let interval_ms = config
            .fields
            .get("interval")
            .ok_or_else(|| Error::Runtime("Timer: missing `interval` field".into()))?
            .as_duration_ms()?;

        let mut sequence: i64 = 0;
        let mut interval =
            tokio::time::interval(Duration::from_millis(interval_ms.max(1) as u64));

        loop {
            tokio::select! {
                _ = cancel.cancelled() => return Ok(()),
                _ = interval.tick() => {
                    sequence += 1;
                    let tick = Molecule::builder("Tick")
                        .field("sequence", Value::Int(sequence))
                        .field("ts", Value::Timestamp(now_ms()))
                        .build();
                    handle.emit(tick).await?;
                }
            }
        }
    }
}

fn now_ms() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as i64)
        .unwrap_or(0)
}