use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;
#[derive(Debug, Clone)]
pub struct PipelineTraceRow {
pub id: String,
pub turn_id: String,
pub session_id: String,
pub channel: String,
pub total_ms: i64,
pub stages_json: String,
pub created_at: String,
}
pub fn get_pipeline_trace(db: &Database, turn_id: &str) -> Result<Option<PipelineTraceRow>> {
let conn = db.conn();
conn.query_row(
"SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
COALESCE(created_at, '') as created_at \
FROM pipeline_traces WHERE turn_id = ?1",
[turn_id],
|row| {
Ok(PipelineTraceRow {
id: row.get(0)?,
turn_id: row.get(1)?,
session_id: row.get(2)?,
channel: row.get(3)?,
total_ms: row.get(4)?,
stages_json: row.get(5)?,
created_at: row.get(6)?,
})
},
)
.optional()
.db_err()
}
pub fn list_pipeline_traces(
db: &Database,
session_id: &str,
limit: i64,
) -> Result<Vec<PipelineTraceRow>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
COALESCE(created_at, '') as created_at \
FROM pipeline_traces WHERE session_id = ?1 \
ORDER BY created_at DESC LIMIT ?2",
)
.db_err()?;
let rows = stmt
.query_map(rusqlite::params![session_id, limit], |row| {
Ok(PipelineTraceRow {
id: row.get(0)?,
turn_id: row.get(1)?,
session_id: row.get(2)?,
channel: row.get(3)?,
total_ms: row.get(4)?,
stages_json: row.get(5)?,
created_at: row.get(6)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn save_pipeline_trace(db: &Database, row: &PipelineTraceRow) -> Result<()> {
let conn = db.conn();
conn.execute(
"INSERT INTO pipeline_traces \
(id, turn_id, session_id, channel, total_ms, stages_json, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![
row.id,
row.turn_id,
row.session_id,
row.channel,
row.total_ms,
row.stages_json,
row.created_at,
],
)
.db_err()?;
Ok(())
}
pub fn save_react_trace(db: &Database, trace_id: &str, react_json: &str) -> Result<()> {
let conn = db.conn();
conn.execute(
"UPDATE pipeline_traces SET react_trace_json = ?1 WHERE id = ?2",
rusqlite::params![react_json, trace_id],
)
.db_err()?;
Ok(())
}
pub fn get_react_trace(db: &Database, turn_id: &str) -> Result<Option<String>> {
let conn = db.conn();
conn.query_row(
"SELECT react_trace_json FROM pipeline_traces WHERE turn_id = ?1",
[turn_id],
|row| row.get::<_, Option<String>>(0),
)
.optional()
.db_err()
.map(|outer| outer.flatten())
}
#[cfg(test)]
mod tests {
use super::*;
fn test_db() -> Database {
let db = Database::new(":memory:").expect("in-memory db");
{
let conn = db.conn();
conn.execute(
"INSERT INTO sessions (id, agent_id) VALUES ('s-1', 'agent-1')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO turns (id, session_id) VALUES ('t-1', 's-1')",
[],
)
.unwrap();
}
db.conn()
.execute_batch(
"CREATE TABLE IF NOT EXISTS pipeline_traces ( \
id TEXT PRIMARY KEY, \
turn_id TEXT NOT NULL, \
session_id TEXT NOT NULL, \
channel TEXT NOT NULL, \
total_ms INTEGER NOT NULL, \
stages_json TEXT NOT NULL, \
react_trace_json TEXT, \
created_at TEXT NOT NULL DEFAULT (datetime('now')) \
); \
CREATE INDEX IF NOT EXISTS idx_pipeline_traces_turn \
ON pipeline_traces(turn_id); \
CREATE INDEX IF NOT EXISTS idx_pipeline_traces_session \
ON pipeline_traces(session_id);",
)
.expect("pipeline_traces DDL");
db
}
#[test]
fn save_pipeline_trace_inserts_row() {
let db = test_db();
save_pipeline_trace(
&db,
&PipelineTraceRow {
id: "pt-1".into(),
turn_id: "t-1".into(),
session_id: "s-1".into(),
channel: "api".into(),
total_ms: 350,
stages_json: "[]".into(),
created_at: chrono::Utc::now().to_rfc3339(),
},
)
.unwrap();
let count: i64 = db
.conn()
.query_row(
"SELECT COUNT(*) FROM pipeline_traces WHERE id = 'pt-1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1);
}
#[test]
fn save_and_retrieve_react_trace() {
let db = test_db();
save_pipeline_trace(
&db,
&PipelineTraceRow {
id: "pt-1".into(),
turn_id: "t-1".into(),
session_id: "s-1".into(),
channel: "api".into(),
total_ms: 500,
stages_json: "[]".into(),
created_at: chrono::Utc::now().to_rfc3339(),
},
)
.unwrap();
let react_json = r#"{"turn_id":"t-1","steps":[{"type":"tool_call","tool_name":"web_search","parameters_redacted":true,"result_summary":"ok","duration_ms":100,"success":true,"source":"built_in"}]}"#;
save_react_trace(&db, "pt-1", react_json).unwrap();
let retrieved = get_react_trace(&db, "t-1").unwrap();
assert!(retrieved.is_some());
assert!(retrieved.unwrap().contains("web_search"));
}
#[test]
fn get_react_trace_returns_none_when_null() {
let db = test_db();
save_pipeline_trace(
&db,
&PipelineTraceRow {
id: "pt-2".into(),
turn_id: "t-1".into(),
session_id: "s-1".into(),
channel: "api".into(),
total_ms: 100,
stages_json: "[]".into(),
created_at: chrono::Utc::now().to_rfc3339(),
},
)
.unwrap();
let retrieved = get_react_trace(&db, "t-1").unwrap();
assert!(retrieved.is_none());
}
#[test]
fn get_react_trace_returns_none_for_missing_turn() {
let db = test_db();
let retrieved = get_react_trace(&db, "nonexistent-turn").unwrap();
assert!(retrieved.is_none());
}
}