roboticus-cli 0.11.4

CLI commands and migration engine for the Roboticus agent runtime
Documentation
//! Cron job migration transforms.

use std::fs;
use std::path::Path;

use super::{AreaResult, LegacyConfig, LegacyCronJob, LegacyJobsFile, MigrationArea, err, uuid_v4};

pub(crate) fn import_cron(oc_root: &Path, ic_root: &Path) -> AreaResult {
    let mut jobs = Vec::new();
    let mut warnings = Vec::new();

    // Try both `jobs.json` (flat) and `cron/jobs.json` (subdirectory)
    for candidate in [oc_root.join("jobs.json"), oc_root.join("cron/jobs.json")] {
        if !candidate.exists() {
            continue;
        }
        match fs::read_to_string(&candidate) {
            Ok(c) => {
                if let Ok(wrapper) = serde_json::from_str::<LegacyJobsFile>(&c) {
                    jobs.extend(wrapper.jobs);
                } else if let Ok(parsed) = serde_json::from_str::<Vec<LegacyCronJob>>(&c) {
                    jobs.extend(parsed);
                } else {
                    warnings.push(format!("Failed to parse {}", candidate.display()));
                }
            }
            Err(e) => warnings.push(format!("Failed to read {}: {e}", candidate.display())),
        }
    }

    let config_path = oc_root.join("legacy.json");
    if config_path.exists()
        && let Ok(c) = fs::read_to_string(&config_path)
        && let Ok(cfg) = serde_json::from_str::<LegacyConfig>(&c)
        && let Some(cj) = cfg.cron
    {
        jobs.extend(cj);
    }

    if jobs.is_empty() {
        return AreaResult {
            area: MigrationArea::Cron,
            success: true,
            items_processed: 0,
            warnings: vec!["No cron jobs found to import".into()],
            error: None,
        };
    }

    let db_path = ic_root.join("state.db");
    let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
        Ok(d) => d,
        Err(e) => return err(MigrationArea::Cron, format!("Failed to open database: {e}")),
    };

    let conn = db.conn();
    let mut items = 0;
    let mut seen_names = std::collections::HashSet::new();

    for job in &jobs {
        let name = job.name.as_deref().unwrap_or("unnamed");

        if !seen_names.insert(name.to_string()) {
            warnings.push(format!("Skipping duplicate cron job: {name}"));
            continue;
        }

        let id = job.id.clone().unwrap_or_else(uuid_v4);
        let (schedule_kind, schedule_expr) = match &job.schedule {
            Some(serde_json::Value::String(s)) => ("cron".to_string(), s.clone()),
            Some(serde_json::Value::Object(m)) => {
                let kind = m
                    .get("kind")
                    .and_then(|v| v.as_str())
                    .unwrap_or("cron")
                    .to_string();
                let expr = if kind == "cron" {
                    m.get("expr")
                        .and_then(|v| v.as_str())
                        .unwrap_or("0 * * * *")
                        .to_string()
                } else {
                    m.get("everyMs")
                        .and_then(|v| v.as_u64())
                        .map(|ms| format!("every {}s", ms / 1000))
                        .unwrap_or_else(|| "0 * * * *".into())
                };
                (kind, expr)
            }
            _ => ("cron".to_string(), "0 * * * *".to_string()),
        };
        let enabled = job.enabled.unwrap_or(true);
        let payload = job
            .payload
            .as_ref()
            .map(|p| p.to_string())
            .or_else(|| {
                job.command
                    .as_ref()
                    .map(|c| serde_json::json!({"command": c}).to_string())
            })
            .unwrap_or_else(|| "{}".to_string());

        if let Err(e) = conn.execute(
            "DELETE FROM cron_jobs WHERE name = ?1",
            rusqlite::params![name],
        ) {
            warnings.push(format!("Failed to clean existing cron job {name}: {e}"));
        }

        match conn.execute(
            "INSERT INTO cron_jobs (id, name, enabled, schedule_kind, schedule_expr, agent_id, payload_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
            rusqlite::params![id, name, enabled, schedule_kind, schedule_expr, "default", payload],
        ) {
            Ok(_) => items += 1,
            Err(e) => warnings.push(format!("Failed to insert cron job '{name}': {e}")),
        }
    }

    AreaResult {
        area: MigrationArea::Cron,
        success: true,
        items_processed: items,
        warnings,
        error: None,
    }
}

pub(crate) fn export_cron(ic_root: &Path, oc_root: &Path) -> AreaResult {
    let db_path = ic_root.join("state.db");
    if !db_path.exists() {
        return AreaResult {
            area: MigrationArea::Cron,
            success: true,
            items_processed: 0,
            warnings: vec!["No database found".into()],
            error: None,
        };
    }

    let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
        Ok(d) => d,
        Err(e) => return err(MigrationArea::Cron, format!("Failed to open database: {e}")),
    };

    let conn = db.conn();
    let mut stmt = match conn
        .prepare("SELECT name, schedule_expr, payload_json, enabled FROM cron_jobs ORDER BY name")
    {
        Ok(s) => s,
        Err(e) => {
            return err(
                MigrationArea::Cron,
                format!("Failed to query cron jobs: {e}"),
            );
        }
    };

    let jobs: Vec<serde_json::Value> = stmt
        .query_map([], |row| {
            let payload_str: String = row.get(2)?;
            let payload: serde_json::Value = serde_json::from_str(&payload_str).unwrap_or_default();
            let command = payload
                .get("command")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string();
            Ok(serde_json::json!({
                "name": row.get::<_, String>(0)?,
                "schedule": row.get::<_, Option<String>>(1)?.unwrap_or_default(),
                "command": command,
                "enabled": row.get::<_, bool>(3)?,
            }))
        })
        .map(|iter| {
            iter.filter_map(|r| {
                r.inspect_err(|e| tracing::warn!("skipping corrupted cron job row: {e}"))
                    .ok()
            })
            .collect()
        })
        .unwrap_or_else(|e| {
            tracing::warn!(error = %e, "failed to iterate cron job rows during export");
            vec![]
        });

    if let Err(e) = fs::create_dir_all(oc_root) {
        return err(
            MigrationArea::Cron,
            format!("Failed to create output dir: {e}"),
        );
    }
    let jobs_json = match serde_json::to_string_pretty(&jobs) {
        Ok(s) => s,
        Err(e) => {
            return err(
                MigrationArea::Cron,
                format!("Failed to serialize jobs.json: {e}"),
            );
        }
    };
    if let Err(e) = fs::write(oc_root.join("jobs.json"), &jobs_json) {
        return err(
            MigrationArea::Cron,
            format!("Failed to write jobs.json: {e}"),
        );
    }

    AreaResult {
        area: MigrationArea::Cron,
        success: true,
        items_processed: jobs.len(),
        warnings: vec![],
        error: None,
    }
}