Skip to main content

roboticus_api/
cron_runtime.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use serde_json::Value;
5use tokio::sync::Semaphore;
6
7use crate::api::{AppState, execute_scheduled_agent_task, subagent_integrity};
8
9pub async fn run_cron_worker(state: AppState, instance_id: String) {
10    // Delay cron job evaluation to let the system stabilize after startup.
11    // Without this, cron jobs fire immediately and can starve the HTTP server
12    // and Telegram poller of resources during the critical first minute.
13    tokio::time::sleep(Duration::from_secs(90)).await;
14
15    let mut interval = tokio::time::interval(Duration::from_secs(60));
16    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
17    let initial_limit = state.config.read().await.server.cron_max_concurrency as usize;
18    let mut concurrency = Arc::new(Semaphore::new(initial_limit));
19    let mut current_limit = initial_limit;
20    tracing::info!("Server cron worker started (after 90s startup delay)");
21
22    loop {
23        interval.tick().await;
24
25        // Re-read concurrency limit on each tick so hot-reloaded config takes
26        // effect.  Outstanding permits from the old semaphore drain naturally;
27        // new acquisitions use the updated limit.
28        let configured_limit = state.config.read().await.server.cron_max_concurrency as usize;
29        if configured_limit != current_limit {
30            tracing::info!(
31                old = current_limit,
32                new = configured_limit,
33                "cron concurrency limit changed via hot-reload"
34            );
35            concurrency = Arc::new(Semaphore::new(configured_limit));
36            current_limit = configured_limit;
37        }
38        let jobs = match roboticus_db::cron::list_jobs(&state.db) {
39            Ok(j) => j,
40            Err(e) => {
41                tracing::error!(error = %e, "Failed to list cron jobs; ALL scheduled jobs are paused this tick");
42                continue;
43            }
44        };
45        let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
46        for job in &jobs {
47            if !job.enabled {
48                continue;
49            }
50            let kind = match job.schedule_kind.as_str() {
51                "interval" => "every",
52                "once" => "at",
53                other => other,
54            };
55            let due = match kind {
56                "cron" => match job.schedule_expr.as_deref() {
57                    Some(expr) => roboticus_schedule::DurableScheduler::evaluate_cron(
58                        expr,
59                        job.last_run_at.as_deref(),
60                        &now,
61                    ),
62                    None => {
63                        tracing::warn!(job_id = %job.id, job_name = %job.name,
64                            "cron-type job has no schedule_expr; will never fire");
65                        false
66                    }
67                },
68                "every" => {
69                    let raw_interval = job
70                        .schedule_every_ms
71                        .or_else(|| {
72                            parse_interval_expr_to_ms(job.schedule_expr.as_deref().unwrap_or(""))
73                        })
74                        .unwrap_or(60_000);
75                    // Guard against zero/negative intervals that would fire every tick.
76                    let interval_ms = if raw_interval < 1_000 {
77                        tracing::warn!(
78                            job_id = %job.id, job_name = %job.name,
79                            raw_interval_ms = raw_interval,
80                            "clamping dangerously low interval to 60s minimum"
81                        );
82                        60_000
83                    } else {
84                        raw_interval
85                    };
86                    roboticus_schedule::DurableScheduler::evaluate_interval(
87                        job.last_run_at.as_deref(),
88                        interval_ms,
89                        &now,
90                    )
91                }
92                "at" => match job.schedule_expr.as_deref() {
93                    Some(expr) => {
94                        // "once"/"at" jobs fire when now >= target and haven't run yet.
95                        if job.last_run_at.is_some() {
96                            false // already fired
97                        } else {
98                            roboticus_schedule::DurableScheduler::evaluate_at(expr, &now)
99                        }
100                    }
101                    None => {
102                        tracing::warn!(job_id = %job.id, job_name = %job.name,
103                            "once-type job has no schedule_expr; auto-disabling");
104                        let _ = roboticus_db::cron::update_job(
105                            &state.db,
106                            &job.id,
107                            None,
108                            None,
109                            None,
110                            Some(false),
111                        );
112                        false
113                    }
114                },
115                other_kind => {
116                    tracing::warn!(job_id = %job.id, job_name = %job.name, schedule_kind = other_kind,
117                        "unrecognized schedule_kind; job will not be scheduled");
118                    false
119                }
120            };
121            if !due {
122                continue;
123            }
124            let lease_acquired =
125                match roboticus_db::cron::acquire_lease(&state.db, &job.id, &instance_id) {
126                    Ok(acquired) => acquired,
127                    Err(e) => {
128                        tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
129                        "failed to acquire cron lease due to database error");
130                        continue;
131                    }
132                };
133            if !lease_acquired {
134                continue;
135            }
136            let Ok(permit) = concurrency.clone().try_acquire_owned() else {
137                if let Err(e) = roboticus_db::cron::release_lease(&state.db, &job.id, &instance_id)
138                {
139                    tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
140                        "failed to release cron lease after semaphore saturation; job may freeze until lease expiry");
141                }
142                tracing::warn!(job=%job.name, "Cron worker saturated; deferring leased job to next tick");
143                continue;
144            };
145            let state_clone = state.clone();
146            let job_clone = job.clone();
147            let instance_id_clone = instance_id.clone();
148            let kind = kind.to_string();
149            tokio::spawn(async move {
150                let _permit = permit;
151                let start = std::time::Instant::now();
152                let result = execute_cron_job_once(&state_clone, &job_clone).await;
153                let duration = start.elapsed().as_millis() as i64;
154                if let Err(e) = roboticus_db::cron::record_run(
155                    &state_clone.db,
156                    &job_clone.id,
157                    result.status,
158                    Some(duration),
159                    result.error.as_deref(),
160                    result.output.as_deref(),
161                ) {
162                    tracing::error!(
163                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
164                        "CRITICAL: failed to record cron run audit trail"
165                    );
166                }
167                let now_str = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
168                // Map dispatch aliases back to DB-canonical kinds for
169                // calculate_next_run: "every" → "interval", "at" → "at".
170                let next_kind = match kind.as_str() {
171                    "every" => "interval",
172                    other => other,
173                };
174                // Resolve the effective interval_ms the same way the due-time
175                // evaluation does: prefer schedule_every_ms, fall back to
176                // parsing schedule_expr (e.g. "30m").  Without this, expr-based
177                // interval jobs pass None and calculate_next_run returns None,
178                // leaving next_run_at permanently NULL.
179                let resolved_every_ms = job_clone.schedule_every_ms.or_else(|| {
180                    parse_interval_expr_to_ms(job_clone.schedule_expr.as_deref().unwrap_or(""))
181                });
182                let next = roboticus_schedule::DurableScheduler::calculate_next_run(
183                    next_kind,
184                    job_clone.schedule_expr.as_deref(),
185                    resolved_every_ms,
186                    &now_str,
187                );
188                if let Err(e) = roboticus_db::cron::update_next_run_at(
189                    &state_clone.db,
190                    &job_clone.id,
191                    next.as_deref(),
192                ) {
193                    tracing::error!(
194                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
195                        "CRITICAL: failed to update next_run_at; job may re-fire prematurely"
196                    );
197                }
198                // Auto-disable "once"/"at" jobs after their single execution.
199                if next_kind == "at" {
200                    if let Err(e) = roboticus_db::cron::update_job(
201                        &state_clone.db,
202                        &job_clone.id,
203                        None,
204                        None,
205                        None,
206                        Some(false),
207                    ) {
208                        tracing::error!(
209                            job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
210                            "CRITICAL: failed to auto-disable once job after execution"
211                        );
212                    } else {
213                        tracing::info!(
214                            job_id = %job_clone.id, job_name = %job_clone.name,
215                            "once job auto-disabled after successful execution"
216                        );
217                    }
218                }
219                if let Err(e) = roboticus_db::cron::release_lease(
220                    &state_clone.db,
221                    &job_clone.id,
222                    &instance_id_clone,
223                ) {
224                    tracing::error!(
225                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
226                        "CRITICAL: failed to release cron lease; job may freeze until lease expiry"
227                    );
228                }
229            });
230        }
231    }
232}
233
234pub(crate) struct CronExecutionResult {
235    pub status: &'static str,
236    pub error: Option<String>,
237    pub output: Option<String>,
238}
239
240pub(crate) async fn execute_cron_job_once(
241    state: &AppState,
242    job: &roboticus_db::cron::CronJob,
243) -> CronExecutionResult {
244    let payload: Value = match serde_json::from_str(&job.payload_json) {
245        Ok(v) => v,
246        Err(e) => {
247            return CronExecutionResult {
248                status: "error",
249                error: Some(format!("invalid payload: {e}")),
250                output: None,
251            };
252        }
253    };
254    let action = payload
255        .get("action")
256        .and_then(|v| v.as_str())
257        .unwrap_or("unknown");
258    match action {
259        "agent_task" => execute_agent_task_for_job(state, job, &payload).await,
260        "log" => {
261            if let Some(task) = implied_agent_task(job, &payload) {
262                execute_named_agent_task(state, &job.agent_id, &task, Some(job.name.as_str())).await
263            } else {
264                let message = payload
265                    .get("message")
266                    .and_then(|v| v.as_str())
267                    .unwrap_or("cron heartbeat");
268                tracing::info!(job = %job.name, message, "cron job executed");
269                CronExecutionResult {
270                    status: "success",
271                    error: None,
272                    output: Some(message.to_string()),
273                }
274            }
275        }
276        "metric_snapshot" => {
277            let snapshot = serde_json::json!({"job_id": job.id, "job_name": job.name, "schedule_kind": job.schedule_kind, "timestamp": chrono::Utc::now().to_rfc3339()});
278            match roboticus_db::metrics::record_metric_snapshot(&state.db, &snapshot.to_string()) {
279                Ok(_) => CronExecutionResult {
280                    status: "success",
281                    error: None,
282                    output: Some("metric snapshot recorded".to_string()),
283                },
284                Err(e) => CronExecutionResult {
285                    status: "error",
286                    error: Some(format!("metric_snapshot failed: {e}")),
287                    output: None,
288                },
289            }
290        }
291        "expire_sessions" => {
292            let ttl_seconds = payload
293                .get("ttl_seconds")
294                .and_then(|v| v.as_u64())
295                .unwrap_or(86_400);
296            match roboticus_db::sessions::expire_stale_sessions(&state.db, ttl_seconds) {
297                Ok(expired) => CronExecutionResult {
298                    status: "success",
299                    error: None,
300                    output: Some(format!("expired {expired} stale sessions")),
301                },
302                Err(e) => CronExecutionResult {
303                    status: "error",
304                    error: Some(format!("expire_sessions failed: {e}")),
305                    output: None,
306                },
307            }
308        }
309        "record_transaction" => {
310            let tx_type = payload
311                .get("tx_type")
312                .and_then(|v| v.as_str())
313                .unwrap_or("cron");
314            let Some(amount) = payload.get("amount").and_then(|v| v.as_f64()) else {
315                return CronExecutionResult {
316                    status: "error",
317                    error: Some(
318                        "record_transaction payload missing or invalid 'amount' field".to_string(),
319                    ),
320                    output: None,
321                };
322            };
323            if !amount.is_finite() {
324                return CronExecutionResult {
325                    status: "error",
326                    error: Some("record_transaction amount must be finite".to_string()),
327                    output: None,
328                };
329            }
330            let currency = payload
331                .get("currency")
332                .and_then(|v| v.as_str())
333                .unwrap_or("USD");
334            let counterparty = payload.get("counterparty").and_then(|v| v.as_str());
335            let tx_hash = payload.get("tx_hash").and_then(|v| v.as_str());
336            match roboticus_db::metrics::record_transaction(
337                &state.db,
338                tx_type,
339                amount,
340                currency,
341                counterparty,
342                tx_hash,
343            ) {
344                Ok(_) => CronExecutionResult {
345                    status: "success",
346                    error: None,
347                    output: Some(format!("transaction recorded: {amount} {currency}")),
348                },
349                Err(e) => CronExecutionResult {
350                    status: "error",
351                    error: Some(format!("record_transaction failed: {e}")),
352                    output: None,
353                },
354            }
355        }
356        "noop" => CronExecutionResult {
357            status: "success",
358            error: None,
359            output: None,
360        },
361        other => CronExecutionResult {
362            status: "error",
363            error: Some(format!("unknown action: {other}")),
364            output: None,
365        },
366    }
367}
368
369async fn execute_agent_task_for_job(
370    state: &AppState,
371    job: &roboticus_db::cron::CronJob,
372    payload: &Value,
373) -> CronExecutionResult {
374    let task = payload
375        .get("task")
376        .and_then(|v| v.as_str())
377        .or_else(|| payload.get("prompt").and_then(|v| v.as_str()))
378        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
379        .map(str::trim)
380        .filter(|s| !s.is_empty())
381        .or(job
382            .description
383            .as_deref()
384            .map(str::trim)
385            .filter(|s| !s.is_empty()));
386    let Some(task) = task else {
387        return CronExecutionResult {
388            status: "error",
389            error: Some("agent_task payload missing task/prompt/message".to_string()),
390            output: None,
391        };
392    };
393    execute_named_agent_task(state, &job.agent_id, task, Some(job.name.as_str())).await
394}
395
396async fn execute_named_agent_task(
397    state: &AppState,
398    agent_id: &str,
399    task: &str,
400    job_name: Option<&str>,
401) -> CronExecutionResult {
402    match roboticus_db::agents::list_sub_agents(&state.db) {
403        Ok(subagents) => {
404            if let Some(sa) = subagents
405                .into_iter()
406                .find(|sa| sa.name.eq_ignore_ascii_case(agent_id) && sa.enabled)
407                && let Err(err) =
408                    subagent_integrity::ensure_taskable_subagent_ready(state, &sa).await
409            {
410                return CronExecutionResult {
411                    status: "error",
412                    error: Some(format!("subagent integrity repair failed: {err}")),
413                    output: None,
414                };
415            }
416        }
417        Err(e) => {
418            tracing::error!(agent_id, error = %e, "failed to list sub-agents for cron task; proceeding without integrity check");
419        }
420    }
421    match execute_scheduled_agent_task(state, agent_id, task, job_name).await {
422        Ok(output) => CronExecutionResult {
423            status: "success",
424            error: None,
425            output: Some(output),
426        },
427        Err(err) => CronExecutionResult {
428            status: "error",
429            error: Some(err),
430            output: None,
431        },
432    }
433}
434
435fn implied_agent_task(job: &roboticus_db::cron::CronJob, payload: &Value) -> Option<String> {
436    let description = job
437        .description
438        .as_deref()
439        .map(str::trim)
440        .filter(|s| !s.is_empty())?;
441    let message = payload
442        .get("message")
443        .and_then(|v| v.as_str())
444        .map(str::trim)
445        .unwrap_or("");
446    if message.eq_ignore_ascii_case(description)
447        || message.to_ascii_lowercase().starts_with("scheduled job:")
448    {
449        return Some(description.to_string());
450    }
451    None
452}
453
454fn parse_interval_expr_to_ms(expr: &str) -> Option<i64> {
455    if expr.is_empty() {
456        return None;
457    }
458    let (unit_byte_offset, unit) = expr.char_indices().last()?;
459    let qty = expr[..unit_byte_offset].parse::<i64>().ok()?;
460    let ms = match unit {
461        's' | 'S' => qty.saturating_mul(1_000),
462        'm' | 'M' => qty.saturating_mul(60_000),
463        'h' | 'H' => qty.saturating_mul(3_600_000),
464        _ => return None,
465    };
466    if ms > 0 { Some(ms) } else { None }
467}