roboticus-cli 0.11.4

CLI commands and migration engine for the Roboticus agent runtime
Documentation
use std::path::Path;

#[derive(Debug, Default, Clone, Copy)]
pub struct StateHygieneReport {
    pub changed: bool,
    pub changed_rows: u64,
    pub subagent_rows_normalized: u64,
    pub cron_payload_rows_repaired: u64,
    pub cron_jobs_disabled_invalid_expr: u64,
}

pub fn run_state_hygiene(
    state_db_path: &Path,
) -> Result<StateHygieneReport, Box<dyn std::error::Error>> {
    if !state_db_path.exists() {
        return Ok(StateHygieneReport::default());
    }
    let conn = rusqlite::Connection::open(state_db_path)?;
    let mut report = StateHygieneReport::default();
    let has_column = |table: &str, column: &str| -> rusqlite::Result<bool> {
        let mut stmt = conn.prepare(&format!(
            "PRAGMA table_info(\"{}\")",
            table.replace('"', "\"\"")
        ))?;
        let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
        for col in rows {
            if col? == column {
                return Ok(true);
            }
        }
        Ok(false)
    };
    let has_table = |table: &str| -> rusqlite::Result<bool> {
        conn.query_row(
            "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1)",
            [table],
            |row| row.get::<_, i64>(0),
        )
        .map(|exists| exists != 0)
    };

    conn.execute_batch("BEGIN;")?;

    conn.execute(
        "UPDATE sub_agents SET role='subagent' WHERE lower(trim(role))='specialist'",
        [],
    )?;
    let n = conn.changes();
    report.subagent_rows_normalized += n;
    report.changed_rows += n;

    conn.execute(
        "DELETE FROM sub_agents WHERE lower(trim(role))='commander'",
        [],
    )?;
    let n = conn.changes();
    report.subagent_rows_normalized += n;
    report.changed_rows += n;

    conn.execute(
        "UPDATE sub_agents SET skills_json='[]' WHERE skills_json IS NULL",
        [],
    )?;
    let n = conn.changes();
    report.subagent_rows_normalized += n;
    report.changed_rows += n;

    if has_column("sub_agents", "fallback_models_json")? {
        conn.execute(
            "UPDATE sub_agents SET fallback_models_json='[]' WHERE fallback_models_json IS NULL OR trim(fallback_models_json)=''",
            [],
        )?;
        let n = conn.changes();
        report.subagent_rows_normalized += n;
        report.changed_rows += n;
    }
    if has_column("sub_agents", "model")? {
        conn.execute(
            "UPDATE sub_agents
             SET model='auto'
             WHERE lower(trim(role))='subagent'
               AND lower(trim(model)) IN ('ollama-gpu/qwen3:14b','ollama-gpu/qwen3.5:35b-a3b')",
            [],
        )?;
        let n = conn.changes();
        report.subagent_rows_normalized += n;
        report.changed_rows += n;

        conn.execute(
            "UPDATE sub_agents
             SET model='auto'
             WHERE lower(trim(role))='subagent'
               AND trim(model) <> ''
               AND lower(trim(model)) NOT IN ('auto','orchestrator')
               AND instr(trim(model), '/') = 0",
            [],
        )?;
        let n = conn.changes();
        report.subagent_rows_normalized += n;
        report.changed_rows += n;
    }

    if has_table("cron_jobs")?
        && has_column("cron_jobs", "payload_json")?
        && has_column("cron_jobs", "id")?
    {
        let mut stmt = conn.prepare("SELECT id, description, payload_json FROM cron_jobs")?;
        let rows = stmt.query_map([], |row| {
            Ok((
                row.get::<_, String>(0)?,
                row.get::<_, Option<String>>(1)?,
                row.get::<_, String>(2)?,
            ))
        })?;
        for row in rows {
            let (id, description, payload_raw) = row?;
            if let Some(payload_json) =
                normalize_cron_payload_json(description.as_deref(), &payload_raw)
            {
                conn.execute(
                    "UPDATE cron_jobs SET payload_json=?1 WHERE id=?2",
                    rusqlite::params![payload_json, id],
                )?;
                let n = conn.changes();
                report.cron_payload_rows_repaired += n;
                report.changed_rows += n;
            }
        }
    }

    if has_table("cron_jobs")?
        && has_column("cron_jobs", "id")?
        && has_column("cron_jobs", "enabled")?
        && has_column("cron_jobs", "schedule_kind")?
        && has_column("cron_jobs", "schedule_expr")?
    {
        let mut stmt = conn.prepare(
            "SELECT id, schedule_expr
             FROM cron_jobs
             WHERE enabled=1 AND lower(trim(schedule_kind))='cron'",
        )?;
        let rows = stmt.query_map([], |row| {
            Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
        })?;
        for row in rows {
            let (id, expr_opt) = row?;
            let valid = expr_opt
                .as_deref()
                .map(roboticus_schedule::DurableScheduler::is_valid_cron_expression)
                .unwrap_or(false);
            if !valid {
                conn.execute(
                    "UPDATE cron_jobs SET enabled=0 WHERE id=?1",
                    rusqlite::params![id],
                )?;
                let n = conn.changes();
                report.cron_jobs_disabled_invalid_expr += n;
                report.changed_rows += n;
            }
        }
    }

    conn.execute_batch("COMMIT;")?;
    report.changed = report.changed_rows > 0;
    Ok(report)
}

