roboticus-db 0.11.4

SQLite persistence layer with 28 tables, FTS5 search, WAL mode, and migration system
Documentation
//! # task_events
//!
//! Task lifecycle event persistence for delegation observability.
//!
//! This module records the full lifecycle of delegated tasks — from initial
//! `pending` state through assignment, execution, retries, and terminal states
//! (`completed`, `failed`, `cancelled`). Each state transition is stored as an
//! immutable event row, enabling audit trails, retry tracking, and live dashboards
//! without in-place mutation of task records.

use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use std::fmt;

// ---------------------------------------------------------------------------
// TaskLifecycleState
// ---------------------------------------------------------------------------

/// The state of a delegated task at a point in time.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskLifecycleState {
    Pending,
    Assigned,
    Running,
    Progress,
    Completed,
    Failed,
    Cancelled,
    Retry,
}

impl TaskLifecycleState {
    /// Returns the canonical lowercase string representation used in the database.
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Pending => "pending",
            Self::Assigned => "assigned",
            Self::Running => "running",
            Self::Progress => "progress",
            Self::Completed => "completed",
            Self::Failed => "failed",
            Self::Cancelled => "cancelled",
            Self::Retry => "retry",
        }
    }

    /// Parses a string into a `TaskLifecycleState`, returning `None` for unrecognized values.
    pub fn from_str_opt(s: &str) -> Option<Self> {
        match s {
            "pending" => Some(Self::Pending),
            "assigned" => Some(Self::Assigned),
            "running" => Some(Self::Running),
            "progress" => Some(Self::Progress),
            "completed" => Some(Self::Completed),
            "failed" => Some(Self::Failed),
            "cancelled" => Some(Self::Cancelled),
            "retry" => Some(Self::Retry),
            _ => None,
        }
    }

    /// Returns `true` if this state is a terminal state (no further transitions expected).
    pub fn is_terminal(&self) -> bool {
        matches!(self, Self::Completed | Self::Failed | Self::Cancelled)
    }
}

impl fmt::Display for TaskLifecycleState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_str())
    }
}

// ---------------------------------------------------------------------------
// TaskEventRow
// ---------------------------------------------------------------------------

/// A single task lifecycle event record persisted in the `task_events` table.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskEventRow {
    pub id: String,
    pub task_id: String,
    pub parent_task_id: Option<String>,
    pub assigned_to: Option<String>,
    pub event_type: TaskLifecycleState,
    pub summary: Option<String>,
    pub detail_json: Option<String>,
    pub percentage: Option<f64>,
    pub retry_count: i32,
    pub created_at: String,
}

// ---------------------------------------------------------------------------
// Shared row mapper
// ---------------------------------------------------------------------------

fn row_to_task_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<TaskEventRow> {
    let raw: String = row.get(4)?;
    let event_type = TaskLifecycleState::from_str_opt(&raw).unwrap_or_else(|| {
        tracing::warn!(
            event_type = %raw,
            "task_events: unrecognized event_type encountered in row; defaulting to Pending"
        );
        TaskLifecycleState::Pending
    });
    Ok(TaskEventRow {
        id: row.get(0)?,
        task_id: row.get(1)?,
        parent_task_id: row.get(2)?,
        assigned_to: row.get(3)?,
        event_type,
        summary: row.get(5)?,
        detail_json: row.get(6)?,
        percentage: row.get(7)?,
        retry_count: row.get(8)?,
        created_at: row.get(9)?,
    })
}

// ---------------------------------------------------------------------------
// Write operations
// ---------------------------------------------------------------------------

/// Maps a [`TaskLifecycleState`] to the corresponding `tasks.status` column
/// value.  Returns `None` for transient events (`Progress`, `Retry`) that
/// should not update the canonical task status.
fn lifecycle_to_task_status(state: TaskLifecycleState) -> Option<&'static str> {
    match state {
        TaskLifecycleState::Pending => Some("pending"),
        TaskLifecycleState::Assigned | TaskLifecycleState::Running => Some("in_progress"),
        TaskLifecycleState::Completed => Some("completed"),
        TaskLifecycleState::Failed => Some("failed"),
        TaskLifecycleState::Cancelled => Some("cancelled"),
        // Progress and Retry are informational — they don't change canonical status.
        TaskLifecycleState::Progress | TaskLifecycleState::Retry => None,
    }
}

