Skip to main content

oxios_kernel/
agent_log_db.rs

1//! Agent history log — SQLite-backed query engine for past agent records.
2//!
3//! # Architecture
4//!
5//! Two-tier storage:
6//! - **Filesystem JSON** (`state/agents/<uuid>.json`): source of truth.
7//!   Human-readable, backup-friendly, rebuildable.
8//! - **SQLite** (`state/agent_log.db`): query index with indexes, FTS5.
9//!   Fast filtering, sorting, search, aggregation.
10//!
11//! SQLite DB is rebuildable from filesystem JSON at any time
12//! via [`AgentLogDb::reindex_all`].
13//!
14//! # Feature gate
15//!
16//! When `sqlite-memory` feature is disabled, all query operations
17//! fall back to filesystem-only scan mode. Degraded but functional.
18
19use std::path::Path;
20
21use anyhow::{Context, Result};
22use chrono::{DateTime, Utc};
23
24use crate::config::AgentLogConfig;
25use crate::state_store::StateStore;
26use crate::types::{AgentInfo, AgentStatus, ToolCallRecord};
27
28// ===========================================================================
29// Filter / Query Types
30// ===========================================================================
31
32/// Field to search against.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum SearchField {
35    All,
36    Name,
37    Error,
38    ToolName,
39    ToolOutput,
40}
41
42impl SearchField {
43    #[allow(dead_code)]
44    fn as_str(&self) -> &'static str {
45        match self {
46            Self::All => "all",
47            Self::Name => "name",
48            Self::Error => "error",
49            Self::ToolName => "tool_name",
50            Self::ToolOutput => "tool_output",
51        }
52    }
53
54    /// Lenient best-effort parser with a default fallback (unknown → All).
55    /// Not a `FromStr` impl because it is infallible by design.
56    #[allow(clippy::should_implement_trait)]
57    pub fn from_str(s: &str) -> Self {
58        match s {
59            "name" => Self::Name,
60            "error" => Self::Error,
61            "tool_name" => Self::ToolName,
62            "tool_output" => Self::ToolOutput,
63            _ => Self::All,
64        }
65    }
66}
67
68/// Sort field.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum SortBy {
71    CreatedAt,
72    Cost,
73    Duration,
74    Tokens,
75    Name,
76}
77
78impl SortBy {
79    pub fn as_str(&self) -> &'static str {
80        match self {
81            Self::CreatedAt => "created_at",
82            Self::Cost => "cost_usd",
83            Self::Duration => "duration_secs",
84            Self::Tokens => "tokens_total",
85            Self::Name => "name",
86        }
87    }
88
89    /// Lenient best-effort parser with a default fallback (unknown → CreatedAt).
90    #[allow(clippy::should_implement_trait)]
91    pub fn from_str(s: &str) -> Self {
92        match s {
93            "cost" => Self::Cost,
94            "duration" => Self::Duration,
95            "tokens" => Self::Tokens,
96            "name" => Self::Name,
97            _ => Self::CreatedAt,
98        }
99    }
100}
101
102/// Sort direction.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum SortDir {
105    Asc,
106    Desc,
107}
108
109impl SortDir {
110    pub fn as_str(&self) -> &'static str {
111        match self {
112            Self::Asc => "ASC",
113            Self::Desc => "DESC",
114        }
115    }
116
117    /// Lenient best-effort parser with a default fallback (unknown → Desc).
118    #[allow(clippy::should_implement_trait)]
119    pub fn from_str(s: &str) -> Self {
120        match s {
121            "asc" => Self::Asc,
122            _ => Self::Desc,
123        }
124    }
125}
126
127/// Full filter specification for querying agent history.
128#[derive(Debug, Clone)]
129pub struct AgentListFilter {
130    pub q: Option<String>,
131    pub search_field: SearchField,
132    pub status: Option<AgentStatusFilter>,
133    pub session_id: Option<String>,
134    pub project_id: Option<String>,
135    pub seed_id: Option<String>,
136    pub model_id: Option<String>,
137    pub tool: Option<String>,
138    pub has_error: Option<bool>,
139    pub date_from: Option<DateTime<Utc>>,
140    pub date_to: Option<DateTime<Utc>>,
141    pub cost_min: Option<f64>,
142    pub cost_max: Option<f64>,
143    pub tokens_min: Option<u64>,
144    pub tokens_max: Option<u64>,
145    pub duration_min: Option<u64>,
146    pub duration_max: Option<u64>,
147    pub sort_by: SortBy,
148    pub sort_dir: SortDir,
149    pub page: u32,
150    pub per_page: u32,
151}
152
153impl Default for AgentListFilter {
154    fn default() -> Self {
155        Self {
156            q: None,
157            search_field: SearchField::All,
158            status: None,
159            session_id: None,
160            project_id: None,
161            seed_id: None,
162            model_id: None,
163            tool: None,
164            has_error: None,
165            date_from: None,
166            date_to: None,
167            cost_min: None,
168            cost_max: None,
169            tokens_min: None,
170            tokens_max: None,
171            duration_min: None,
172            duration_max: None,
173            sort_by: SortBy::CreatedAt,
174            sort_dir: SortDir::Desc,
175            page: 1,
176            per_page: 50,
177        }
178    }
179}
180
181/// Single status value that can be used to filter.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum AgentStatusFilter {
184    Running,
185    Completed,
186    Failed,
187    Stopped,
188    Starting,
189    Idle,
190}
191
192impl AgentStatusFilter {
193    pub fn as_sql(&self) -> &'static str {
194        match self {
195            Self::Running => "running",
196            Self::Completed => "completed",
197            Self::Failed => "failed",
198            Self::Stopped => "stopped",
199            Self::Starting => "starting",
200            Self::Idle => "idle",
201        }
202    }
203
204    /// Lenient best-effort parser; unknown values map to `None`.
205    #[allow(clippy::should_implement_trait)]
206    pub fn from_str(s: &str) -> Option<Self> {
207        match s {
208            "running" => Some(Self::Running),
209            "completed" => Some(Self::Completed),
210            "failed" => Some(Self::Failed),
211            "stopped" => Some(Self::Stopped),
212            "starting" => Some(Self::Starting),
213            "idle" => Some(Self::Idle),
214            _ => None,
215        }
216    }
217}
218
219// ===========================================================================
220// Filtered stats
221// ===========================================================================
222
223/// Aggregated stats computed from a query result set.
224#[derive(Debug, Clone, Default)]
225pub struct FilteredStats {
226    pub total_cost_usd: f64,
227    pub total_tokens: u64,
228    pub avg_duration_secs: f64,
229    pub count_running: u64,
230    pub count_completed: u64,
231    pub count_failed: u64,
232}
233
234/// Global aggregate stats (unfiltered).
235#[derive(Debug, Clone, Default)]
236pub struct AgentStats {
237    pub total_agents: u64,
238    pub running: u64,
239    pub completed: u64,
240    pub failed: u64,
241    pub total_cost_usd: f64,
242    pub total_tokens: u64,
243    pub total_duration_secs: u64,
244    pub avg_duration_secs: f64,
245    pub avg_cost_usd: f64,
246    pub total_sessions: u64,
247    pub oldest_agent_at: Option<DateTime<Utc>>,
248    pub newest_agent_at: Option<DateTime<Utc>>,
249}
250
251/// Query result with pagination and filtered stats.
252#[derive(Debug, Clone)]
253pub struct QueryResult {
254    pub items: Vec<AgentInfo>,
255    pub total: u64,
256    pub page: u32,
257    pub per_page: u32,
258    pub total_pages: u32,
259    pub stats: FilteredStats,
260}
261
262// ===========================================================================
263// Rebuild report
264// ===========================================================================
265
266#[derive(Debug, Clone, Default)]
267pub struct RebuildReport {
268    pub reindexed: u64,
269    pub orphaned: u64,
270    pub errors: u64,
271}
272
273// ===========================================================================
274// AgentLogDb
275// ===========================================================================
276
277/// SQLite-backed agent history query engine.
278///
279/// Interior mutability via `parking_lot::Mutex` so all methods take `&self`,
280/// compatible with `Arc<AgentLogDb>` shared across tokio tasks.
281#[cfg(feature = "sqlite-memory")]
282pub struct AgentLogDb {
283    conn: parking_lot::Mutex<rusqlite::Connection>,
284}
285
286#[cfg(feature = "sqlite-memory")]
287impl AgentLogDb {
288    pub fn open(path: &Path) -> Result<Self> {
289        let conn = rusqlite::Connection::open(path)
290            .with_context(|| format!("Failed to open agent log database at {}", path.display()))?;
291        let db = Self {
292            conn: parking_lot::Mutex::new(conn),
293        };
294        db.migrate()?;
295        db.configure_wal()?;
296        Ok(db)
297    }
298
299    fn migrate(&self) -> Result<()> {
300        let conn = self.conn.lock();
301        conn.execute_batch(
302            "
303            CREATE TABLE IF NOT EXISTS agents (
304                id              TEXT PRIMARY KEY,
305                name            TEXT NOT NULL,
306                status          TEXT NOT NULL,
307                created_at      TEXT NOT NULL,
308                started_at      TEXT,
309                completed_at    TEXT,
310                session_id      TEXT,
311                seed_id         TEXT,
312                project_id      TEXT,
313                model_id        TEXT NOT NULL DEFAULT '',
314                error           TEXT,
315                steps_completed INTEGER NOT NULL DEFAULT 0,
316                steps_total     INTEGER,
317                tokens_input    INTEGER NOT NULL DEFAULT 0,
318                tokens_output   INTEGER NOT NULL DEFAULT 0,
319                cost_usd        REAL NOT NULL DEFAULT 0.0,
320                duration_secs   INTEGER
321            );
322
323            CREATE TABLE IF NOT EXISTS agent_tool_calls (
324                id          INTEGER PRIMARY KEY AUTOINCREMENT,
325                agent_id    TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
326                seq         INTEGER NOT NULL,
327                tool_name   TEXT NOT NULL,
328                input       TEXT NOT NULL DEFAULT '',
329                output      TEXT NOT NULL DEFAULT '',
330                duration_ms INTEGER NOT NULL DEFAULT 0,
331                is_error    INTEGER NOT NULL DEFAULT 0,
332                timestamp   TEXT,
333                tool_call_id TEXT NOT NULL DEFAULT ''
334            );
335
336            CREATE INDEX IF NOT EXISTS idx_agents_status_created ON agents(status, created_at DESC);
337            CREATE INDEX IF NOT EXISTS idx_agents_session     ON agents(session_id);
338            CREATE INDEX IF NOT EXISTS idx_agents_project     ON agents(project_id);
339            CREATE INDEX IF NOT EXISTS idx_agents_seed        ON agents(seed_id);
340            CREATE INDEX IF NOT EXISTS idx_agents_model       ON agents(model_id);
341            CREATE INDEX IF NOT EXISTS idx_agents_cost        ON agents(cost_usd);
342            CREATE INDEX IF NOT EXISTS idx_agents_duration    ON agents(duration_secs);
343            CREATE INDEX IF NOT EXISTS idx_agents_name        ON agents(name);
344            CREATE INDEX IF NOT EXISTS idx_tool_calls_agent   ON agent_tool_calls(agent_id, seq);
345            CREATE INDEX IF NOT EXISTS idx_tool_calls_name    ON agent_tool_calls(tool_name);
346
347            CREATE TABLE IF NOT EXISTS agent_log_meta (
348                key   TEXT PRIMARY KEY,
349                value TEXT NOT NULL
350            );
351        ",
352        )
353        .context("Failed to run agent log SQLite migration")?;
354
355        // FTS5 virtual table (separately, catch errors if FTS5 not available)
356        let _ = conn.execute_batch(
357            "CREATE VIRTUAL TABLE IF NOT EXISTS agent_tool_calls_fts USING fts5(
358                tool_name, input, output,
359                content='agent_tool_calls',
360                content_rowid='id'
361            );",
362        );
363
364        Ok(())
365    }
366
367    fn configure_wal(&self) -> Result<()> {
368        let conn = self.conn.lock();
369        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
370            .context("Failed to configure WAL mode")
371    }
372
373    // ── Upsert ──────────────────────────────────────────────────────
374
375    pub fn upsert_agent(&self, info: &AgentInfo) -> Result<()> {
376        let mut conn = self.conn.lock();
377        let tx = conn
378            .transaction()
379            .context("Failed to begin agent upsert transaction")?;
380
381        let duration_secs = match (info.started_at, info.completed_at) {
382            (Some(start), Some(end)) => Some((end - start).num_seconds().max(0)),
383            _ => None,
384        };
385
386        tx.execute(
387            "INSERT INTO agents (id, name, status, created_at, started_at, completed_at,
388                 session_id, seed_id, project_id, model_id, error,
389                 steps_completed, steps_total, tokens_input, tokens_output, cost_usd, duration_secs)
390             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)
391             ON CONFLICT(id) DO UPDATE SET
392                 status=excluded.status,
393                 completed_at=excluded.completed_at,
394                 error=excluded.error,
395                 steps_completed=excluded.steps_completed,
396                 steps_total=excluded.steps_total,
397                 tokens_input=excluded.tokens_input,
398                 tokens_output=excluded.tokens_output,
399                 cost_usd=excluded.cost_usd,
400                 duration_secs=excluded.duration_secs",
401            rusqlite::params![
402                info.id.to_string(),
403                info.name,
404                info.status.to_string(),
405                info.created_at.to_rfc3339(),
406                info.started_at.map(|t| t.to_rfc3339()),
407                info.completed_at.map(|t| t.to_rfc3339()),
408                info.session_id,
409                info.seed_id.map(|s| s.to_string()),
410                info.project_id.map(|p| p.to_string()),
411                info.model_id,
412                info.error,
413                info.steps_completed as i64,
414                info.steps_total.map(|s| s as i64),
415                info.tokens_input as i64,
416                info.tokens_output as i64,
417                info.cost_usd,
418                duration_secs,
419            ],
420        )?;
421
422        // Replace tool calls
423        tx.execute(
424            "DELETE FROM agent_tool_calls WHERE agent_id = ?1",
425            rusqlite::params![info.id.to_string()],
426        )?;
427
428        for (i, tc) in info.tool_calls.iter().enumerate() {
429            tx.execute(
430                "INSERT INTO agent_tool_calls (agent_id, seq, tool_name, input, output,
431                     duration_ms, is_error, timestamp, tool_call_id)
432                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
433                rusqlite::params![
434                    info.id.to_string(),
435                    i as i64,
436                    tc.tool,
437                    tc.input,
438                    tc.output,
439                    tc.duration_ms as i64,
440                    tc.is_error as i64,
441                    tc.timestamp.map(|t| t.to_rfc3339()),
442                    tc.tool_call_id,
443                ],
444            )?;
445        }
446
447        tx.commit().context("Failed to commit agent upsert")?;
448
449        // Rebuild FTS
450        let _ = conn.execute_batch(
451            "INSERT INTO agent_tool_calls_fts(agent_tool_calls_fts) VALUES('rebuild');",
452        );
453
454        Ok(())
455    }
456
457    // ── Query ───────────────────────────────────────────────────────
458
459    pub fn query(&self, filter: &AgentListFilter) -> Result<QueryResult> {
460        let (where_clause, params) = self.build_where(filter);
461        let offset = ((filter.page.max(1) - 1) * filter.per_page) as i64;
462        let limit = filter.per_page.min(200) as i64;
463
464        let conn = self.conn.lock();
465
466        // Count
467        let count_sql = format!("SELECT COUNT(*) FROM agents WHERE {}", where_clause);
468        let total: u64 = conn
469            .query_row(
470                &count_sql,
471                rusqlite::params_from_iter(params.iter()),
472                |row| row.get(0),
473            )
474            .context("Failed to count agents")?;
475
476        // Data
477        let safe_sort_col = filter.sort_by.as_str();
478        let safe_sort_dir = filter.sort_dir.as_str();
479        let param_count = params.len();
480        let data_sql = format!(
481            "SELECT * FROM agents WHERE {} ORDER BY {} {} LIMIT ?{} OFFSET ?{}",
482            where_clause,
483            safe_sort_col,
484            safe_sort_dir,
485            param_count + 1,
486            param_count + 2,
487        );
488
489        let mut stmt = conn.prepare(&data_sql)?;
490        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = params
491            .into_iter()
492            .map(|p| -> Box<dyn rusqlite::types::ToSql> { p })
493            .collect();
494        all_params.push(Box::new(limit));
495        all_params.push(Box::new(offset));
496
497        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
498            all_params.iter().map(|p| p.as_ref()).collect();
499
500        let items: Vec<AgentInfo> = stmt
501            .query_map(param_refs.as_slice(), Self::row_to_agent)?
502            .collect::<Result<Vec<_>, _>>()
503            .context("Failed to collect agent query results")?;
504
505        // Stats for filtered set
506        let stats = self.filtered_stats_inner(filter, &conn)?;
507
508        let total_pages = if total == 0 {
509            1
510        } else {
511            ((total as f64) / filter.per_page as f64).ceil() as u32
512        };
513
514        Ok(QueryResult {
515            items,
516            total,
517            page: filter.page,
518            per_page: filter.per_page,
519            total_pages,
520            stats,
521        })
522    }
523
524    pub fn stats(&self) -> Result<AgentStats> {
525        let conn = self.conn.lock();
526
527        let mut s = AgentStats::default();
528
529        let row = conn
530            .query_row(
531                "SELECT
532                    COUNT(*) as total,
533                    COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
534                    COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
535                    COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0),
536                    COALESCE(SUM(cost_usd), 0.0),
537                    COALESCE(SUM(tokens_input + tokens_output), 0),
538                    COALESCE(SUM(duration_secs), 0),
539                    COALESCE(AVG(duration_secs), 0.0),
540                    COALESCE(AVG(cost_usd), 0.0),
541                    COUNT(DISTINCT session_id),
542                    MIN(created_at),
543                    MAX(created_at)
544                 FROM agents",
545                [],
546                |row| {
547                    Ok((
548                        row.get::<_, i64>(0)? as u64,
549                        row.get::<_, i64>(1)? as u64,
550                        row.get::<_, i64>(2)? as u64,
551                        row.get::<_, i64>(3)? as u64,
552                        row.get::<_, f64>(4)?,
553                        row.get::<_, i64>(5)? as u64,
554                        row.get::<_, i64>(6)? as u64,
555                        row.get::<_, f64>(7)?,
556                        row.get::<_, f64>(8)?,
557                        row.get::<_, i64>(9)? as u64,
558                        row.get::<_, Option<String>>(10)?,
559                        row.get::<_, Option<String>>(11)?,
560                    ))
561                },
562            )
563            .context("Failed to query agent stats")?;
564
565        s.total_agents = row.0;
566        s.running = row.1;
567        s.completed = row.2;
568        s.failed = row.3;
569        s.total_cost_usd = row.4;
570        s.total_tokens = row.5;
571        s.total_duration_secs = row.6;
572        s.avg_duration_secs = if s.total_agents > 0 { row.7 } else { 0.0 };
573        s.avg_cost_usd = if s.total_agents > 0 { row.8 } else { 0.0 };
574        s.total_sessions = row.9;
575        s.oldest_agent_at = row
576            .10
577            .as_deref()
578            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
579            .map(|dt| dt.with_timezone(&Utc));
580        s.newest_agent_at = row
581            .11
582            .as_deref()
583            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
584            .map(|dt| dt.with_timezone(&Utc));
585
586        Ok(s)
587    }
588
589    pub fn get(&self, id: &str) -> Result<Option<AgentInfo>> {
590        let conn = self.conn.lock();
591        let mut stmt = conn
592            .prepare("SELECT * FROM agents WHERE id = ?1")
593            .context("Failed to prepare agent get statement")?;
594
595        let mut rows = stmt
596            .query_map(rusqlite::params![id], Self::row_to_agent)
597            .context("Failed to query agent by id")?;
598
599        match rows.next() {
600            Some(Ok(mut agent)) => {
601                agent.tool_calls = Self::get_tool_calls_inner(&conn, id)?;
602                Ok(Some(agent))
603            }
604            Some(Err(e)) => Err(e.into()),
605            None => Ok(None),
606        }
607    }
608
609    pub fn get_tool_calls(&self, agent_id: &str) -> Result<Vec<ToolCallRecord>> {
610        let conn = self.conn.lock();
611        Self::get_tool_calls_inner(&conn, agent_id)
612    }
613
614    pub fn delete(&self, id: &str) -> Result<bool> {
615        let conn = self.conn.lock();
616        let changes = conn
617            .execute("DELETE FROM agents WHERE id = ?1", rusqlite::params![id])
618            .context("Failed to delete agent")?;
619        Ok(changes > 0)
620    }
621
622    // ── Pruning ─────────────────────────────────────────────────────
623
624    pub fn prune(&self, config: &AgentLogConfig) -> Result<usize> {
625        let mut pruned = 0usize;
626
627        if config.ttl_hours > 0 || config.max_entries > 0 {
628            let conn = self.conn.lock();
629
630            // 1. TTL-based
631            if config.ttl_hours > 0 {
632                let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
633                let cutoff_str = cutoff.to_rfc3339();
634                let deleted = conn
635                    .execute(
636                        "DELETE FROM agents WHERE created_at < ?1",
637                        rusqlite::params![cutoff_str],
638                    )
639                    .context("Failed to prune agents by TTL")?;
640                pruned += deleted;
641            }
642
643            // 2. Count-based
644            if config.max_entries > 0 {
645                let count: i64 = conn
646                    .query_row("SELECT COUNT(*) FROM agents", [], |row| row.get(0))
647                    .context("Failed to count agents for pruning")?;
648
649                if count > config.max_entries as i64 {
650                    let excess = count - config.max_entries as i64;
651                    let to_delete = excess.min(config.prune_batch_size as i64);
652                    let deleted = conn
653                        .execute(
654                            "DELETE FROM agents WHERE id IN (
655                            SELECT id FROM agents ORDER BY created_at ASC LIMIT ?1
656                        )",
657                            rusqlite::params![to_delete],
658                        )
659                        .context("Failed to prune agents by count")?;
660                    // Report the rows actually deleted, not the estimate:
661                    // FK CASCADE or a concurrent prune can make the real
662                    // count differ from `to_delete`. (state-area F7.)
663                    pruned += deleted;
664                }
665            }
666        }
667
668        if pruned > 0 {
669            tracing::info!(pruned = pruned, "Agent log SQLite pruning completed");
670        }
671
672        Ok(pruned)
673    }
674
675    // ── Recovery ────────────────────────────────────────────────────
676
677    pub async fn reindex_all(&self, state_store: &StateStore) -> Result<RebuildReport> {
678        let agent_names = state_store
679            .list_category("agents")
680            .await
681            .unwrap_or_default();
682        let mut report = RebuildReport::default();
683
684        {
685            let conn = self.conn.lock();
686            conn.execute_batch("DELETE FROM agent_tool_calls; DELETE FROM agents;")
687                .context("Failed to clear agent tables for reindex")?;
688        }
689
690        for name in &agent_names {
691            match state_store.load_json::<AgentInfo>("agents", name).await {
692                Ok(Some(agent)) => {
693                    if let Err(e) = self.upsert_agent(&agent) {
694                        tracing::warn!(agent_id = %name, error = %e, "Failed to reindex agent");
695                        report.errors += 1;
696                    } else {
697                        report.reindexed += 1;
698                    }
699                }
700                _ => {
701                    report.errors += 1;
702                }
703            }
704        }
705
706        Ok(report)
707    }
708
709    // ── Internal Helpers ────────────────────────────────────────────
710
711    fn row_to_agent(row: &rusqlite::Row) -> rusqlite::Result<AgentInfo> {
712        let id_str: String = row.get("id")?;
713        let seed_id_str: Option<String> = row.get("seed_id")?;
714        let project_id_str: Option<String> = row.get("project_id")?;
715        let created_at_str: String = row.get("created_at")?;
716        let started_at_str: Option<String> = row.get("started_at")?;
717        let completed_at_str: Option<String> = row.get("completed_at")?;
718        Ok(AgentInfo {
719            id: uuid::Uuid::parse_str(&id_str).unwrap_or_else(|e| {
720                tracing::error!(agent_id = %id_str, error = %e, "Corrupt agent id in DB, substituting Nil");
721                uuid::Uuid::nil()
722            }),
723            name: row.get("name")?,
724            status: Self::parse_status(&row.get::<_, String>("status")?),
725            created_at: DateTime::parse_from_rfc3339(&created_at_str)
726                .map(|dt| dt.with_timezone(&Utc))
727                .unwrap_or_else(|e| {
728                    tracing::error!(
729                        agent_id = %id_str,
730                        created_at = %created_at_str,
731                        error = %e,
732                        "Corrupt created_at in DB, substituting Utc::now()"
733                    );
734                    Utc::now()
735                }),
736            seed_id: seed_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
737            project_id: project_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
738            started_at: started_at_str
739                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
740                .map(|dt| dt.with_timezone(&Utc)),
741            completed_at: completed_at_str
742                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
743                .map(|dt| dt.with_timezone(&Utc)),
744            error: row.get("error")?,
745            steps_completed: row.get::<_, i64>("steps_completed")? as usize,
746            steps_total: row
747                .get::<_, Option<i64>>("steps_total")?
748                .map(|v| v as usize),
749            tool_calls: vec![],
750            tokens_input: row.get::<_, i64>("tokens_input")? as u64,
751            tokens_output: row.get::<_, i64>("tokens_output")? as u64,
752            cost_usd: row.get("cost_usd")?,
753            model_id: row.get("model_id")?,
754            session_id: row.get("session_id")?,
755        })
756    }
757
758    fn get_tool_calls_inner(
759        conn: &rusqlite::Connection,
760        agent_id: &str,
761    ) -> Result<Vec<ToolCallRecord>> {
762        let mut stmt = conn
763            .prepare(
764                "SELECT tool_name, input, output, duration_ms, is_error, timestamp, tool_call_id
765                 FROM agent_tool_calls WHERE agent_id = ?1 ORDER BY seq",
766            )
767            .context("Failed to prepare tool calls statement")?;
768
769        let calls = stmt
770            .query_map(rusqlite::params![agent_id], |row| {
771                let ts: Option<String> = row.get(5)?;
772                Ok(ToolCallRecord {
773                    tool: row.get(0)?,
774                    input: row.get(1)?,
775                    output: row.get(2)?,
776                    duration_ms: row.get::<_, i64>(3)? as u64,
777                    is_error: row.get::<_, i64>(4)? != 0,
778                    tool_call_id: row.get(6)?,
779                    timestamp: ts
780                        .and_then(|s| DateTime::parse_from_rfc3339(&s as &str).ok())
781                        .map(|dt| dt.with_timezone(&Utc)),
782                })
783            })
784            .context("Failed to query tool calls")?
785            .collect::<Result<Vec<_>, _>>()
786            .context("Failed to collect tool calls")?;
787
788        Ok(calls)
789    }
790
791    fn parse_status(s: &str) -> AgentStatus {
792        match s {
793            "starting" => AgentStatus::Starting,
794            "running" => AgentStatus::Running,
795            "idle" => AgentStatus::Idle,
796            "stopped" => AgentStatus::Stopped,
797            "failed" => AgentStatus::Failed,
798            "completed" => AgentStatus::Completed,
799            _ => AgentStatus::Idle,
800        }
801    }
802
803    /// Build WHERE clause and params from the filter.
804    fn build_where(
805        &self,
806        filter: &AgentListFilter,
807    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
808        let mut conditions: Vec<String> = Vec::new();
809        let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
810
811        // Status filter
812        if let Some(status) = filter.status {
813            let idx = params.len() + 1;
814            conditions.push(format!("status = ?{}", idx));
815            params.push(Box::new(status.as_sql().to_string()));
816        }
817
818        // Date range
819        if let Some(from) = filter.date_from {
820            let idx = params.len() + 1;
821            conditions.push(format!("created_at >= ?{}", idx));
822            params.push(Box::new(from.to_rfc3339()));
823        }
824        if let Some(to) = filter.date_to {
825            let idx = params.len() + 1;
826            conditions.push(format!("created_at <= ?{}", idx));
827            params.push(Box::new(to.to_rfc3339()));
828        }
829
830        // Session / project / seed
831        if let Some(ref sid) = filter.session_id {
832            let idx = params.len() + 1;
833            conditions.push(format!("session_id = ?{}", idx));
834            params.push(Box::new(sid.clone()));
835        }
836        if let Some(ref pid) = filter.project_id {
837            let idx = params.len() + 1;
838            conditions.push(format!("project_id = ?{}", idx));
839            params.push(Box::new(pid.clone()));
840        }
841        if let Some(ref sid) = filter.seed_id {
842            let idx = params.len() + 1;
843            conditions.push(format!("seed_id = ?{}", idx));
844            params.push(Box::new(sid.clone()));
845        }
846
847        // Model (substring)
848        if let Some(ref model) = filter.model_id {
849            let idx = params.len() + 1;
850            conditions.push(format!("model_id LIKE ?{}", idx));
851            params.push(Box::new(format!("%{}%", model)));
852        }
853
854        // Cost range
855        if let Some(min) = filter.cost_min {
856            let idx = params.len() + 1;
857            conditions.push(format!("cost_usd >= ?{}", idx));
858            params.push(Box::new(min));
859        }
860        if let Some(max) = filter.cost_max {
861            let idx = params.len() + 1;
862            conditions.push(format!("cost_usd <= ?{}", idx));
863            params.push(Box::new(max));
864        }
865
866        // Tokens range
867        if filter.tokens_min.is_some() || filter.tokens_max.is_some() {
868            let total_expr = "(tokens_input + tokens_output)";
869            if let Some(min) = filter.tokens_min {
870                let idx = params.len() + 1;
871                conditions.push(format!("{} >= ?{}", total_expr, idx));
872                params.push(Box::new(min as i64));
873            }
874            if let Some(max) = filter.tokens_max {
875                let idx = params.len() + 1;
876                conditions.push(format!("{} <= ?{}", total_expr, idx));
877                params.push(Box::new(max as i64));
878            }
879        }
880
881        // Duration range
882        if let Some(min) = filter.duration_min {
883            let idx = params.len() + 1;
884            conditions.push(format!("duration_secs >= ?{}", idx));
885            params.push(Box::new(min as i64));
886        }
887        if let Some(max) = filter.duration_max {
888            let idx = params.len() + 1;
889            conditions.push(format!("duration_secs <= ?{}", idx));
890            params.push(Box::new(max as i64));
891        }
892
893        // Error filter
894        if let Some(has_err) = filter.has_error {
895            if has_err {
896                conditions.push("error IS NOT NULL AND error != ''".to_string());
897            } else {
898                conditions.push("(error IS NULL OR error = '')".to_string());
899            }
900        }
901
902        // Tool filter (JOIN subquery)
903        if let Some(ref tool) = filter.tool {
904            let idx = params.len() + 1;
905            conditions.push(format!(
906                "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls WHERE tool_name LIKE ?{})",
907                idx
908            ));
909            params.push(Box::new(format!("%{}%", tool)));
910        }
911
912        // Full-text search via FTS5
913        let has_fts = matches!(
914            filter.search_field,
915            SearchField::All | SearchField::ToolName | SearchField::ToolOutput
916        ) && filter.q.is_some();
917
918        if has_fts {
919            if let Some(ref q) = filter.q {
920                // Treat user input as an FTS5 *phrase* (double-quoted
921                // string) so FTS5 query syntax — `*` wildcards, `:`
922                // column qualifiers, `AND`/`OR`/`NEAR`, parentheses —
923                // is treated as literal text, not operators. Internal
924                // double-quotes are escaped by doubling (FTS5 rule).
925                // Bound as a parameter so it never reaches the SQL parser
926                // as code. (state-area F8.)
927                let phrase = format!("\"{}\"", q.replace('"', "\"\""));
928                let idx = params.len() + 1;
929                conditions.push(format!(
930                    "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls_fts WHERE agent_tool_calls_fts MATCH ?{idx})"
931                ));
932                params.push(Box::new(phrase));
933            }
934        } else if let Some(ref q) = filter.q {
935            match filter.search_field {
936                SearchField::Name => {
937                    let idx = params.len() + 1;
938                    conditions.push(format!("name LIKE ?{}", idx));
939                    params.push(Box::new(format!("%{}%", q)));
940                }
941                SearchField::Error => {
942                    let idx = params.len() + 1;
943                    conditions.push(format!("error LIKE ?{}", idx));
944                    params.push(Box::new(format!("%{}%", q)));
945                }
946                _ => {
947                    let idx1 = params.len() + 1;
948                    let idx2 = params.len() + 2;
949                    conditions.push(format!("(name LIKE ?{} OR error LIKE ?{})", idx1, idx2));
950                    params.push(Box::new(format!("%{}%", q)));
951                    params.push(Box::new(format!("%{}%", q)));
952                }
953            }
954        }
955
956        if conditions.is_empty() {
957            ("1=1".to_string(), params)
958        } else {
959            (conditions.join(" AND "), params)
960        }
961    }
962
963    /// Aggregate stats constrained to a filter (acquires the connection lock).
964    /// Reserved for filtered dashboard endpoints; currently `list()` reuses the
965    /// inner helper directly to avoid a double lock.
966    #[allow(dead_code)]
967    fn filtered_stats(&self, filter: &AgentListFilter) -> Result<FilteredStats> {
968        let conn = self.conn.lock();
969        self.filtered_stats_inner(filter, &conn)
970    }
971
972    fn filtered_stats_inner(
973        &self,
974        filter: &AgentListFilter,
975        conn: &rusqlite::Connection,
976    ) -> Result<FilteredStats> {
977        let (where_clause, params) = self.build_where(filter);
978
979        let sql = format!(
980            "SELECT
981                COALESCE(SUM(cost_usd), 0.0),
982                COALESCE(SUM(tokens_input + tokens_output), 0),
983                COALESCE(AVG(duration_secs), 0.0),
984                COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
985                COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
986                COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0)
987             FROM agents WHERE {}",
988            where_clause
989        );
990
991        let mut stmt = conn.prepare(&sql)?;
992        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
993            params.iter().map(|p| p.as_ref()).collect();
994        let row = stmt
995            .query_row(param_refs.as_slice(), |row| {
996                Ok(FilteredStats {
997                    total_cost_usd: row.get(0)?,
998                    total_tokens: row.get::<_, i64>(1)? as u64,
999                    avg_duration_secs: row.get(2)?,
1000                    count_running: row.get::<_, i64>(3)? as u64,
1001                    count_completed: row.get::<_, i64>(4)? as u64,
1002                    count_failed: row.get::<_, i64>(5)? as u64,
1003                })
1004            })
1005            .context("Failed to compute filtered stats")?;
1006
1007        Ok(row)
1008    }
1009}
1010
1011// ===========================================================================
1012// Tests
1013// ===========================================================================
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::*;
1018    use crate::types::AgentStatus;
1019
1020    fn make_test_db() -> (AgentLogDb, tempfile::TempDir) {
1021        let dir = tempfile::tempdir().unwrap();
1022        let path = dir.path().join("agent_log.db");
1023        let db = AgentLogDb::open(&path).unwrap();
1024        (db, dir)
1025    }
1026
1027    fn sample_agent(id: &str, status: AgentStatus, created: &str, cost: f64) -> AgentInfo {
1028        AgentInfo {
1029            id: uuid::Uuid::parse_str(id).unwrap(),
1030            name: format!("test-agent-{}", &id[..8]),
1031            status,
1032            created_at: DateTime::parse_from_rfc3339(created)
1033                .map(|dt| dt.with_timezone(&Utc))
1034                .unwrap(),
1035            seed_id: None,
1036            project_id: None,
1037            started_at: None,
1038            completed_at: None,
1039            error: None,
1040            steps_completed: 5,
1041            steps_total: Some(10),
1042            tool_calls: vec![],
1043            tokens_input: 1000,
1044            tokens_output: 500,
1045            cost_usd: cost,
1046            model_id: "anthropic/claude-sonnet-4".into(),
1047            session_id: None,
1048        }
1049    }
1050
1051    #[test]
1052    fn test_upsert_and_query() {
1053        let (db, _dir) = make_test_db();
1054
1055        let agent = sample_agent(
1056            "550e8400-e29b-41d4-a716-446655440000",
1057            AgentStatus::Completed,
1058            "2026-06-01T00:00:00Z",
1059            0.05,
1060        );
1061
1062        db.upsert_agent(&agent).unwrap();
1063
1064        let filter = AgentListFilter::default();
1065        let result = db.query(&filter).unwrap();
1066        assert_eq!(result.total, 1);
1067        assert_eq!(result.items[0].name, "test-agent-550e8400");
1068        assert_eq!(result.items[0].status, AgentStatus::Completed);
1069    }
1070
1071    #[test]
1072    fn test_filter_by_status() {
1073        let (db, _dir) = make_test_db();
1074
1075        for i in 0..5 {
1076            let status = if i % 2 == 0 {
1077                AgentStatus::Completed
1078            } else {
1079                AgentStatus::Failed
1080            };
1081            db.upsert_agent(&sample_agent(
1082                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1083                status,
1084                "2026-06-01T00:00:00Z",
1085                0.01,
1086            ))
1087            .unwrap();
1088        }
1089
1090        let filter = AgentListFilter {
1091            status: Some(AgentStatusFilter::Failed),
1092            ..Default::default()
1093        };
1094        let result = db.query(&filter).unwrap();
1095        assert_eq!(result.total, 2);
1096    }
1097
1098    #[test]
1099    fn test_filter_by_date_range() {
1100        let (db, _dir) = make_test_db();
1101
1102        for (i, (created, cost)) in [
1103            ("2026-06-01T00:00:00Z", 0.01),
1104            ("2026-06-10T00:00:00Z", 0.02),
1105            ("2026-06-20T00:00:00Z", 0.03),
1106        ]
1107        .iter()
1108        .enumerate()
1109        {
1110            db.upsert_agent(&sample_agent(
1111                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1112                AgentStatus::Completed,
1113                created,
1114                *cost,
1115            ))
1116            .unwrap();
1117        }
1118
1119        let filter = AgentListFilter {
1120            date_from: Some(
1121                DateTime::parse_from_rfc3339("2026-06-05T00:00:00Z")
1122                    .map(|dt| dt.with_timezone(&Utc))
1123                    .unwrap(),
1124            ),
1125            date_to: Some(
1126                DateTime::parse_from_rfc3339("2026-06-15T00:00:00Z")
1127                    .map(|dt| dt.with_timezone(&Utc))
1128                    .unwrap(),
1129            ),
1130            ..Default::default()
1131        };
1132        let result = db.query(&filter).unwrap();
1133        assert_eq!(result.total, 1);
1134    }
1135
1136    #[test]
1137    fn test_search_by_name() {
1138        let (db, _dir) = make_test_db();
1139
1140        let mut agent = sample_agent(
1141            "550e8400-e29b-41d4-a716-446655440000",
1142            AgentStatus::Completed,
1143            "2026-06-01T00:00:00Z",
1144            0.01,
1145        );
1146        agent.name = "Refactor authentication module".into();
1147        db.upsert_agent(&agent).unwrap();
1148
1149        let mut agent2 = sample_agent(
1150            "550e8400-e29b-41d4-a716-446655440001",
1151            AgentStatus::Failed,
1152            "2026-06-02T00:00:00Z",
1153            0.02,
1154        );
1155        agent2.name = "Fix build error".into();
1156        db.upsert_agent(&agent2).unwrap();
1157
1158        let filter = AgentListFilter {
1159            q: Some("Refactor".into()),
1160            search_field: SearchField::Name,
1161            ..Default::default()
1162        };
1163        let result = db.query(&filter).unwrap();
1164        assert_eq!(result.total, 1);
1165        assert!(result.items[0].name.contains("Refactor"));
1166    }
1167
1168    #[test]
1169    fn test_sorting() {
1170        let (db, _dir) = make_test_db();
1171
1172        for (i, cost) in [0.10, 0.01, 0.50].iter().enumerate() {
1173            db.upsert_agent(&sample_agent(
1174                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1175                AgentStatus::Completed,
1176                "2026-06-01T00:00:00Z",
1177                *cost,
1178            ))
1179            .unwrap();
1180        }
1181
1182        let filter = AgentListFilter {
1183            sort_by: SortBy::Cost,
1184            sort_dir: SortDir::Desc,
1185            ..Default::default()
1186        };
1187        let result = db.query(&filter).unwrap();
1188        assert_eq!(result.items[0].cost_usd, 0.50);
1189        assert_eq!(result.items[1].cost_usd, 0.10);
1190        assert_eq!(result.items[2].cost_usd, 0.01);
1191    }
1192
1193    #[test]
1194    fn test_pagination() {
1195        let (db, _dir) = make_test_db();
1196
1197        for i in 0..10 {
1198            db.upsert_agent(&sample_agent(
1199                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1200                AgentStatus::Completed,
1201                &format!("2026-06-{:02}T00:00:00Z", i + 1),
1202                0.01,
1203            ))
1204            .unwrap();
1205        }
1206
1207        let mut filter = AgentListFilter {
1208            per_page: 3,
1209            page: 1,
1210            ..Default::default()
1211        };
1212        let result = db.query(&filter).unwrap();
1213        assert_eq!(result.items.len(), 3);
1214        assert_eq!(result.total_pages, 4);
1215
1216        filter.page = 2;
1217        let result = db.query(&filter).unwrap();
1218        assert_eq!(result.items.len(), 3);
1219    }
1220
1221    #[test]
1222    fn test_stats() {
1223        let (db, _dir) = make_test_db();
1224
1225        for (i, (status, cost)) in [
1226            (AgentStatus::Completed, 0.05),
1227            (AgentStatus::Completed, 0.03),
1228            (AgentStatus::Failed, 0.01),
1229            (AgentStatus::Running, 0.02),
1230        ]
1231        .iter()
1232        .enumerate()
1233        {
1234            db.upsert_agent(&sample_agent(
1235                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1236                *status,
1237                "2026-06-01T00:00:00Z",
1238                *cost,
1239            ))
1240            .unwrap();
1241        }
1242
1243        let stats = db.stats().unwrap();
1244        assert_eq!(stats.total_agents, 4);
1245        assert_eq!(stats.completed, 2);
1246        assert_eq!(stats.failed, 1);
1247        assert_eq!(stats.running, 1);
1248        assert!((stats.total_cost_usd - 0.11).abs() < 0.001);
1249    }
1250
1251    #[test]
1252    fn test_prune_by_ttl() {
1253        let (db, _dir) = make_test_db();
1254
1255        db.upsert_agent(&sample_agent(
1256            "11111111-1111-4114-a716-446655440000",
1257            AgentStatus::Completed,
1258            "2026-03-01T00:00:00Z",
1259            0.01,
1260        ))
1261        .unwrap();
1262
1263        db.upsert_agent(&sample_agent(
1264            "22222222-2222-4114-a716-446655440000",
1265            AgentStatus::Completed,
1266            "2026-06-01T00:00:00Z",
1267            0.02,
1268        ))
1269        .unwrap();
1270
1271        let config = AgentLogConfig {
1272            ttl_hours: 720,
1273            ..Default::default()
1274        };
1275        let pruned = db.prune(&config).unwrap();
1276        assert!(pruned > 0);
1277
1278        let result = db.query(&AgentListFilter::default()).unwrap();
1279        assert_eq!(result.total, 1);
1280        assert_eq!(
1281            result.items[0].id.to_string(),
1282            "22222222-2222-4114-a716-446655440000"
1283        );
1284    }
1285
1286    #[test]
1287    fn test_delete() {
1288        let (db, _dir) = make_test_db();
1289
1290        db.upsert_agent(&sample_agent(
1291            "550e8400-e29b-41d4-a716-446655440000",
1292            AgentStatus::Completed,
1293            "2026-06-01T00:00:00Z",
1294            0.01,
1295        ))
1296        .unwrap();
1297
1298        assert!(db.delete("550e8400-e29b-41d4-a716-446655440000").unwrap());
1299        assert!(!db.delete("nonexistent").unwrap());
1300
1301        let result = db.query(&AgentListFilter::default()).unwrap();
1302        assert_eq!(result.total, 0);
1303    }
1304
1305    #[test]
1306    fn test_get_tool_calls() {
1307        let (db, _dir) = make_test_db();
1308
1309        let mut agent = sample_agent(
1310            "550e8400-e29b-41d4-a716-446655440000",
1311            AgentStatus::Completed,
1312            "2026-06-01T00:00:00Z",
1313            0.01,
1314        );
1315        agent.tool_calls = vec![
1316            ToolCallRecord {
1317                tool: "bash".into(),
1318                input: "ls -la".into(),
1319                output: "total 42".into(),
1320                duration_ms: 150,
1321                is_error: false,
1322                tool_call_id: "call_1".into(),
1323                timestamp: Some(Utc::now()),
1324            },
1325            ToolCallRecord {
1326                tool: "read".into(),
1327                input: "file.rs".into(),
1328                output: "fn main()".into(),
1329                duration_ms: 5,
1330                is_error: false,
1331                tool_call_id: "call_2".into(),
1332                timestamp: Some(Utc::now()),
1333            },
1334        ];
1335        db.upsert_agent(&agent).unwrap();
1336
1337        let calls = db
1338            .get_tool_calls("550e8400-e29b-41d4-a716-446655440000")
1339            .unwrap();
1340        assert_eq!(calls.len(), 2);
1341        assert_eq!(calls[0].tool, "bash");
1342        assert_eq!(calls[1].tool, "read");
1343    }
1344}