pub(crate) fn normalize_cron_payload_json(description: Option<&str>, raw: &str) -> Option<String> {
    let mut payload = match serde_json::from_str::<serde_json::Value>(raw) {
        Ok(v) => v,
        Err(_) => return Some(r#"{"action":"noop"}"#.to_string()),
    };
    let obj = match payload.as_object_mut() {
        Some(v) => v,
        None => return Some(r#"{"action":"noop"}"#.to_string()),
    };
    let mut changed = false;
    if let Some(kind) = obj.get("kind").and_then(|v| v.as_str())
        && obj.get("action").and_then(|v| v.as_str()).is_none()
        && let Some(mapped) = legacy_kind_to_action(kind)
    {
        obj.insert(
            "action".to_string(),
            serde_json::Value::String(mapped.to_string()),
        );
        changed = true;
    }
    let action = obj
        .get("action")
        .and_then(|v| v.as_str())
        .unwrap_or("unknown");
    if action == "log"
        && let Some(desc) = description.map(str::trim).filter(|d| !d.is_empty())
    {
        let message = obj
            .get("message")
            .and_then(|v| v.as_str())
            .map(str::trim)
            .unwrap_or("");
        if message.eq_ignore_ascii_case(desc) || message.starts_with("scheduled job:") {
            obj.insert(
                "action".to_string(),
                serde_json::Value::String("agent_task".to_string()),
            );
            obj.insert(
                "task".to_string(),
                serde_json::Value::String(desc.to_string()),
            );
            obj.remove("message");
            return serde_json::to_string(&payload).ok();
        }
    }
    if matches!(
        action,
        "log"
            | "agent_task"
            | "metric_snapshot"
            | "expire_sessions"
            | "record_transaction"
            | "noop"
    ) {
        if changed {
            return serde_json::to_string(&payload).ok();
        }
        return None;
    }
    obj.insert(
        "action".to_string(),
        serde_json::Value::String("noop".to_string()),
    );
    serde_json::to_string(&payload).ok()
}

fn legacy_kind_to_action(kind: &str) -> Option<&'static str> {
    match kind {
        "agentTurn" => Some("noop"),
        "metricSnapshot" => Some("metric_snapshot"),
        "expireSessions" => Some("expire_sessions"),
        "recordTransaction" => Some("record_transaction"),
        "log" => Some("log"),
        "noop" => Some("noop"),
        _ => None,
    }
}