greentic_runner_host/runner/
adapt_timer.rs1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use cron::Schedule;
8use serde_json::json;
9use tokio::task::JoinHandle;
10use tokio::time::sleep;
11
12use crate::engine::runtime::IngressEnvelope;
13use crate::runtime::TenantRuntime;
14
15pub fn spawn_timers(runtime: Arc<TenantRuntime>) -> Result<Vec<JoinHandle<()>>> {
16 let mut handles = Vec::new();
17
18 for timer in runtime.config().timers.clone() {
19 let cron_expr = timer.cron.clone();
20 let normalized = normalize_cron(&cron_expr);
21 let schedule = Schedule::from_str(&normalized)
22 .with_context(|| format!("invalid cron expression for {}", timer.schedule_id()))?;
23 let flow_id = timer.flow_id.clone();
24 let schedule_id = timer.schedule_id().to_string();
25 let tenant = runtime.config().tenant.clone();
26 let runtime_clone = Arc::clone(&runtime);
27
28 let handle = tokio::spawn(async move {
29 tracing::info!(
30 flow_id = %flow_id,
31 schedule_id = %schedule_id,
32 cron = %cron_expr,
33 normalized_cron = %normalized,
34 "registered timer schedule"
35 );
36 for next in schedule.upcoming(Utc) {
37 if let Some(wait) = duration_until(next) {
38 sleep(wait).await;
39 } else {
40 continue;
41 }
42 let payload = json!({
43 "now": next.to_rfc3339(),
44 "schedule_id": schedule_id.clone(),
45 });
46 tracing::info!(
47 flow_id = %flow_id,
48 schedule_id = %schedule_id,
49 scheduled_for = %next,
50 "triggering timer flow"
51 );
52 let envelope = IngressEnvelope {
53 tenant: tenant.clone(),
54 env: None,
55 flow_id: flow_id.clone(),
56 flow_type: Some("timer".into()),
57 action: Some("timer".into()),
58 session_hint: Some(schedule_id.clone()),
59 provider: Some("timer".into()),
60 channel: Some(schedule_id.clone()),
61 conversation: Some(schedule_id.clone()),
62 user: None,
63 activity_id: Some(format!("{}@{}", schedule_id, next)),
64 timestamp: Some(next.to_rfc3339()),
65 payload,
66 metadata: None,
67 }
68 .canonicalize();
69 match runtime_clone.state_machine().handle(envelope).await {
70 Ok(output) => {
71 tracing::info!(
72 flow_id = %flow_id,
73 schedule_id = %schedule_id,
74 now = %next,
75 response = %output,
76 "timer flow completed"
77 );
78 }
79 Err(err) => {
80 let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
81 tracing::error!(
82 flow_id = %flow_id,
83 schedule_id = %schedule_id,
84 error.cause_chain = ?chain,
85 "timer flow execution failed"
86 );
87 }
88 }
89 }
90 tracing::info!(
91 flow_id = %flow_id,
92 schedule_id = %schedule_id,
93 "timer schedule completed"
94 );
95 });
96
97 handles.push(handle);
98 }
99
100 Ok(handles)
101}
102
103fn duration_until(next: DateTime<Utc>) -> Option<Duration> {
104 let now = Utc::now();
105 let duration = next - now;
106 if duration.num_milliseconds() <= 0 {
107 return Some(Duration::from_secs(0));
108 }
109 duration.to_std().ok()
110}
111
112fn normalize_cron(expr: &str) -> String {
113 if expr.split_whitespace().count() == 5 {
114 format!("0 {expr}")
115 } else {
116 expr.to_string()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn normalize_cron_adds_seconds_for_five_fields() {
126 assert_eq!(normalize_cron("*/5 * * * *"), "0 */5 * * * *");
127 assert_eq!(normalize_cron("0 */2 * * * *"), "0 */2 * * * *");
128 }
129
130 #[test]
131 fn duration_until_returns_zero_for_past_times() {
132 let past = Utc::now() - chrono::Duration::seconds(10);
133 assert_eq!(duration_until(past).unwrap(), Duration::from_secs(0));
134 }
135}