1use crate::{Database, DbResultExt};
2use roboticus_core::Result;
3use rusqlite::OptionalExtension;
4use serde_json::{Value, json};
5
6pub fn normalize_task_source_value(raw: Option<&str>) -> Value {
7 let Some(raw) = raw else {
8 return Value::Null;
9 };
10 let trimmed = raw.trim();
11 if trimmed.is_empty() {
12 return Value::Null;
13 }
14
15 match serde_json::from_str::<Value>(trimmed) {
16 Ok(Value::String(inner)) => parse_inner_or_origin(&inner),
17 Ok(parsed) => parsed,
18 Err(_) => parse_inner_or_origin(trimmed),
19 }
20}
21
22fn parse_inner_or_origin(raw: &str) -> Value {
23 let trimmed = raw.trim();
24 if trimmed.is_empty() {
25 return Value::Null;
26 }
27 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
28 return parsed;
29 }
30 if looks_like_origin(trimmed) {
31 return json!({ "origin": trimmed });
32 }
33 Value::String(raw.to_string())
34}
35
36fn looks_like_origin(raw: &str) -> bool {
37 raw.contains(':')
38 && !raw.contains(' ')
39 && !raw.contains('{')
40 && !raw.contains('}')
41 && !raw.contains('[')
42 && !raw.contains(']')
43}
44
45pub fn canonical_task_source_json(raw: Option<&str>) -> Option<String> {
46 let normalized = normalize_task_source_value(raw);
47 if normalized.is_null() {
48 None
49 } else {
50 Some(
51 serde_json::to_string(&normalized)
52 .expect("serde_json::to_string on a parsed Value cannot fail"),
53 )
54 }
55}
56
57pub fn task_is_revenue_like(title: &str, source: &Value) -> bool {
58 let title_lc = title.to_ascii_lowercase();
59 if title_lc.contains("bounty:")
60 || title_lc.contains("audit:")
61 || title_lc.contains("self-funding")
62 || title_lc.contains("monetization")
63 || title_lc.contains("trading")
64 {
65 return true;
66 }
67 let haystack = source.to_string().to_ascii_lowercase();
68 haystack.contains("\"type\":\"revenue\"")
69 || haystack.contains("immunefi")
70 || haystack.contains("bounty")
71 || haystack.contains("mentat:tasks")
72}
73
74pub fn task_is_obvious_noise(title: &str, source: &Value) -> bool {
75 let title_lc = title.trim().to_ascii_lowercase();
76 if title_lc.is_empty() {
77 return false;
78 }
79 let canned = [
80 "what is the juice of saphoo?",
81 "no, that was just a test. thank you",
82 "no, that was just a test. thank you",
83 ];
84 if canned.iter().any(|item| title_lc == *item) {
85 return true;
86 }
87 let source_text = source.to_string().to_ascii_lowercase();
88 source_text.contains("agentic_bot:tasks")
89 && (title_lc.contains("just a test") || title_lc.contains("saphoo"))
90}
91
92pub fn normalize_task_sources_in_db(db: &Database) -> Result<i64> {
93 let conn = db.conn();
94 let mut stmt = conn
95 .prepare("SELECT id, source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
96 .db_err()?;
97 let rows = stmt
98 .query_map([], |row| {
99 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
100 })
101 .db_err()?;
102 let mut updated = 0i64;
103 for row in rows {
104 let (id, source) = row.db_err()?;
105 let normalized = canonical_task_source_json(Some(&source));
106 if normalized.as_deref() != Some(source.trim()) {
107 updated += conn
108 .execute(
109 "UPDATE tasks SET source = ?2 WHERE id = ?1",
110 rusqlite::params![id, normalized],
111 )
112 .db_err()? as i64;
113 }
114 }
115 Ok(updated)
116}
117
118pub fn count_task_sources_needing_normalization(db: &Database) -> Result<i64> {
119 let conn = db.conn();
120 let mut stmt = conn
121 .prepare("SELECT source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
122 .db_err()?;
123 let rows = stmt.query_map([], |row| row.get::<_, String>(0)).db_err()?;
124 let mut count = 0i64;
125 for row in rows {
126 let source = row.db_err()?;
127 let normalized = canonical_task_source_json(Some(&source));
128 if normalized.as_deref() != Some(source.trim()) {
129 count += 1;
130 }
131 }
132 Ok(count)
133}
134
135pub fn classify_open_tasks(db: &Database) -> Result<(i64, i64)> {
136 let conn = db.conn();
137 let mut stmt = conn
138 .prepare(
139 "SELECT title, source \
140 FROM tasks \
141 WHERE lower(status) IN ('pending','in_progress')",
142 )
143 .db_err()?;
144 let rows = stmt
145 .query_map([], |row| {
146 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
147 })
148 .db_err()?;
149 let mut revenue_like = 0i64;
150 let mut obvious_noise = 0i64;
151 for row in rows {
152 let (title, source_raw) = row.db_err()?;
153 let source = normalize_task_source_value(source_raw.as_deref());
154 if task_is_revenue_like(&title, &source) {
155 revenue_like += 1;
156 }
157 if task_is_obvious_noise(&title, &source) {
158 obvious_noise += 1;
159 }
160 }
161 Ok((revenue_like, obvious_noise))
162}
163
164pub fn count_stale_revenue_tasks(db: &Database) -> Result<i64> {
165 let conn = db.conn();
166 let mut stmt = conn
167 .prepare(
168 "SELECT title, source \
169 FROM tasks \
170 WHERE lower(status) = 'in_progress' \
171 AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
172 )
173 .db_err()?;
174 let rows = stmt
175 .query_map([], |row| {
176 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
177 })
178 .db_err()?;
179 let mut count = 0i64;
180 for row in rows {
181 let (title, source_raw) = row.db_err()?;
182 let source = normalize_task_source_value(source_raw.as_deref());
183 if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
184 count += 1;
185 }
186 }
187 Ok(count)
188}
189
190pub fn mark_stale_revenue_tasks_needs_review(db: &Database) -> Result<i64> {
191 let conn = db.conn();
192 let mut stmt = conn
193 .prepare(
194 "SELECT id, title, source \
195 FROM tasks \
196 WHERE lower(status) = 'in_progress' \
197 AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
198 )
199 .db_err()?;
200 let rows = stmt
201 .query_map([], |row| {
202 Ok((
203 row.get::<_, String>(0)?,
204 row.get::<_, String>(1)?,
205 row.get::<_, Option<String>>(2)?,
206 ))
207 })
208 .db_err()?;
209 let mut updated = 0i64;
210 for row in rows {
211 let (id, title, source_raw) = row.db_err()?;
212 let source = normalize_task_source_value(source_raw.as_deref());
213 if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
214 updated += conn
215 .execute(
216 "UPDATE tasks SET status = 'needs_review', updated_at = datetime('now') WHERE id = ?1",
217 [id],
218 )
219 .db_err()? as i64;
220 }
221 }
222 Ok(updated)
223}
224
225pub fn dismiss_obvious_noise_tasks(db: &Database) -> Result<i64> {
226 let conn = db.conn();
227 let mut stmt = conn
228 .prepare(
229 "SELECT id, title, source \
230 FROM tasks \
231 WHERE lower(status) IN ('pending','in_progress')",
232 )
233 .db_err()?;
234 let rows = stmt
235 .query_map([], |row| {
236 Ok((
237 row.get::<_, String>(0)?,
238 row.get::<_, String>(1)?,
239 row.get::<_, Option<String>>(2)?,
240 ))
241 })
242 .db_err()?;
243 let mut updated = 0i64;
244 for row in rows {
245 let (id, title, source_raw) = row.db_err()?;
246 let source = normalize_task_source_value(source_raw.as_deref());
247 if task_is_obvious_noise(&title, &source) {
248 updated += conn
249 .execute(
250 "UPDATE tasks SET status = 'dismissed', updated_at = datetime('now') WHERE id = ?1",
251 [id],
252 )
253 .db_err()? as i64;
254 }
255 }
256 Ok(updated)
257}
258
259pub fn get_task_source(db: &Database, id: &str) -> Result<Option<Value>> {
260 let conn = db.conn();
261 let source = conn
262 .query_row("SELECT source FROM tasks WHERE id = ?1", [id], |row| {
263 row.get::<_, Option<String>>(0)
264 })
265 .optional()
266 .db_err()?
267 .flatten();
268 Ok(Some(normalize_task_source_value(source.as_deref())).filter(|v| !v.is_null()))
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn normalize_task_source_handles_json_string_wrapped_object() {
277 let raw = "\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"";
278 let normalized = normalize_task_source_value(Some(raw));
279 assert_eq!(normalized["origin"], "pg:mentat:tasks");
280 assert_eq!(normalized["metadata"]["type"], "revenue");
281 }
282
283 #[test]
284 fn normalize_task_source_wraps_origin_strings() {
285 let normalized = normalize_task_source_value(Some("pg:agentic_bot:tasks"));
286 assert_eq!(normalized["origin"], "pg:agentic_bot:tasks");
287 }
288
289 #[test]
290 fn repair_classifies_and_cleans_revenue_and_noise_tasks() {
291 let db = Database::new(":memory:").unwrap();
292 let conn = db.conn();
293 conn.execute(
294 "INSERT INTO tasks (id, title, status, priority, source, created_at, updated_at) VALUES \
295 ('t1','Bounty: SSV Network','in_progress',85,'\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"',datetime('now','-2 days'),datetime('now','-2 days')), \
296 ('t2','What is the juice of saphoo?','pending',5,'pg:agentic_bot:tasks',datetime('now'),datetime('now'))",
297 [],
298 )
299 .unwrap();
300 drop(conn);
301
302 assert_eq!(count_task_sources_needing_normalization(&db).unwrap(), 2);
303 assert_eq!(normalize_task_sources_in_db(&db).unwrap(), 2);
304 assert_eq!(mark_stale_revenue_tasks_needs_review(&db).unwrap(), 1);
305 assert_eq!(dismiss_obvious_noise_tasks(&db).unwrap(), 1);
306 let conn = db.conn();
307 let t1_status: String = conn
308 .query_row("SELECT status FROM tasks WHERE id='t1'", [], |row| {
309 row.get(0)
310 })
311 .unwrap();
312 let t2_status: String = conn
313 .query_row("SELECT status FROM tasks WHERE id='t2'", [], |row| {
314 row.get(0)
315 })
316 .unwrap();
317 let t1_source: String = conn
318 .query_row("SELECT source FROM tasks WHERE id='t1'", [], |row| {
319 row.get(0)
320 })
321 .unwrap();
322 assert_eq!(t1_status, "needs_review");
323 assert_eq!(t2_status, "dismissed");
324 assert!(t1_source.contains("\"origin\":\"pg:mentat:tasks\""));
325 }
326}