use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskLifecycleState {
Pending,
Assigned,
Running,
Progress,
Completed,
Failed,
Cancelled,
Retry,
}
impl TaskLifecycleState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Assigned => "assigned",
Self::Running => "running",
Self::Progress => "progress",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
Self::Retry => "retry",
}
}
pub fn from_str_opt(s: &str) -> Option<Self> {
match s {
"pending" => Some(Self::Pending),
"assigned" => Some(Self::Assigned),
"running" => Some(Self::Running),
"progress" => Some(Self::Progress),
"completed" => Some(Self::Completed),
"failed" => Some(Self::Failed),
"cancelled" => Some(Self::Cancelled),
"retry" => Some(Self::Retry),
_ => None,
}
}
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
}
}
impl fmt::Display for TaskLifecycleState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskEventRow {
pub id: String,
pub task_id: String,
pub parent_task_id: Option<String>,
pub assigned_to: Option<String>,
pub event_type: TaskLifecycleState,
pub summary: Option<String>,
pub detail_json: Option<String>,
pub percentage: Option<f64>,
pub retry_count: i32,
pub created_at: String,
}
fn row_to_task_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<TaskEventRow> {
let raw: String = row.get(4)?;
let event_type = TaskLifecycleState::from_str_opt(&raw).unwrap_or_else(|| {
tracing::warn!(
event_type = %raw,
"task_events: unrecognized event_type encountered in row; defaulting to Pending"
);
TaskLifecycleState::Pending
});
Ok(TaskEventRow {
id: row.get(0)?,
task_id: row.get(1)?,
parent_task_id: row.get(2)?,
assigned_to: row.get(3)?,
event_type,
summary: row.get(5)?,
detail_json: row.get(6)?,
percentage: row.get(7)?,
retry_count: row.get(8)?,
created_at: row.get(9)?,
})
}
fn lifecycle_to_task_status(state: TaskLifecycleState) -> Option<&'static str> {
match state {
TaskLifecycleState::Pending => Some("pending"),
TaskLifecycleState::Assigned | TaskLifecycleState::Running => Some("in_progress"),
TaskLifecycleState::Completed => Some("completed"),
TaskLifecycleState::Failed => Some("failed"),
TaskLifecycleState::Cancelled => Some("cancelled"),
TaskLifecycleState::Progress | TaskLifecycleState::Retry => None,
}
}
pub fn insert_task_event(db: &Database, row: &TaskEventRow) -> Result<()> {
let conn = db.conn();
conn.execute(
"INSERT INTO task_events \
(id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, \
COALESCE(NULLIF(?10, ''), datetime('now')))",
rusqlite::params![
row.id,
row.task_id,
row.parent_task_id,
row.assigned_to,
row.event_type.as_str(),
row.summary,
row.detail_json,
row.percentage,
row.retry_count,
row.created_at,
],
)
.db_err()?;
if let Some(status) = lifecycle_to_task_status(row.event_type) {
let _ = conn.execute(
"UPDATE tasks SET status = ?1, updated_at = datetime('now') WHERE id = ?2",
rusqlite::params![status, row.task_id],
);
}
Ok(())
}
pub fn task_events_for_task(db: &Database, task_id: &str) -> Result<Vec<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events WHERE task_id = ?1 ORDER BY datetime(created_at) ASC, rowid ASC",
)
.db_err()?;
let rows = stmt.query_map([task_id], row_to_task_event).db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn task_events_for_agent(db: &Database, agent_name: &str) -> Result<Vec<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events WHERE assigned_to = ?1 ORDER BY datetime(created_at) DESC, rowid DESC",
)
.db_err()?;
let rows = stmt.query_map([agent_name], row_to_task_event).db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn latest_task_event(db: &Database, task_id: &str) -> Result<Option<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events WHERE task_id = ?1 \
ORDER BY datetime(created_at) DESC, rowid DESC LIMIT 1",
)
.db_err()?;
stmt.query_row([task_id], row_to_task_event)
.optional()
.db_err()
}
pub fn current_task_state(db: &Database, task_id: &str) -> Result<Option<TaskLifecycleState>> {
let conn = db.conn();
let raw: Option<String> = conn
.query_row(
"SELECT event_type FROM task_events WHERE task_id = ?1 \
ORDER BY datetime(created_at) DESC, rowid DESC LIMIT 1",
[task_id],
|row| row.get(0),
)
.optional()
.db_err()?;
Ok(raw.and_then(|s| {
let state = TaskLifecycleState::from_str_opt(&s);
if state.is_none() {
tracing::warn!(event_type = %s, "current_task_state: unrecognized event_type in DB");
}
state
}))
}
pub fn retry_count_for_task(db: &Database, task_id: &str) -> Result<i32> {
let conn = db.conn();
let count: Option<i32> = conn
.query_row(
"SELECT MAX(retry_count) FROM task_events WHERE task_id = ?1",
[task_id],
|row| row.get(0),
)
.optional()
.db_err()?
.flatten();
Ok(count.unwrap_or(0))
}
pub fn recent_task_events(db: &Database, limit: i64) -> Result<Vec<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events ORDER BY datetime(created_at) DESC, rowid DESC LIMIT ?1",
)
.db_err()?;
let rows = stmt.query_map([limit], row_to_task_event).db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn active_task_summaries(db: &Database) -> Result<Vec<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events te \
WHERE created_at = ( \
SELECT MAX(created_at) FROM task_events WHERE task_id = te.task_id \
) \
AND event_type NOT IN ('completed', 'failed', 'cancelled') \
ORDER BY created_at DESC",
)
.db_err()?;
let rows = stmt.query_map([], row_to_task_event).db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn subtask_events_for_parent(db: &Database, parent_task_id: &str) -> Result<Vec<TaskEventRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
percentage, retry_count, created_at \
FROM task_events te \
WHERE parent_task_id = ?1 \
AND rowid = ( \
SELECT MAX(rowid) FROM task_events WHERE task_id = te.task_id \
) \
ORDER BY datetime(created_at) DESC, rowid DESC",
)
.db_err()?;
let rows = stmt
.query_map([parent_task_id], row_to_task_event)
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
#[cfg(test)]
#[path = "task_events_tests.rs"]
mod tests;