Skip to main content

oxios_kernel/
agent_log_db.rs

1//! Agent history log — SQLite-backed query engine for past agent records.
2//!
3//! # Architecture
4//!
5//! Two-tier storage:
6//! - **Filesystem JSON** (`state/agents/<uuid>.json`): source of truth.
7//!   Human-readable, backup-friendly, rebuildable.
8//! - **SQLite** (`state/agent_log.db`): query index with indexes, FTS5.
9//!   Fast filtering, sorting, search, aggregation.
10//!
11//! SQLite DB is rebuildable from filesystem JSON at any time
12//! via [`AgentLogDb::reindex_all`].
13//!
14//! # Feature gate
15//!
16//! When `sqlite-memory` feature is disabled, all query operations
17//! fall back to filesystem-only scan mode. Degraded but functional.
18
19use std::path::Path;
20
21use anyhow::{Context, Result};
22use chrono::{DateTime, Utc};
23
24use crate::config::AgentLogConfig;
25use crate::state_store::StateStore;
26use crate::types::{AgentInfo, AgentStatus, ToolCallRecord};
27
28// ===========================================================================
29// Filter / Query Types
30// ===========================================================================
31
32/// Field to search against.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum SearchField {
35    All,
36    Name,
37    Error,
38    ToolName,
39    ToolOutput,
40}
41
42impl SearchField {
43    #[allow(dead_code)]
44    fn as_str(&self) -> &'static str {
45        match self {
46            Self::All => "all",
47            Self::Name => "name",
48            Self::Error => "error",
49            Self::ToolName => "tool_name",
50            Self::ToolOutput => "tool_output",
51        }
52    }
53
54    /// Lenient best-effort parser with a default fallback (unknown → All).
55    /// Not a `FromStr` impl because it is infallible by design.
56    #[allow(clippy::should_implement_trait)]
57    pub fn from_str(s: &str) -> Self {
58        match s {
59            "name" => Self::Name,
60            "error" => Self::Error,
61            "tool_name" => Self::ToolName,
62            "tool_output" => Self::ToolOutput,
63            _ => Self::All,
64        }
65    }
66}
67
68/// Sort field.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum SortBy {
71    CreatedAt,
72    Cost,
73    Duration,
74    Tokens,
75    Name,
76}
77
78impl SortBy {
79    pub fn as_str(&self) -> &'static str {
80        match self {
81            Self::CreatedAt => "created_at",
82            Self::Cost => "cost_usd",
83            Self::Duration => "duration_secs",
84            Self::Tokens => "tokens_total",
85            Self::Name => "name",
86        }
87    }
88
89    /// Lenient best-effort parser with a default fallback (unknown → CreatedAt).
90    #[allow(clippy::should_implement_trait)]
91    pub fn from_str(s: &str) -> Self {
92        match s {
93            "cost" => Self::Cost,
94            "duration" => Self::Duration,
95            "tokens" => Self::Tokens,
96            "name" => Self::Name,
97            _ => Self::CreatedAt,
98        }
99    }
100}
101
102/// Sort direction.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum SortDir {
105    Asc,
106    Desc,
107}
108
109impl SortDir {
110    pub fn as_str(&self) -> &'static str {
111        match self {
112            Self::Asc => "ASC",
113            Self::Desc => "DESC",
114        }
115    }
116
117    /// Lenient best-effort parser with a default fallback (unknown → Desc).
118    #[allow(clippy::should_implement_trait)]
119    pub fn from_str(s: &str) -> Self {
120        match s {
121            "asc" => Self::Asc,
122            _ => Self::Desc,
123        }
124    }
125}
126
127/// Full filter specification for querying agent history.
128#[derive(Debug, Clone)]
129pub struct AgentListFilter {
130    pub q: Option<String>,
131    pub search_field: SearchField,
132    pub status: Option<AgentStatusFilter>,
133    pub session_id: Option<String>,
134    pub project_id: Option<String>,
135    pub seed_id: Option<String>,
136    pub model_id: Option<String>,
137    pub tool: Option<String>,
138    pub has_error: Option<bool>,
139    pub date_from: Option<DateTime<Utc>>,
140    pub date_to: Option<DateTime<Utc>>,
141    pub cost_min: Option<f64>,
142    pub cost_max: Option<f64>,
143    pub tokens_min: Option<u64>,
144    pub tokens_max: Option<u64>,
145    pub duration_min: Option<u64>,
146    pub duration_max: Option<u64>,
147    pub sort_by: SortBy,
148    pub sort_dir: SortDir,
149    pub page: u32,
150    pub per_page: u32,
151}
152
153impl Default for AgentListFilter {
154    fn default() -> Self {
155        Self {
156            q: None,
157            search_field: SearchField::All,
158            status: None,
159            session_id: None,
160            project_id: None,
161            seed_id: None,
162            model_id: None,
163            tool: None,
164            has_error: None,
165            date_from: None,
166            date_to: None,
167            cost_min: None,
168            cost_max: None,
169            tokens_min: None,
170            tokens_max: None,
171            duration_min: None,
172            duration_max: None,
173            sort_by: SortBy::CreatedAt,
174            sort_dir: SortDir::Desc,
175            page: 1,
176            per_page: 50,
177        }
178    }
179}
180
181/// Single status value that can be used to filter.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub enum AgentStatusFilter {
184    Running,
185    Completed,
186    Failed,
187    Stopped,
188    Starting,
189    Idle,
190}
191
192impl AgentStatusFilter {
193    pub fn as_sql(&self) -> &'static str {
194        match self {
195            Self::Running => "running",
196            Self::Completed => "completed",
197            Self::Failed => "failed",
198            Self::Stopped => "stopped",
199            Self::Starting => "starting",
200            Self::Idle => "idle",
201        }
202    }
203
204    /// Lenient best-effort parser; unknown values map to `None`.
205    #[allow(clippy::should_implement_trait)]
206    pub fn from_str(s: &str) -> Option<Self> {
207        match s {
208            "running" => Some(Self::Running),
209            "completed" => Some(Self::Completed),
210            "failed" => Some(Self::Failed),
211            "stopped" => Some(Self::Stopped),
212            "starting" => Some(Self::Starting),
213            "idle" => Some(Self::Idle),
214            _ => None,
215        }
216    }
217}
218
219// ===========================================================================
220// Filtered stats
221// ===========================================================================
222
223/// Aggregated stats computed from a query result set.
224#[derive(Debug, Clone, Default)]
225pub struct FilteredStats {
226    pub total_cost_usd: f64,
227    pub total_tokens: u64,
228    pub avg_duration_secs: f64,
229    pub count_running: u64,
230    pub count_completed: u64,
231    pub count_failed: u64,
232}
233
234/// Global aggregate stats (unfiltered).
235#[derive(Debug, Clone, Default)]
236pub struct AgentStats {
237    pub total_agents: u64,
238    pub running: u64,
239    pub completed: u64,
240    pub failed: u64,
241    pub total_cost_usd: f64,
242    pub total_tokens: u64,
243    pub total_duration_secs: u64,
244    pub avg_duration_secs: f64,
245    pub avg_cost_usd: f64,
246    pub total_sessions: u64,
247    pub oldest_agent_at: Option<DateTime<Utc>>,
248    pub newest_agent_at: Option<DateTime<Utc>>,
249}
250
251/// Query result with pagination and filtered stats.
252#[derive(Debug, Clone)]
253pub struct QueryResult {
254    pub items: Vec<AgentInfo>,
255    pub total: u64,
256    pub page: u32,
257    pub per_page: u32,
258    pub total_pages: u32,
259    pub stats: FilteredStats,
260}
261
262// ===========================================================================
263// Rebuild report
264// ===========================================================================
265
266#[derive(Debug, Clone, Default)]
267pub struct RebuildReport {
268    pub reindexed: u64,
269    pub orphaned: u64,
270    pub errors: u64,
271}
272
273// ===========================================================================
274// AgentLogDb
275// ===========================================================================
276
277/// SQLite-backed agent history query engine.
278///
279/// Interior mutability via `parking_lot::Mutex` so all methods take `&self`,
280/// compatible with `Arc<AgentLogDb>` shared across tokio tasks.
281#[cfg(feature = "sqlite-memory")]
282pub struct AgentLogDb {
283    conn: parking_lot::Mutex<rusqlite::Connection>,
284}
285
286#[cfg(feature = "sqlite-memory")]
287impl AgentLogDb {
288    pub fn open(path: &Path) -> Result<Self> {
289        let conn = rusqlite::Connection::open(path)
290            .with_context(|| format!("Failed to open agent log database at {}", path.display()))?;
291        let db = Self {
292            conn: parking_lot::Mutex::new(conn),
293        };
294        db.migrate()?;
295        db.configure_wal()?;
296        Ok(db)
297    }
298
299    fn migrate(&self) -> Result<()> {
300        let conn = self.conn.lock();
301        conn.execute_batch(
302            "
303            CREATE TABLE IF NOT EXISTS agents (
304                id              TEXT PRIMARY KEY,
305                name            TEXT NOT NULL,
306                status          TEXT NOT NULL,
307                created_at      TEXT NOT NULL,
308                started_at      TEXT,
309                completed_at    TEXT,
310                session_id      TEXT,
311                seed_id         TEXT,
312                project_id      TEXT,
313                model_id        TEXT NOT NULL DEFAULT '',
314                error           TEXT,
315                steps_completed INTEGER NOT NULL DEFAULT 0,
316                steps_total     INTEGER,
317                tokens_input    INTEGER NOT NULL DEFAULT 0,
318                tokens_output   INTEGER NOT NULL DEFAULT 0,
319                cost_usd        REAL NOT NULL DEFAULT 0.0,
320                duration_secs   INTEGER
321            );
322
323            CREATE TABLE IF NOT EXISTS agent_tool_calls (
324                id          INTEGER PRIMARY KEY AUTOINCREMENT,
325                agent_id    TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
326                seq         INTEGER NOT NULL,
327                tool_name   TEXT NOT NULL,
328                input       TEXT NOT NULL DEFAULT '',
329                output      TEXT NOT NULL DEFAULT '',
330                duration_ms INTEGER NOT NULL DEFAULT 0,
331                is_error    INTEGER NOT NULL DEFAULT 0,
332                timestamp   TEXT,
333                tool_call_id TEXT NOT NULL DEFAULT ''
334            );
335
336            CREATE INDEX IF NOT EXISTS idx_agents_status_created ON agents(status, created_at DESC);
337            CREATE INDEX IF NOT EXISTS idx_agents_session     ON agents(session_id);
338            CREATE INDEX IF NOT EXISTS idx_agents_project     ON agents(project_id);
339            CREATE INDEX IF NOT EXISTS idx_agents_seed        ON agents(seed_id);
340            CREATE INDEX IF NOT EXISTS idx_agents_model       ON agents(model_id);
341            CREATE INDEX IF NOT EXISTS idx_agents_cost        ON agents(cost_usd);
342            CREATE INDEX IF NOT EXISTS idx_agents_duration    ON agents(duration_secs);
343            CREATE INDEX IF NOT EXISTS idx_agents_name        ON agents(name);
344            CREATE INDEX IF NOT EXISTS idx_tool_calls_agent   ON agent_tool_calls(agent_id, seq);
345            CREATE INDEX IF NOT EXISTS idx_tool_calls_name    ON agent_tool_calls(tool_name);
346
347            CREATE TABLE IF NOT EXISTS agent_log_meta (
348                key   TEXT PRIMARY KEY,
349                value TEXT NOT NULL
350            );
351        ",
352        )
353        .context("Failed to run agent log SQLite migration")?;
354
355        // FTS5 virtual table (separately, catch errors if FTS5 not available)
356        let _ = conn.execute_batch(
357            "CREATE VIRTUAL TABLE IF NOT EXISTS agent_tool_calls_fts USING fts5(
358                tool_name, input, output,
359                content='agent_tool_calls',
360                content_rowid='id'
361            );",
362        );
363
364        Ok(())
365    }
366
367    fn configure_wal(&self) -> Result<()> {
368        let conn = self.conn.lock();
369        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")
370            .context("Failed to configure WAL mode")
371    }
372
373    // ── Upsert ──────────────────────────────────────────────────────
374
375    pub fn upsert_agent(&self, info: &AgentInfo) -> Result<()> {
376        let mut conn = self.conn.lock();
377        let tx = conn
378            .transaction()
379            .context("Failed to begin agent upsert transaction")?;
380
381        let duration_secs = match (info.started_at, info.completed_at) {
382            (Some(start), Some(end)) => Some((end - start).num_seconds().max(0)),
383            _ => None,
384        };
385
386        tx.execute(
387            "INSERT INTO agents (id, name, status, created_at, started_at, completed_at,
388                 session_id, seed_id, project_id, model_id, error,
389                 steps_completed, steps_total, tokens_input, tokens_output, cost_usd, duration_secs)
390             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)
391             ON CONFLICT(id) DO UPDATE SET
392                 status=excluded.status,
393                 completed_at=excluded.completed_at,
394                 error=excluded.error,
395                 steps_completed=excluded.steps_completed,
396                 steps_total=excluded.steps_total,
397                 tokens_input=excluded.tokens_input,
398                 tokens_output=excluded.tokens_output,
399                 cost_usd=excluded.cost_usd,
400                 duration_secs=excluded.duration_secs",
401            rusqlite::params![
402                info.id.to_string(),
403                info.name,
404                info.status.to_string(),
405                info.created_at.to_rfc3339(),
406                info.started_at.map(|t| t.to_rfc3339()),
407                info.completed_at.map(|t| t.to_rfc3339()),
408                info.session_id,
409                info.seed_id.map(|s| s.to_string()),
410                info.project_id.map(|p| p.to_string()),
411                info.model_id,
412                info.error,
413                info.steps_completed as i64,
414                info.steps_total.map(|s| s as i64),
415                info.tokens_input as i64,
416                info.tokens_output as i64,
417                info.cost_usd,
418                duration_secs,
419            ],
420        )?;
421
422        // Replace tool calls
423        tx.execute(
424            "DELETE FROM agent_tool_calls WHERE agent_id = ?1",
425            rusqlite::params![info.id.to_string()],
426        )?;
427
428        for (i, tc) in info.tool_calls.iter().enumerate() {
429            tx.execute(
430                "INSERT INTO agent_tool_calls (agent_id, seq, tool_name, input, output,
431                     duration_ms, is_error, timestamp, tool_call_id)
432                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
433                rusqlite::params![
434                    info.id.to_string(),
435                    i as i64,
436                    tc.tool,
437                    tc.input,
438                    tc.output,
439                    tc.duration_ms as i64,
440                    tc.is_error as i64,
441                    tc.timestamp.map(|t| t.to_rfc3339()),
442                    tc.tool_call_id,
443                ],
444            )?;
445        }
446
447        tx.commit().context("Failed to commit agent upsert")?;
448
449        // Rebuild FTS
450        let _ = conn.execute_batch(
451            "INSERT INTO agent_tool_calls_fts(agent_tool_calls_fts) VALUES('rebuild');",
452        );
453
454        Ok(())
455    }
456
457    // ── Query ───────────────────────────────────────────────────────
458
459    pub fn query(&self, filter: &AgentListFilter) -> Result<QueryResult> {
460        let (where_clause, params) = self.build_where(filter);
461        let offset = ((filter.page.max(1) - 1) * filter.per_page) as i64;
462        let limit = filter.per_page.min(200) as i64;
463
464        let conn = self.conn.lock();
465
466        // Count
467        let count_sql = format!("SELECT COUNT(*) FROM agents WHERE {}", where_clause);
468        let total: u64 = conn
469            .query_row(
470                &count_sql,
471                rusqlite::params_from_iter(params.iter()),
472                |row| row.get(0),
473            )
474            .context("Failed to count agents")?;
475
476        // Data
477        let safe_sort_col = filter.sort_by.as_str();
478        let safe_sort_dir = filter.sort_dir.as_str();
479        let param_count = params.len();
480        let data_sql = format!(
481            "SELECT * FROM agents WHERE {} ORDER BY {} {} LIMIT ?{} OFFSET ?{}",
482            where_clause,
483            safe_sort_col,
484            safe_sort_dir,
485            param_count + 1,
486            param_count + 2,
487        );
488
489        let mut stmt = conn.prepare(&data_sql)?;
490        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = params
491            .into_iter()
492            .map(|p| -> Box<dyn rusqlite::types::ToSql> { p })
493            .collect();
494        all_params.push(Box::new(limit));
495        all_params.push(Box::new(offset));
496
497        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
498            all_params.iter().map(|p| p.as_ref()).collect();
499
500        let items: Vec<AgentInfo> = stmt
501            .query_map(param_refs.as_slice(), Self::row_to_agent)?
502            .collect::<Result<Vec<_>, _>>()
503            .context("Failed to collect agent query results")?;
504
505        // Stats for filtered set
506        let stats = self.filtered_stats_inner(filter, &conn)?;
507
508        let total_pages = if total == 0 {
509            1
510        } else {
511            ((total as f64) / filter.per_page as f64).ceil() as u32
512        };
513
514        Ok(QueryResult {
515            items,
516            total,
517            page: filter.page,
518            per_page: filter.per_page,
519            total_pages,
520            stats,
521        })
522    }
523
524    pub fn stats(&self) -> Result<AgentStats> {
525        let conn = self.conn.lock();
526
527        let mut s = AgentStats::default();
528
529        let row = conn
530            .query_row(
531                "SELECT
532                    COUNT(*) as total,
533                    COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
534                    COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
535                    COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0),
536                    COALESCE(SUM(cost_usd), 0.0),
537                    COALESCE(SUM(tokens_input + tokens_output), 0),
538                    COALESCE(SUM(duration_secs), 0),
539                    COALESCE(AVG(duration_secs), 0.0),
540                    COALESCE(AVG(cost_usd), 0.0),
541                    COUNT(DISTINCT session_id),
542                    MIN(created_at),
543                    MAX(created_at)
544                 FROM agents",
545                [],
546                |row| {
547                    Ok((
548                        row.get::<_, i64>(0)? as u64,
549                        row.get::<_, i64>(1)? as u64,
550                        row.get::<_, i64>(2)? as u64,
551                        row.get::<_, i64>(3)? as u64,
552                        row.get::<_, f64>(4)?,
553                        row.get::<_, i64>(5)? as u64,
554                        row.get::<_, i64>(6)? as u64,
555                        row.get::<_, f64>(7)?,
556                        row.get::<_, f64>(8)?,
557                        row.get::<_, i64>(9)? as u64,
558                        row.get::<_, Option<String>>(10)?,
559                        row.get::<_, Option<String>>(11)?,
560                    ))
561                },
562            )
563            .context("Failed to query agent stats")?;
564
565        s.total_agents = row.0;
566        s.running = row.1;
567        s.completed = row.2;
568        s.failed = row.3;
569        s.total_cost_usd = row.4;
570        s.total_tokens = row.5;
571        s.total_duration_secs = row.6;
572        s.avg_duration_secs = if s.total_agents > 0 { row.7 } else { 0.0 };
573        s.avg_cost_usd = if s.total_agents > 0 { row.8 } else { 0.0 };
574        s.total_sessions = row.9;
575        s.oldest_agent_at = row
576            .10
577            .as_deref()
578            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
579            .map(|dt| dt.with_timezone(&Utc));
580        s.newest_agent_at = row
581            .11
582            .as_deref()
583            .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
584            .map(|dt| dt.with_timezone(&Utc));
585
586        Ok(s)
587    }
588
589    pub fn get(&self, id: &str) -> Result<Option<AgentInfo>> {
590        let conn = self.conn.lock();
591        let mut stmt = conn
592            .prepare("SELECT * FROM agents WHERE id = ?1")
593            .context("Failed to prepare agent get statement")?;
594
595        let mut rows = stmt
596            .query_map(rusqlite::params![id], Self::row_to_agent)
597            .context("Failed to query agent by id")?;
598
599        match rows.next() {
600            Some(Ok(mut agent)) => {
601                agent.tool_calls = Self::get_tool_calls_inner(&conn, id)?;
602                Ok(Some(agent))
603            }
604            Some(Err(e)) => Err(e.into()),
605            None => Ok(None),
606        }
607    }
608
609    pub fn get_tool_calls(&self, agent_id: &str) -> Result<Vec<ToolCallRecord>> {
610        let conn = self.conn.lock();
611        Self::get_tool_calls_inner(&conn, agent_id)
612    }
613
614    pub fn delete(&self, id: &str) -> Result<bool> {
615        let conn = self.conn.lock();
616        let changes = conn
617            .execute("DELETE FROM agents WHERE id = ?1", rusqlite::params![id])
618            .context("Failed to delete agent")?;
619        Ok(changes > 0)
620    }
621
622    // ── Pruning ─────────────────────────────────────────────────────
623
624    pub fn prune(&self, config: &AgentLogConfig) -> Result<usize> {
625        let mut pruned = 0usize;
626
627        if config.ttl_hours > 0 || config.max_entries > 0 {
628            let conn = self.conn.lock();
629
630            // 1. TTL-based
631            if config.ttl_hours > 0 {
632                let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
633                let cutoff_str = cutoff.to_rfc3339();
634                let deleted = conn
635                    .execute(
636                        "DELETE FROM agents WHERE created_at < ?1",
637                        rusqlite::params![cutoff_str],
638                    )
639                    .context("Failed to prune agents by TTL")?;
640                pruned += deleted;
641            }
642
643            // 2. Count-based
644            if config.max_entries > 0 {
645                let count: i64 = conn
646                    .query_row("SELECT COUNT(*) FROM agents", [], |row| row.get(0))
647                    .context("Failed to count agents for pruning")?;
648
649                if count > config.max_entries as i64 {
650                    let excess = count - config.max_entries as i64;
651                    let to_delete = excess.min(config.prune_batch_size as i64);
652                    conn.execute(
653                        "DELETE FROM agents WHERE id IN (
654                            SELECT id FROM agents ORDER BY created_at ASC LIMIT ?1
655                        )",
656                        rusqlite::params![to_delete],
657                    )?;
658                    pruned += to_delete as usize;
659                }
660            }
661        }
662
663        if pruned > 0 {
664            tracing::info!(pruned = pruned, "Agent log SQLite pruning completed");
665        }
666
667        Ok(pruned)
668    }
669
670    // ── Recovery ────────────────────────────────────────────────────
671
672    pub async fn reindex_all(&self, state_store: &StateStore) -> Result<RebuildReport> {
673        let agent_names = state_store
674            .list_category("agents")
675            .await
676            .unwrap_or_default();
677        let mut report = RebuildReport::default();
678
679        {
680            let conn = self.conn.lock();
681            conn.execute_batch("DELETE FROM agent_tool_calls; DELETE FROM agents;")
682                .context("Failed to clear agent tables for reindex")?;
683        }
684
685        for name in &agent_names {
686            match state_store.load_json::<AgentInfo>("agents", name).await {
687                Ok(Some(agent)) => {
688                    if let Err(e) = self.upsert_agent(&agent) {
689                        tracing::warn!(agent_id = %name, error = %e, "Failed to reindex agent");
690                        report.errors += 1;
691                    } else {
692                        report.reindexed += 1;
693                    }
694                }
695                _ => {
696                    report.errors += 1;
697                }
698            }
699        }
700
701        Ok(report)
702    }
703
704    // ── Internal Helpers ────────────────────────────────────────────
705
706    fn row_to_agent(row: &rusqlite::Row) -> rusqlite::Result<AgentInfo> {
707        let id_str: String = row.get("id")?;
708        let seed_id_str: Option<String> = row.get("seed_id")?;
709        let project_id_str: Option<String> = row.get("project_id")?;
710        let created_at_str: String = row.get("created_at")?;
711        let started_at_str: Option<String> = row.get("started_at")?;
712        let completed_at_str: Option<String> = row.get("completed_at")?;
713
714        Ok(AgentInfo {
715            id: uuid::Uuid::parse_str(&id_str).unwrap_or_default(),
716            name: row.get("name")?,
717            status: Self::parse_status(&row.get::<_, String>("status")?),
718            created_at: DateTime::parse_from_rfc3339(&created_at_str)
719                .map(|dt| dt.with_timezone(&Utc))
720                .unwrap_or_else(|_| Utc::now()),
721            seed_id: seed_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
722            project_id: project_id_str.and_then(|s| uuid::Uuid::parse_str(&s).ok()),
723            started_at: started_at_str
724                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
725                .map(|dt| dt.with_timezone(&Utc)),
726            completed_at: completed_at_str
727                .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
728                .map(|dt| dt.with_timezone(&Utc)),
729            error: row.get("error")?,
730            steps_completed: row.get::<_, i64>("steps_completed")? as usize,
731            steps_total: row
732                .get::<_, Option<i64>>("steps_total")?
733                .map(|v| v as usize),
734            tool_calls: vec![],
735            tokens_input: row.get::<_, i64>("tokens_input")? as u64,
736            tokens_output: row.get::<_, i64>("tokens_output")? as u64,
737            cost_usd: row.get("cost_usd")?,
738            model_id: row.get("model_id")?,
739            session_id: row.get("session_id")?,
740        })
741    }
742
743    fn get_tool_calls_inner(
744        conn: &rusqlite::Connection,
745        agent_id: &str,
746    ) -> Result<Vec<ToolCallRecord>> {
747        let mut stmt = conn
748            .prepare(
749                "SELECT tool_name, input, output, duration_ms, is_error, timestamp, tool_call_id
750                 FROM agent_tool_calls WHERE agent_id = ?1 ORDER BY seq",
751            )
752            .context("Failed to prepare tool calls statement")?;
753
754        let calls = stmt
755            .query_map(rusqlite::params![agent_id], |row| {
756                let ts: Option<String> = row.get(5)?;
757                Ok(ToolCallRecord {
758                    tool: row.get(0)?,
759                    input: row.get(1)?,
760                    output: row.get(2)?,
761                    duration_ms: row.get::<_, i64>(3)? as u64,
762                    is_error: row.get::<_, i64>(4)? != 0,
763                    tool_call_id: row.get(6)?,
764                    timestamp: ts
765                        .and_then(|s| DateTime::parse_from_rfc3339(&s as &str).ok())
766                        .map(|dt| dt.with_timezone(&Utc)),
767                })
768            })
769            .context("Failed to query tool calls")?
770            .collect::<Result<Vec<_>, _>>()
771            .context("Failed to collect tool calls")?;
772
773        Ok(calls)
774    }
775
776    fn parse_status(s: &str) -> AgentStatus {
777        match s {
778            "starting" => AgentStatus::Starting,
779            "running" => AgentStatus::Running,
780            "idle" => AgentStatus::Idle,
781            "stopped" => AgentStatus::Stopped,
782            "failed" => AgentStatus::Failed,
783            "completed" => AgentStatus::Completed,
784            _ => AgentStatus::Idle,
785        }
786    }
787
788    /// Build WHERE clause and params from the filter.
789    fn build_where(
790        &self,
791        filter: &AgentListFilter,
792    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
793        let mut conditions: Vec<String> = Vec::new();
794        let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
795
796        // Status filter
797        if let Some(status) = filter.status {
798            let idx = params.len() + 1;
799            conditions.push(format!("status = ?{}", idx));
800            params.push(Box::new(status.as_sql().to_string()));
801        }
802
803        // Date range
804        if let Some(from) = filter.date_from {
805            let idx = params.len() + 1;
806            conditions.push(format!("created_at >= ?{}", idx));
807            params.push(Box::new(from.to_rfc3339()));
808        }
809        if let Some(to) = filter.date_to {
810            let idx = params.len() + 1;
811            conditions.push(format!("created_at <= ?{}", idx));
812            params.push(Box::new(to.to_rfc3339()));
813        }
814
815        // Session / project / seed
816        if let Some(ref sid) = filter.session_id {
817            let idx = params.len() + 1;
818            conditions.push(format!("session_id = ?{}", idx));
819            params.push(Box::new(sid.clone()));
820        }
821        if let Some(ref pid) = filter.project_id {
822            let idx = params.len() + 1;
823            conditions.push(format!("project_id = ?{}", idx));
824            params.push(Box::new(pid.clone()));
825        }
826        if let Some(ref sid) = filter.seed_id {
827            let idx = params.len() + 1;
828            conditions.push(format!("seed_id = ?{}", idx));
829            params.push(Box::new(sid.clone()));
830        }
831
832        // Model (substring)
833        if let Some(ref model) = filter.model_id {
834            let idx = params.len() + 1;
835            conditions.push(format!("model_id LIKE ?{}", idx));
836            params.push(Box::new(format!("%{}%", model)));
837        }
838
839        // Cost range
840        if let Some(min) = filter.cost_min {
841            let idx = params.len() + 1;
842            conditions.push(format!("cost_usd >= ?{}", idx));
843            params.push(Box::new(min));
844        }
845        if let Some(max) = filter.cost_max {
846            let idx = params.len() + 1;
847            conditions.push(format!("cost_usd <= ?{}", idx));
848            params.push(Box::new(max));
849        }
850
851        // Tokens range
852        if filter.tokens_min.is_some() || filter.tokens_max.is_some() {
853            let total_expr = "(tokens_input + tokens_output)";
854            if let Some(min) = filter.tokens_min {
855                let idx = params.len() + 1;
856                conditions.push(format!("{} >= ?{}", total_expr, idx));
857                params.push(Box::new(min as i64));
858            }
859            if let Some(max) = filter.tokens_max {
860                let idx = params.len() + 1;
861                conditions.push(format!("{} <= ?{}", total_expr, idx));
862                params.push(Box::new(max as i64));
863            }
864        }
865
866        // Duration range
867        if let Some(min) = filter.duration_min {
868            let idx = params.len() + 1;
869            conditions.push(format!("duration_secs >= ?{}", idx));
870            params.push(Box::new(min as i64));
871        }
872        if let Some(max) = filter.duration_max {
873            let idx = params.len() + 1;
874            conditions.push(format!("duration_secs <= ?{}", idx));
875            params.push(Box::new(max as i64));
876        }
877
878        // Error filter
879        if let Some(has_err) = filter.has_error {
880            if has_err {
881                conditions.push("error IS NOT NULL AND error != ''".to_string());
882            } else {
883                conditions.push("(error IS NULL OR error = '')".to_string());
884            }
885        }
886
887        // Tool filter (JOIN subquery)
888        if let Some(ref tool) = filter.tool {
889            let idx = params.len() + 1;
890            conditions.push(format!(
891                "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls WHERE tool_name LIKE ?{})",
892                idx
893            ));
894            params.push(Box::new(format!("%{}%", tool)));
895        }
896
897        // Full-text search via FTS5
898        let has_fts = matches!(
899            filter.search_field,
900            SearchField::All | SearchField::ToolName | SearchField::ToolOutput
901        ) && filter.q.is_some();
902
903        if has_fts {
904            if let Some(ref q) = filter.q {
905                let escaped = q.replace('\'', "''");
906                conditions.push(format!(
907                    "id IN (SELECT DISTINCT agent_id FROM agent_tool_calls_fts WHERE agent_tool_calls_fts MATCH '{}')",
908                    escaped
909                ));
910            }
911        } else if let Some(ref q) = filter.q {
912            match filter.search_field {
913                SearchField::Name => {
914                    let idx = params.len() + 1;
915                    conditions.push(format!("name LIKE ?{}", idx));
916                    params.push(Box::new(format!("%{}%", q)));
917                }
918                SearchField::Error => {
919                    let idx = params.len() + 1;
920                    conditions.push(format!("error LIKE ?{}", idx));
921                    params.push(Box::new(format!("%{}%", q)));
922                }
923                _ => {
924                    let idx1 = params.len() + 1;
925                    let idx2 = params.len() + 2;
926                    conditions.push(format!("(name LIKE ?{} OR error LIKE ?{})", idx1, idx2));
927                    params.push(Box::new(format!("%{}%", q)));
928                    params.push(Box::new(format!("%{}%", q)));
929                }
930            }
931        }
932
933        if conditions.is_empty() {
934            ("1=1".to_string(), params)
935        } else {
936            (conditions.join(" AND "), params)
937        }
938    }
939
940    /// Aggregate stats constrained to a filter (acquires the connection lock).
941    /// Reserved for filtered dashboard endpoints; currently `list()` reuses the
942    /// inner helper directly to avoid a double lock.
943    #[allow(dead_code)]
944    fn filtered_stats(&self, filter: &AgentListFilter) -> Result<FilteredStats> {
945        let conn = self.conn.lock();
946        self.filtered_stats_inner(filter, &conn)
947    }
948
949    fn filtered_stats_inner(
950        &self,
951        filter: &AgentListFilter,
952        conn: &rusqlite::Connection,
953    ) -> Result<FilteredStats> {
954        let (where_clause, params) = self.build_where(filter);
955
956        let sql = format!(
957            "SELECT
958                COALESCE(SUM(cost_usd), 0.0),
959                COALESCE(SUM(tokens_input + tokens_output), 0),
960                COALESCE(AVG(duration_secs), 0.0),
961                COALESCE(SUM(CASE WHEN status='running' THEN 1 ELSE 0 END), 0),
962                COALESCE(SUM(CASE WHEN status IN ('completed','stopped') THEN 1 ELSE 0 END), 0),
963                COALESCE(SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END), 0)
964             FROM agents WHERE {}",
965            where_clause
966        );
967
968        let mut stmt = conn.prepare(&sql)?;
969        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
970            params.iter().map(|p| p.as_ref()).collect();
971        let row = stmt
972            .query_row(param_refs.as_slice(), |row| {
973                Ok(FilteredStats {
974                    total_cost_usd: row.get(0)?,
975                    total_tokens: row.get::<_, i64>(1)? as u64,
976                    avg_duration_secs: row.get(2)?,
977                    count_running: row.get::<_, i64>(3)? as u64,
978                    count_completed: row.get::<_, i64>(4)? as u64,
979                    count_failed: row.get::<_, i64>(5)? as u64,
980                })
981            })
982            .context("Failed to compute filtered stats")?;
983
984        Ok(row)
985    }
986}
987
988// ===========================================================================
989// Tests
990// ===========================================================================
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995    use crate::types::AgentStatus;
996
997    fn make_test_db() -> (AgentLogDb, tempfile::TempDir) {
998        let dir = tempfile::tempdir().unwrap();
999        let path = dir.path().join("agent_log.db");
1000        let db = AgentLogDb::open(&path).unwrap();
1001        (db, dir)
1002    }
1003
1004    fn sample_agent(id: &str, status: AgentStatus, created: &str, cost: f64) -> AgentInfo {
1005        AgentInfo {
1006            id: uuid::Uuid::parse_str(id).unwrap(),
1007            name: format!("test-agent-{}", &id[..8]),
1008            status,
1009            created_at: DateTime::parse_from_rfc3339(created)
1010                .map(|dt| dt.with_timezone(&Utc))
1011                .unwrap(),
1012            seed_id: None,
1013            project_id: None,
1014            started_at: None,
1015            completed_at: None,
1016            error: None,
1017            steps_completed: 5,
1018            steps_total: Some(10),
1019            tool_calls: vec![],
1020            tokens_input: 1000,
1021            tokens_output: 500,
1022            cost_usd: cost,
1023            model_id: "anthropic/claude-sonnet-4".into(),
1024            session_id: None,
1025        }
1026    }
1027
1028    #[test]
1029    fn test_upsert_and_query() {
1030        let (db, _dir) = make_test_db();
1031
1032        let agent = sample_agent(
1033            "550e8400-e29b-41d4-a716-446655440000",
1034            AgentStatus::Completed,
1035            "2026-06-01T00:00:00Z",
1036            0.05,
1037        );
1038
1039        db.upsert_agent(&agent).unwrap();
1040
1041        let filter = AgentListFilter::default();
1042        let result = db.query(&filter).unwrap();
1043        assert_eq!(result.total, 1);
1044        assert_eq!(result.items[0].name, "test-agent-550e8400");
1045        assert_eq!(result.items[0].status, AgentStatus::Completed);
1046    }
1047
1048    #[test]
1049    fn test_filter_by_status() {
1050        let (db, _dir) = make_test_db();
1051
1052        for i in 0..5 {
1053            let status = if i % 2 == 0 {
1054                AgentStatus::Completed
1055            } else {
1056                AgentStatus::Failed
1057            };
1058            db.upsert_agent(&sample_agent(
1059                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1060                status,
1061                "2026-06-01T00:00:00Z",
1062                0.01,
1063            ))
1064            .unwrap();
1065        }
1066
1067        let filter = AgentListFilter {
1068            status: Some(AgentStatusFilter::Failed),
1069            ..Default::default()
1070        };
1071        let result = db.query(&filter).unwrap();
1072        assert_eq!(result.total, 2);
1073    }
1074
1075    #[test]
1076    fn test_filter_by_date_range() {
1077        let (db, _dir) = make_test_db();
1078
1079        for (i, (created, cost)) in [
1080            ("2026-06-01T00:00:00Z", 0.01),
1081            ("2026-06-10T00:00:00Z", 0.02),
1082            ("2026-06-20T00:00:00Z", 0.03),
1083        ]
1084        .iter()
1085        .enumerate()
1086        {
1087            db.upsert_agent(&sample_agent(
1088                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1089                AgentStatus::Completed,
1090                created,
1091                *cost,
1092            ))
1093            .unwrap();
1094        }
1095
1096        let filter = AgentListFilter {
1097            date_from: Some(
1098                DateTime::parse_from_rfc3339("2026-06-05T00:00:00Z")
1099                    .map(|dt| dt.with_timezone(&Utc))
1100                    .unwrap(),
1101            ),
1102            date_to: Some(
1103                DateTime::parse_from_rfc3339("2026-06-15T00:00:00Z")
1104                    .map(|dt| dt.with_timezone(&Utc))
1105                    .unwrap(),
1106            ),
1107            ..Default::default()
1108        };
1109        let result = db.query(&filter).unwrap();
1110        assert_eq!(result.total, 1);
1111    }
1112
1113    #[test]
1114    fn test_search_by_name() {
1115        let (db, _dir) = make_test_db();
1116
1117        let mut agent = sample_agent(
1118            "550e8400-e29b-41d4-a716-446655440000",
1119            AgentStatus::Completed,
1120            "2026-06-01T00:00:00Z",
1121            0.01,
1122        );
1123        agent.name = "Refactor authentication module".into();
1124        db.upsert_agent(&agent).unwrap();
1125
1126        let mut agent2 = sample_agent(
1127            "550e8400-e29b-41d4-a716-446655440001",
1128            AgentStatus::Failed,
1129            "2026-06-02T00:00:00Z",
1130            0.02,
1131        );
1132        agent2.name = "Fix build error".into();
1133        db.upsert_agent(&agent2).unwrap();
1134
1135        let filter = AgentListFilter {
1136            q: Some("Refactor".into()),
1137            search_field: SearchField::Name,
1138            ..Default::default()
1139        };
1140        let result = db.query(&filter).unwrap();
1141        assert_eq!(result.total, 1);
1142        assert!(result.items[0].name.contains("Refactor"));
1143    }
1144
1145    #[test]
1146    fn test_sorting() {
1147        let (db, _dir) = make_test_db();
1148
1149        for (i, cost) in [0.10, 0.01, 0.50].iter().enumerate() {
1150            db.upsert_agent(&sample_agent(
1151                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1152                AgentStatus::Completed,
1153                "2026-06-01T00:00:00Z",
1154                *cost,
1155            ))
1156            .unwrap();
1157        }
1158
1159        let filter = AgentListFilter {
1160            sort_by: SortBy::Cost,
1161            sort_dir: SortDir::Desc,
1162            ..Default::default()
1163        };
1164        let result = db.query(&filter).unwrap();
1165        assert_eq!(result.items[0].cost_usd, 0.50);
1166        assert_eq!(result.items[1].cost_usd, 0.10);
1167        assert_eq!(result.items[2].cost_usd, 0.01);
1168    }
1169
1170    #[test]
1171    fn test_pagination() {
1172        let (db, _dir) = make_test_db();
1173
1174        for i in 0..10 {
1175            db.upsert_agent(&sample_agent(
1176                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1177                AgentStatus::Completed,
1178                &format!("2026-06-{:02}T00:00:00Z", i + 1),
1179                0.01,
1180            ))
1181            .unwrap();
1182        }
1183
1184        let mut filter = AgentListFilter {
1185            per_page: 3,
1186            page: 1,
1187            ..Default::default()
1188        };
1189        let result = db.query(&filter).unwrap();
1190        assert_eq!(result.items.len(), 3);
1191        assert_eq!(result.total_pages, 4);
1192
1193        filter.page = 2;
1194        let result = db.query(&filter).unwrap();
1195        assert_eq!(result.items.len(), 3);
1196    }
1197
1198    #[test]
1199    fn test_stats() {
1200        let (db, _dir) = make_test_db();
1201
1202        for (i, (status, cost)) in [
1203            (AgentStatus::Completed, 0.05),
1204            (AgentStatus::Completed, 0.03),
1205            (AgentStatus::Failed, 0.01),
1206            (AgentStatus::Running, 0.02),
1207        ]
1208        .iter()
1209        .enumerate()
1210        {
1211            db.upsert_agent(&sample_agent(
1212                &format!("550e8400-e29b-41d4-a716-44665544000{}", i),
1213                *status,
1214                "2026-06-01T00:00:00Z",
1215                *cost,
1216            ))
1217            .unwrap();
1218        }
1219
1220        let stats = db.stats().unwrap();
1221        assert_eq!(stats.total_agents, 4);
1222        assert_eq!(stats.completed, 2);
1223        assert_eq!(stats.failed, 1);
1224        assert_eq!(stats.running, 1);
1225        assert!((stats.total_cost_usd - 0.11).abs() < 0.001);
1226    }
1227
1228    #[test]
1229    fn test_prune_by_ttl() {
1230        let (db, _dir) = make_test_db();
1231
1232        db.upsert_agent(&sample_agent(
1233            "11111111-1111-4114-a716-446655440000",
1234            AgentStatus::Completed,
1235            "2026-03-01T00:00:00Z",
1236            0.01,
1237        ))
1238        .unwrap();
1239
1240        db.upsert_agent(&sample_agent(
1241            "22222222-2222-4114-a716-446655440000",
1242            AgentStatus::Completed,
1243            "2026-06-01T00:00:00Z",
1244            0.02,
1245        ))
1246        .unwrap();
1247
1248        let config = AgentLogConfig {
1249            ttl_hours: 720,
1250            ..Default::default()
1251        };
1252        let pruned = db.prune(&config).unwrap();
1253        assert!(pruned > 0);
1254
1255        let result = db.query(&AgentListFilter::default()).unwrap();
1256        assert_eq!(result.total, 1);
1257        assert_eq!(
1258            result.items[0].id.to_string(),
1259            "22222222-2222-4114-a716-446655440000"
1260        );
1261    }
1262
1263    #[test]
1264    fn test_delete() {
1265        let (db, _dir) = make_test_db();
1266
1267        db.upsert_agent(&sample_agent(
1268            "550e8400-e29b-41d4-a716-446655440000",
1269            AgentStatus::Completed,
1270            "2026-06-01T00:00:00Z",
1271            0.01,
1272        ))
1273        .unwrap();
1274
1275        assert!(db.delete("550e8400-e29b-41d4-a716-446655440000").unwrap());
1276        assert!(!db.delete("nonexistent").unwrap());
1277
1278        let result = db.query(&AgentListFilter::default()).unwrap();
1279        assert_eq!(result.total, 0);
1280    }
1281
1282    #[test]
1283    fn test_get_tool_calls() {
1284        let (db, _dir) = make_test_db();
1285
1286        let mut agent = sample_agent(
1287            "550e8400-e29b-41d4-a716-446655440000",
1288            AgentStatus::Completed,
1289            "2026-06-01T00:00:00Z",
1290            0.01,
1291        );
1292        agent.tool_calls = vec![
1293            ToolCallRecord {
1294                tool: "bash".into(),
1295                input: "ls -la".into(),
1296                output: "total 42".into(),
1297                duration_ms: 150,
1298                is_error: false,
1299                tool_call_id: "call_1".into(),
1300                timestamp: Some(Utc::now()),
1301            },
1302            ToolCallRecord {
1303                tool: "read".into(),
1304                input: "file.rs".into(),
1305                output: "fn main()".into(),
1306                duration_ms: 5,
1307                is_error: false,
1308                tool_call_id: "call_2".into(),
1309                timestamp: Some(Utc::now()),
1310            },
1311        ];
1312        db.upsert_agent(&agent).unwrap();
1313
1314        let calls = db
1315            .get_tool_calls("550e8400-e29b-41d4-a716-446655440000")
1316            .unwrap();
1317        assert_eq!(calls.len(), 2);
1318        assert_eq!(calls[0].tool, "bash");
1319        assert_eq!(calls[1].tool, "read");
1320    }
1321}