/// Inserts a new task lifecycle event row.
///
/// The `row.id` must be a unique identifier (e.g. a UUID).
///
/// **Reverse sync:** When the event represents a meaningful state change (not
/// `Progress` or `Retry`), the corresponding `tasks.status` is also updated
/// if a row with the matching `task_id` exists.  This keeps the `tasks` table
/// and `task_events` stream consistent.
pub fn insert_task_event(db: &Database, row: &TaskEventRow) -> Result<()> {
    let conn = db.conn();
    // Use an explicit created_at when provided (non-empty), otherwise rely on the
    // column DEFAULT of datetime('now').  COALESCE + NULLIF ensures the DEFAULT
    // path is taken for empty strings, which is the normal production path.
    conn.execute(
        "INSERT INTO task_events \
         (id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
          percentage, retry_count, created_at) \
         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, \
                 COALESCE(NULLIF(?10, ''), datetime('now')))",
        rusqlite::params![
            row.id,
            row.task_id,
            row.parent_task_id,
            row.assigned_to,
            row.event_type.as_str(),
            row.summary,
            row.detail_json,
            row.percentage,
            row.retry_count,
            row.created_at,
        ],
    )
    .db_err()?;

    // Reverse sync: keep tasks.status consistent with the event stream.
    if let Some(status) = lifecycle_to_task_status(row.event_type) {
        let _ = conn.execute(
            "UPDATE tasks SET status = ?1, updated_at = datetime('now') WHERE id = ?2",
            rusqlite::params![status, row.task_id],
        );
        // Ignore errors: task_id may be a synthetic pipeline ID (e.g. "{turn_id}-sub-0")
        // that doesn't exist in the tasks table — that's fine.
    }

    Ok(())
}

// ---------------------------------------------------------------------------
// Read operations
// ---------------------------------------------------------------------------

/// Returns all events for a task, ordered by `created_at ASC`.
pub fn task_events_for_task(db: &Database, task_id: &str) -> Result<Vec<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events WHERE task_id = ?1 ORDER BY datetime(created_at) ASC, rowid ASC",
        )
        .db_err()?;
    let rows = stmt.query_map([task_id], row_to_task_event).db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

/// Returns all events assigned to a specific agent, ordered by `created_at DESC`.
pub fn task_events_for_agent(db: &Database, agent_name: &str) -> Result<Vec<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events WHERE assigned_to = ?1 ORDER BY datetime(created_at) DESC, rowid DESC",
        )
        .db_err()?;
    let rows = stmt.query_map([agent_name], row_to_task_event).db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

/// Returns the most recent event for a task, or `None` if no events exist.
pub fn latest_task_event(db: &Database, task_id: &str) -> Result<Option<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events WHERE task_id = ?1 \
             ORDER BY datetime(created_at) DESC, rowid DESC LIMIT 1",
        )
        .db_err()?;
    stmt.query_row([task_id], row_to_task_event)
        .optional()
        .db_err()
}

/// Returns the `event_type` of the most recent event for a task, or `None`.
pub fn current_task_state(db: &Database, task_id: &str) -> Result<Option<TaskLifecycleState>> {
    let conn = db.conn();
    let raw: Option<String> = conn
        .query_row(
            "SELECT event_type FROM task_events WHERE task_id = ?1 \
             ORDER BY datetime(created_at) DESC, rowid DESC LIMIT 1",
            [task_id],
            |row| row.get(0),
        )
        .optional()
        .db_err()?;
    Ok(raw.and_then(|s| {
        let state = TaskLifecycleState::from_str_opt(&s);
        if state.is_none() {
            tracing::warn!(event_type = %s, "current_task_state: unrecognized event_type in DB");
        }
        state
    }))
}

/// Returns the maximum `retry_count` recorded for a task, or `0` if no events exist.
pub fn retry_count_for_task(db: &Database, task_id: &str) -> Result<i32> {
    let conn = db.conn();
    let count: Option<i32> = conn
        .query_row(
            "SELECT MAX(retry_count) FROM task_events WHERE task_id = ?1",
            [task_id],
            |row| row.get(0),
        )
        .optional()
        .db_err()?
        .flatten();
    Ok(count.unwrap_or(0))
}

/// Returns the `limit` most recent task events across all tasks, ordered by `created_at DESC`.
pub fn recent_task_events(db: &Database, limit: i64) -> Result<Vec<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events ORDER BY datetime(created_at) DESC, rowid DESC LIMIT ?1",
        )
        .db_err()?;
    let rows = stmt.query_map([limit], row_to_task_event).db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

/// Returns the latest event for each task whose current state is non-terminal.
///
/// Uses a correlated subquery to find the most recent event per task, then
/// filters to tasks that have not yet reached a terminal state.
pub fn active_task_summaries(db: &Database) -> Result<Vec<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events te \
             WHERE created_at = ( \
                 SELECT MAX(created_at) FROM task_events WHERE task_id = te.task_id \
             ) \
             AND event_type NOT IN ('completed', 'failed', 'cancelled') \
             ORDER BY created_at DESC",
        )
        .db_err()?;
    let rows = stmt.query_map([], row_to_task_event).db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

/// Returns the latest event for each direct subtask of a parent task.
pub fn subtask_events_for_parent(db: &Database, parent_task_id: &str) -> Result<Vec<TaskEventRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, task_id, parent_task_id, assigned_to, event_type, summary, detail_json, \
             percentage, retry_count, created_at \
             FROM task_events te \
             WHERE parent_task_id = ?1 \
               AND rowid = ( \
                   SELECT MAX(rowid) FROM task_events WHERE task_id = te.task_id \
               ) \
             ORDER BY datetime(created_at) DESC, rowid DESC",
        )
        .db_err()?;
    let rows = stmt
        .query_map([parent_task_id], row_to_task_event)
        .db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
#[path = "task_events_tests.rs"]
mod tests;