roboticus-db 0.11.1

SQLite persistence layer with 28 tables, FTS5 search, WAL mode, and migration system
Documentation
//! # traces
//!
//! Per-turn pipeline trace persistence.
//!
//! `PipelineTraceRow` stores per-stage timing for a single pipeline turn and is
//! written by the API layer at the end of each turn's execution.
//!
//! Migration 024 creates the `pipeline_traces` table.
//! Migration 027 (`027_flight_recorder.sql`) adds the `react_trace_json TEXT`
//! column for within-inference ReAct detail (tool calls, retrieval snapshots,
//! guard outcomes).

use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;

// ---------------------------------------------------------------------------
// PipelineTraceRow
// ---------------------------------------------------------------------------

/// A single persisted pipeline trace row.
#[derive(Debug, Clone)]
pub struct PipelineTraceRow {
    /// Unique identifier for this trace record.
    pub id: String,
    /// The turn this trace covers.
    pub turn_id: String,
    /// Session the turn belongs to.
    pub session_id: String,
    /// Channel the message arrived on (e.g. `"api"`, `"telegram"`).
    pub channel: String,
    /// Wall-clock duration of the entire pipeline turn in milliseconds.
    pub total_ms: i64,
    /// JSON-serialised array of stage spans produced by `PipelineTrace`.
    pub stages_json: String,
    /// When this trace was recorded.
    pub created_at: String,
}

// ---------------------------------------------------------------------------
// get_pipeline_trace
// ---------------------------------------------------------------------------

/// Retrieve a pipeline trace row by `turn_id`.
///
/// Returns `None` if no trace exists for the given turn.
pub fn get_pipeline_trace(db: &Database, turn_id: &str) -> Result<Option<PipelineTraceRow>> {
    let conn = db.conn();
    conn.query_row(
        "SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
                COALESCE(created_at, '') as created_at \
         FROM pipeline_traces WHERE turn_id = ?1",
        [turn_id],
        |row| {
            Ok(PipelineTraceRow {
                id: row.get(0)?,
                turn_id: row.get(1)?,
                session_id: row.get(2)?,
                channel: row.get(3)?,
                total_ms: row.get(4)?,
                stages_json: row.get(5)?,
                created_at: row.get(6)?,
            })
        },
    )
    .optional()
    .db_err()
}

// ---------------------------------------------------------------------------
// list_pipeline_traces
// ---------------------------------------------------------------------------

/// List pipeline trace rows for a session, most recent first.
///
/// Returns up to `limit` rows ordered by `created_at DESC`.
pub fn list_pipeline_traces(
    db: &Database,
    session_id: &str,
    limit: i64,
) -> Result<Vec<PipelineTraceRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
                    COALESCE(created_at, '') as created_at \
             FROM pipeline_traces WHERE session_id = ?1 \
             ORDER BY created_at DESC LIMIT ?2",
        )
        .db_err()?;
    let rows = stmt
        .query_map(rusqlite::params![session_id, limit], |row| {
            Ok(PipelineTraceRow {
                id: row.get(0)?,
                turn_id: row.get(1)?,
                session_id: row.get(2)?,
                channel: row.get(3)?,
                total_ms: row.get(4)?,
                stages_json: row.get(5)?,
                created_at: row.get(6)?,
            })
        })
        .db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

// ---------------------------------------------------------------------------
// save_pipeline_trace
// ---------------------------------------------------------------------------

/// Persist a pipeline trace row.
pub fn save_pipeline_trace(db: &Database, row: &PipelineTraceRow) -> Result<()> {
    let conn = db.conn();
    conn.execute(
        "INSERT INTO pipeline_traces \
             (id, turn_id, session_id, channel, total_ms, stages_json, created_at) \
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
        rusqlite::params![
            row.id,
            row.turn_id,
            row.session_id,
            row.channel,
            row.total_ms,
            row.stages_json,
            row.created_at,
        ],
    )
    .db_err()?;
    Ok(())
}

// ---------------------------------------------------------------------------
// save_react_trace / get_react_trace
// ---------------------------------------------------------------------------

