roboticus-db 0.11.4

SQLite persistence layer with 28 tables, FTS5 search, WAL mode, and migration system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
//! # traces
//!
//! Per-turn pipeline trace persistence.
//!
//! `PipelineTraceRow` stores per-stage timing for a single pipeline turn and is
//! written by the API layer at the end of each turn's execution.
//!
//! Migration 024 creates the `pipeline_traces` table.
//! Migration 027 (`027_flight_recorder.sql`) adds the `react_trace_json TEXT`
//! column for within-inference ReAct detail (tool calls, retrieval snapshots,
//! guard outcomes).

use crate::{Database, DbResultExt};
use roboticus_core::Result;
use rusqlite::OptionalExtension;

// ---------------------------------------------------------------------------
// PipelineTraceRow
// ---------------------------------------------------------------------------

/// A single persisted pipeline trace row.
#[derive(Debug, Clone)]
pub struct PipelineTraceRow {
    /// Unique identifier for this trace record.
    pub id: String,
    /// The turn this trace covers.
    pub turn_id: String,
    /// Session the turn belongs to.
    pub session_id: String,
    /// Channel the message arrived on (e.g. `"api"`, `"telegram"`).
    pub channel: String,
    /// Wall-clock duration of the entire pipeline turn in milliseconds.
    pub total_ms: i64,
    /// JSON-serialised array of stage spans produced by `PipelineTrace`.
    pub stages_json: String,
    /// When this trace was recorded.
    pub created_at: String,
}

// ---------------------------------------------------------------------------
// get_pipeline_trace
// ---------------------------------------------------------------------------

/// Retrieve a pipeline trace row by `turn_id`.
///
/// Returns `None` if no trace exists for the given turn.
pub fn get_pipeline_trace(db: &Database, turn_id: &str) -> Result<Option<PipelineTraceRow>> {
    let conn = db.conn();
    conn.query_row(
        "SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
                COALESCE(created_at, '') as created_at \
         FROM pipeline_traces WHERE turn_id = ?1",
        [turn_id],
        |row| {
            Ok(PipelineTraceRow {
                id: row.get(0)?,
                turn_id: row.get(1)?,
                session_id: row.get(2)?,
                channel: row.get(3)?,
                total_ms: row.get(4)?,
                stages_json: row.get(5)?,
                created_at: row.get(6)?,
            })
        },
    )
    .optional()
    .db_err()
}

// ---------------------------------------------------------------------------
// list_pipeline_traces
// ---------------------------------------------------------------------------

/// List pipeline trace rows for a session, most recent first.
///
/// Returns up to `limit` rows ordered by `created_at DESC`.
pub fn list_pipeline_traces(
    db: &Database,
    session_id: &str,
    limit: i64,
) -> Result<Vec<PipelineTraceRow>> {
    let conn = db.conn();
    let mut stmt = conn
        .prepare(
            "SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
                    COALESCE(created_at, '') as created_at \
             FROM pipeline_traces WHERE session_id = ?1 \
             ORDER BY created_at DESC LIMIT ?2",
        )
        .db_err()?;
    let rows = stmt
        .query_map(rusqlite::params![session_id, limit], |row| {
            Ok(PipelineTraceRow {
                id: row.get(0)?,
                turn_id: row.get(1)?,
                session_id: row.get(2)?,
                channel: row.get(3)?,
                total_ms: row.get(4)?,
                stages_json: row.get(5)?,
                created_at: row.get(6)?,
            })
        })
        .db_err()?;
    rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}

// ---------------------------------------------------------------------------
// save_pipeline_trace
// ---------------------------------------------------------------------------

/// Persist a pipeline trace row.
pub fn save_pipeline_trace(db: &Database, row: &PipelineTraceRow) -> Result<()> {
    let conn = db.conn();
    conn.execute(
        "INSERT INTO pipeline_traces \
             (id, turn_id, session_id, channel, total_ms, stages_json, created_at) \
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
        rusqlite::params![
            row.id,
            row.turn_id,
            row.session_id,
            row.channel,
            row.total_ms,
            row.stages_json,
            row.created_at,
        ],
    )
    .db_err()?;
    Ok(())
}

// ---------------------------------------------------------------------------
// save_react_trace / get_react_trace
// ---------------------------------------------------------------------------

