1use std::sync::Arc;
2use std::time::Duration;
3
4use serde_json::Value;
5use tokio::sync::Semaphore;
6
7use crate::api::{AppState, execute_scheduled_agent_task, subagent_integrity};
8
9pub async fn run_cron_worker(state: AppState, instance_id: String) {
10 tokio::time::sleep(Duration::from_secs(90)).await;
14
15 let mut interval = tokio::time::interval(Duration::from_secs(60));
16 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
17 let initial_limit = state.config.read().await.server.cron_max_concurrency as usize;
18 let mut concurrency = Arc::new(Semaphore::new(initial_limit));
19 let mut current_limit = initial_limit;
20 tracing::info!("Server cron worker started (after 90s startup delay)");
21
22 loop {
23 interval.tick().await;
24
25 let configured_limit = state.config.read().await.server.cron_max_concurrency as usize;
29 if configured_limit != current_limit {
30 tracing::info!(
31 old = current_limit,
32 new = configured_limit,
33 "cron concurrency limit changed via hot-reload"
34 );
35 concurrency = Arc::new(Semaphore::new(configured_limit));
36 current_limit = configured_limit;
37 }
38 let jobs = match roboticus_db::cron::list_jobs(&state.db) {
39 Ok(j) => j,
40 Err(e) => {
41 tracing::error!(error = %e, "Failed to list cron jobs; ALL scheduled jobs are paused this tick");
42 continue;
43 }
44 };
45 let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
46 for job in &jobs {
47 if !job.enabled {
48 continue;
49 }
50 let kind = match job.schedule_kind.as_str() {
51 "interval" => "every",
52 "once" => "at",
53 other => other,
54 };
55 let due = match kind {
56 "cron" => match job.schedule_expr.as_deref() {
57 Some(expr) => roboticus_schedule::DurableScheduler::evaluate_cron(
58 expr,
59 job.last_run_at.as_deref(),
60 &now,
61 ),
62 None => {
63 tracing::warn!(job_id = %job.id, job_name = %job.name,
64 "cron-type job has no schedule_expr; will never fire");
65 false
66 }
67 },
68 "every" => {
69 let raw_interval = job
70 .schedule_every_ms
71 .or_else(|| {
72 parse_interval_expr_to_ms(job.schedule_expr.as_deref().unwrap_or(""))
73 })
74 .unwrap_or(60_000);
75 let interval_ms = if raw_interval < 1_000 {
77 tracing::warn!(
78 job_id = %job.id, job_name = %job.name,
79 raw_interval_ms = raw_interval,
80 "clamping dangerously low interval to 60s minimum"
81 );
82 60_000
83 } else {
84 raw_interval
85 };
86 roboticus_schedule::DurableScheduler::evaluate_interval(
87 job.last_run_at.as_deref(),
88 interval_ms,
89 &now,
90 )
91 }
92 "at" => match job.schedule_expr.as_deref() {
93 Some(expr) => {
94 if job.last_run_at.is_some() {
96 false } else {
98 roboticus_schedule::DurableScheduler::evaluate_at(expr, &now)
99 }
100 }
101 None => {
102 tracing::warn!(job_id = %job.id, job_name = %job.name,
103 "once-type job has no schedule_expr; auto-disabling");
104 let _ = roboticus_db::cron::update_job(
105 &state.db,
106 &job.id,
107 None,
108 None,
109 None,
110 Some(false),
111 );
112 false
113 }
114 },
115 other_kind => {
116 tracing::warn!(job_id = %job.id, job_name = %job.name, schedule_kind = other_kind,
117 "unrecognized schedule_kind; job will not be scheduled");
118 false
119 }
120 };
121 if !due {
122 continue;
123 }
124 let lease_acquired =
125 match roboticus_db::cron::acquire_lease(&state.db, &job.id, &instance_id) {
126 Ok(acquired) => acquired,
127 Err(e) => {
128 tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
129 "failed to acquire cron lease due to database error");
130 continue;
131 }
132 };
133 if !lease_acquired {
134 continue;
135 }
136 let Ok(permit) = concurrency.clone().try_acquire_owned() else {
137 if let Err(e) = roboticus_db::cron::release_lease(&state.db, &job.id, &instance_id)
138 {
139 tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
140 "failed to release cron lease after semaphore saturation; job may freeze until lease expiry");
141 }
142 tracing::warn!(job=%job.name, "Cron worker saturated; deferring leased job to next tick");
143 continue;
144 };
145 let state_clone = state.clone();
146 let job_clone = job.clone();
147 let instance_id_clone = instance_id.clone();
148 let kind = kind.to_string();
149 tokio::spawn(async move {
150 let _permit = permit;
151 let start = std::time::Instant::now();
152 let result = execute_cron_job_once(&state_clone, &job_clone).await;
153 let duration = start.elapsed().as_millis() as i64;
154 if let Err(e) = roboticus_db::cron::record_run(
155 &state_clone.db,
156 &job_clone.id,
157 result.status,
158 Some(duration),
159 result.error.as_deref(),
160 result.output.as_deref(),
161 ) {
162 tracing::error!(
163 job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
164 "CRITICAL: failed to record cron run audit trail"
165 );
166 }
167 let now_str = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
168 let next_kind = match kind.as_str() {
171 "every" => "interval",
172 other => other,
173 };
174 let resolved_every_ms = job_clone.schedule_every_ms.or_else(|| {
180 parse_interval_expr_to_ms(job_clone.schedule_expr.as_deref().unwrap_or(""))
181 });
182 let next = roboticus_schedule::DurableScheduler::calculate_next_run(
183 next_kind,
184 job_clone.schedule_expr.as_deref(),
185 resolved_every_ms,
186 &now_str,
187 );
188 if let Err(e) = roboticus_db::cron::update_next_run_at(
189 &state_clone.db,
190 &job_clone.id,
191 next.as_deref(),
192 ) {
193 tracing::error!(
194 job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
195 "CRITICAL: failed to update next_run_at; job may re-fire prematurely"
196 );
197 }
198 if next_kind == "at" {
200 if let Err(e) = roboticus_db::cron::update_job(
201 &state_clone.db,
202 &job_clone.id,
203 None,
204 None,
205 None,
206 Some(false),
207 ) {
208 tracing::error!(
209 job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
210 "CRITICAL: failed to auto-disable once job after execution"
211 );
212 } else {
213 tracing::info!(
214 job_id = %job_clone.id, job_name = %job_clone.name,
215 "once job auto-disabled after successful execution"
216 );
217 }
218 }
219 if let Err(e) = roboticus_db::cron::release_lease(
220 &state_clone.db,
221 &job_clone.id,
222 &instance_id_clone,
223 ) {
224 tracing::error!(
225 job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
226 "CRITICAL: failed to release cron lease; job may freeze until lease expiry"
227 );
228 }
229 });
230 }
231 }
232}
233
234pub(crate) struct CronExecutionResult {
235 pub status: &'static str,
236 pub error: Option<String>,
237 pub output: Option<String>,
238}
239
240pub(crate) async fn execute_cron_job_once(
241 state: &AppState,
242 job: &roboticus_db::cron::CronJob,
243) -> CronExecutionResult {
244 let payload: Value = match serde_json::from_str(&job.payload_json) {
245 Ok(v) => v,
246 Err(e) => {
247 return CronExecutionResult {
248 status: "error",
249 error: Some(format!("invalid payload: {e}")),
250 output: None,
251 };
252 }
253 };
254 let action = payload
255 .get("action")
256 .and_then(|v| v.as_str())
257 .unwrap_or("unknown");
258 match action {
259 "agent_task" => execute_agent_task_for_job(state, job, &payload).await,
260 "log" => {
261 if let Some(task) = implied_agent_task(job, &payload) {
262 execute_named_agent_task(state, &job.agent_id, &task, Some(job.name.as_str())).await
263 } else {
264 let message = payload
265 .get("message")
266 .and_then(|v| v.as_str())
267 .unwrap_or("cron heartbeat");
268 tracing::info!(job = %job.name, message, "cron job executed");
269 CronExecutionResult {
270 status: "success",
271 error: None,
272 output: Some(message.to_string()),
273 }
274 }
275 }
276 "metric_snapshot" => {
277 let snapshot = serde_json::json!({"job_id": job.id, "job_name": job.name, "schedule_kind": job.schedule_kind, "timestamp": chrono::Utc::now().to_rfc3339()});
278 match roboticus_db::metrics::record_metric_snapshot(&state.db, &snapshot.to_string()) {
279 Ok(_) => CronExecutionResult {
280 status: "success",
281 error: None,
282 output: Some("metric snapshot recorded".to_string()),
283 },
284 Err(e) => CronExecutionResult {
285 status: "error",
286 error: Some(format!("metric_snapshot failed: {e}")),
287 output: None,
288 },
289 }
290 }
291 "expire_sessions" => {
292 let ttl_seconds = payload
293 .get("ttl_seconds")
294 .and_then(|v| v.as_u64())
295 .unwrap_or(86_400);
296 match roboticus_db::sessions::expire_stale_sessions(&state.db, ttl_seconds) {
297 Ok(expired) => CronExecutionResult {
298 status: "success",
299 error: None,
300 output: Some(format!("expired {expired} stale sessions")),
301 },
302 Err(e) => CronExecutionResult {
303 status: "error",
304 error: Some(format!("expire_sessions failed: {e}")),
305 output: None,
306 },
307 }
308 }
309 "record_transaction" => {
310 let tx_type = payload
311 .get("tx_type")
312 .and_then(|v| v.as_str())
313 .unwrap_or("cron");
314 let Some(amount) = payload.get("amount").and_then(|v| v.as_f64()) else {
315 return CronExecutionResult {
316 status: "error",
317 error: Some(
318 "record_transaction payload missing or invalid 'amount' field".to_string(),
319 ),
320 output: None,
321 };
322 };
323 if !amount.is_finite() {
324 return CronExecutionResult {
325 status: "error",
326 error: Some("record_transaction amount must be finite".to_string()),
327 output: None,
328 };
329 }
330 let currency = payload
331 .get("currency")
332 .and_then(|v| v.as_str())
333 .unwrap_or("USD");
334 let counterparty = payload.get("counterparty").and_then(|v| v.as_str());
335 let tx_hash = payload.get("tx_hash").and_then(|v| v.as_str());
336 match roboticus_db::metrics::record_transaction(
337 &state.db,
338 tx_type,
339 amount,
340 currency,
341 counterparty,
342 tx_hash,
343 ) {
344 Ok(_) => CronExecutionResult {
345 status: "success",
346 error: None,
347 output: Some(format!("transaction recorded: {amount} {currency}")),
348 },
349 Err(e) => CronExecutionResult {
350 status: "error",
351 error: Some(format!("record_transaction failed: {e}")),
352 output: None,
353 },
354 }
355 }
356 "noop" => CronExecutionResult {
357 status: "success",
358 error: None,
359 output: None,
360 },
361 other => CronExecutionResult {
362 status: "error",
363 error: Some(format!("unknown action: {other}")),
364 output: None,
365 },
366 }
367}
368
369async fn execute_agent_task_for_job(
370 state: &AppState,
371 job: &roboticus_db::cron::CronJob,
372 payload: &Value,
373) -> CronExecutionResult {
374 let task = payload
375 .get("task")
376 .and_then(|v| v.as_str())
377 .or_else(|| payload.get("prompt").and_then(|v| v.as_str()))
378 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
379 .map(str::trim)
380 .filter(|s| !s.is_empty())
381 .or(job
382 .description
383 .as_deref()
384 .map(str::trim)
385 .filter(|s| !s.is_empty()));
386 let Some(task) = task else {
387 return CronExecutionResult {
388 status: "error",
389 error: Some("agent_task payload missing task/prompt/message".to_string()),
390 output: None,
391 };
392 };
393 execute_named_agent_task(state, &job.agent_id, task, Some(job.name.as_str())).await
394}
395
396async fn execute_named_agent_task(
397 state: &AppState,
398 agent_id: &str,
399 task: &str,
400 job_name: Option<&str>,
401) -> CronExecutionResult {
402 match roboticus_db::agents::list_sub_agents(&state.db) {
403 Ok(subagents) => {
404 if let Some(sa) = subagents
405 .into_iter()
406 .find(|sa| sa.name.eq_ignore_ascii_case(agent_id) && sa.enabled)
407 && let Err(err) =
408 subagent_integrity::ensure_taskable_subagent_ready(state, &sa).await
409 {
410 return CronExecutionResult {
411 status: "error",
412 error: Some(format!("subagent integrity repair failed: {err}")),
413 output: None,
414 };
415 }
416 }
417 Err(e) => {
418 tracing::error!(agent_id, error = %e, "failed to list sub-agents for cron task; proceeding without integrity check");
419 }
420 }
421 match execute_scheduled_agent_task(state, agent_id, task, job_name).await {
422 Ok(output) => CronExecutionResult {
423 status: "success",
424 error: None,
425 output: Some(output),
426 },
427 Err(err) => CronExecutionResult {
428 status: "error",
429 error: Some(err),
430 output: None,
431 },
432 }
433}
434
435fn implied_agent_task(job: &roboticus_db::cron::CronJob, payload: &Value) -> Option<String> {
436 let description = job
437 .description
438 .as_deref()
439 .map(str::trim)
440 .filter(|s| !s.is_empty())?;
441 let message = payload
442 .get("message")
443 .and_then(|v| v.as_str())
444 .map(str::trim)
445 .unwrap_or("");
446 if message.eq_ignore_ascii_case(description)
447 || message.to_ascii_lowercase().starts_with("scheduled job:")
448 {
449 return Some(description.to_string());
450 }
451 None
452}
453
454fn parse_interval_expr_to_ms(expr: &str) -> Option<i64> {
455 if expr.is_empty() {
456 return None;
457 }
458 let (unit_byte_offset, unit) = expr.char_indices().last()?;
459 let qty = expr[..unit_byte_offset].parse::<i64>().ok()?;
460 let ms = match unit {
461 's' | 'S' => qty.saturating_mul(1_000),
462 'm' | 'M' => qty.saturating_mul(60_000),
463 'h' | 'H' => qty.saturating_mul(3_600_000),
464 _ => return None,
465 };
466 if ms > 0 { Some(ms) } else { None }
467}