/// Save the ReAct trace JSON for a pipeline trace.
///
/// `trace_id` must match an existing `pipeline_traces.id`.  The column
/// `react_trace_json` is added by migration 027; calling this function on an
/// un-migrated database will return a runtime error, which is acceptable.
pub fn save_react_trace(db: &Database, trace_id: &str, react_json: &str) -> Result<()> {
    let conn = db.conn();
    conn.execute(
        "UPDATE pipeline_traces SET react_trace_json = ?1 WHERE id = ?2",
        rusqlite::params![react_json, trace_id],
    )
    .db_err()?;
    Ok(())
}

/// Retrieve the ReAct trace JSON for a given turn.
///
/// Queries by `turn_id` (not trace id) since callers typically hold the turn
/// identifier.  Returns `None` if no trace exists for the turn or if the
/// `react_trace_json` column is NULL.
pub fn get_react_trace(db: &Database, turn_id: &str) -> Result<Option<String>> {
    let conn = db.conn();
    conn.query_row(
        "SELECT react_trace_json FROM pipeline_traces WHERE turn_id = ?1",
        [turn_id],
        |row| row.get::<_, Option<String>>(0),
    )
    .optional()
    .db_err()
    // optional() wraps QueryReturnedNoRows as None; flatten the double-Option.
    .map(|outer| outer.flatten())
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    /// Bootstrap an in-memory database with the `pipeline_traces` DDL.
    ///
    /// The embedded schema (version 23) does not include `pipeline_traces`;
    /// that table is created by migration 024 at production startup.  In tests
    /// the migrations directory is not on the search path, so we apply the DDL
    /// inline — mirroring the pattern used in `task_events_tests.rs`.
    ///
    /// Migration 027 adds the `react_trace_json` column, which we include here
    /// so the flight-recorder persistence tests can exercise the full schema.
    fn test_db() -> Database {
        let db = Database::new(":memory:").expect("in-memory db");
        // Seed a session and turn so FK constraints pass (sessions → turns
        // are part of the embedded schema).
        {
            let conn = db.conn();
            conn.execute(
                "INSERT INTO sessions (id, agent_id) VALUES ('s-1', 'agent-1')",
                [],
            )
            .unwrap();
            conn.execute(
                "INSERT INTO turns (id, session_id) VALUES ('t-1', 's-1')",
                [],
            )
            .unwrap();
        }
        // Apply pipeline_traces DDL (migration 024 + 027).
        db.conn()
            .execute_batch(
                "CREATE TABLE IF NOT EXISTS pipeline_traces ( \
                    id TEXT PRIMARY KEY, \
                    turn_id TEXT NOT NULL, \
                    session_id TEXT NOT NULL, \
                    channel TEXT NOT NULL, \
                    total_ms INTEGER NOT NULL, \
                    stages_json TEXT NOT NULL, \
                    react_trace_json TEXT, \
                    created_at TEXT NOT NULL DEFAULT (datetime('now')) \
                ); \
                CREATE INDEX IF NOT EXISTS idx_pipeline_traces_turn \
                    ON pipeline_traces(turn_id); \
                CREATE INDEX IF NOT EXISTS idx_pipeline_traces_session \
                    ON pipeline_traces(session_id);",
            )
            .expect("pipeline_traces DDL");
        db
    }

    #[test]
    fn save_pipeline_trace_inserts_row() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-1".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 350,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        let count: i64 = db
            .conn()
            .query_row(
                "SELECT COUNT(*) FROM pipeline_traces WHERE id = 'pt-1'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(count, 1);
    }

    #[test]
    fn save_and_retrieve_react_trace() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-1".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 500,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        let react_json = r#"{"turn_id":"t-1","steps":[{"type":"tool_call","tool_name":"web_search","parameters_redacted":true,"result_summary":"ok","duration_ms":100,"success":true,"source":"built_in"}]}"#;
        save_react_trace(&db, "pt-1", react_json).unwrap();

        let retrieved = get_react_trace(&db, "t-1").unwrap();
        assert!(retrieved.is_some());
        assert!(retrieved.unwrap().contains("web_search"));
    }

    #[test]
    fn get_react_trace_returns_none_when_null() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-2".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 100,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        // No react trace saved — column should be NULL → returns None.
        let retrieved = get_react_trace(&db, "t-1").unwrap();
        assert!(retrieved.is_none());
    }

    #[test]
    fn get_react_trace_returns_none_for_missing_turn() {
        let db = test_db();
        let retrieved = get_react_trace(&db, "nonexistent-turn").unwrap();
        assert!(retrieved.is_none());
    }
}