/// Save the ReAct trace JSON for a pipeline trace.
///
/// `trace_id` must match an existing `pipeline_traces.id`.  The column
/// `react_trace_json` is added by migration 027; calling this function on an
/// un-migrated database will return a runtime error, which is acceptable.
pub fn save_react_trace(db: &Database, trace_id: &str, react_json: &str) -> Result<()> {
    let conn = db.conn();
    conn.execute(
        "UPDATE pipeline_traces SET react_trace_json = ?1 WHERE id = ?2",
        rusqlite::params![react_json, trace_id],
    )
    .db_err()?;
    Ok(())
}

/// Retrieve the ReAct trace JSON for a given turn.
///
/// Queries by `turn_id` (not trace id) since callers typically hold the turn
/// identifier.  Returns `None` if no trace exists for the turn or if the
/// `react_trace_json` column is NULL.
pub fn get_react_trace(db: &Database, turn_id: &str) -> Result<Option<String>> {
    let conn = db.conn();
    conn.query_row(
        "SELECT react_trace_json FROM pipeline_traces WHERE turn_id = ?1",
        [turn_id],
        |row| row.get::<_, Option<String>>(0),
    )
    .optional()
    .db_err()
    // optional() wraps QueryReturnedNoRows as None; flatten the double-Option.
    .map(|outer| outer.flatten())
}

/// Save inference parameters (model, temperature, max_tokens) alongside a trace.
///
/// Called after inference completes. Migration 033 adds the column.
pub fn save_inference_params(db: &Database, trace_id: &str, params_json: &str) -> Result<()> {
    let conn = db.conn();
    conn.execute(
        "UPDATE pipeline_traces SET inference_params_json = ?1 WHERE id = ?2",
        rusqlite::params![params_json, trace_id],
    )
    .db_err()?;
    Ok(())
}

/// Retrieve the full trace (stages + react + inference params) for export.
pub fn get_full_trace_for_export(db: &Database, turn_id: &str) -> Result<Option<FullTraceExport>> {
    let conn = db.conn();
    conn.query_row(
        "SELECT id, turn_id, session_id, channel, total_ms, stages_json, \
                react_trace_json, inference_params_json, created_at \
         FROM pipeline_traces WHERE turn_id = ?1",
        [turn_id],
        |row| {
            Ok(FullTraceExport {
                id: row.get(0)?,
                turn_id: row.get(1)?,
                session_id: row.get(2)?,
                channel: row.get(3)?,
                total_ms: row.get(4)?,
                stages_json: row.get(5)?,
                react_trace_json: row.get(6)?,
                inference_params_json: row.get(7)?,
                created_at: row.get(8)?,
            })
        },
    )
    .optional()
    .db_err()
}

/// Export-ready trace row with all available data.
#[derive(Debug, serde::Serialize)]
pub struct FullTraceExport {
    pub id: String,
    pub turn_id: String,
    pub session_id: String,
    pub channel: String,
    pub total_ms: i64,
    pub stages_json: String,
    pub react_trace_json: Option<String>,
    pub inference_params_json: Option<String>,
    pub created_at: String,
}

// ---------------------------------------------------------------------------
// search_traces
// ---------------------------------------------------------------------------

/// Summary row returned by `search_traces`.
#[derive(Debug, serde::Serialize)]
pub struct TraceSearchResult {
    pub turn_id: String,
    pub session_id: String,
    pub channel: String,
    pub total_ms: i64,
    pub created_at: String,
    pub stages_json: String,
}

