Skip to main content

greentic_operator/demo/
timer_scheduler.rs

1use std::sync::{Arc, mpsc};
2use std::thread;
3use std::time::{Duration, Instant};
4
5use anyhow::Context;
6use chrono::Utc;
7use serde::{Deserialize, Serialize};
8use serde_json::{Value as JsonValue, json};
9use zip::ZipArchive;
10
11use crate::demo::event_router::route_events_to_default_flow;
12use crate::demo::ingress_types::EventEnvelopeV1;
13use crate::demo::runner_host::{DemoRunnerHost, OperatorContext};
14use crate::discovery;
15use crate::domains::Domain;
16use crate::operator_log;
17
18#[derive(Clone, Debug)]
19pub struct TimerHandlerConfig {
20    pub provider: String,
21    pub op_id: String,
22    pub handler_id: String,
23    pub interval_seconds: u64,
24}
25
26#[derive(Clone)]
27pub struct TimerSchedulerConfig {
28    pub runner_host: Arc<DemoRunnerHost>,
29    pub tenant: String,
30    pub team: Option<String>,
31    pub handlers: Vec<TimerHandlerConfig>,
32    pub debug_enabled: bool,
33}
34
35pub struct TimerScheduler {
36    shutdown: Option<mpsc::Sender<()>>,
37    handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
38}
39
40impl TimerScheduler {
41    pub fn start(config: TimerSchedulerConfig) -> anyhow::Result<Self> {
42        let (tx, rx) = mpsc::channel::<()>();
43        let handle = thread::Builder::new()
44            .name("demo-events-timer".to_string())
45            .spawn(move || run_scheduler_loop(config, rx))
46            .context("spawn timer scheduler thread")?;
47        Ok(Self {
48            shutdown: Some(tx),
49            handle: Some(handle),
50        })
51    }
52
53    pub fn stop(mut self) -> anyhow::Result<()> {
54        if let Some(tx) = self.shutdown.take() {
55            let _ = tx.send(());
56        }
57        if let Some(handle) = self.handle.take() {
58            handle
59                .join()
60                .map_err(|err| anyhow::anyhow!("timer scheduler panicked: {err:?}"))??;
61        }
62        Ok(())
63    }
64}
65
66#[derive(Clone)]
67struct ScheduledTimer {
68    config: TimerHandlerConfig,
69    next_tick: Instant,
70    last_run_rfc3339: Option<String>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct TimerTickInputV1 {
75    v: u8,
76    domain: String,
77    provider: String,
78    handler_id: String,
79    tenant: String,
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    team: Option<String>,
82    occurred_at: String,
83    interval_seconds: u64,
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    last_run: Option<String>,
86}
87
88fn run_scheduler_loop(config: TimerSchedulerConfig, rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
89    if config.handlers.is_empty() {
90        return Ok(());
91    }
92    let mut timers = config
93        .handlers
94        .iter()
95        .cloned()
96        .map(|handler| ScheduledTimer {
97            next_tick: Instant::now() + Duration::from_secs(handler.interval_seconds.max(1)),
98            config: handler,
99            last_run_rfc3339: None,
100        })
101        .collect::<Vec<_>>();
102
103    operator_log::info(
104        module_path!(),
105        format!(
106            "events timer scheduler started handlers={} tenant={} team={}",
107            timers.len(),
108            config.tenant,
109            config.team.as_deref().unwrap_or("default")
110        ),
111    );
112
113    loop {
114        let now = Instant::now();
115        for timer in &mut timers {
116            if now < timer.next_tick {
117                continue;
118            }
119            if let Err(err) = run_timer_handler(&config, timer) {
120                operator_log::error(module_path!(), format!("timer handler failed: {err}"));
121            }
122            timer.next_tick = Instant::now() + Duration::from_secs(timer.config.interval_seconds);
123        }
124
125        let sleep_for = timers
126            .iter()
127            .map(|timer| timer.next_tick.saturating_duration_since(Instant::now()))
128            .min()
129            .unwrap_or_else(|| Duration::from_millis(200))
130            .max(Duration::from_millis(50));
131        if rx.recv_timeout(sleep_for).is_ok() {
132            break;
133        }
134    }
135
136    operator_log::info(module_path!(), "events timer scheduler stopped");
137    Ok(())
138}
139
140fn run_timer_handler(
141    scheduler: &TimerSchedulerConfig,
142    timer: &mut ScheduledTimer,
143) -> anyhow::Result<()> {
144    let occurred_at = Utc::now().to_rfc3339();
145    let payload = TimerTickInputV1 {
146        v: 1,
147        domain: "events".to_string(),
148        provider: timer.config.provider.clone(),
149        handler_id: timer.config.handler_id.clone(),
150        tenant: scheduler.tenant.clone(),
151        team: scheduler.team.clone(),
152        occurred_at: occurred_at.clone(),
153        interval_seconds: timer.config.interval_seconds,
154        last_run: timer.last_run_rfc3339.clone(),
155    };
156    let bytes = greentic_types::cbor::canonical::to_canonical_cbor(&payload)
157        .map_err(|err| anyhow::anyhow!("{err}"))?;
158    let context = OperatorContext {
159        tenant: scheduler.tenant.clone(),
160        team: scheduler.team.clone(),
161        correlation_id: None,
162    };
163    let outcome = scheduler.runner_host.invoke_provider_op(
164        Domain::Events,
165        &timer.config.provider,
166        &timer.config.op_id,
167        &bytes,
168        &context,
169    )?;
170    if !outcome.success {
171        let message = outcome
172            .error
173            .or(outcome.raw)
174            .unwrap_or_else(|| "timer op failed".to_string());
175        anyhow::bail!(
176            "provider={} op={} handler={} failed: {}",
177            timer.config.provider,
178            timer.config.op_id,
179            timer.config.handler_id,
180            message
181        );
182    }
183    if scheduler.debug_enabled {
184        operator_log::debug(
185            module_path!(),
186            format!(
187                "[demo dev] timer tick provider={} op={} handler={} tenant={} team={}",
188                timer.config.provider,
189                timer.config.op_id,
190                timer.config.handler_id,
191                scheduler.tenant,
192                scheduler.team.as_deref().unwrap_or("default")
193            ),
194        );
195    }
196    let output = outcome.output.unwrap_or_else(|| json!({}));
197    let events = parse_events(&output)?;
198    if !events.is_empty() {
199        route_events_to_default_flow(scheduler.runner_host.bundle_root(), &context, &events)?;
200    }
201    timer.last_run_rfc3339 = Some(occurred_at);
202    Ok(())
203}
204
205fn parse_events(output: &JsonValue) -> anyhow::Result<Vec<EventEnvelopeV1>> {
206    let Some(array) = output.get("events").and_then(JsonValue::as_array) else {
207        return Ok(Vec::new());
208    };
209    let mut events = Vec::with_capacity(array.len());
210    for entry in array {
211        let event: EventEnvelopeV1 = serde_json::from_value(entry.clone())
212            .context("invalid EventEnvelopeV1 emitted by timer op")?;
213        events.push(event);
214    }
215    Ok(events)
216}
217
218pub fn discover_timer_handlers(
219    discovery: &discovery::DiscoveryResult,
220    default_interval_seconds: u64,
221) -> anyhow::Result<Vec<TimerHandlerConfig>> {
222    let mut handlers = Vec::new();
223    for provider in &discovery.providers {
224        if provider.domain != "events" {
225            continue;
226        }
227        let file = std::fs::File::open(&provider.pack_path)?;
228        let mut archive = ZipArchive::new(file)?;
229        let mut manifest = archive.by_name("manifest.cbor").with_context(|| {
230            format!("manifest.cbor missing in {}", provider.pack_path.display())
231        })?;
232        let mut bytes = Vec::new();
233        std::io::Read::read_to_end(&mut manifest, &mut bytes)?;
234        let manifest_json: JsonValue = serde_cbor::from_slice(&bytes)
235            .with_context(|| format!("decode manifest.cbor {}", provider.pack_path.display()))?;
236        let explicit = parse_explicit_timer_handlers(
237            &manifest_json,
238            &provider.provider_id,
239            default_interval_seconds.max(1),
240        )?;
241        if !explicit.is_empty() {
242            handlers.extend(explicit);
243            continue;
244        }
245        // Guarded fallback for legacy packs that do not declare explicit timer metadata.
246        for op in parse_provider_ops(&manifest_json, &provider.provider_id)? {
247            if let Some((handler_id, interval_seconds)) =
248                parse_timer_op(&op, default_interval_seconds.max(1))
249            {
250                handlers.push(TimerHandlerConfig {
251                    provider: provider.provider_id.clone(),
252                    op_id: op,
253                    handler_id,
254                    interval_seconds,
255                });
256            }
257        }
258    }
259    Ok(handlers)
260}
261
262fn parse_explicit_timer_handlers(
263    manifest_json: &JsonValue,
264    default_provider: &str,
265    default_interval_seconds: u64,
266) -> anyhow::Result<Vec<TimerHandlerConfig>> {
267    let inline = provider_extension_inline_json(manifest_json)?;
268    let mut handlers = Vec::new();
269    for key in ["timer_handlers", "timers"] {
270        if let Some(values) = inline.get(key).and_then(JsonValue::as_array) {
271            for entry in values {
272                if let Some(handler) =
273                    parse_timer_handler_entry(entry, default_provider, default_interval_seconds)
274                {
275                    handlers.push(handler);
276                }
277            }
278        }
279    }
280    if !handlers.is_empty() {
281        return Ok(handlers);
282    }
283    if let Some(providers) = inline.get("providers").and_then(JsonValue::as_array) {
284        for provider in providers {
285            let provider_type = provider
286                .get("provider_type")
287                .and_then(JsonValue::as_str)
288                .unwrap_or(default_provider);
289            for key in ["timer_handlers", "timers"] {
290                if let Some(values) = provider.get(key).and_then(JsonValue::as_array) {
291                    for entry in values {
292                        if let Some(mut handler) = parse_timer_handler_entry(
293                            entry,
294                            default_provider,
295                            default_interval_seconds,
296                        ) {
297                            if handler.provider == default_provider {
298                                handler.provider = provider_type.to_string();
299                            }
300                            handlers.push(handler);
301                        }
302                    }
303                }
304            }
305        }
306    }
307    Ok(handlers)
308}
309
310fn parse_timer_handler_entry(
311    value: &JsonValue,
312    default_provider: &str,
313    default_interval_seconds: u64,
314) -> Option<TimerHandlerConfig> {
315    if let Some(op_id) = value.as_str() {
316        return Some(TimerHandlerConfig {
317            provider: default_provider.to_string(),
318            op_id: op_id.to_string(),
319            handler_id: "default".to_string(),
320            interval_seconds: default_interval_seconds,
321        });
322    }
323    let obj = value.as_object()?;
324    let op_id = obj
325        .get("op_id")
326        .and_then(JsonValue::as_str)
327        .or_else(|| obj.get("op").and_then(JsonValue::as_str))?
328        .to_string();
329    let handler_id = obj
330        .get("handler_id")
331        .and_then(JsonValue::as_str)
332        .or_else(|| obj.get("handler").and_then(JsonValue::as_str))
333        .unwrap_or("default")
334        .to_string();
335    let provider = obj
336        .get("provider_type")
337        .and_then(JsonValue::as_str)
338        .or_else(|| obj.get("provider").and_then(JsonValue::as_str))
339        .unwrap_or(default_provider)
340        .to_string();
341    let interval_seconds = obj
342        .get("interval_seconds")
343        .and_then(JsonValue::as_u64)
344        .or_else(|| obj.get("interval").and_then(JsonValue::as_u64))
345        .unwrap_or(default_interval_seconds)
346        .max(1);
347    Some(TimerHandlerConfig {
348        provider,
349        op_id,
350        handler_id,
351        interval_seconds,
352    })
353}
354
355fn parse_provider_ops(manifest_json: &JsonValue, provider_id: &str) -> anyhow::Result<Vec<String>> {
356    let inline = provider_extension_inline_json(manifest_json)?;
357    let mut ops = Vec::new();
358    if let Some(providers) = inline.get("providers").and_then(JsonValue::as_array) {
359        for provider in providers {
360            let provider_type = provider
361                .get("provider_type")
362                .and_then(JsonValue::as_str)
363                .unwrap_or_default();
364            if provider_type != provider_id {
365                continue;
366            }
367            if let Some(provider_ops) = provider.get("ops").and_then(JsonValue::as_array) {
368                for op in provider_ops {
369                    if let Some(op_id) = op.as_str() {
370                        ops.push(op_id.to_string());
371                    }
372                }
373            }
374        }
375    }
376    Ok(ops)
377}
378
379fn provider_extension_inline_json(manifest_json: &JsonValue) -> anyhow::Result<&JsonValue> {
380    manifest_json
381        .get("extensions")
382        .and_then(JsonValue::as_object)
383        .and_then(|extensions| extensions.get("greentic.provider-extension.v1"))
384        .and_then(JsonValue::as_object)
385        .and_then(|ext| ext.get("inline"))
386        .ok_or_else(|| anyhow::anyhow!("provider extension inline payload missing"))
387}
388
389fn parse_timer_op(op: &str, default_interval_seconds: u64) -> Option<(String, u64)> {
390    if op.eq_ignore_ascii_case("timer_tick") || op.eq_ignore_ascii_case("ingest_timer") {
391        return Some(("default".to_string(), default_interval_seconds));
392    }
393    let prefix = "timer_";
394    if !op.starts_with(prefix) {
395        return None;
396    }
397    let tail = &op[prefix.len()..];
398    if tail.is_empty() {
399        return Some(("default".to_string(), default_interval_seconds));
400    }
401    let mut parts = tail.rsplitn(2, '_');
402    let last = parts.next().unwrap_or_default();
403    let rest = parts.next();
404    if let Ok(interval) = last.parse::<u64>() {
405        let handler = rest.unwrap_or("default");
406        return Some((handler.to_string(), interval.max(1)));
407    }
408    Some((tail.to_string(), default_interval_seconds))
409}
410
411#[cfg(test)]
412mod tests {
413    use super::{parse_explicit_timer_handlers, parse_timer_op};
414    use serde_json::json;
415
416    #[test]
417    fn timer_op_conventions_are_detected() {
418        assert_eq!(
419            parse_timer_op("timer_tick", 30).expect("timer_tick"),
420            ("default".to_string(), 30)
421        );
422        assert_eq!(
423            parse_timer_op("timer_reminder", 30).expect("timer_reminder"),
424            ("reminder".to_string(), 30)
425        );
426        assert_eq!(
427            parse_timer_op("timer_reminder_10", 30).expect("timer_reminder_10"),
428            ("reminder".to_string(), 10)
429        );
430        assert!(parse_timer_op("ingest_http", 30).is_none());
431    }
432
433    #[test]
434    fn parses_explicit_timer_handlers_from_extension() {
435        let manifest = json!({
436            "extensions": {
437                "greentic.provider-extension.v1": {
438                    "inline": {
439                        "timer_handlers": [
440                            {"provider_type":"events-twilio","op_id":"timer_poll","handler_id":"poll","interval_seconds":15}
441                        ]
442                    }
443                }
444            }
445        });
446        let handlers =
447            parse_explicit_timer_handlers(&manifest, "events-twilio", 60).expect("parse explicit");
448        assert_eq!(handlers.len(), 1);
449        assert_eq!(handlers[0].provider, "events-twilio");
450        assert_eq!(handlers[0].op_id, "timer_poll");
451        assert_eq!(handlers[0].handler_id, "poll");
452        assert_eq!(handlers[0].interval_seconds, 15);
453    }
454}