1use std::path::Path;
2
3#[derive(Debug, Default, Clone, Copy)]
4pub struct StateHygieneReport {
5 pub changed: bool,
6 pub changed_rows: u64,
7 pub subagent_rows_normalized: u64,
8 pub cron_payload_rows_repaired: u64,
9 pub cron_jobs_disabled_invalid_expr: u64,
10}
11
12pub fn run_state_hygiene(
13 state_db_path: &Path,
14) -> Result<StateHygieneReport, Box<dyn std::error::Error>> {
15 if !state_db_path.exists() {
16 return Ok(StateHygieneReport::default());
17 }
18 let conn = rusqlite::Connection::open(state_db_path)?;
19 let mut report = StateHygieneReport::default();
20 let has_column = |table: &str, column: &str| -> rusqlite::Result<bool> {
21 let mut stmt = conn.prepare(&format!(
22 "PRAGMA table_info(\"{}\")",
23 table.replace('"', "\"\"")
24 ))?;
25 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
26 for col in rows {
27 if col? == column {
28 return Ok(true);
29 }
30 }
31 Ok(false)
32 };
33 let has_table = |table: &str| -> rusqlite::Result<bool> {
34 conn.query_row(
35 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1)",
36 [table],
37 |row| row.get::<_, i64>(0),
38 )
39 .map(|exists| exists != 0)
40 };
41
42 conn.execute_batch("BEGIN;")?;
43
44 conn.execute(
45 "UPDATE sub_agents SET role='subagent' WHERE lower(trim(role))='specialist'",
46 [],
47 )?;
48 let n = conn.changes();
49 report.subagent_rows_normalized += n;
50 report.changed_rows += n;
51
52 conn.execute(
53 "DELETE FROM sub_agents WHERE lower(trim(role))='commander'",
54 [],
55 )?;
56 let n = conn.changes();
57 report.subagent_rows_normalized += n;
58 report.changed_rows += n;
59
60 conn.execute(
61 "UPDATE sub_agents SET skills_json='[]' WHERE skills_json IS NULL",
62 [],
63 )?;
64 let n = conn.changes();
65 report.subagent_rows_normalized += n;
66 report.changed_rows += n;
67
68 if has_column("sub_agents", "fallback_models_json")? {
69 conn.execute(
70 "UPDATE sub_agents SET fallback_models_json='[]' WHERE fallback_models_json IS NULL OR trim(fallback_models_json)=''",
71 [],
72 )?;
73 let n = conn.changes();
74 report.subagent_rows_normalized += n;
75 report.changed_rows += n;
76 }
77 if has_column("sub_agents", "model")? {
78 conn.execute(
79 "UPDATE sub_agents
80 SET model='auto'
81 WHERE lower(trim(role))='subagent'
82 AND lower(trim(model)) IN ('ollama-gpu/qwen3:14b','ollama-gpu/qwen3.5:35b-a3b')",
83 [],
84 )?;
85 let n = conn.changes();
86 report.subagent_rows_normalized += n;
87 report.changed_rows += n;
88
89 conn.execute(
90 "UPDATE sub_agents
91 SET model='auto'
92 WHERE lower(trim(role))='subagent'
93 AND trim(model) <> ''
94 AND lower(trim(model)) NOT IN ('auto','orchestrator')
95 AND instr(trim(model), '/') = 0",
96 [],
97 )?;
98 let n = conn.changes();
99 report.subagent_rows_normalized += n;
100 report.changed_rows += n;
101 }
102
103 if has_table("cron_jobs")?
104 && has_column("cron_jobs", "payload_json")?
105 && has_column("cron_jobs", "id")?
106 {
107 let mut stmt = conn.prepare("SELECT id, description, payload_json FROM cron_jobs")?;
108 let rows = stmt.query_map([], |row| {
109 Ok((
110 row.get::<_, String>(0)?,
111 row.get::<_, Option<String>>(1)?,
112 row.get::<_, String>(2)?,
113 ))
114 })?;
115 for row in rows {
116 let (id, description, payload_raw) = row?;
117 if let Some(payload_json) =
118 normalize_cron_payload_json(description.as_deref(), &payload_raw)
119 {
120 conn.execute(
121 "UPDATE cron_jobs SET payload_json=?1 WHERE id=?2",
122 rusqlite::params![payload_json, id],
123 )?;
124 let n = conn.changes();
125 report.cron_payload_rows_repaired += n;
126 report.changed_rows += n;
127 }
128 }
129 }
130
131 if has_table("cron_jobs")?
132 && has_column("cron_jobs", "id")?
133 && has_column("cron_jobs", "enabled")?
134 && has_column("cron_jobs", "schedule_kind")?
135 && has_column("cron_jobs", "schedule_expr")?
136 {
137 let mut stmt = conn.prepare(
138 "SELECT id, schedule_expr
139 FROM cron_jobs
140 WHERE enabled=1 AND lower(trim(schedule_kind))='cron'",
141 )?;
142 let rows = stmt.query_map([], |row| {
143 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
144 })?;
145 for row in rows {
146 let (id, expr_opt) = row?;
147 let valid = expr_opt
148 .as_deref()
149 .map(roboticus_schedule::DurableScheduler::is_valid_cron_expression)
150 .unwrap_or(false);
151 if !valid {
152 conn.execute(
153 "UPDATE cron_jobs SET enabled=0 WHERE id=?1",
154 rusqlite::params![id],
155 )?;
156 let n = conn.changes();
157 report.cron_jobs_disabled_invalid_expr += n;
158 report.changed_rows += n;
159 }
160 }
161 }
162
163 conn.execute_batch("COMMIT;")?;
164 report.changed = report.changed_rows > 0;
165 Ok(report)
166}
167
168pub(crate) fn normalize_cron_payload_json(description: Option<&str>, raw: &str) -> Option<String> {
169 let mut payload = match serde_json::from_str::<serde_json::Value>(raw) {
170 Ok(v) => v,
171 Err(_) => return Some(r#"{"action":"noop"}"#.to_string()),
172 };
173 let obj = match payload.as_object_mut() {
174 Some(v) => v,
175 None => return Some(r#"{"action":"noop"}"#.to_string()),
176 };
177 let mut changed = false;
178 if let Some(kind) = obj.get("kind").and_then(|v| v.as_str())
179 && obj.get("action").and_then(|v| v.as_str()).is_none()
180 && let Some(mapped) = legacy_kind_to_action(kind)
181 {
182 obj.insert(
183 "action".to_string(),
184 serde_json::Value::String(mapped.to_string()),
185 );
186 changed = true;
187 }
188 let action = obj
189 .get("action")
190 .and_then(|v| v.as_str())
191 .unwrap_or("unknown");
192 if action == "log"
193 && let Some(desc) = description.map(str::trim).filter(|d| !d.is_empty())
194 {
195 let message = obj
196 .get("message")
197 .and_then(|v| v.as_str())
198 .map(str::trim)
199 .unwrap_or("");
200 if message.eq_ignore_ascii_case(desc) || message.starts_with("scheduled job:") {
201 obj.insert(
202 "action".to_string(),
203 serde_json::Value::String("agent_task".to_string()),
204 );
205 obj.insert(
206 "task".to_string(),
207 serde_json::Value::String(desc.to_string()),
208 );
209 obj.remove("message");
210 return serde_json::to_string(&payload).ok();
211 }
212 }
213 if matches!(
214 action,
215 "log"
216 | "agent_task"
217 | "metric_snapshot"
218 | "expire_sessions"
219 | "record_transaction"
220 | "noop"
221 ) {
222 if changed {
223 return serde_json::to_string(&payload).ok();
224 }
225 return None;
226 }
227 obj.insert(
228 "action".to_string(),
229 serde_json::Value::String("noop".to_string()),
230 );
231 serde_json::to_string(&payload).ok()
232}
233
234fn legacy_kind_to_action(kind: &str) -> Option<&'static str> {
235 match kind {
236 "agentTurn" => Some("noop"),
237 "metricSnapshot" => Some("metric_snapshot"),
238 "expireSessions" => Some("expire_sessions"),
239 "recordTransaction" => Some("record_transaction"),
240 "log" => Some("log"),
241 "noop" => Some("noop"),
242 _ => None,
243 }
244}