greentic_runner_host/runner/
adapt_timer.rs1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use cron::Schedule;
8use serde_json::json;
9use tokio::task::JoinHandle;
10use tokio::time::sleep;
11
12use crate::engine::runtime::IngressEnvelope;
13use crate::runtime::TenantRuntime;
14
15pub fn spawn_timers(runtime: Arc<TenantRuntime>) -> Result<Vec<JoinHandle<()>>> {
16 let mut handles = Vec::new();
17
18 for timer in runtime.config().timers.clone() {
19 let cron_expr = timer.cron.clone();
20 let normalized = normalize_cron(&cron_expr);
21 let schedule = Schedule::from_str(&normalized)
22 .with_context(|| format!("invalid cron expression for {}", timer.schedule_id()))?;
23 let flow_id = timer.flow_id.clone();
24 let schedule_id = timer.schedule_id().to_string();
25 let tenant = runtime.config().tenant.clone();
26 let runtime_clone = Arc::clone(&runtime);
27
28 let handle = tokio::spawn(async move {
29 tracing::info!(
30 flow_id = %flow_id,
31 schedule_id = %schedule_id,
32 cron = %cron_expr,
33 normalized_cron = %normalized,
34 "registered timer schedule"
35 );
36 for next in schedule.upcoming(Utc) {
37 if let Some(wait) = duration_until(next) {
38 sleep(wait).await;
39 } else {
40 continue;
41 }
42 let pack_id = match runtime_clone.engine().flow_by_id(&flow_id) {
43 Some(flow) => flow.pack_id.clone(),
44 None => {
45 tracing::error!(
46 flow_id = %flow_id,
47 schedule_id = %schedule_id,
48 "timer flow is ambiguous; pack_id is required"
49 );
50 continue;
51 }
52 };
53 let payload = json!({
54 "now": next.to_rfc3339(),
55 "schedule_id": schedule_id.clone(),
56 });
57 tracing::info!(
58 flow_id = %flow_id,
59 schedule_id = %schedule_id,
60 scheduled_for = %next,
61 "triggering timer flow"
62 );
63 let envelope = IngressEnvelope {
64 tenant: tenant.clone(),
65 env: None,
66 pack_id: Some(pack_id),
67 flow_id: flow_id.clone(),
68 flow_type: Some("timer".into()),
69 action: Some("timer".into()),
70 session_hint: Some(schedule_id.clone()),
71 provider: Some("timer".into()),
72 messaging_endpoint_id: None,
74 channel: Some(schedule_id.clone()),
75 conversation: Some(schedule_id.clone()),
76 user: None,
77 activity_id: Some(format!("{}@{}", schedule_id, next)),
78 timestamp: Some(next.to_rfc3339()),
79 payload,
80 metadata: None,
81 reply_scope: None,
82 }
83 .canonicalize();
84 match runtime_clone.state_machine().handle(envelope).await {
85 Ok(output) => {
86 tracing::info!(
87 flow_id = %flow_id,
88 schedule_id = %schedule_id,
89 now = %next,
90 response = %output,
91 "timer flow completed"
92 );
93 }
94 Err(err) => {
95 let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
96 tracing::error!(
97 flow_id = %flow_id,
98 schedule_id = %schedule_id,
99 error.cause_chain = ?chain,
100 "timer flow execution failed"
101 );
102 }
103 }
104 }
105 tracing::info!(
106 flow_id = %flow_id,
107 schedule_id = %schedule_id,
108 "timer schedule completed"
109 );
110 });
111
112 handles.push(handle);
113 }
114
115 Ok(handles)
116}
117
118fn duration_until(next: DateTime<Utc>) -> Option<Duration> {
119 let now = Utc::now();
120 let duration = next - now;
121 if duration.num_milliseconds() <= 0 {
122 return Some(Duration::from_secs(0));
123 }
124 duration.to_std().ok()
125}
126
127fn normalize_cron(expr: &str) -> String {
128 if expr.split_whitespace().count() == 5 {
129 format!("0 {expr}")
130 } else {
131 expr.to_string()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use std::sync::Arc;
139
140 #[test]
141 fn normalize_cron_adds_seconds_for_five_fields() {
142 assert_eq!(normalize_cron("*/5 * * * *"), "0 */5 * * * *");
143 assert_eq!(normalize_cron("0 */2 * * * *"), "0 */2 * * * *");
144 }
145
146 #[test]
147 fn duration_until_returns_zero_for_past_times() {
148 let past = Utc::now() - chrono::Duration::seconds(10);
149 assert_eq!(duration_until(past).unwrap(), Duration::from_secs(0));
150 }
151
152 #[test]
153 fn duration_until_returns_positive_duration_for_future_times() {
154 let future = Utc::now() + chrono::Duration::milliseconds(150);
155 let wait = duration_until(future).unwrap();
156 assert!(wait > Duration::from_secs(0));
157 assert!(wait <= Duration::from_secs(1));
158 }
159
160 #[test]
161 fn normalize_cron_preserves_non_five_field_expressions() {
162 assert_eq!(normalize_cron("@daily"), "@daily");
163 assert_eq!(normalize_cron("0 0 */2 * * *"), "0 0 */2 * * *");
164 }
165
166 #[tokio::test]
167 async fn spawn_timers_is_empty_when_config_has_no_timers() {
168 let (_workspace, runtime) = crate::test_support::build_test_runtime()
169 .await
170 .expect("runtime");
171 let timers = spawn_timers(Arc::clone(&runtime)).expect("spawn timers");
172 assert!(timers.is_empty());
173 }
174}