Skip to main content

oxios_kernel/
agent_log_db.rs

1//! Agent history log — SQLite-backed query engine for past agent records.
2//!
3//! # Architecture
4//!
5//! Two-tier storage:
6//! - **Filesystem JSON** (`state/agents/<uuid>.json`): source of truth.
7//!   Human-readable, backup-friendly, rebuildable.
8//! - **SQLite** (`state/agent_log.db`): query index with indexes, FTS5.
9//!   Fast filtering, sorting, search, aggregation.
10//!
11//! SQLite DB is rebuildable from filesystem JSON at any time
12//! via [`AgentLogDb::reindex_all`].
13//!
14//! # Feature gate
15//!
16//! When `sqlite-memory` feature is disabled, all query operations
17//! fall back to filesystem-only scan mode. Degraded but functional.
18
19use std::path::Path;
20
21use anyhow::{Context, Result};
22use chrono::{DateTime, Utc};
23
24use crate::config::AgentLogConfig;
25use crate::state_store::StateStore;
26use crate::types::{AgentInfo, AgentStatus, ToolCallRecord};
27
28// ===========================================================================
29// Filter / Query Types
30// ===========================================================================
31
32/// Field to search against.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum SearchField {
35    All,
36    Name,
37    Error,
38    ToolName,
39    ToolOutput,
40}
41
42impl SearchField {
43    #[allow(dead_code)]
44    fn as_str(&self) -> &'static str {
45        match self {
46            Self::All => "all",
47            Self::Name => "name",
48            Self::Error => "error",
49            Self::ToolName => "tool_name",
50            Self::ToolOutput => "tool_output",
51        }
52    }
53
54    /// Lenient best-effort parser with a default fallback (unknown → All).
55    /// Not a `FromStr` impl because it is infallible by design.
56    #[allow(clippy::should_implement_trait)]
57    pub fn from_str(s: &str) -> Self {
58        match s {
59            "name" => Self::Name,
60            "error" => Self::Error,
61            "tool_name" => Self::ToolName,
62            "tool_output" => Self::ToolOutput,
63            _ => Self::All,
64        }
65    }
66}
67
68/// Sort field.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum SortBy {
71    CreatedAt,
72    Cost,
73    Duration,
74    Tokens,
75    Name,
76}
77
78impl SortBy {
79    pub fn as_str(&self) -> &'static str {
80        match self {
81            Self::CreatedAt => "created_at",
82            Self::Cost => "cost_usd",
83            Self::Duration => "duration_secs",
84            Self::Tokens => "tokens_total",
85            Self::Name => "name",
86        }
87    }
88
89    /// Lenient best-effort parser with a default fallback (unknown → CreatedAt).
90    #[allow(clippy::should_implement_trait)]
91    pub fn from_str(s: &str) -> Self {
92        match s {
93            "cost" => Self::Cost,
94            "duration" => Self::Duration,
95            "tokens" => Self::Tokens,
96            "name" => Self::Name,
97            _ => Self::CreatedAt,
98        }
99    }
100}
101
102/// Sort direction.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum SortDir {
105    Asc,
106    Desc,
107}
108
109impl SortDir {
110    pub fn as_str(&self) -> &'static str {
111        match self {
112            Self::Asc => "ASC",
113            Self::Desc => "DESC",
114        }
115    }
116
117    /// Lenient best-effort parser with a default fallback (unknown → Desc).
118    #[allow(clippy::should_implement_trait)]
119    pub fn from_str(s: &str) -> Self {
120        match s {
121            "asc" => Self::Asc,
122            _ => Self::Desc,
123        }
124    }
125}
126
127/// Full filter specification for querying agent history.
128#[derive(Debug, Clone)]
129pub struct AgentListFilter {
130    pub q: Option<String>,
131    pub search_field: SearchField,
132    pub status: Option<AgentStatusFilter>,
133    pub session_id: Option<String>,
134    pub project_id: Option<String>,
135    pub seed_id: Option<String>,
136    pub model_id: Option<String>,
137    pub tool: Option<String>,
138    pub has_error: Option<bool>,
139    pub date_from: Option<DateTime<Utc>>,
140    pub date_to: Option<DateTime<Utc>>,
141    pub cost_min: Option<f64>,
142    pub cost_max: Option<f64>,
143    pub tokens_min: Option<u64>,
144    pub tokens_max: Option<u64>,
145    pub duration_min: Option<u64>,
146    pub duration_max: Option<u64>,
147    pub sort_by: SortBy,
148    pub sort_dir: SortDir,
149    pub page: u32,
150    pub per_page: u32,
151}
152
153impl Default for AgentListFilter {
154    fn default() -> Self {
155        Self {
156            q: None,
157            search_field: SearchField::All,
158            status: None,
159            session_id: None,
160            project_id: None,
161            seed_id: None,
162            model_id: None,
163            tool: None,
164            has_error: None,
165            date_from: None,
166            date_to: None,
167            cost_min: None,
168            cost_max: None,
169            tokens_min: None,
170            tokens_max: None,
171            duration_min: None,
172            duration_max: None,
173            sort_by: SortBy::CreatedAt,
174            sort_dir: SortDir::Desc,
175            page: 1,
176            per_page: 50,
177        }
178    }
179}
180
181/// Single status value that can be used to filter.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum AgentStatusFilter {
184    Running,
185    Completed,
186    Failed,
187    Stopped,
188    Starting,
189    Idle,
190}
191
192impl AgentStatusFilter {
193    pub fn as_sql(&self) -> &'static str {
194        match self {
195            Self::Running => "running",
196            Self::Completed => "completed",
197            Self::Failed => "failed",
198            Self::Stopped => "stopped",
199            Self::Starting => "starting",
200            Self::Idle => "idle",
201        }
202    }
203
204    /// Lenient best-effort parser; unknown values map to `None`.
205    #[allow(clippy::should_implement_trait)]
206    pub fn from_str(s: &str) -> Option<Self> {
207        match s {
208            "running" => Some(Self::Running),
209            "completed" => Some(Self::Completed),
210            "failed" => Some(Self::Failed),
211            "stopped" => Some(Self::Stopped),
212            "starting" => Some(Self::Starting),
213            "idle" => Some(Self::Idle),
214            _ => None,
215        }
216    }
217}
218
219// ===========================================================================
220// Filtered stats
221// ===========================================================================
222
223/// Aggregated stats computed from a query result set.
224#[derive(Debug, Clone, Default)]
225pub struct FilteredStats {
226    pub total_cost_usd: f64,
227    pub total_tokens: u64,
228    pub avg_duration_secs: f64,
229    pub count_running: u64,
230    pub count_completed: u64,
231    pub count_failed: u64,
232}
233
234/// Global aggregate stats (unfiltered).
235#[derive(Debug, Clone, Default)]
236pub struct AgentStats {
237    pub total_agents: u64,
238    pub running: u64,
239    pub completed: u64,
240    pub failed: u64,
241    pub total_cost_usd: f64,
242    pub total_tokens: u64,
243    pub total_duration_secs: u64,
244    pub avg_duration_secs: f64,
245    pub avg_cost_usd: f64,
246    pub total_sessions: u64,
247    pub oldest_agent_at: Option<DateTime<Utc>>,
248    pub newest_agent_at: Option<DateTime<Utc>>,
249}
250
251/// Query result with pagination and filtered stats.
252#[derive(Debug, Clone)]
253pub struct QueryResult {
254    pub items: Vec<AgentInfo>,
255    pub total: u64,
256    pub page: u32,
257    pub per_page: u32,
258    pub total_pages: u32,
259    pub stats: FilteredStats,
260}
261
262// ===========================================================================
263// Rebuild report
264// ===========================================================================
265
266#[derive(Debug, Clone, Default)]
267pub struct RebuildReport {
268    pub reindexed: u64,
269    pub orphaned: u64,
270    pub errors: u64,
271}
272
273// ===========================================================================
274// AgentLogDb
275// ===========================================================================
276
277/// SQLite-backed agent history query engine.
278///
279/// Interior mutability via `parking_lot::Mutex` so all methods take `&self`,
280/// compatible with `Arc<AgentLogDb>` shared across tokio tasks.
281#[cfg(feature = "sqlite-memory")]
282pub struct AgentLogDb {
283    conn: parking_lot::Mutex<rusqlite::Connection>,
284}
285
286#[cfg(feature = "sqlite-memory")]
287impl AgentLogDb {
288    pub fn open(path: &Path) -> Result<Self> {
289        let conn = rusqlite::Connection::open(path)
290            .with_context(|| format!("Failed to open agent log database at {}", path.display()))?;
291        let db = Self {
292            conn: parking_lot::Mutex::new(conn),
293        };
294        db.migrate()?;
295        db.configure_wal()?;
296        Ok(db)
297    }
298
299    fn migrate(&self) -> Result<()> {
300        let conn = self.conn.lock();
301        conn.execute_batch(
302            "
303            CREATE TABLE IF NOT EXISTS agents (
304                id              TEXT PRIMARY KEY,
305                name            TEXT NOT NULL,
306                status          TEXT NOT NULL,
307                created_at      TEXT NOT NULL,
308                started_at      TEXT,
309                completed_at    TEXT,
310                session_id      TEXT,
311                seed_id         TEXT,
312                project_id      TEXT,
313                model_id        TEXT NOT NULL DEFAULT '',
314                error           TEXT,
315                steps_completed INTEGER NOT NULL DEFAULT 0,
316                steps_total     INTEGER,
317                tokens_input    INTEGER NOT NULL DEFAULT 0,
318                tokens_output   INTEGER NOT NULL DEFAULT 0,
319                cost_usd        REAL NOT NULL DEFAULT 0.0,
320                duration_secs   INTEGER
321            );
322
323            CREATE TABLE IF NOT EXISTS agent_tool_calls (
324                id          INTEGER PRIMARY KEY AUTOINCREMENT,
325                agent_id    TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
326                seq         INTEGER NOT NULL,
327                tool_name   TEXT NOT NULL,
328                input       TEXT NOT NULL DEFAULT '',
329                output      TEXT NOT NULL DEFAULT '',
330                duration_ms INTEGER NOT NULL DEFAULT 0,
331                is_error    INTEGER NOT NULL DEFAULT 0,
332                timestamp   TEXT,
333                tool_call_id TEXT NOT NULL DEFAULT ''
334            );
335
336            CREATE INDEX IF NOT EXISTS idx_agents_status_created ON agents(status, created_at DESC);
337            CREATE INDEX IF NOT EXISTS idx_agents_session     ON agents(session_id);
338            CREATE INDEX IF NOT EXISTS idx_agents_project     ON agents(project_id);
339            CREATE INDEX IF NOT EXISTS idx_agents_seed        ON agents(seed_id);
340            CREATE INDEX IF NOT EXISTS idx_agents_model       ON agents(model_id);
341            CREATE INDEX IF NOT EXISTS idx_agents_cost        ON agents(cost_usd);
342            CREATE INDEX IF NOT EXISTS idx_agents_duration    ON agents(duration_secs);
343            CREATE INDEX IF NOT EXISTS idx_agents_name        ON agents(name);
344            CREATE INDEX IF NOT EXISTS idx_tool_calls_agent   ON agent_tool_calls(agent_id, seq);
345            CREATE INDEX IF NOT EXISTS idx_tool_calls_name    ON agent_tool_calls(tool_name);
346
347            CREATE TABLE IF NOT EXISTS agent_log_meta (
348                key   TEXT PRIMARY KEY,
349                value TEXT NOT NULL
350            );
351        ",
352        )
353        .context("Failed to run agent log SQLite migration")?;
354
355        // FTS5 virtual table (separately, catch errors if FTS5 not available)
356        let _ = conn.execute_batch(
357            "CREATE VIRTUAL TABLE IF NOT EXISTS agent_tool_calls_fts USING fts5(
358                tool_name, input, output,
359                content='agent_tool_calls',
360                content_rowid='id'
361            );",
362        );
363
364        Ok(())
365    }
366
367    fn configure_wal(&self) -> Result<()> {
368        let conn = self.conn.lock();
369        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
370            .context("Failed to configure WAL mode")
371    }
372
373    // ── Upsert ──────────────────────────────────────────────────────
374
375    pub fn upsert_agent(&self, info: &AgentInfo) -> Result<()> {
376        let mut conn = self.conn.lock();
377        let tx = conn
378            .transaction()
379            .context("Failed to begin agent upsert transaction")?;
380
381        let duration_secs = match (info.started_at, info.completed_at) {
382            (Some(start), Some(end)) => Some((end - start).num_seconds().max(0)),
383            _ => None,
384        };
385
386        tx.execute(
387            "INSERT INTO agents (id, name, status, created_at, started_at, completed_at,
388                 session_id, seed_id, project_id, model_id, error,
389                 steps_completed, steps_total, tokens_input, tokens_output, cost_usd, duration_secs)
390             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)
391             ON CONFLICT(id) DO UPDATE SET
392                 status=excluded.status,
393                 completed_at=excluded.completed_at,
394                 error=excluded.error,
395                 steps_completed=excluded.steps_completed,
396                 steps_total=excluded.steps_total,
397                 tokens_input=excluded.tokens_input,
398                 tokens_output=excluded.tokens_output,
399                 cost_usd=excluded.cost_usd,
400                 duration_secs=excluded.duration_secs",
401            rusqlite::params![
402                info.id.to_string(),
403                info.name,
404                info.status.to_string(),
405                info.created_at.to_rfc3339(),
406                info.started_at.map(|t| t.to_rfc3339()),
407                info.completed_at.map(|t| t.to_rfc3339()),
408                info.session_id,
409                info.seed_id.map(|s| s.to_string()),
410                info.project_id.map(|p| p.to_string()),
411                info.model_id,
412                info.error,
413                info.steps_completed as i64,
414                info.steps_total.map(|s| s as i64),
415                info.tokens_input as i64,
416                info.tokens_output as i64,
417                info.cost_usd,
418                duration_secs,
419            ],
420        )?;
421
422        // Replace tool calls
423        tx.execute(
424            "DELETE FROM agent_tool_calls WHERE agent_id = ?1",
425            rusqlite::params![info.id.to_string()],
426        )?;
427
428        for (i, tc) in info.tool_calls.iter().enumerate() {
429            tx.execute(
430                "INSERT INTO agent_tool_calls (agent_id, seq, tool_name, input, output,
431                     duration_ms, is_error, timestamp, tool_call_id)
432                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
433                rusqlite::params![
434                    info.id.to_string(),
435                    i as i64,
436                    tc.tool,
437                    tc.input,
438                    tc.output,
439                    tc.duration_ms as i64,
440                    tc.is_error as i64,
441                    tc.timestamp.map(|t| t.to_rfc3339()),
442                    tc.tool_call_id,
443                ],
444            )?;
445        }
446
447        tx.commit().context("Failed to commit agent upsert")?;
448
449        // Rebuild FTS
450        let _ = conn.execute_batch(
451            "INSERT INTO agent_tool_calls_fts(agent_tool_calls_fts) VALUES('rebuild');",
452        );
453
454        Ok(())
455    }
456
457    // ── Query ───────────────────────────────────────────────────────
458
459    pub fn query(&self, filter: &AgentListFilter) -> Result<QueryResult> {
460        let (where_clause, params) = self.build_where(filter);
461        let offset = ((filter.page.max(1) - 1) * filter.per_page) as i64;
462        let limit = filter.per_page.min(200) as i64;
463
464        let conn = self.conn.lock();
465
466        // Count
467        let count_sql = format!("SELECT COUNT(*) FROM agents WHERE {}", where_clause);
468        let total: u64 = conn
469            .query_row(
470                &count_sql,
471                rusqlite::params_from_iter(params.iter()),
472                |row| row.get(0),
473            )
474            .context("Failed to count agents")?;
475
476        // Data
477        let safe_sort_col = filter.sort_by.as_str();
478        let safe_sort_dir = filter.sort_dir.as_str();
479        let param_count = params.len();
480        let data_sql = format!(
481            "SELECT * FROM agents WHERE {} ORDER BY {} {} LIMIT ?{} OFFSET ?{}",
482            where_clause,
483            safe_sort_col,
484            safe_sort_dir,
485            param_count + 1,
486            param_count + 2,
487        );
488
489        let mut stmt = conn.prepare(&data_sql)?;
490        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = params
491            .into_iter()
492            .map(|p| -> Box<dyn rusqlite::types::ToSql> { p })
493            .collect();
494        all_params.push(Box::new(limit));
495        all_params.push(Box::new(offset));
496
497        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
498            all_params.iter().map(|p| p.as_ref()).collect();
499
500        let items: Vec<AgentInfo> = stmt
501            .query_map(param_refs.as_slice(), Self::row_to_agent)?
502            .collect::<Result<Vec<_>, _>>()
503            .context("Failed to collect agent query results")?;
504
505        // Stats for filtered set
506        let stats = self.filtered_stats_inner(filter, &conn)?;
507
508        let total_pages = if total == 0 {
509            1
510        } else {
511            ((total as f64) / filter.per_page as f64).ceil() as u32
512        };
513
514        Ok(QueryResult {
515            items,
516            total,
517            page: filter.page,
518            per_page: filter.per_page,
519            total_pages,
520            stats,
521        })
522    }
523
524    pub fn stats(&self) -> Result<AgentStats> {
525        let conn = self.conn.lock();
526
527        let mut s = AgentStats::default();
528
529        let row = conn
530            .query_row(
531                "SELECT
532                    COUNT(*) as total,
533                    COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
534                    COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
535                    COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0),
536                    COALESCE(SUM(cost_usd), 0.0),
537                    COALESCE(SUM(tokens_input + tokens_output), 0),
538                    COALESCE(SUM(duration_secs), 0),
539                    COALESCE(AVG(duration_secs), 0.0),
540                    COALESCE(AVG(cost_usd), 0.0),
541                    COUNT(DISTINCT session_id),
542                    MIN(created_at),
543                    MAX(created_at)
544                 FROM agents",
545                [],
546                |row| {
547                    Ok((
548                        row.get::<_, i64>(0)? as u64,
549                        row.get::<_, i64>(1)? as u64,
550                        row.get::<_, i64>(2)? as u64,
551                        row.get::<_, i64>(3)? as u64,
552                        row.get::<_, f64>(4)?,
553                        row.get::<_, i64>(5)? as u64,
554                        row.get::<_, i64>(6)? as u64,
555                        row.get::<_, f64>(7)?,
556                        row.get::<_, f64>(8)?,
557                        row.get::<_, i64>(9)? as u64,
558                        row.get::<_, Option<String>>(10)?,
559                        row.get::<_, Option<String>>(11)?,
560                    ))
561                },
562            )
563            .context("Failed to query agent stats")?;
564
565        s.total_agents = row.0;
566        s.running = row.1;
567        s.completed = row.2;
568        s.failed = row.3;
569        s.total_cost_usd = row.4;
570        s.total_tokens = row.5;
571        s.total_duration_secs = row.6;
572        s.avg_duration_secs = if s.total_agents > 0 { row.7 } else { 0.0 };
573        s.avg_cost_usd = if s.total_agents > 0 { row.8 } else { 0.0 };
574        s.total_sessions = row.9;
575        s.oldest_agent_at = row
576            .10
577            .as_deref()
578            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
579            .map(|dt| dt.with_timezone(&Utc));
580        s.newest_agent_at = row
581            .11
582            .as_deref()
583            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
584            .map(|dt| dt.with_timezone(&Utc));
585
586        Ok(s)
587    }
588
589    pub fn get(&self, id: &str) -> Result<Option<AgentInfo>> {
590        let conn = self.conn.lock();
591        let mut stmt = conn
592            .prepare("SELECT * FROM agents WHERE id = ?1")
593            .context("Failed to prepare agent get statement")?;
594
595        let mut rows = stmt
596            .query_map(rusqlite::params![id], Self::row_to_agent)
597            .context("Failed to query agent by id")?;
598
599        match rows.next() {
600            Some(Ok(mut agent)) => {
601                agent.tool_calls = Self::get_tool_calls_inner(&conn, id)?;
602                Ok(Some(agent))
603            }
604            Some(Err(e)) => Err(e.into()),
605            None => Ok(None),
606        }
607    }
608
609    pub fn get_tool_calls(&self, agent_id: &str) -> Result<Vec<ToolCallRecord>> {
610        let conn = self.conn.lock();
611        Self::get_tool_calls_inner(&conn, agent_id)
612    }
613
614    pub fn delete(&self, id: &str) -> Result<bool> {
615        let conn = self.conn.lock();
616        let changes = conn
617            .execute("DELETE FROM agents WHERE id = ?1", rusqlite::params![id])
618            .context("Failed to delete agent")?;
619        Ok(changes > 0)
620    }
621
622    // ── Pruning ─────────────────────────────────────────────────────
623
624    pub fn prune(&self, config: &AgentLogConfig) -> Result<usize> {
625        let mut pruned = 0usize;
626
627        if config.ttl_hours > 0 || config.max_entries > 0 {
628            let conn = self.conn.lock();
629
630            // 1. TTL-based
631            if config.ttl_hours > 0 {
632                let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
633                let cutoff_str = cutoff.to_rfc3339();
634                let deleted = conn
635                    .execute(
636                        "DELETE FROM agents WHERE created_at < ?1",
637                        rusqlite::params![cutoff_str],
638                    )
639                    .context("Failed to prune agents by TTL")?;
640                pruned += deleted;
641            }
642
643            // 2. Count-based
644            if config.max_entries > 0 {
645                let count: i64 = conn
646                    .query_row("SELECT COUNT(*) FROM agents", [], |row| row.get(0))
647                    .context("Failed to count agents for pruning")?;
648
649                if count > config.max_entries as i64 {
650                    let excess = count - config.max_entries as i64;
651                    let to_delete = excess.min(config.prune_batch_size as i64);
652                    conn.execute(
653                        "DELETE FROM agents WHERE id IN (
654                            SELECT id FROM agents ORDER BY created_at ASC LIMIT ?1
655                        )",
656                        rusqlite::params![to_delete],
657                    )?;
658                    pruned += to_delete as usize;
659                }
660            }
661        }
662
663        if pruned > 0 {
664            tracing::info!(pruned = pruned, "Agent log SQLite pruning completed");
665        }
666
667        Ok(pruned)
668    }
669
670    // ── Recovery ────────────────────────────────────────────────────
671
672    pub async fn reindex_all(&self, state_store: &StateStore) -> Result<RebuildReport> {
673        let agent_names = state_store
674            .list_category("agents")
675            .await
676            .unwrap_or_default();
677        let mut report = RebuildReport::default();
678
679        {
680            let conn = self.conn.lock();
681            conn.execute_batch("DELETE FROM agent_tool_calls; DELETE FROM agents;")
682                .context("Failed to clear agent tables for reindex")?;
683        }
684
685        for name in &agent_names {
686            match state_store.load_json::<AgentInfo>("agents", name).await {
687                Ok(Some(agent)) => {
688                    if let Err(e) = self.upsert_agent(&agent) {
689                        tracing::warn!(agent_id = %name, error = %e, "Failed to reindex agent");
690                        report.errors += 1;
691                    } else {
692                        report.reindexed += 1;
693                    }
694                }
695                _ => {
696                    report.errors += 1;
697                }
698            }
699        }
700
701        Ok(report)
702    }
703
704    // ── Internal Helpers ────────────────────────────────────────────
705
706    fn row_to_agent(row: &rusqlite::Row) -> rusqlite::Result<AgentInfo> {
707        let id_str: String = row.get("id")?;
708        let seed_id_str: Option<String> = row.get("seed_id")?;
709        let project_id_str: Option<String> = row.get("project_id")?;
710        let created_at_str: String = row.get("created_at")?;
711        let started_at_str: Option<String> = row.get("started_at")?;
712        let completed_at_str: Option<String> = row.get("completed_at")?;
713
714        Ok(AgentInfo {
715            id: uuid::Uuid::parse_str(&id_str).unwrap_or_default(),
716            name: row.get("name")?,
717            status: Self::parse_status(&row.get::<_, String>("status")?),
718            created_at: DateTime::parse_from_rfc3339(&created_at_str)
719                .map(|dt| dt.with_timezone(&Utc))
720                .unwrap_or_else(|_| Utc::now()),
721            seed_id: seed_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
722            project_id: project_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
723            started_at: started_at_str
724                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
725                .map(|dt| dt.with_timezone(&Utc)),
726            completed_at: completed_at_str
727                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
728                .map(|dt| dt.with_timezone(&Utc)),
729            error: row.get("error")?,
730            steps_completed: row.get::<_, i64>("steps_completed")? as usize,
731            steps_total: row
732                .get::<_, Option<i64>>("steps_total")?
733                .map(|v| v as usize),
734            tool_calls: vec![],
735            tokens_input: row.get::<_, i64>("tokens_input")? as u64,
736            tokens_output: row.get::<_, i64>("tokens_output")? as u64,
737            cost_usd: row.get("cost_usd")?,
738            model_id: row.get("model_id")?,
739            session_id: row.get("session_id")?,
740        })
741    }
742
743    fn get_tool_calls_inner(
744        conn: &rusqlite::Connection,
745        agent_id: &str,
746    ) -> Result<Vec<ToolCallRecord>> {
747        let mut stmt = conn
748            .prepare(
749                "SELECT tool_name, input, output, duration_ms, is_error, timestamp, tool_call_id
750                 FROM agent_tool_calls WHERE agent_id = ?1 ORDER BY seq",
751            )
752            .context("Failed to prepare tool calls statement")?;
753
754        let calls = stmt
755            .query_map(rusqlite::params![agent_id], |row| {
756                let ts: Option<String> = row.get(5)?;
757                Ok(ToolCallRecord {
758                    tool: row.get(0)?,
759                    input: row.get(1)?,
760                    output: row.get(2)?,
761                    duration_ms: row.get::<_, i64>(3)? as u64,
762                    is_error: row.get::<_, i64>(4)? != 0,
763                    tool_call_id: row.get(6)?,
764                    timestamp: ts
765                        .and_then(|s| DateTime::parse_from_rfc3339(&s as &str).ok())
766                        .map(|dt| dt.with_timezone(&Utc)),
767                })
768            })
769            .context("Failed to query tool calls")?
770            .collect::<Result<Vec<_>, _>>()
771            .context("Failed to collect tool calls")?;
772
773        Ok(calls)
774    }
775
776    fn parse_status(s: &str) -> AgentStatus {
777        match s {
778            "starting" => AgentStatus::Starting,
779            "running" => AgentStatus::Running,
780            "idle" => AgentStatus::Idle,
781            "stopped" => AgentStatus::Stopped,
782            "failed" => AgentStatus::Failed,
783            "completed" => AgentStatus::Completed,
784            _ => AgentStatus::Idle,
785        }
786    }
787
788    /// Build WHERE clause and params from the filter.
789    fn build_where(
790        &self,
791        filter: &AgentListFilter,
792    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
793        let mut conditions: Vec<String> = Vec::new();
794        let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
795
796        // Status filter
797        if let Some(status) = filter.status {
798            let idx = params.len() + 1;
799            conditions.push(format!("status = ?{}", idx));
800            params.push(Box::new(status.as_sql().to_string()));
801        }
802
803        // Date range
804        if let Some(from) = filter.date_from {
805            let idx = params.len() + 1;
806            conditions.push(format!("created_at >= ?{}", idx));
807            params.push(Box::new(from.to_rfc3339()));
808        }
809        if let Some(to) = filter.date_to {
810            let idx = params.len() + 1;
811            conditions.push(format!("created_at <= ?{}", idx));
812            params.push(Box::new(to.to_rfc3339()));
813        }
814
815        // Session / project / seed
816        if let Some(ref sid) = filter.session_id {
817            let idx = params.len() + 1;
818            conditions.push(format!("session_id = ?{}", idx));
819            params.push(Box::new(sid.clone()));
820        }
821        if let Some(ref pid) = filter.project_id {
822            let idx = params.len() + 1;
823            conditions.push(format!("project_id = ?{}", idx));
824            params.push(Box::new(pid.clone()));
825        }
826        if let Some(ref sid) = filter.seed_id {
827            let idx = params.len() + 1;
828            conditions.push(format!("seed_id = ?{}", idx));
829            params.push(Box::new(sid.clone()));
830        }
831
832        // Model (substring)
833        if let Some(ref model) = filter.model_id {
834            let idx = params.len() + 1;
835            conditions.push(format!("model_id LIKE ?{}", idx));
836            params.push(Box::new(format!("%{}%", model)));
837        }
838
839        // Cost range
840        if let Some(min) = filter.cost_min {
841            let idx = params.len() + 1;
842            conditions.push(format!("cost_usd >= ?{}", idx));
843            params.push(Box::new(min));
844        }
845        if let Some(max) = filter.cost_max {
846            let idx = params.len() + 1;
847            conditions.push(format!("cost_usd <= ?{}", idx));
848            params.push(Box::new(max));
849        }
850
851        // Tokens range
852        if filter.tokens_min.is_some() || filter.tokens_max.is_some() {
853            let total_expr = "(tokens_input + tokens_output)";
854            if let Some(min) = filter.tokens_min {
855                let idx = params.len() + 1;
856                conditions.push(format!("{} >= ?{}", total_expr, idx));
857                params.push(Box::new(min as i64));
858            }
859            if let Some(max) = filter.tokens_max {
860                let idx = params.len() + 1;
861                conditions.push(format!("{} <= ?{}", total_expr, idx));
862                params.push(Box::new(max as i64));
863            }
864        }
865
866        // Duration range
867        if let Some(min) = filter.duration_min {
868            let idx = params.len() + 1;
869            conditions.push(format!("duration_secs >= ?{}", idx));
870            params.push(Box::new(min as i64));
871        }
872        if let Some(max) = filter.duration_max {
873            let idx = params.len() + 1;
874            conditions.push(format!("duration_secs <= ?{}", idx));
875            params.push(Box::new(max as i64));
876        }
877
878        // Error filter
879        if let Some(has_err) = filter.has_error {
880            if has_err {
881                conditions.push("error IS NOT NULL AND error != ''".to_string());
882            } else {
883                conditions.push("(error IS NULL OR error = '')".to_string());
884            }
885        }
886
887        // Tool filter (JOIN subquery)
888        if let Some(ref tool) = filter.tool {
889            let idx = params.len() + 1;
890            conditions.push(format!(
891                "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls WHERE tool_name LIKE ?{})",
892                idx
893            ));
894            params.push(Box::new(format!("%{}%", tool)));
895        }
896
897        // Full-text search via FTS5
898        let has_fts = matches!(
899            filter.search_field,
900            SearchField::All | SearchField::ToolName | SearchField::ToolOutput
901        ) && filter.q.is_some();
902
903        if has_fts {
904            if let Some(ref q) = filter.q {
905                let escaped = q.replace('\'', "''");
906                conditions.push(format!(
907                    "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls_fts WHERE agent_tool_calls_fts MATCH '{}')",
908                    escaped
909                ));
910            }
911        } else if let Some(ref q) = filter.q {
912            match filter.search_field {
913                SearchField::Name => {
914                    let idx = params.len() + 1;
915                    conditions.push(format!("name LIKE ?{}", idx));
916                    params.push(Box::new(format!("%{}%", q)));
917                }
918                SearchField::Error => {
919                    let idx = params.len() + 1;
920                    conditions.push(format!("error LIKE ?{}", idx));
921                    params.push(Box::new(format!("%{}%", q)));
922                }
923                _ => {
924                    let idx1 = params.len() + 1;
925                    let idx2 = params.len() + 2;
926                    conditions.push(format!("(name LIKE ?{} OR error LIKE ?{})", idx1, idx2));
927                    params.push(Box::new(format!("%{}%", q)));
928                    params.push(Box::new(format!("%{}%", q)));
929                }
930            }
931        }
932
933        if conditions.is_empty() {
934            ("1=1".to_string(), params)
935        } else {
936            (conditions.join(" AND "), params)
937        }
938    }
939
940    /// Aggregate stats constrained to a filter (acquires the connection lock).
941    /// Reserved for filtered dashboard endpoints; currently `list()` reuses the
942    /// inner helper directly to avoid a double lock.
943    #[allow(dead_code)]
944    fn filtered_stats(&self, filter: &AgentListFilter) -> Result<FilteredStats> {
945        let conn = self.conn.lock();
946        self.filtered_stats_inner(filter, &conn)
947    }
948
949    fn filtered_stats_inner(
950        &self,
951        filter: &AgentListFilter,
952        conn: &rusqlite::Connection,
953    ) -> Result<FilteredStats> {
954        let (where_clause, params) = self.build_where(filter);
955
956        let sql = format!(
957            "SELECT
958                COALESCE(SUM(cost_usd), 0.0),
959                COALESCE(SUM(tokens_input + tokens_output), 0),
960                COALESCE(AVG(duration_secs), 0.0),
961                COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
962                COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
963                COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0)
964             FROM agents WHERE {}",
965            where_clause
966        );
967
968        let mut stmt = conn.prepare(&sql)?;
969        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
970            params.iter().map(|p| p.as_ref()).collect();
971        let row = stmt
972            .query_row(param_refs.as_slice(), |row| {
973                Ok(FilteredStats {
974                    total_cost_usd: row.get(0)?,
975                    total_tokens: row.get::<_, i64>(1)? as u64,
976                    avg_duration_secs: row.get(2)?,
977                    count_running: row.get::<_, i64>(3)? as u64,
978                    count_completed: row.get::<_, i64>(4)? as u64,
979                    count_failed: row.get::<_, i64>(5)? as u64,
980                })
981            })
982            .context("Failed to compute filtered stats")?;
983
984        Ok(row)
985    }
986}
987
988// ===========================================================================
989// Tests
990// ===========================================================================
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995    use crate::types::AgentStatus;
996
997    fn make_test_db() -> (AgentLogDb, tempfile::TempDir) {
998        let dir = tempfile::tempdir().unwrap();
999        let path = dir.path().join("agent_log.db");
1000        let db = AgentLogDb::open(&path).unwrap();
1001        (db, dir)
1002    }
1003
1004    fn sample_agent(id: &str, status: AgentStatus, created: &str, cost: f64) -> AgentInfo {
1005        AgentInfo {
1006            id: uuid::Uuid::parse_str(id).unwrap(),
1007            name: format!("test-agent-{}", &id[..8]),
1008            status,
1009            created_at: DateTime::parse_from_rfc3339(created)
1010                .map(|dt| dt.with_timezone(&Utc))
1011                .unwrap(),
1012            seed_id: None,
1013            project_id: None,
1014            started_at: None,
1015            completed_at: None,
1016            error: None,
1017            steps_completed: 5,
1018            steps_total: Some(10),
1019            tool_calls: vec![],
1020            tokens_input: 1000,
1021            tokens_output: 500,
1022            cost_usd: cost,
1023            model_id: "anthropic/claude-sonnet-4".into(),
1024            session_id: None,
1025        }
1026    }
1027
1028    #[test]
1029    fn test_upsert_and_query() {
1030        let (db, _dir) = make_test_db();
1031
1032        let agent = sample_agent(
1033            "550e8400-e29b-41d4-a716-446655440000",
1034            AgentStatus::Completed,
1035            "2026-06-01T00:00:00Z",
1036            0.05,
1037        );
1038
1039        db.upsert_agent(&agent).unwrap();
1040
1041        let filter = AgentListFilter::default();
1042        let result = db.query(&filter).unwrap();
1043        assert_eq!(result.total, 1);
1044        assert_eq!(result.items[0].name, "test-agent-550e8400");
1045        assert_eq!(result.items[0].status, AgentStatus::Completed);
1046    }
1047
1048    #[test]
1049    fn test_filter_by_status() {
1050        let (db, _dir) = make_test_db();
1051
1052        for i in 0..5 {
1053            let status = if i % 2 == 0 {
1054                AgentStatus::Completed
1055            } else {
1056                AgentStatus::Failed
1057            };
1058            db.upsert_agent(&sample_agent(
1059                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1060                status,
1061                "2026-06-01T00:00:00Z",
1062                0.01,
1063            ))
1064            .unwrap();
1065        }
1066
1067        let mut filter = AgentListFilter::default();
1068        filter.status = Some(AgentStatusFilter::Failed);
1069        let result = db.query(&filter).unwrap();
1070        assert_eq!(result.total, 2);
1071    }
1072
1073    #[test]
1074    fn test_filter_by_date_range() {
1075        let (db, _dir) = make_test_db();
1076
1077        for (i, (created, cost)) in [
1078            ("2026-06-01T00:00:00Z", 0.01),
1079            ("2026-06-10T00:00:00Z", 0.02),
1080            ("2026-06-20T00:00:00Z", 0.03),
1081        ]
1082        .iter()
1083        .enumerate()
1084        {
1085            db.upsert_agent(&sample_agent(
1086                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1087                AgentStatus::Completed,
1088                created,
1089                *cost,
1090            ))
1091            .unwrap();
1092        }
1093
1094        let mut filter = AgentListFilter::default();
1095        filter.date_from = Some(
1096            DateTime::parse_from_rfc3339("2026-06-05T00:00:00Z")
1097                .map(|dt| dt.with_timezone(&Utc))
1098                .unwrap(),
1099        );
1100        filter.date_to = Some(
1101            DateTime::parse_from_rfc3339("2026-06-15T00:00:00Z")
1102                .map(|dt| dt.with_timezone(&Utc))
1103                .unwrap(),
1104        );
1105        let result = db.query(&filter).unwrap();
1106        assert_eq!(result.total, 1);
1107    }
1108
1109    #[test]
1110    fn test_search_by_name() {
1111        let (db, _dir) = make_test_db();
1112
1113        let mut agent = sample_agent(
1114            "550e8400-e29b-41d4-a716-446655440000",
1115            AgentStatus::Completed,
1116            "2026-06-01T00:00:00Z",
1117            0.01,
1118        );
1119        agent.name = "Refactor authentication module".into();
1120        db.upsert_agent(&agent).unwrap();
1121
1122        let mut agent2 = sample_agent(
1123            "550e8400-e29b-41d4-a716-446655440001",
1124            AgentStatus::Failed,
1125            "2026-06-02T00:00:00Z",
1126            0.02,
1127        );
1128        agent2.name = "Fix build error".into();
1129        db.upsert_agent(&agent2).unwrap();
1130
1131        let mut filter = AgentListFilter::default();
1132        filter.q = Some("Refactor".into());
1133        filter.search_field = SearchField::Name;
1134        let result = db.query(&filter).unwrap();
1135        assert_eq!(result.total, 1);
1136        assert!(result.items[0].name.contains("Refactor"));
1137    }
1138
1139    #[test]
1140    fn test_sorting() {
1141        let (db, _dir) = make_test_db();
1142
1143        for (i, cost) in [0.10, 0.01, 0.50].iter().enumerate() {
1144            db.upsert_agent(&sample_agent(
1145                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1146                AgentStatus::Completed,
1147                "2026-06-01T00:00:00Z",
1148                *cost,
1149            ))
1150            .unwrap();
1151        }
1152
1153        let mut filter = AgentListFilter::default();
1154        filter.sort_by = SortBy::Cost;
1155        filter.sort_dir = SortDir::Desc;
1156        let result = db.query(&filter).unwrap();
1157        assert_eq!(result.items[0].cost_usd, 0.50);
1158        assert_eq!(result.items[1].cost_usd, 0.10);
1159        assert_eq!(result.items[2].cost_usd, 0.01);
1160    }
1161
1162    #[test]
1163    fn test_pagination() {
1164        let (db, _dir) = make_test_db();
1165
1166        for i in 0..10 {
1167            db.upsert_agent(&sample_agent(
1168                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1169                AgentStatus::Completed,
1170                &format!("2026-06-{:02}T00:00:00Z", i + 1),
1171                0.01,
1172            ))
1173            .unwrap();
1174        }
1175
1176        let mut filter = AgentListFilter::default();
1177        filter.per_page = 3;
1178        filter.page = 1;
1179        let result = db.query(&filter).unwrap();
1180        assert_eq!(result.items.len(), 3);
1181        assert_eq!(result.total, 10);
1182        assert_eq!(result.total_pages, 4);
1183
1184        filter.page = 2;
1185        let result = db.query(&filter).unwrap();
1186        assert_eq!(result.items.len(), 3);
1187    }
1188
1189    #[test]
1190    fn test_stats() {
1191        let (db, _dir) = make_test_db();
1192
1193        for (i, (status, cost)) in [
1194            (AgentStatus::Completed, 0.05),
1195            (AgentStatus::Completed, 0.03),
1196            (AgentStatus::Failed, 0.01),
1197            (AgentStatus::Running, 0.02),
1198        ]
1199        .iter()
1200        .enumerate()
1201        {
1202            db.upsert_agent(&sample_agent(
1203                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1204                *status,
1205                "2026-06-01T00:00:00Z",
1206                *cost,
1207            ))
1208            .unwrap();
1209        }
1210
1211        let stats = db.stats().unwrap();
1212        assert_eq!(stats.total_agents, 4);
1213        assert_eq!(stats.completed, 2);
1214        assert_eq!(stats.failed, 1);
1215        assert_eq!(stats.running, 1);
1216        assert!((stats.total_cost_usd - 0.11).abs() < 0.001);
1217    }
1218
1219    #[test]
1220    fn test_prune_by_ttl() {
1221        let (db, _dir) = make_test_db();
1222
1223        db.upsert_agent(&sample_agent(
1224            "11111111-1111-4114-a716-446655440000",
1225            AgentStatus::Completed,
1226            "2026-03-01T00:00:00Z",
1227            0.01,
1228        ))
1229        .unwrap();
1230
1231        db.upsert_agent(&sample_agent(
1232            "22222222-2222-4114-a716-446655440000",
1233            AgentStatus::Completed,
1234            "2026-06-01T00:00:00Z",
1235            0.02,
1236        ))
1237        .unwrap();
1238
1239        let config = AgentLogConfig {
1240            ttl_hours: 720,
1241            ..Default::default()
1242        };
1243        let pruned = db.prune(&config).unwrap();
1244        assert!(pruned > 0);
1245
1246        let result = db.query(&AgentListFilter::default()).unwrap();
1247        assert_eq!(result.total, 1);
1248        assert_eq!(
1249            result.items[0].id.to_string(),
1250            "22222222-2222-4114-a716-446655440000"
1251        );
1252    }
1253
1254    #[test]
1255    fn test_delete() {
1256        let (db, _dir) = make_test_db();
1257
1258        db.upsert_agent(&sample_agent(
1259            "550e8400-e29b-41d4-a716-446655440000",
1260            AgentStatus::Completed,
1261            "2026-06-01T00:00:00Z",
1262            0.01,
1263        ))
1264        .unwrap();
1265
1266        assert!(db.delete("550e8400-e29b-41d4-a716-446655440000").unwrap());
1267        assert!(!db.delete("nonexistent").unwrap());
1268
1269        let result = db.query(&AgentListFilter::default()).unwrap();
1270        assert_eq!(result.total, 0);
1271    }
1272
1273    #[test]
1274    fn test_get_tool_calls() {
1275        let (db, _dir) = make_test_db();
1276
1277        let mut agent = sample_agent(
1278            "550e8400-e29b-41d4-a716-446655440000",
1279            AgentStatus::Completed,
1280            "2026-06-01T00:00:00Z",
1281            0.01,
1282        );
1283        agent.tool_calls = vec![
1284            ToolCallRecord {
1285                tool: "bash".into(),
1286                input: "ls -la".into(),
1287                output: "total 42".into(),
1288                duration_ms: 150,
1289                is_error: false,
1290                tool_call_id: "call_1".into(),
1291                timestamp: Some(Utc::now()),
1292            },
1293            ToolCallRecord {
1294                tool: "read".into(),
1295                input: "file.rs".into(),
1296                output: "fn main()".into(),
1297                duration_ms: 5,
1298                is_error: false,
1299                tool_call_id: "call_2".into(),
1300                timestamp: Some(Utc::now()),
1301            },
1302        ];
1303        db.upsert_agent(&agent).unwrap();
1304
1305        let calls = db
1306            .get_tool_calls("550e8400-e29b-41d4-a716-446655440000")
1307            .unwrap();
1308        assert_eq!(calls.len(), 2);
1309        assert_eq!(calls[0].tool, "bash");
1310        assert_eq!(calls[1].tool, "read");
1311    }
1312}