ironclad-api 0.9.8

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Ironclad agent runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
use std::sync::Arc;
use std::time::Duration;

use serde_json::Value;
use tokio::sync::Semaphore;

use crate::api::{AppState, execute_scheduled_agent_task, subagent_integrity};

pub async fn run_cron_worker(state: AppState, instance_id: String) {
    let mut interval = tokio::time::interval(Duration::from_secs(60));
    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
    let initial_limit = state.config.read().await.server.cron_max_concurrency as usize;
    let mut concurrency = Arc::new(Semaphore::new(initial_limit));
    let mut current_limit = initial_limit;
    tracing::info!("Server cron worker started");

    loop {
        interval.tick().await;

        // Re-read concurrency limit on each tick so hot-reloaded config takes
        // effect.  Outstanding permits from the old semaphore drain naturally;
        // new acquisitions use the updated limit.
        let configured_limit = state.config.read().await.server.cron_max_concurrency as usize;
        if configured_limit != current_limit {
            tracing::info!(
                old = current_limit,
                new = configured_limit,
                "cron concurrency limit changed via hot-reload"
            );
            concurrency = Arc::new(Semaphore::new(configured_limit));
            current_limit = configured_limit;
        }
        let jobs = match ironclad_db::cron::list_jobs(&state.db) {
            Ok(j) => j,
            Err(e) => {
                tracing::error!(error = %e, "Failed to list cron jobs; ALL scheduled jobs are paused this tick");
                continue;
            }
        };
        let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
        for job in &jobs {
            if !job.enabled {
                continue;
            }
            let kind = match job.schedule_kind.as_str() {
                "interval" => "every",
                "once" => "at",
                other => other,
            };
            let due = match kind {
                "cron" => match job.schedule_expr.as_deref() {
                    Some(expr) => ironclad_schedule::DurableScheduler::evaluate_cron(
                        expr,
                        job.last_run_at.as_deref(),
                        &now,
                    ),
                    None => {
                        tracing::warn!(job_id = %job.id, job_name = %job.name,
                            "cron-type job has no schedule_expr; will never fire");
                        false
                    }
                },
                "every" => {
                    let raw_interval = job
                        .schedule_every_ms
                        .or_else(|| {
                            parse_interval_expr_to_ms(job.schedule_expr.as_deref().unwrap_or(""))
                        })
                        .unwrap_or(60_000);
                    // Guard against zero/negative intervals that would fire every tick.
                    let interval_ms = if raw_interval < 1_000 {
                        tracing::warn!(
                            job_id = %job.id, job_name = %job.name,
                            raw_interval_ms = raw_interval,
                            "clamping dangerously low interval to 60s minimum"
                        );
                        60_000
                    } else {
                        raw_interval
                    };
                    ironclad_schedule::DurableScheduler::evaluate_interval(
                        job.last_run_at.as_deref(),
                        interval_ms,
                        &now,
                    )
                }
                "at" => match job.schedule_expr.as_deref() {
                    Some(expr) => {
                        // "once"/"at" jobs fire when now >= target and haven't run yet.
                        if job.last_run_at.is_some() {
                            false // already fired
                        } else {
                            ironclad_schedule::DurableScheduler::evaluate_at(expr, &now)
                        }
                    }
                    None => {
                        tracing::warn!(job_id = %job.id, job_name = %job.name,
                            "once-type job has no schedule_expr; auto-disabling");
                        let _ = ironclad_db::cron::update_job(
                            &state.db,
                            &job.id,
                            None,
                            None,
                            None,
                            Some(false),
                        );
                        false
                    }
                },
                other_kind => {
                    tracing::warn!(job_id = %job.id, job_name = %job.name, schedule_kind = other_kind,
                        "unrecognized schedule_kind; job will not be scheduled");
                    false
                }
            };
            if !due {
                continue;
            }
            let lease_acquired =
                match ironclad_db::cron::acquire_lease(&state.db, &job.id, &instance_id) {
                    Ok(acquired) => acquired,
                    Err(e) => {
                        tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
                        "failed to acquire cron lease due to database error");
                        continue;
                    }
                };
            if !lease_acquired {
                continue;
            }
            let Ok(permit) = concurrency.clone().try_acquire_owned() else {
                if let Err(e) = ironclad_db::cron::release_lease(&state.db, &job.id, &instance_id) {
                    tracing::error!(job_id = %job.id, job_name = %job.name, error = %e,
                        "failed to release cron lease after semaphore saturation; job may freeze until lease expiry");
                }
                tracing::warn!(job=%job.name, "Cron worker saturated; deferring leased job to next tick");
                continue;
            };
            let state_clone = state.clone();
            let job_clone = job.clone();
            let instance_id_clone = instance_id.clone();
            let kind = kind.to_string();
            tokio::spawn(async move {
                let _permit = permit;
                let start = std::time::Instant::now();
                let result = execute_cron_job_once(&state_clone, &job_clone).await;
                let duration = start.elapsed().as_millis() as i64;
                if let Err(e) = ironclad_db::cron::record_run(
                    &state_clone.db,
                    &job_clone.id,
                    result.status,
                    Some(duration),
                    result.error.as_deref(),
                    result.output.as_deref(),
                ) {
                    tracing::error!(
                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
                        "CRITICAL: failed to record cron run audit trail"
                    );
                }
                let now_str = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
                // Map dispatch aliases back to DB-canonical kinds for
                // calculate_next_run: "every" → "interval", "at" → "at".
                let next_kind = match kind.as_str() {
                    "every" => "interval",
                    other => other,
                };
                // Resolve the effective interval_ms the same way the due-time
                // evaluation does: prefer schedule_every_ms, fall back to
                // parsing schedule_expr (e.g. "30m").  Without this, expr-based
                // interval jobs pass None and calculate_next_run returns None,
                // leaving next_run_at permanently NULL.
                let resolved_every_ms = job_clone.schedule_every_ms.or_else(|| {
                    parse_interval_expr_to_ms(job_clone.schedule_expr.as_deref().unwrap_or(""))
                });
                let next = ironclad_schedule::DurableScheduler::calculate_next_run(
                    next_kind,
                    job_clone.schedule_expr.as_deref(),
                    resolved_every_ms,
                    &now_str,
                );
                if let Err(e) = ironclad_db::cron::update_next_run_at(
                    &state_clone.db,
                    &job_clone.id,
                    next.as_deref(),
                ) {
                    tracing::error!(
                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
                        "CRITICAL: failed to update next_run_at; job may re-fire prematurely"
                    );
                }
                // Auto-disable "once"/"at" jobs after their single execution.
                if next_kind == "at" {
                    if let Err(e) = ironclad_db::cron::update_job(
                        &state_clone.db,
                        &job_clone.id,
                        None,
                        None,
                        None,
                        Some(false),
                    ) {
                        tracing::error!(
                            job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
                            "CRITICAL: failed to auto-disable once job after execution"
                        );
                    } else {
                        tracing::info!(
                            job_id = %job_clone.id, job_name = %job_clone.name,
                            "once job auto-disabled after successful execution"
                        );
                    }
                }
                if let Err(e) = ironclad_db::cron::release_lease(
                    &state_clone.db,
                    &job_clone.id,
                    &instance_id_clone,
                ) {
                    tracing::error!(
                        job_id = %job_clone.id, job_name = %job_clone.name, error = %e,
                        "CRITICAL: failed to release cron lease; job may freeze until lease expiry"
                    );
                }
            });
        }
    }
}

