1use std::sync::{Arc, mpsc};
2use std::thread;
3use std::time::{Duration, Instant};
4
5use anyhow::Context;
6use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use serde_json::{Value as JsonValue, json};
9use zip::ZipArchive;
10
11use crate::demo::event_router::route_events_to_default_flow;
12use crate::demo::ingress_types::EventEnvelopeV1;
13use crate::demo::runner_host::{DemoRunnerHost, OperatorContext};
14use crate::discovery;
15use crate::domains::Domain;
16use crate::operator_log;
17
18#[derive(Clone, Debug)]
19pub struct TimerHandlerConfig {
20 pub provider: String,
21 pub op_id: String,
22 pub handler_id: String,
23 pub interval_seconds: u64,
24}
25
26#[derive(Clone)]
27pub struct TimerSchedulerConfig {
28 pub runner_host: Arc<DemoRunnerHost>,
29 pub tenant: String,
30 pub team: Option<String>,
31 pub handlers: Vec<TimerHandlerConfig>,
32 pub debug_enabled: bool,
33}
34
35pub struct TimerScheduler {
36 shutdown: Option<mpsc::Sender<()>>,
37 handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
38}
39
40impl TimerScheduler {
41 pub fn start(config: TimerSchedulerConfig) -> anyhow::Result<Self> {
42 let (tx, rx) = mpsc::channel::<()>();
43 let handle = thread::Builder::new()
44 .name("demo-events-timer".to_string())
45 .spawn(move || run_scheduler_loop(config, rx))
46 .context("spawn timer scheduler thread")?;
47 Ok(Self {
48 shutdown: Some(tx),
49 handle: Some(handle),
50 })
51 }
52
53 pub fn stop(mut self) -> anyhow::Result<()> {
54 if let Some(tx) = self.shutdown.take() {
55 let _ = tx.send(());
56 }
57 if let Some(handle) = self.handle.take() {
58 handle
59 .join()
60 .map_err(|err| anyhow::anyhow!("timer scheduler panicked: {err:?}"))??;
61 }
62 Ok(())
63 }
64}
65
66#[derive(Clone)]
67struct ScheduledTimer {
68 config: TimerHandlerConfig,
69 next_tick: Instant,
70 last_run_rfc3339: Option<String>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct TimerTickInputV1 {
75 v: u8,
76 domain: String,
77 provider: String,
78 handler_id: String,
79 tenant: String,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
81 team: Option<String>,
82 occurred_at: String,
83 interval_seconds: u64,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
85 last_run: Option<String>,
86}
87
88fn run_scheduler_loop(config: TimerSchedulerConfig, rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
89 if config.handlers.is_empty() {
90 return Ok(());
91 }
92 let mut timers = config
93 .handlers
94 .iter()
95 .cloned()
96 .map(|handler| ScheduledTimer {
97 next_tick: Instant::now() + Duration::from_secs(handler.interval_seconds.max(1)),
98 config: handler,
99 last_run_rfc3339: None,
100 })
101 .collect::<Vec<_>>();
102
103 operator_log::info(
104 module_path!(),
105 format!(
106 "events timer scheduler started handlers={} tenant={} team={}",
107 timers.len(),
108 config.tenant,
109 config.team.as_deref().unwrap_or("default")
110 ),
111 );
112
113 loop {
114 let now = Instant::now();
115 for timer in &mut timers {
116 if now < timer.next_tick {
117 continue;
118 }
119 if let Err(err) = run_timer_handler(&config, timer) {
120 operator_log::error(module_path!(), format!("timer handler failed: {err}"));
121 }
122 timer.next_tick = Instant::now() + Duration::from_secs(timer.config.interval_seconds);
123 }
124
125 let sleep_for = timers
126 .iter()
127 .map(|timer| timer.next_tick.saturating_duration_since(Instant::now()))
128 .min()
129 .unwrap_or_else(|| Duration::from_millis(200))
130 .max(Duration::from_millis(50));
131 if rx.recv_timeout(sleep_for).is_ok() {
132 break;
133 }
134 }
135
136 operator_log::info(module_path!(), "events timer scheduler stopped");
137 Ok(())
138}
139
140fn run_timer_handler(
141 scheduler: &TimerSchedulerConfig,
142 timer: &mut ScheduledTimer,
143) -> anyhow::Result<()> {
144 let occurred_at = Utc::now().to_rfc3339();
145 let payload = TimerTickInputV1 {
146 v: 1,
147 domain: "events".to_string(),
148 provider: timer.config.provider.clone(),
149 handler_id: timer.config.handler_id.clone(),
150 tenant: scheduler.tenant.clone(),
151 team: scheduler.team.clone(),
152 occurred_at: occurred_at.clone(),
153 interval_seconds: timer.config.interval_seconds,
154 last_run: timer.last_run_rfc3339.clone(),
155 };
156 let bytes = greentic_types::cbor::canonical::to_canonical_cbor(&payload)
157 .map_err(|err| anyhow::anyhow!("{err}"))?;
158 let context = OperatorContext {
159 tenant: scheduler.tenant.clone(),
160 team: scheduler.team.clone(),
161 correlation_id: None,
162 };
163 let outcome = scheduler.runner_host.invoke_provider_op(
164 Domain::Events,
165 &timer.config.provider,
166 &timer.config.op_id,
167 &bytes,
168 &context,
169 )?;
170 if !outcome.success {
171 let message = outcome
172 .error
173 .or(outcome.raw)
174 .unwrap_or_else(|| "timer op failed".to_string());
175 anyhow::bail!(
176 "provider={} op={} handler={} failed: {}",
177 timer.config.provider,
178 timer.config.op_id,
179 timer.config.handler_id,
180 message
181 );
182 }
183 if scheduler.debug_enabled {
184 operator_log::debug(
185 module_path!(),
186 format!(
187 "[demo dev] timer tick provider={} op={} handler={} tenant={} team={}",
188 timer.config.provider,
189 timer.config.op_id,
190 timer.config.handler_id,
191 scheduler.tenant,
192 scheduler.team.as_deref().unwrap_or("default")
193 ),
194 );
195 }
196 let output = outcome.output.unwrap_or_else(|| json!({}));
197 let events = parse_events(&output)?;
198 if !events.is_empty() {
199 route_events_to_default_flow(scheduler.runner_host.bundle_root(), &context, &events)?;
200 }
201 timer.last_run_rfc3339 = Some(occurred_at);
202 Ok(())
203}
204
205fn parse_events(output: &JsonValue) -> anyhow::Result<Vec<EventEnvelopeV1>> {
206 let Some(array) = output.get("events").and_then(JsonValue::as_array) else {
207 return Ok(Vec::new());
208 };
209 let mut events = Vec::with_capacity(array.len());
210 for entry in array {
211 let event: EventEnvelopeV1 = serde_json::from_value(entry.clone())
212 .context("invalid EventEnvelopeV1 emitted by timer op")?;
213 events.push(event);
214 }
215 Ok(events)
216}
217
218pub fn discover_timer_handlers(
219 discovery: &discovery::DiscoveryResult,
220 default_interval_seconds: u64,
221) -> anyhow::Result<Vec<TimerHandlerConfig>> {
222 let mut handlers = Vec::new();
223 for provider in &discovery.providers {
224 if provider.domain != "events" {
225 continue;
226 }
227 let file = std::fs::File::open(&provider.pack_path)?;
228 let mut archive = ZipArchive::new(file)?;
229 let mut manifest = archive.by_name("manifest.cbor").with_context(|| {
230 format!("manifest.cbor missing in {}", provider.pack_path.display())
231 })?;
232 let mut bytes = Vec::new();
233 std::io::Read::read_to_end(&mut manifest, &mut bytes)?;
234 let manifest_json: JsonValue = serde_cbor::from_slice(&bytes)
235 .with_context(|| format!("decode manifest.cbor {}", provider.pack_path.display()))?;
236 let explicit = parse_explicit_timer_handlers(
237 &manifest_json,
238 &provider.provider_id,
239 default_interval_seconds.max(1),
240 )?;
241 if !explicit.is_empty() {
242 handlers.extend(explicit);
243 continue;
244 }
245 for op in parse_provider_ops(&manifest_json, &provider.provider_id)? {
247 if let Some((handler_id, interval_seconds)) =
248 parse_timer_op(&op, default_interval_seconds.max(1))
249 {
250 handlers.push(TimerHandlerConfig {
251 provider: provider.provider_id.clone(),
252 op_id: op,
253 handler_id,
254 interval_seconds,
255 });
256 }
257 }
258 }
259 Ok(handlers)
260}
261
262fn parse_explicit_timer_handlers(
263 manifest_json: &JsonValue,
264 default_provider: &str,
265 default_interval_seconds: u64,
266) -> anyhow::Result<Vec<TimerHandlerConfig>> {
267 let inline = provider_extension_inline_json(manifest_json)?;
268 let mut handlers = Vec::new();
269 for key in ["timer_handlers", "timers"] {
270 if let Some(values) = inline.get(key).and_then(JsonValue::as_array) {
271 for entry in values {
272 if let Some(handler) =
273 parse_timer_handler_entry(entry, default_provider, default_interval_seconds)
274 {
275 handlers.push(handler);
276 }
277 }
278 }
279 }
280 if !handlers.is_empty() {
281 return Ok(handlers);
282 }
283 if let Some(providers) = inline.get("providers").and_then(JsonValue::as_array) {
284 for provider in providers {
285 let provider_type = provider
286 .get("provider_type")
287 .and_then(JsonValue::as_str)
288 .unwrap_or(default_provider);
289 for key in ["timer_handlers", "timers"] {
290 if let Some(values) = provider.get(key).and_then(JsonValue::as_array) {
291 for entry in values {
292 if let Some(mut handler) = parse_timer_handler_entry(
293 entry,
294 default_provider,
295 default_interval_seconds,
296 ) {
297 if handler.provider == default_provider {
298 handler.provider = provider_type.to_string();
299 }
300 handlers.push(handler);
301 }
302 }
303 }
304 }
305 }
306 }
307 Ok(handlers)
308}
309
310fn parse_timer_handler_entry(
311 value: &JsonValue,
312 default_provider: &str,
313 default_interval_seconds: u64,
314) -> Option<TimerHandlerConfig> {
315 if let Some(op_id) = value.as_str() {
316 return Some(TimerHandlerConfig {
317 provider: default_provider.to_string(),
318 op_id: op_id.to_string(),
319 handler_id: "default".to_string(),
320 interval_seconds: default_interval_seconds,
321 });
322 }
323 let obj = value.as_object()?;
324 let op_id = obj
325 .get("op_id")
326 .and_then(JsonValue::as_str)
327 .or_else(|| obj.get("op").and_then(JsonValue::as_str))?
328 .to_string();
329 let handler_id = obj
330 .get("handler_id")
331 .and_then(JsonValue::as_str)
332 .or_else(|| obj.get("handler").and_then(JsonValue::as_str))
333 .unwrap_or("default")
334 .to_string();
335 let provider = obj
336 .get("provider_type")
337 .and_then(JsonValue::as_str)
338 .or_else(|| obj.get("provider").and_then(JsonValue::as_str))
339 .unwrap_or(default_provider)
340 .to_string();
341 let interval_seconds = obj
342 .get("interval_seconds")
343 .and_then(JsonValue::as_u64)
344 .or_else(|| obj.get("interval").and_then(JsonValue::as_u64))
345 .unwrap_or(default_interval_seconds)
346 .max(1);
347 Some(TimerHandlerConfig {
348 provider,
349 op_id,
350 handler_id,
351 interval_seconds,
352 })
353}
354
355fn parse_provider_ops(manifest_json: &JsonValue, provider_id: &str) -> anyhow::Result<Vec<String>> {
356 let inline = provider_extension_inline_json(manifest_json)?;
357 let mut ops = Vec::new();
358 if let Some(providers) = inline.get("providers").and_then(JsonValue::as_array) {
359 for provider in providers {
360 let provider_type = provider
361 .get("provider_type")
362 .and_then(JsonValue::as_str)
363 .unwrap_or_default();
364 if provider_type != provider_id {
365 continue;
366 }
367 if let Some(provider_ops) = provider.get("ops").and_then(JsonValue::as_array) {
368 for op in provider_ops {
369 if let Some(op_id) = op.as_str() {
370 ops.push(op_id.to_string());
371 }
372 }
373 }
374 }
375 }
376 Ok(ops)
377}
378
379fn provider_extension_inline_json(manifest_json: &JsonValue) -> anyhow::Result<&JsonValue> {
380 manifest_json
381 .get("extensions")
382 .and_then(JsonValue::as_object)
383 .and_then(|extensions| extensions.get("greentic.provider-extension.v1"))
384 .and_then(JsonValue::as_object)
385 .and_then(|ext| ext.get("inline"))
386 .ok_or_else(|| anyhow::anyhow!("provider extension inline payload missing"))
387}
388
389fn parse_timer_op(op: &str, default_interval_seconds: u64) -> Option<(String, u64)> {
390 if op.eq_ignore_ascii_case("timer_tick") || op.eq_ignore_ascii_case("ingest_timer") {
391 return Some(("default".to_string(), default_interval_seconds));
392 }
393 let prefix = "timer_";
394 if !op.starts_with(prefix) {
395 return None;
396 }
397 let tail = &op[prefix.len()..];
398 if tail.is_empty() {
399 return Some(("default".to_string(), default_interval_seconds));
400 }
401 let mut parts = tail.rsplitn(2, '_');
402 let last = parts.next().unwrap_or_default();
403 let rest = parts.next();
404 if let Ok(interval) = last.parse::<u64>() {
405 let handler = rest.unwrap_or("default");
406 return Some((handler.to_string(), interval.max(1)));
407 }
408 Some((tail.to_string(), default_interval_seconds))
409}
410
411#[cfg(test)]
412mod tests {
413 use super::{parse_explicit_timer_handlers, parse_timer_op};
414 use serde_json::json;
415
416 #[test]
417 fn timer_op_conventions_are_detected() {
418 assert_eq!(
419 parse_timer_op("timer_tick", 30).expect("timer_tick"),
420 ("default".to_string(), 30)
421 );
422 assert_eq!(
423 parse_timer_op("timer_reminder", 30).expect("timer_reminder"),
424 ("reminder".to_string(), 30)
425 );
426 assert_eq!(
427 parse_timer_op("timer_reminder_10", 30).expect("timer_reminder_10"),
428 ("reminder".to_string(), 10)
429 );
430 assert!(parse_timer_op("ingest_http", 30).is_none());
431 }
432
433 #[test]
434 fn parses_explicit_timer_handlers_from_extension() {
435 let manifest = json!({
436 "extensions": {
437 "greentic.provider-extension.v1": {
438 "inline": {
439 "timer_handlers": [
440 {"provider_type":"events-twilio","op_id":"timer_poll","handler_id":"poll","interval_seconds":15}
441 ]
442 }
443 }
444 }
445 });
446 let handlers =
447 parse_explicit_timer_handlers(&manifest, "events-twilio", 60).expect("parse explicit");
448 assert_eq!(handlers.len(), 1);
449 assert_eq!(handlers[0].provider, "events-twilio");
450 assert_eq!(handlers[0].op_id, "timer_poll");
451 assert_eq!(handlers[0].handler_id, "poll");
452 assert_eq!(handlers[0].interval_seconds, 15);
453 }
454}