/// Search traces by optional filters: tool name (in stages JSON), guard name,
/// minimum duration, and a `since` timestamp. Results ordered newest-first.
pub fn search_traces(
    db: &Database,
    tool_name: Option<&str>,
    guard_name: Option<&str>,
    min_duration_ms: Option<i64>,
    since: Option<&str>,
    limit: usize,
) -> Result<Vec<TraceSearchResult>> {
    let mut sql = String::from(
        "SELECT turn_id, session_id, channel, total_ms, created_at, stages_json \
         FROM pipeline_traces WHERE 1=1",
    );
    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();

    if let Some(tool) = tool_name {
        sql.push_str(" AND stages_json LIKE ?");
        params.push(Box::new(format!("%{tool}%")));
    }
    if let Some(guard) = guard_name {
        sql.push_str(" AND (react_trace_json LIKE ? OR stages_json LIKE ?)");
        params.push(Box::new(format!("%{guard}%")));
        params.push(Box::new(format!("%{guard}%")));
    }
    if let Some(ms) = min_duration_ms {
        sql.push_str(" AND total_ms >= ?");
        params.push(Box::new(ms));
    }
    if let Some(s) = since {
        sql.push_str(" AND created_at >= ?");
        params.push(Box::new(s.to_string()));
    }
    sql.push_str(" ORDER BY created_at DESC LIMIT ?");
    params.push(Box::new(limit as i64));

    let conn = db.conn();
    let mut stmt = conn.prepare(&sql).db_err()?;
    let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
    let rows = stmt
        .query_map(param_refs.as_slice(), |row| {
            Ok(TraceSearchResult {
                turn_id: row.get(0)?,
                session_id: row.get(1)?,
                channel: row.get(2)?,
                total_ms: row.get(3)?,
                created_at: row.get(4)?,
                stages_json: row.get(5)?,
            })
        })
        .db_err()?
        .collect::<std::result::Result<Vec<_>, _>>()
        .db_err()?;
    Ok(rows)
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    /// Bootstrap an in-memory database with the `pipeline_traces` DDL.
    ///
    /// The embedded schema (version 23) does not include `pipeline_traces`;
    /// that table is created by migration 024 at production startup.  In tests
    /// the migrations directory is not on the search path, so we apply the DDL
    /// inline — mirroring the pattern used in `task_events_tests.rs`.
    ///
    /// Migration 027 adds the `react_trace_json` column, which we include here
    /// so the flight-recorder persistence tests can exercise the full schema.
    fn test_db() -> Database {
        let db = Database::new(":memory:").expect("in-memory db");
        // Seed a session and turn so FK constraints pass (sessions → turns
        // are part of the embedded schema).
        {
            let conn = db.conn();
            conn.execute(
                "INSERT INTO sessions (id, agent_id) VALUES ('s-1', 'agent-1')",
                [],
            )
            .unwrap();
            conn.execute(
                "INSERT INTO turns (id, session_id) VALUES ('t-1', 's-1')",
                [],
            )
            .unwrap();
        }
        // Apply pipeline_traces DDL (migration 024 + 027).
        db.conn()
            .execute_batch(
                "CREATE TABLE IF NOT EXISTS pipeline_traces ( \
                    id TEXT PRIMARY KEY, \
                    turn_id TEXT NOT NULL, \
                    session_id TEXT NOT NULL, \
                    channel TEXT NOT NULL, \
                    total_ms INTEGER NOT NULL, \
                    stages_json TEXT NOT NULL, \
                    react_trace_json TEXT, \
                    created_at TEXT NOT NULL DEFAULT (datetime('now')) \
                ); \
                CREATE INDEX IF NOT EXISTS idx_pipeline_traces_turn \
                    ON pipeline_traces(turn_id); \
                CREATE INDEX IF NOT EXISTS idx_pipeline_traces_session \
                    ON pipeline_traces(session_id);",
            )
            .expect("pipeline_traces DDL");
        db
    }

    #[test]
    fn save_pipeline_trace_inserts_row() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-1".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 350,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        let count: i64 = db
            .conn()
            .query_row(
                "SELECT COUNT(*) FROM pipeline_traces WHERE id = 'pt-1'",
                [],
                |row| row.get(0),
            )
            .unwrap();
        assert_eq!(count, 1);
    }

    #[test]
    fn save_and_retrieve_react_trace() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-1".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 500,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        let react_json = r#"{"turn_id":"t-1","steps":[{"type":"tool_call","tool_name":"web_search","parameters_redacted":true,"result_summary":"ok","duration_ms":100,"success":true,"source":"built_in"}]}"#;
        save_react_trace(&db, "pt-1", react_json).unwrap();

        let retrieved = get_react_trace(&db, "t-1").unwrap();
        assert!(retrieved.is_some());
        assert!(retrieved.unwrap().contains("web_search"));
    }

    #[test]
    fn get_react_trace_returns_none_when_null() {
        let db = test_db();
        save_pipeline_trace(
            &db,
            &PipelineTraceRow {
                id: "pt-2".into(),
                turn_id: "t-1".into(),
                session_id: "s-1".into(),
                channel: "api".into(),
                total_ms: 100,
                stages_json: "[]".into(),
                created_at: chrono::Utc::now().to_rfc3339(),
            },
        )
        .unwrap();

        // No react trace saved — column should be NULL → returns None.
        let retrieved = get_react_trace(&db, "t-1").unwrap();
        assert!(retrieved.is_none());
    }

    #[test]
    fn get_react_trace_returns_none_for_missing_turn() {
        let db = test_db();
        let retrieved = get_react_trace(&db, "nonexistent-turn").unwrap();
        assert!(retrieved.is_none());
    }
}