use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;
use serde_json::{Value, json};
pub fn normalize_task_source_value(raw: Option<&str>) -> Value {
let Some(raw) = raw else {
return Value::Null;
};
let trimmed = raw.trim();
if trimmed.is_empty() {
return Value::Null;
}
match serde_json::from_str::<Value>(trimmed) {
Ok(Value::String(inner)) => parse_inner_or_origin(&inner),
Ok(parsed) => parsed,
Err(_) => parse_inner_or_origin(trimmed),
}
}
fn parse_inner_or_origin(raw: &str) -> Value {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Value::Null;
}
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return parsed;
}
if looks_like_origin(trimmed) {
return json!({ "origin": trimmed });
}
Value::String(raw.to_string())
}
fn looks_like_origin(raw: &str) -> bool {
raw.contains(':')
&& !raw.contains(' ')
&& !raw.contains('{')
&& !raw.contains('}')
&& !raw.contains('[')
&& !raw.contains(']')
}
pub fn canonical_task_source_json(raw: Option<&str>) -> Option<String> {
let normalized = normalize_task_source_value(raw);
if normalized.is_null() {
None
} else {
Some(
serde_json::to_string(&normalized)
.expect("serde_json::to_string on a parsed Value cannot fail"),
)
}
}
pub fn task_is_revenue_like(title: &str, source: &Value) -> bool {
let title_lc = title.to_ascii_lowercase();
if title_lc.contains("bounty:")
|| title_lc.contains("audit:")
|| title_lc.contains("self-funding")
|| title_lc.contains("monetization")
|| title_lc.contains("trading")
{
return true;
}
let haystack = source.to_string().to_ascii_lowercase();
haystack.contains("\"type\":\"revenue\"")
|| haystack.contains("immunefi")
|| haystack.contains("bounty")
|| haystack.contains("mentat:tasks")
}
pub fn task_is_obvious_noise(title: &str, source: &Value) -> bool {
let title_lc = title.trim().to_ascii_lowercase();
if title_lc.is_empty() {
return false;
}
let canned = [
"what is the juice of saphoo?",
"no, that was just a test. thank you",
"no, that was just a test. thank you",
];
if canned.iter().any(|item| title_lc == *item) {
return true;
}
let source_text = source.to_string().to_ascii_lowercase();
source_text.contains("agentic_bot:tasks")
&& (title_lc.contains("just a test") || title_lc.contains("saphoo"))
}
pub fn normalize_task_sources_in_db(db: &Database) -> Result<i64> {
let conn = db.conn();
let mut stmt = conn
.prepare("SELECT id, source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})
.db_err()?;
let mut updated = 0i64;
for row in rows {
let (id, source) = row.db_err()?;
let normalized = canonical_task_source_json(Some(&source));
if normalized.as_deref() != Some(source.trim()) {
updated += conn
.execute(
"UPDATE tasks SET source = ?2 WHERE id = ?1",
rusqlite::params![id, normalized],
)
.db_err()? as i64;
}
}
Ok(updated)
}
pub fn count_task_sources_needing_normalization(db: &Database) -> Result<i64> {
let conn = db.conn();
let mut stmt = conn
.prepare("SELECT source FROM tasks WHERE source IS NOT NULL AND trim(source) != ''")
.db_err()?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0)).db_err()?;
let mut count = 0i64;
for row in rows {
let source = row.db_err()?;
let normalized = canonical_task_source_json(Some(&source));
if normalized.as_deref() != Some(source.trim()) {
count += 1;
}
}
Ok(count)
}
pub fn classify_open_tasks(db: &Database) -> Result<(i64, i64)> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT title, source \
FROM tasks \
WHERE lower(status) IN ('pending','in_progress')",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})
.db_err()?;
let mut revenue_like = 0i64;
let mut obvious_noise = 0i64;
for row in rows {
let (title, source_raw) = row.db_err()?;
let source = normalize_task_source_value(source_raw.as_deref());
if task_is_revenue_like(&title, &source) {
revenue_like += 1;
}
if task_is_obvious_noise(&title, &source) {
obvious_noise += 1;
}
}
Ok((revenue_like, obvious_noise))
}
pub fn count_stale_revenue_tasks(db: &Database) -> Result<i64> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT title, source \
FROM tasks \
WHERE lower(status) = 'in_progress' \
AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})
.db_err()?;
let mut count = 0i64;
for row in rows {
let (title, source_raw) = row.db_err()?;
let source = normalize_task_source_value(source_raw.as_deref());
if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
count += 1;
}
}
Ok(count)
}
pub fn mark_stale_revenue_tasks_needs_review(db: &Database) -> Result<i64> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, title, source \
FROM tasks \
WHERE lower(status) = 'in_progress' \
AND datetime(COALESCE(updated_at, created_at)) < datetime('now','-24 hours')",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})
.db_err()?;
let mut updated = 0i64;
for row in rows {
let (id, title, source_raw) = row.db_err()?;
let source = normalize_task_source_value(source_raw.as_deref());
if task_is_revenue_like(&title, &source) && !source.to_string().contains("revenue_swap") {
updated += conn
.execute(
"UPDATE tasks SET status = 'needs_review', updated_at = datetime('now') WHERE id = ?1",
[id],
)
.db_err()? as i64;
}
}
Ok(updated)
}
pub fn dismiss_obvious_noise_tasks(db: &Database) -> Result<i64> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, title, source \
FROM tasks \
WHERE lower(status) IN ('pending','in_progress')",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})
.db_err()?;
let mut updated = 0i64;
for row in rows {
let (id, title, source_raw) = row.db_err()?;
let source = normalize_task_source_value(source_raw.as_deref());
if task_is_obvious_noise(&title, &source) {
updated += conn
.execute(
"UPDATE tasks SET status = 'dismissed', updated_at = datetime('now') WHERE id = ?1",
[id],
)
.db_err()? as i64;
}
}
Ok(updated)
}
pub fn list_open_tasks(db: &Database) -> Result<Vec<Value>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, title, description, status, priority, source, created_at, updated_at \
FROM tasks \
WHERE lower(status) IN ('pending', 'in_progress') \
ORDER BY priority DESC, created_at ASC",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok(json!({
"id": row.get::<_, String>(0)?,
"title": row.get::<_, String>(1)?,
"description": row.get::<_, Option<String>>(2)?,
"status": row.get::<_, String>(3)?,
"priority": row.get::<_, i64>(4)?,
"source": row.get::<_, Option<String>>(5)?,
"created_at": row.get::<_, String>(6)?,
"updated_at": row.get::<_, String>(7)?,
}))
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn list_open_tasks_with_events(db: &Database) -> Result<Vec<Value>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT t.id, t.title, t.status, t.priority, t.source, t.created_at, t.updated_at, \
e.event_type, e.summary, e.percentage, e.retry_count, e.created_at AS event_at, \
e.assigned_to \
FROM tasks t \
LEFT JOIN ( \
SELECT task_id, assigned_to, event_type, summary, percentage, retry_count, created_at, \
ROW_NUMBER() OVER (PARTITION BY task_id ORDER BY created_at DESC) AS rn \
FROM task_events \
) e ON t.id = e.task_id AND e.rn = 1 \
WHERE lower(t.status) IN ('pending', 'in_progress', 'needs_review') \
ORDER BY t.priority DESC, t.created_at ASC",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
let mut task = json!({
"id": row.get::<_, String>(0)?,
"title": row.get::<_, String>(1)?,
"status": row.get::<_, String>(2)?,
"priority": row.get::<_, i64>(3)?,
"source": row.get::<_, Option<String>>(4)?,
"created_at": row.get::<_, String>(5)?,
"updated_at": row.get::<_, String>(6)?,
});
if let Ok(Some(et)) = row.get::<_, Option<String>>(7) {
task["lifecycle_state"] = json!(et);
}
if let Ok(Some(s)) = row.get::<_, Option<String>>(8) {
task["lifecycle_summary"] = json!(s);
}
if let Ok(Some(p)) = row.get::<_, Option<f64>>(9) {
task["lifecycle_percentage"] = json!(p);
}
if let Ok(Some(r)) = row.get::<_, Option<i32>>(10) {
task["retry_count"] = json!(r);
}
if let Ok(Some(ea)) = row.get::<_, Option<String>>(11) {
task["lifecycle_updated_at"] = json!(ea);
}
if let Ok(Some(a)) = row.get::<_, Option<String>>(12) {
task["assigned_to"] = json!(a);
}
Ok(task)
})
.db_err()?;
let mut tasks = Vec::new();
for row in rows {
tasks.push(row.db_err()?);
}
Ok(tasks)
}
pub fn get_task_source(db: &Database, id: &str) -> Result<Option<Value>> {
let conn = db.conn();
let source = conn
.query_row("SELECT source FROM tasks WHERE id = ?1", [id], |row| {
row.get::<_, Option<String>>(0)
})
.optional()
.db_err()?
.flatten();
Ok(Some(normalize_task_source_value(source.as_deref())).filter(|v| !v.is_null()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_task_source_handles_json_string_wrapped_object() {
let raw = "\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"";
let normalized = normalize_task_source_value(Some(raw));
assert_eq!(normalized["origin"], "pg:mentat:tasks");
assert_eq!(normalized["metadata"]["type"], "revenue");
}
#[test]
fn normalize_task_source_wraps_origin_strings() {
let normalized = normalize_task_source_value(Some("pg:agentic_bot:tasks"));
assert_eq!(normalized["origin"], "pg:agentic_bot:tasks");
}
#[test]
fn repair_classifies_and_cleans_revenue_and_noise_tasks() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO tasks (id, title, status, priority, source, created_at, updated_at) VALUES \
('t1','Bounty: SSV Network','in_progress',85,'\"{\\\"origin\\\":\\\"pg:mentat:tasks\\\",\\\"metadata\\\":{\\\"type\\\":\\\"revenue\\\"}}\"',datetime('now','-2 days'),datetime('now','-2 days')), \
('t2','What is the juice of saphoo?','pending',5,'pg:agentic_bot:tasks',datetime('now'),datetime('now'))",
[],
)
.unwrap();
drop(conn);
assert_eq!(count_task_sources_needing_normalization(&db).unwrap(), 2);
assert_eq!(normalize_task_sources_in_db(&db).unwrap(), 2);
assert_eq!(mark_stale_revenue_tasks_needs_review(&db).unwrap(), 1);
assert_eq!(dismiss_obvious_noise_tasks(&db).unwrap(), 1);
let conn = db.conn();
let t1_status: String = conn
.query_row("SELECT status FROM tasks WHERE id='t1'", [], |row| {
row.get(0)
})
.unwrap();
let t2_status: String = conn
.query_row("SELECT status FROM tasks WHERE id='t2'", [], |row| {
row.get(0)
})
.unwrap();
let t1_source: String = conn
.query_row("SELECT source FROM tasks WHERE id='t1'", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(t1_status, "needs_review");
assert_eq!(t2_status, "dismissed");
assert!(t1_source.contains("\"origin\":\"pg:mentat:tasks\""));
}
#[test]
fn list_open_tasks_with_lifecycle_state() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS task_events ( \
id TEXT PRIMARY KEY, \
task_id TEXT NOT NULL, \
parent_task_id TEXT, \
assigned_to TEXT, \
event_type TEXT NOT NULL, \
summary TEXT, \
detail_json TEXT, \
percentage REAL, \
retry_count INTEGER NOT NULL DEFAULT 0, \
created_at TEXT NOT NULL DEFAULT (datetime('now')) \
);",
)
.unwrap();
conn.execute(
"INSERT INTO tasks (id, title, status, priority, created_at, updated_at) VALUES \
('task-1', 'Test lifecycle task', 'pending', 10, datetime('now'), datetime('now'))",
[],
)
.unwrap();
conn.execute(
"INSERT INTO task_events (id, task_id, assigned_to, event_type, summary, percentage, retry_count, created_at) VALUES \
('evt-1', 'task-1', 'subagent-alpha', 'running', 'Working on it', 42.0, 0, datetime('now'))",
[],
)
.unwrap();
drop(conn);
let tasks = list_open_tasks_with_events(&db).unwrap();
assert_eq!(tasks.len(), 1, "expected one open task");
let t = &tasks[0];
assert_eq!(t["id"].as_str().unwrap(), "task-1");
assert_eq!(t["lifecycle_state"].as_str().unwrap(), "running");
assert_eq!(t["lifecycle_summary"].as_str().unwrap(), "Working on it");
assert!((t["lifecycle_percentage"].as_f64().unwrap() - 42.0).abs() < f64::EPSILON);
assert_eq!(t["assigned_to"].as_str().unwrap(), "subagent-alpha");
}
}