Skip to main content

greentic_runner_host/runner/
adapt_timer.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use cron::Schedule;
8use serde_json::json;
9use tokio::task::JoinHandle;
10use tokio::time::sleep;
11
12use crate::engine::runtime::IngressEnvelope;
13use crate::runtime::TenantRuntime;
14
15pub fn spawn_timers(runtime: Arc<TenantRuntime>) -> Result<Vec<JoinHandle<()>>> {
16    let mut handles = Vec::new();
17
18    for timer in runtime.config().timers.clone() {
19        let cron_expr = timer.cron.clone();
20        let normalized = normalize_cron(&cron_expr);
21        let schedule = Schedule::from_str(&normalized)
22            .with_context(|| format!("invalid cron expression for {}", timer.schedule_id()))?;
23        let flow_id = timer.flow_id.clone();
24        let schedule_id = timer.schedule_id().to_string();
25        let tenant = runtime.config().tenant.clone();
26        let runtime_clone = Arc::clone(&runtime);
27
28        let handle = tokio::spawn(async move {
29            tracing::info!(
30                flow_id = %flow_id,
31                schedule_id = %schedule_id,
32                cron = %cron_expr,
33                normalized_cron = %normalized,
34                "registered timer schedule"
35            );
36            for next in schedule.upcoming(Utc) {
37                if let Some(wait) = duration_until(next) {
38                    sleep(wait).await;
39                } else {
40                    continue;
41                }
42                let pack_id = match runtime_clone.engine().flow_by_id(&flow_id) {
43                    Some(flow) => flow.pack_id.clone(),
44                    None => {
45                        tracing::error!(
46                            flow_id = %flow_id,
47                            schedule_id = %schedule_id,
48                            "timer flow is ambiguous; pack_id is required"
49                        );
50                        continue;
51                    }
52                };
53                let payload = json!({
54                    "now": next.to_rfc3339(),
55                    "schedule_id": schedule_id.clone(),
56                });
57                tracing::info!(
58                    flow_id = %flow_id,
59                    schedule_id = %schedule_id,
60                    scheduled_for = %next,
61                    "triggering timer flow"
62                );
63                let envelope = IngressEnvelope {
64                    tenant: tenant.clone(),
65                    env: None,
66                    pack_id: Some(pack_id),
67                    flow_id: flow_id.clone(),
68                    flow_type: Some("timer".into()),
69                    action: Some("timer".into()),
70                    session_hint: Some(schedule_id.clone()),
71                    provider: Some("timer".into()),
72                    // Timers fire without a messaging endpoint by definition.
73                    messaging_endpoint_id: None,
74                    channel: Some(schedule_id.clone()),
75                    conversation: Some(schedule_id.clone()),
76                    user: None,
77                    activity_id: Some(format!("{}@{}", schedule_id, next)),
78                    timestamp: Some(next.to_rfc3339()),
79                    payload,
80                    metadata: None,
81                    reply_scope: None,
82                }
83                .canonicalize();
84                match runtime_clone.state_machine().handle(envelope).await {
85                    Ok(output) => {
86                        tracing::info!(
87                            flow_id = %flow_id,
88                            schedule_id = %schedule_id,
89                            now = %next,
90                            response = %output,
91                            "timer flow completed"
92                        );
93                    }
94                    Err(err) => {
95                        let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
96                        tracing::error!(
97                            flow_id = %flow_id,
98                            schedule_id = %schedule_id,
99                            error.cause_chain = ?chain,
100                            "timer flow execution failed"
101                        );
102                    }
103                }
104            }
105            tracing::info!(
106                flow_id = %flow_id,
107                schedule_id = %schedule_id,
108                "timer schedule completed"
109            );
110        });
111
112        handles.push(handle);
113    }
114
115    Ok(handles)
116}
117
118fn duration_until(next: DateTime<Utc>) -> Option<Duration> {
119    let now = Utc::now();
120    let duration = next - now;
121    if duration.num_milliseconds() <= 0 {
122        return Some(Duration::from_secs(0));
123    }
124    duration.to_std().ok()
125}
126
127fn normalize_cron(expr: &str) -> String {
128    if expr.split_whitespace().count() == 5 {
129        format!("0 {expr}")
130    } else {
131        expr.to_string()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use std::sync::Arc;
139
140    #[test]
141    fn normalize_cron_adds_seconds_for_five_fields() {
142        assert_eq!(normalize_cron("*/5 * * * *"), "0 */5 * * * *");
143        assert_eq!(normalize_cron("0 */2 * * * *"), "0 */2 * * * *");
144    }
145
146    #[test]
147    fn duration_until_returns_zero_for_past_times() {
148        let past = Utc::now() - chrono::Duration::seconds(10);
149        assert_eq!(duration_until(past).unwrap(), Duration::from_secs(0));
150    }
151
152    #[test]
153    fn duration_until_returns_positive_duration_for_future_times() {
154        let future = Utc::now() + chrono::Duration::milliseconds(150);
155        let wait = duration_until(future).unwrap();
156        assert!(wait > Duration::from_secs(0));
157        assert!(wait <= Duration::from_secs(1));
158    }
159
160    #[test]
161    fn normalize_cron_preserves_non_five_field_expressions() {
162        assert_eq!(normalize_cron("@daily"), "@daily");
163        assert_eq!(normalize_cron("0 0 */2 * * *"), "0 0 */2 * * *");
164    }
165
166    #[tokio::test]
167    async fn spawn_timers_is_empty_when_config_has_no_timers() {
168        let (_workspace, runtime) = crate::test_support::build_test_runtime()
169            .await
170            .expect("runtime");
171        let timers = spawn_timers(Arc::clone(&runtime)).expect("spawn timers");
172        assert!(timers.is_empty());
173    }
174}