greentic_runner_host/runner/
adapt_timer.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use cron::Schedule;
8use serde_json::json;
9use tokio::task::JoinHandle;
10use tokio::time::sleep;
11
12use crate::engine::runtime::IngressEnvelope;
13use crate::runtime::TenantRuntime;
14
15pub fn spawn_timers(runtime: Arc<TenantRuntime>) -> Result<Vec<JoinHandle<()>>> {
16    let mut handles = Vec::new();
17
18    for timer in runtime.config().timers.clone() {
19        let cron_expr = timer.cron.clone();
20        let normalized = normalize_cron(&cron_expr);
21        let schedule = Schedule::from_str(&normalized)
22            .with_context(|| format!("invalid cron expression for {}", timer.schedule_id()))?;
23        let flow_id = timer.flow_id.clone();
24        let schedule_id = timer.schedule_id().to_string();
25        let tenant = runtime.config().tenant.clone();
26        let runtime_clone = Arc::clone(&runtime);
27
28        let handle = tokio::spawn(async move {
29            tracing::info!(
30                flow_id = %flow_id,
31                schedule_id = %schedule_id,
32                cron = %cron_expr,
33                normalized_cron = %normalized,
34                "registered timer schedule"
35            );
36            for next in schedule.upcoming(Utc) {
37                if let Some(wait) = duration_until(next) {
38                    sleep(wait).await;
39                } else {
40                    continue;
41                }
42                let payload = json!({
43                    "now": next.to_rfc3339(),
44                    "schedule_id": schedule_id.clone(),
45                });
46                tracing::info!(
47                    flow_id = %flow_id,
48                    schedule_id = %schedule_id,
49                    scheduled_for = %next,
50                    "triggering timer flow"
51                );
52                let envelope = IngressEnvelope {
53                    tenant: tenant.clone(),
54                    env: None,
55                    flow_id: flow_id.clone(),
56                    flow_type: Some("timer".into()),
57                    action: Some("timer".into()),
58                    session_hint: Some(schedule_id.clone()),
59                    provider: Some("timer".into()),
60                    channel: Some(schedule_id.clone()),
61                    conversation: Some(schedule_id.clone()),
62                    user: None,
63                    activity_id: Some(format!("{}@{}", schedule_id, next)),
64                    timestamp: Some(next.to_rfc3339()),
65                    payload,
66                    metadata: None,
67                }
68                .canonicalize();
69                match runtime_clone.state_machine().handle(envelope).await {
70                    Ok(output) => {
71                        tracing::info!(
72                            flow_id = %flow_id,
73                            schedule_id = %schedule_id,
74                            now = %next,
75                            response = %output,
76                            "timer flow completed"
77                        );
78                    }
79                    Err(err) => {
80                        let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
81                        tracing::error!(
82                            flow_id = %flow_id,
83                            schedule_id = %schedule_id,
84                            error.cause_chain = ?chain,
85                            "timer flow execution failed"
86                        );
87                    }
88                }
89            }
90            tracing::info!(
91                flow_id = %flow_id,
92                schedule_id = %schedule_id,
93                "timer schedule completed"
94            );
95        });
96
97        handles.push(handle);
98    }
99
100    Ok(handles)
101}
102
103fn duration_until(next: DateTime<Utc>) -> Option<Duration> {
104    let now = Utc::now();
105    let duration = next - now;
106    if duration.num_milliseconds() <= 0 {
107        return Some(Duration::from_secs(0));
108    }
109    duration.to_std().ok()
110}
111
112fn normalize_cron(expr: &str) -> String {
113    if expr.split_whitespace().count() == 5 {
114        format!("0 {expr}")
115    } else {
116        expr.to_string()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn normalize_cron_adds_seconds_for_five_fields() {
126        assert_eq!(normalize_cron("*/5 * * * *"), "0 */5 * * * *");
127        assert_eq!(normalize_cron("0 */2 * * * *"), "0 */2 * * * *");
128    }
129
130    #[test]
131    fn duration_until_returns_zero_for_past_times() {
132        let past = Utc::now() - chrono::Duration::seconds(10);
133        assert_eq!(duration_until(past).unwrap(), Duration::from_secs(0));
134    }
135}