pub(crate) struct CronExecutionResult {
    pub status: &'static str,
    pub error: Option<String>,
    pub output: Option<String>,
}

pub(crate) async fn execute_cron_job_once(
    state: &AppState,
    job: &ironclad_db::cron::CronJob,
) -> CronExecutionResult {
    let payload: Value = match serde_json::from_str(&job.payload_json) {
        Ok(v) => v,
        Err(e) => {
            return CronExecutionResult {
                status: "error",
                error: Some(format!("invalid payload: {e}")),
                output: None,
            };
        }
    };
    let action = payload
        .get("action")
        .and_then(|v| v.as_str())
        .unwrap_or("unknown");
    match action {
        "agent_task" => execute_agent_task_for_job(state, job, &payload).await,
        "log" => {
            if let Some(task) = implied_agent_task(job, &payload) {
                execute_named_agent_task(state, &job.agent_id, &task).await
            } else {
                let message = payload
                    .get("message")
                    .and_then(|v| v.as_str())
                    .unwrap_or("cron heartbeat");
                tracing::info!(job = %job.name, message, "cron job executed");
                CronExecutionResult {
                    status: "success",
                    error: None,
                    output: Some(message.to_string()),
                }
            }
        }
        "metric_snapshot" => {
            let snapshot = serde_json::json!({"job_id": job.id, "job_name": job.name, "schedule_kind": job.schedule_kind, "timestamp": chrono::Utc::now().to_rfc3339()});
            match ironclad_db::metrics::record_metric_snapshot(&state.db, &snapshot.to_string()) {
                Ok(_) => CronExecutionResult {
                    status: "success",
                    error: None,
                    output: Some("metric snapshot recorded".to_string()),
                },
                Err(e) => CronExecutionResult {
                    status: "error",
                    error: Some(format!("metric_snapshot failed: {e}")),
                    output: None,
                },
            }
        }
        "expire_sessions" => {
            let ttl_seconds = payload
                .get("ttl_seconds")
                .and_then(|v| v.as_u64())
                .unwrap_or(86_400);
            match ironclad_db::sessions::expire_stale_sessions(&state.db, ttl_seconds) {
                Ok(expired) => CronExecutionResult {
                    status: "success",
                    error: None,
                    output: Some(format!("expired {expired} stale sessions")),
                },
                Err(e) => CronExecutionResult {
                    status: "error",
                    error: Some(format!("expire_sessions failed: {e}")),
                    output: None,
                },
            }
        }
        "record_transaction" => {
            let tx_type = payload
                .get("tx_type")
                .and_then(|v| v.as_str())
                .unwrap_or("cron");
            let Some(amount) = payload.get("amount").and_then(|v| v.as_f64()) else {
                return CronExecutionResult {
                    status: "error",
                    error: Some(
                        "record_transaction payload missing or invalid 'amount' field".to_string(),
                    ),
                    output: None,
                };
            };
            if !amount.is_finite() {
                return CronExecutionResult {
                    status: "error",
                    error: Some("record_transaction amount must be finite".to_string()),
                    output: None,
                };
            }
            let currency = payload
                .get("currency")
                .and_then(|v| v.as_str())
                .unwrap_or("USD");
            let counterparty = payload.get("counterparty").and_then(|v| v.as_str());
            let tx_hash = payload.get("tx_hash").and_then(|v| v.as_str());
            match ironclad_db::metrics::record_transaction(
                &state.db,
                tx_type,
                amount,
                currency,
                counterparty,
                tx_hash,
            ) {
                Ok(_) => CronExecutionResult {
                    status: "success",
                    error: None,
                    output: Some(format!("transaction recorded: {amount} {currency}")),
                },
                Err(e) => CronExecutionResult {
                    status: "error",
                    error: Some(format!("record_transaction failed: {e}")),
                    output: None,
                },
            }
        }
        "noop" => CronExecutionResult {
            status: "success",
            error: None,
            output: None,
        },
        other => CronExecutionResult {
            status: "error",
            error: Some(format!("unknown action: {other}")),
            output: None,
        },
    }
}

