Skip to main content

roboticus_db/
tasks.rs

1use crate::{Database, DbResultExt};
2use roboticus_core::Result;
3use rusqlite::OptionalExtension;
4use serde_json::{Value, json};
5
6pub fn normalize_task_source_value(raw: Option<&str>) -> Value {
7    let Some(raw) = raw else {
8        return Value::Null;
9    };
10    let trimmed = raw.trim();
11    if trimmed.is_empty() {
12        return Value::Null;
13    }
14
15    match serde_json::from_str::<Value>(trimmed) {
16        Ok(Value::String(inner)) => parse_inner_or_origin(&inner),
17        Ok(parsed) => parsed,
18        Err(_) => parse_inner_or_origin(trimmed),
19    }
20}
21
22fn parse_inner_or_origin(raw: &str) -> Value {
23    let trimmed = raw.trim();
24    if trimmed.is_empty() {
25        return Value::Null;
26    }
27    if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
28        return parsed;
29    }
30    if looks_like_origin(trimmed) {
31        return json!({ "origin": trimmed });
32    }
33    Value::String(raw.to_string())
34}
35
36fn looks_like_origin(raw: &str) -> bool {
37    raw.contains(':')
38        && !raw.contains(' ')
39        && !raw.contains('{')
40        && !raw.contains('}')
41        && !raw.contains('[')
42        && !raw.contains(']')
43}
44
45pub fn canonical_task_source_json(raw: Option<&str>) -> Option<String> {
46    let normalized = normalize_task_source_value(raw);
47    if normalized.is_null() {
48        None
49    } else {
50        Some(
51            serde_json::to_string(&normalized)
52                .expect("serde_json::to_string on a parsed Value cannot fail"),
53        )
54    }
55}
56
57pub fn task_is_revenue_like(title: &str, source: &Value) -> bool {
58    let title_lc = title.to_ascii_lowercase();
59    if title_lc.contains("bounty:")
60        || title_lc.contains("audit:")
61        || title_lc.contains("self-funding")
62        || title_lc.contains("monetization")
63        || title_lc.contains("trading")
64    {
65        return true;
66    }
67    let haystack = source.to_string().to_ascii_lowercase();
68    haystack.contains("\"type\":\"revenue\"")
69        || haystack.contains("immunefi")
70        || haystack.contains("bounty")
71        || haystack.contains("mentat:tasks")
72}
73
74pub fn task_is_obvious_noise(title: &str, source: &Value) -> bool {
75    let title_lc = title.trim().to_ascii_lowercase();
76    if title_lc.is_empty() {
77        return false;
78    }
79    let canned = [
80        "what is the juice of saphoo?",
81        "no, that was just a test.  thank you",
82        "no, that was just a test. thank you",
83    ];
84    if canned.iter().any(|item| title_lc == *item) {
85        return true;
86    }
87    let source_text = source.to_string().to_ascii_lowercase();
88    source_text.contains("agentic_bot:tasks")
89        && (title_lc.contains("just a test") || title_lc.contains("saphoo"))
90}
91
92pub fn normalize_task_sources_in_db(db: &Database) -> Result<i64> {
93    let conn = db.conn();
94    let mut stmt = conn
95        .prepare("SELECT id, source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
96        .db_err()?;
97    let rows = stmt
98        .query_map([], |row| {
99            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
100        })
101        .db_err()?;
102    let mut updated = 0i64;
103    for row in rows {
104        let (id, source) = row.db_err()?;
105        let normalized = canonical_task_source_json(Some(&source));
106        if normalized.as_deref() != Some(source.trim()) {
107            updated += conn
108                .execute(
109                    "UPDATE tasks SET source = ?2 WHERE id = ?1",
110                    rusqlite::params![id, normalized],
111                )
112                .db_err()? as i64;
113        }
114    }
115    Ok(updated)
116}
117
118pub fn count_task_sources_needing_normalization(db: &Database) -> Result<i64> {
119    let conn = db.conn();
120    let mut stmt = conn
121        .prepare("SELECT source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
122        .db_err()?;
123    let rows = stmt.query_map([], |row| row.get::<_, String>(0)).db_err()?;
124    let mut count = 0i64;
125    for row in rows {
126        let source = row.db_err()?;
127        let normalized = canonical_task_source_json(Some(&source));
128        if normalized.as_deref() != Some(source.trim()) {
129            count += 1;
130        }
131    }
132    Ok(count)
133}
134
135pub fn classify_open_tasks(db: &Database) -> Result<(i64, i64)> {
136    let conn = db.conn();
137    let mut stmt = conn
138        .prepare(
139            "SELECT title, source \
140             FROM tasks \
141             WHERE lower(status) IN ('pending','in_progress')",
142        )
143        .db_err()?;
144    let rows = stmt
145        .query_map([], |row| {
146            Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
147        })
148        .db_err()?;
149    let mut revenue_like = 0i64;
150    let mut obvious_noise = 0i64;
151    for row in rows {
152        let (title, source_raw) = row.db_err()?;
153        let source = normalize_task_source_value(source_raw.as_deref());
154        if task_is_revenue_like(&title, &source) {
155            revenue_like += 1;
156        }
157        if task_is_obvious_noise(&title, &source) {
158            obvious_noise += 1;
159        }
160    }
161    Ok((revenue_like, obvious_noise))
162}
163
164pub fn count_stale_revenue_tasks(db: &Database) -> Result<i64> {
165    let conn = db.conn();
166    let mut stmt = conn
167        .prepare(
168            "SELECT title, source \
169             FROM tasks \
170             WHERE lower(status) = 'in_progress' \
171               AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
172        )
173        .db_err()?;
174    let rows = stmt
175        .query_map([], |row| {
176            Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
177        })
178        .db_err()?;
179    let mut count = 0i64;
180    for row in rows {
181        let (title, source_raw) = row.db_err()?;
182        let source = normalize_task_source_value(source_raw.as_deref());
183        if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
184            count += 1;
185        }
186    }
187    Ok(count)
188}
189
190pub fn mark_stale_revenue_tasks_needs_review(db: &Database) -> Result<i64> {
191    let conn = db.conn();
192    let mut stmt = conn
193        .prepare(
194            "SELECT id, title, source \
195             FROM tasks \
196             WHERE lower(status) = 'in_progress' \
197               AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
198        )
199        .db_err()?;
200    let rows = stmt
201        .query_map([], |row| {
202            Ok((
203                row.get::<_, String>(0)?,
204                row.get::<_, String>(1)?,
205                row.get::<_, Option<String>>(2)?,
206            ))
207        })
208        .db_err()?;
209    let mut updated = 0i64;
210    for row in rows {
211        let (id, title, source_raw) = row.db_err()?;
212        let source = normalize_task_source_value(source_raw.as_deref());
213        if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
214            updated += conn
215                .execute(
216                    "UPDATE tasks SET status = 'needs_review', updated_at = datetime('now') WHERE id = ?1",
217                    [id],
218                )
219                .db_err()? as i64;
220        }
221    }
222    Ok(updated)
223}
224
225pub fn dismiss_obvious_noise_tasks(db: &Database) -> Result<i64> {
226    let conn = db.conn();
227    let mut stmt = conn
228        .prepare(
229            "SELECT id, title, source \
230             FROM tasks \
231             WHERE lower(status) IN ('pending','in_progress')",
232        )
233        .db_err()?;
234    let rows = stmt
235        .query_map([], |row| {
236            Ok((
237                row.get::<_, String>(0)?,
238                row.get::<_, String>(1)?,
239                row.get::<_, Option<String>>(2)?,
240            ))
241        })
242        .db_err()?;
243    let mut updated = 0i64;
244    for row in rows {
245        let (id, title, source_raw) = row.db_err()?;
246        let source = normalize_task_source_value(source_raw.as_deref());
247        if task_is_obvious_noise(&title, &source) {
248            updated += conn
249                .execute(
250                    "UPDATE tasks SET status = 'dismissed', updated_at = datetime('now') WHERE id = ?1",
251                    [id],
252                )
253                .db_err()? as i64;
254        }
255    }
256    Ok(updated)
257}
258
259pub fn get_task_source(db: &Database, id: &str) -> Result<Option<Value>> {
260    let conn = db.conn();
261    let source = conn
262        .query_row("SELECT source FROM tasks WHERE id = ?1", [id], |row| {
263            row.get::<_, Option<String>>(0)
264        })
265        .optional()
266        .db_err()?
267        .flatten();
268    Ok(Some(normalize_task_source_value(source.as_deref())).filter(|v| !v.is_null()))
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn normalize_task_source_handles_json_string_wrapped_object() {
277        let raw = "\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"";
278        let normalized = normalize_task_source_value(Some(raw));
279        assert_eq!(normalized["origin"], "pg:mentat:tasks");
280        assert_eq!(normalized["metadata"]["type"], "revenue");
281    }
282
283    #[test]
284    fn normalize_task_source_wraps_origin_strings() {
285        let normalized = normalize_task_source_value(Some("pg:agentic_bot:tasks"));
286        assert_eq!(normalized["origin"], "pg:agentic_bot:tasks");
287    }
288
289    #[test]
290    fn repair_classifies_and_cleans_revenue_and_noise_tasks() {
291        let db = Database::new(":memory:").unwrap();
292        let conn = db.conn();
293        conn.execute(
294            "INSERT INTO tasks (id, title, status, priority, source, created_at, updated_at) VALUES \
295             ('t1','Bounty: SSV Network','in_progress',85,'\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"',datetime('now','-2 days'),datetime('now','-2 days')), \
296             ('t2','What is the juice of saphoo?','pending',5,'pg:agentic_bot:tasks',datetime('now'),datetime('now'))",
297            [],
298        )
299        .unwrap();
300        drop(conn);
301
302        assert_eq!(count_task_sources_needing_normalization(&db).unwrap(), 2);
303        assert_eq!(normalize_task_sources_in_db(&db).unwrap(), 2);
304        assert_eq!(mark_stale_revenue_tasks_needs_review(&db).unwrap(), 1);
305        assert_eq!(dismiss_obvious_noise_tasks(&db).unwrap(), 1);
306        let conn = db.conn();
307        let t1_status: String = conn
308            .query_row("SELECT status FROM tasks WHERE id='t1'", [], |row| {
309                row.get(0)
310            })
311            .unwrap();
312        let t2_status: String = conn
313            .query_row("SELECT status FROM tasks WHERE id='t2'", [], |row| {
314                row.get(0)
315            })
316            .unwrap();
317        let t1_source: String = conn
318            .query_row("SELECT source FROM tasks WHERE id='t1'", [], |row| {
319                row.get(0)
320            })
321            .unwrap();
322        assert_eq!(t1_status, "needs_review");
323        assert_eq!(t2_status, "dismissed");
324        assert!(t1_source.contains("\"origin\":\"pg:mentat:tasks\""));
325    }
326}