Skip to main content

roboticus_cli/
state_hygiene.rs

1use std::path::Path;
2
3#[derive(Debug, Default, Clone, Copy)]
4pub struct StateHygieneReport {
5    pub changed: bool,
6    pub changed_rows: u64,
7    pub subagent_rows_normalized: u64,
8    pub cron_payload_rows_repaired: u64,
9    pub cron_jobs_disabled_invalid_expr: u64,
10}
11
12pub fn run_state_hygiene(
13    state_db_path: &Path,
14) -> Result<StateHygieneReport, Box<dyn std::error::Error>> {
15    if !state_db_path.exists() {
16        return Ok(StateHygieneReport::default());
17    }
18    let conn = rusqlite::Connection::open(state_db_path)?;
19    let mut report = StateHygieneReport::default();
20    let has_column = |table: &str, column: &str| -> rusqlite::Result<bool> {
21        let mut stmt = conn.prepare(&format!(
22            "PRAGMA table_info(\"{}\")",
23            table.replace('"', "\"\"")
24        ))?;
25        let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
26        for col in rows {
27            if col? == column {
28                return Ok(true);
29            }
30        }
31        Ok(false)
32    };
33    let has_table = |table: &str| -> rusqlite::Result<bool> {
34        conn.query_row(
35            "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1)",
36            [table],
37            |row| row.get::<_, i64>(0),
38        )
39        .map(|exists| exists != 0)
40    };
41
42    conn.execute_batch("BEGIN;")?;
43
44    conn.execute(
45        "UPDATE sub_agents SET role='subagent' WHERE lower(trim(role))='specialist'",
46        [],
47    )?;
48    let n = conn.changes();
49    report.subagent_rows_normalized += n;
50    report.changed_rows += n;
51
52    conn.execute(
53        "DELETE FROM sub_agents WHERE lower(trim(role))='commander'",
54        [],
55    )?;
56    let n = conn.changes();
57    report.subagent_rows_normalized += n;
58    report.changed_rows += n;
59
60    conn.execute(
61        "UPDATE sub_agents SET skills_json='[]' WHERE skills_json IS NULL",
62        [],
63    )?;
64    let n = conn.changes();
65    report.subagent_rows_normalized += n;
66    report.changed_rows += n;
67
68    if has_column("sub_agents", "fallback_models_json")? {
69        conn.execute(
70            "UPDATE sub_agents SET fallback_models_json='[]' WHERE fallback_models_json IS NULL OR trim(fallback_models_json)=''",
71            [],
72        )?;
73        let n = conn.changes();
74        report.subagent_rows_normalized += n;
75        report.changed_rows += n;
76    }
77    if has_column("sub_agents", "model")? {
78        conn.execute(
79            "UPDATE sub_agents
80             SET model='auto'
81             WHERE lower(trim(role))='subagent'
82               AND lower(trim(model)) IN ('ollama-gpu/qwen3:14b','ollama-gpu/qwen3.5:35b-a3b')",
83            [],
84        )?;
85        let n = conn.changes();
86        report.subagent_rows_normalized += n;
87        report.changed_rows += n;
88
89        conn.execute(
90            "UPDATE sub_agents
91             SET model='auto'
92             WHERE lower(trim(role))='subagent'
93               AND trim(model) <> ''
94               AND lower(trim(model)) NOT IN ('auto','orchestrator')
95               AND instr(trim(model), '/') = 0",
96            [],
97        )?;
98        let n = conn.changes();
99        report.subagent_rows_normalized += n;
100        report.changed_rows += n;
101    }
102
103    if has_table("cron_jobs")?
104        && has_column("cron_jobs", "payload_json")?
105        && has_column("cron_jobs", "id")?
106    {
107        let mut stmt = conn.prepare("SELECT id, description, payload_json FROM cron_jobs")?;
108        let rows = stmt.query_map([], |row| {
109            Ok((
110                row.get::<_, String>(0)?,
111                row.get::<_, Option<String>>(1)?,
112                row.get::<_, String>(2)?,
113            ))
114        })?;
115        for row in rows {
116            let (id, description, payload_raw) = row?;
117            if let Some(payload_json) =
118                normalize_cron_payload_json(description.as_deref(), &payload_raw)
119            {
120                conn.execute(
121                    "UPDATE cron_jobs SET payload_json=?1 WHERE id=?2",
122                    rusqlite::params![payload_json, id],
123                )?;
124                let n = conn.changes();
125                report.cron_payload_rows_repaired += n;
126                report.changed_rows += n;
127            }
128        }
129    }
130
131    if has_table("cron_jobs")?
132        && has_column("cron_jobs", "id")?
133        && has_column("cron_jobs", "enabled")?
134        && has_column("cron_jobs", "schedule_kind")?
135        && has_column("cron_jobs", "schedule_expr")?
136    {
137        let mut stmt = conn.prepare(
138            "SELECT id, schedule_expr
139             FROM cron_jobs
140             WHERE enabled=1 AND lower(trim(schedule_kind))='cron'",
141        )?;
142        let rows = stmt.query_map([], |row| {
143            Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
144        })?;
145        for row in rows {
146            let (id, expr_opt) = row?;
147            let valid = expr_opt
148                .as_deref()
149                .map(roboticus_schedule::DurableScheduler::is_valid_cron_expression)
150                .unwrap_or(false);
151            if !valid {
152                conn.execute(
153                    "UPDATE cron_jobs SET enabled=0 WHERE id=?1",
154                    rusqlite::params![id],
155                )?;
156                let n = conn.changes();
157                report.cron_jobs_disabled_invalid_expr += n;
158                report.changed_rows += n;
159            }
160        }
161    }
162
163    conn.execute_batch("COMMIT;")?;
164    report.changed = report.changed_rows > 0;
165    Ok(report)
166}
167
168pub(crate) fn normalize_cron_payload_json(description: Option<&str>, raw: &str) -> Option<String> {
169    let mut payload = match serde_json::from_str::<serde_json::Value>(raw) {
170        Ok(v) => v,
171        Err(_) => return Some(r#"{"action":"noop"}"#.to_string()),
172    };
173    let obj = match payload.as_object_mut() {
174        Some(v) => v,
175        None => return Some(r#"{"action":"noop"}"#.to_string()),
176    };
177    let mut changed = false;
178    if let Some(kind) = obj.get("kind").and_then(|v| v.as_str())
179        && obj.get("action").and_then(|v| v.as_str()).is_none()
180        && let Some(mapped) = legacy_kind_to_action(kind)
181    {
182        obj.insert(
183            "action".to_string(),
184            serde_json::Value::String(mapped.to_string()),
185        );
186        changed = true;
187    }
188    let action = obj
189        .get("action")
190        .and_then(|v| v.as_str())
191        .unwrap_or("unknown");
192    if action == "log"
193        && let Some(desc) = description.map(str::trim).filter(|d| !d.is_empty())
194    {
195        let message = obj
196            .get("message")
197            .and_then(|v| v.as_str())
198            .map(str::trim)
199            .unwrap_or("");
200        if message.eq_ignore_ascii_case(desc) || message.starts_with("scheduled job:") {
201            obj.insert(
202                "action".to_string(),
203                serde_json::Value::String("agent_task".to_string()),
204            );
205            obj.insert(
206                "task".to_string(),
207                serde_json::Value::String(desc.to_string()),
208            );
209            obj.remove("message");
210            return serde_json::to_string(&payload).ok();
211        }
212    }
213    if matches!(
214        action,
215        "log"
216            | "agent_task"
217            | "metric_snapshot"
218            | "expire_sessions"
219            | "record_transaction"
220            | "noop"
221    ) {
222        if changed {
223            return serde_json::to_string(&payload).ok();
224        }
225        return None;
226    }
227    obj.insert(
228        "action".to_string(),
229        serde_json::Value::String("noop".to_string()),
230    );
231    serde_json::to_string(&payload).ok()
232}
233
234fn legacy_kind_to_action(kind: &str) -> Option<&'static str> {
235    match kind {
236        "agentTurn" => Some("noop"),
237        "metricSnapshot" => Some("metric_snapshot"),
238        "expireSessions" => Some("expire_sessions"),
239        "recordTransaction" => Some("record_transaction"),
240        "log" => Some("log"),
241        "noop" => Some("noop"),
242        _ => None,
243    }
244}