use crate::Database;
use roboticus_core::Result;
use serde_json::Value;
pub use crate::revenue_strategy_summary::revenue_strategy_summary as strategy_summary;
pub use crate::revenue_strategy_summary::revenue_strategy_profitability as strategy_profitability;
pub use crate::revenue_strategy_summary::revenue_audit_log as audit_log;
pub use crate::revenue_feedback::revenue_feedback_summary_by_strategy as feedback_summary;
pub fn pipeline_health(db: &Database) -> Result<Value> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT status, COUNT(*) AS cnt \
FROM revenue_opportunities \
GROUP BY status \
ORDER BY cnt DESC",
)
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let status_counts: Vec<Value> = stmt
.query_map([], |row| {
Ok(serde_json::json!({
"status": row.get::<_, String>(0)?,
"count": row.get::<_, i64>(1)?,
}))
})
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let stale_intake: i64 = conn
.query_row(
"SELECT COUNT(*) FROM revenue_opportunities \
WHERE status = 'intake' \
AND created_at < datetime('now', '-24 hours')",
[],
|row| row.get(0),
)
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let mut stmt2 = conn
.prepare(
"SELECT strategy, COUNT(*) AS total \
FROM revenue_opportunities \
GROUP BY strategy \
HAVING SUM(CASE WHEN status = 'settled' THEN 1 ELSE 0 END) = 0 \
ORDER BY total DESC \
LIMIT 50",
)
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let zero_settlement_strategies: Vec<Value> = stmt2
.query_map([], |row| {
Ok(serde_json::json!({
"strategy": row.get::<_, String>(0)?,
"total_jobs": row.get::<_, i64>(1)?,
}))
})
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
Ok(serde_json::json!({
"status_distribution": status_counts,
"stale_intake_count": stale_intake,
"zero_settlement_strategies": zero_settlement_strategies,
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pipeline_health_with_mixed_statuses() {
let db = Database::new(":memory:").unwrap();
let conn = db.conn();
conn.execute(
"INSERT INTO revenue_opportunities (id, source, strategy, payload_json, expected_revenue_usdc, status) \
VALUES ('ro_1','a','oracle_feed','{}',5.0,'intake')",
[],
).unwrap();
conn.execute(
"INSERT INTO revenue_opportunities (id, source, strategy, payload_json, expected_revenue_usdc, status, settled_amount_usdc) \
VALUES ('ro_2','b','oracle_feed','{}',10.0,'settled',10.0)",
[],
).unwrap();
conn.execute(
"INSERT INTO revenue_opportunities (id, source, strategy, payload_json, expected_revenue_usdc, status) \
VALUES ('ro_3','c','micro_bounty','{}',2.0,'rejected')",
[],
).unwrap();
drop(conn);
let health = pipeline_health(&db).unwrap();
let statuses = health["status_distribution"].as_array().unwrap();
assert_eq!(statuses.len(), 3);
let zero = health["zero_settlement_strategies"].as_array().unwrap();
assert_eq!(zero.len(), 1);
assert_eq!(zero[0]["strategy"], "micro_bounty");
}
#[test]
fn pipeline_health_empty_db() {
let db = Database::new(":memory:").unwrap();
let health = pipeline_health(&db).unwrap();
assert_eq!(health["stale_intake_count"], 0);
assert!(health["status_distribution"].as_array().unwrap().is_empty());
}
}