async fn execute_agent_task_for_job(
    state: &AppState,
    job: &ironclad_db::cron::CronJob,
    payload: &Value,
) -> CronExecutionResult {
    let task = payload
        .get("task")
        .and_then(|v| v.as_str())
        .or_else(|| payload.get("prompt").and_then(|v| v.as_str()))
        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
        .map(str::trim)
        .filter(|s| !s.is_empty())
        .or(job
            .description
            .as_deref()
            .map(str::trim)
            .filter(|s| !s.is_empty()));
    let Some(task) = task else {
        return CronExecutionResult {
            status: "error",
            error: Some("agent_task payload missing task/prompt/message".to_string()),
            output: None,
        };
    };
    execute_named_agent_task(state, &job.agent_id, task).await
}

async fn execute_named_agent_task(
    state: &AppState,
    agent_id: &str,
    task: &str,
) -> CronExecutionResult {
    match ironclad_db::agents::list_sub_agents(&state.db) {
        Ok(subagents) => {
            if let Some(sa) = subagents
                .into_iter()
                .find(|sa| sa.name.eq_ignore_ascii_case(agent_id) && sa.enabled)
                && let Err(err) =
                    subagent_integrity::ensure_taskable_subagent_ready(state, &sa).await
            {
                return CronExecutionResult {
                    status: "error",
                    error: Some(format!("subagent integrity repair failed: {err}")),
                    output: None,
                };
            }
        }
        Err(e) => {
            tracing::error!(agent_id, error = %e, "failed to list sub-agents for cron task; proceeding without integrity check");
        }
    }
    match execute_scheduled_agent_task(state, agent_id, task).await {
        Ok(output) => CronExecutionResult {
            status: "success",
            error: None,
            output: Some(output),
        },
        Err(err) => CronExecutionResult {
            status: "error",
            error: Some(err),
            output: None,
        },
    }
}

fn implied_agent_task(job: &ironclad_db::cron::CronJob, payload: &Value) -> Option<String> {
    let description = job
        .description
        .as_deref()
        .map(str::trim)
        .filter(|s| !s.is_empty())?;
    let message = payload
        .get("message")
        .and_then(|v| v.as_str())
        .map(str::trim)
        .unwrap_or("");
    if message.eq_ignore_ascii_case(description)
        || message.to_ascii_lowercase().starts_with("scheduled job:")
    {
        return Some(description.to_string());
    }
    None
}

fn parse_interval_expr_to_ms(expr: &str) -> Option<i64> {
    if expr.is_empty() {
        return None;
    }
    let (unit_byte_offset, unit) = expr.char_indices().last()?;
    let qty = expr[..unit_byte_offset].parse::<i64>().ok()?;
    let ms = match unit {
        's' | 'S' => qty.saturating_mul(1_000),
        'm' | 'M' => qty.saturating_mul(60_000),
        'h' | 'H' => qty.saturating_mul(3_600_000),
        _ => return None,
    };
    if ms > 0 { Some(ms) } else { None }
}