Skip to main content

batty_cli/team/
telemetry_db.rs

1//! SQLite-backed telemetry database for agent performance tracking.
2//!
3//! Stores events, per-agent metrics, per-task metrics, and session summaries
4//! in `.batty/telemetry.db`. All tables use `CREATE TABLE IF NOT EXISTS` —
5//! no migration framework needed.
6
7use std::path::Path;
8
9use anyhow::{Context, Result};
10use rusqlite::{Connection, params};
11
12use super::events::TeamEvent;
13
14/// Database file name under `.batty/`.
15const DB_FILENAME: &str = "telemetry.db";
16
17/// Open or create the telemetry database, initializing the schema.
18pub fn open(project_root: &Path) -> Result<Connection> {
19    let db_path = project_root.join(".batty").join(DB_FILENAME);
20    let conn = Connection::open(&db_path)
21        .with_context(|| format!("failed to open telemetry db at {}", db_path.display()))?;
22
23    // WAL mode for better concurrent read/write performance.
24    conn.pragma_update(None, "journal_mode", "WAL")?;
25
26    init_schema(&conn)?;
27    Ok(conn)
28}
29
30/// Open an in-memory database (for tests).
31#[cfg(test)]
32pub fn open_in_memory() -> Result<Connection> {
33    let conn = Connection::open_in_memory()?;
34    init_schema(&conn)?;
35    Ok(conn)
36}
37
38fn init_schema(conn: &Connection) -> Result<()> {
39    conn.execute_batch(
40        "
41        CREATE TABLE IF NOT EXISTS events (
42            id          INTEGER PRIMARY KEY AUTOINCREMENT,
43            timestamp   INTEGER NOT NULL,
44            event_type  TEXT NOT NULL,
45            role        TEXT,
46            task_id     TEXT,
47            payload     TEXT NOT NULL
48        );
49
50        CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
51        CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
52        CREATE INDEX IF NOT EXISTS idx_events_role ON events(role);
53
54        CREATE TABLE IF NOT EXISTS agent_metrics (
55            role            TEXT PRIMARY KEY,
56            completions     INTEGER NOT NULL DEFAULT 0,
57            failures        INTEGER NOT NULL DEFAULT 0,
58            restarts        INTEGER NOT NULL DEFAULT 0,
59            total_cycle_secs INTEGER NOT NULL DEFAULT 0,
60            idle_polls      INTEGER NOT NULL DEFAULT 0,
61            working_polls   INTEGER NOT NULL DEFAULT 0
62        );
63
64        CREATE TABLE IF NOT EXISTS task_metrics (
65            task_id          TEXT PRIMARY KEY,
66            started_at       INTEGER,
67            completed_at     INTEGER,
68            retries          INTEGER NOT NULL DEFAULT 0,
69            escalations      INTEGER NOT NULL DEFAULT 0,
70            merge_time_secs  INTEGER,
71            confidence_score REAL
72        );
73
74        CREATE TABLE IF NOT EXISTS session_summary (
75            session_id      TEXT PRIMARY KEY,
76            started_at      INTEGER NOT NULL,
77            ended_at        INTEGER,
78            tasks_completed INTEGER NOT NULL DEFAULT 0,
79            total_merges    INTEGER NOT NULL DEFAULT 0,
80            total_events    INTEGER NOT NULL DEFAULT 0
81        );
82        ",
83    )
84    .context("failed to initialize telemetry schema")?;
85    Ok(())
86}
87
88// ---------------------------------------------------------------------------
89// Insert helpers
90// ---------------------------------------------------------------------------
91
92/// Insert a raw event into the events table. Also updates derived metrics.
93pub fn insert_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
94    let payload =
95        serde_json::to_string(event).context("failed to serialize event for telemetry")?;
96
97    conn.execute(
98        "INSERT INTO events (timestamp, event_type, role, task_id, payload) VALUES (?1, ?2, ?3, ?4, ?5)",
99        params![
100            event.ts as i64,
101            event.event,
102            event.role,
103            event.task,
104            payload,
105        ],
106    )
107    .context("failed to insert telemetry event")?;
108
109    // Update derived metrics based on event type (may create session row).
110    update_metrics_for_event(conn, event)?;
111
112    // Fix #3: Increment total_events on every insert (after update_metrics
113    // so that daemon_started can create the session row first).
114    conn.execute(
115        "UPDATE session_summary SET total_events = total_events + 1
116         WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
117        [],
118    )?;
119
120    Ok(())
121}
122
123fn update_metrics_for_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
124    match event.event.as_str() {
125        "task_completed" => {
126            if let Some(role) = &event.role {
127                upsert_agent_counter(conn, role, "completions")?;
128            }
129            if let Some(task) = &event.task {
130                conn.execute(
131                    "INSERT INTO task_metrics (task_id, completed_at) VALUES (?1, ?2)
132                     ON CONFLICT(task_id) DO UPDATE SET completed_at = ?2",
133                    params![task, event.ts as i64],
134                )?;
135            }
136            // Fix #1: Increment tasks_completed on latest session.
137            conn.execute(
138                "UPDATE session_summary SET tasks_completed = tasks_completed + 1
139                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
140                [],
141            )?;
142        }
143        "task_assigned" => {
144            if let Some(task) = &event.task {
145                conn.execute(
146                    "INSERT INTO task_metrics (task_id, started_at) VALUES (?1, ?2)
147                     ON CONFLICT(task_id) DO UPDATE SET started_at = COALESCE(task_metrics.started_at, ?2)",
148                    params![task, event.ts as i64],
149                )?;
150            }
151        }
152        "task_escalated" => {
153            if let Some(task) = &event.task {
154                conn.execute(
155                    "INSERT INTO task_metrics (task_id, escalations) VALUES (?1, 1)
156                     ON CONFLICT(task_id) DO UPDATE SET escalations = escalations + 1",
157                    params![task],
158                )?;
159            }
160        }
161        "member_crashed" | "pane_death" | "delivery_failed" => {
162            if let Some(role) = &event.role {
163                upsert_agent_counter(conn, role, "failures")?;
164            }
165        }
166        "pane_respawned" | "agent_restarted" | "context_exhausted" => {
167            if let Some(role) = &event.role {
168                upsert_agent_counter(conn, role, "restarts")?;
169            }
170        }
171        "task_auto_merged" | "task_manual_merged" => {
172            if let Some(task) = &event.task {
173                conn.execute(
174                    "INSERT INTO task_metrics (task_id, merge_time_secs) VALUES (?1, ?2)
175                     ON CONFLICT(task_id) DO UPDATE SET merge_time_secs = ?2 - COALESCE(task_metrics.started_at, ?2)",
176                    params![task, event.ts as i64],
177                )?;
178            }
179            // Fix #2: Increment total_merges on latest session.
180            conn.execute(
181                "UPDATE session_summary SET total_merges = total_merges + 1
182                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
183                [],
184            )?;
185        }
186        "merge_confidence_scored" => {
187            if let Some(task) = &event.task {
188                if let Some(confidence) = event.load {
189                    conn.execute(
190                        "INSERT INTO task_metrics (task_id, confidence_score) VALUES (?1, ?2)
191                         ON CONFLICT(task_id) DO UPDATE SET confidence_score = ?2",
192                        params![task, confidence],
193                    )?;
194                }
195            }
196        }
197        "daemon_started" => {
198            let session_id = format!("session-{}", event.ts);
199            conn.execute(
200                "INSERT OR IGNORE INTO session_summary (session_id, started_at) VALUES (?1, ?2)",
201                params![session_id, event.ts as i64],
202            )?;
203        }
204        // Fix #4: Set ended_at on latest session when daemon stops.
205        // Both daemon_stopped() and daemon_stopped_with_reason() use "daemon_stopped" as event name.
206        "daemon_stopped" => {
207            conn.execute(
208                "UPDATE session_summary SET ended_at = ?1
209                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
210                params![event.ts as i64],
211            )?;
212        }
213        _ => {}
214    }
215    Ok(())
216}
217
218fn upsert_agent_counter(conn: &Connection, role: &str, column: &str) -> Result<()> {
219    // column is a known static string, safe to interpolate.
220    let sql = format!(
221        "INSERT INTO agent_metrics (role, {column}) VALUES (?1, 1)
222         ON CONFLICT(role) DO UPDATE SET {column} = {column} + 1"
223    );
224    conn.execute(&sql, params![role])?;
225    Ok(())
226}
227
228/// Record an agent's poll state (idle or working) and accumulate cycle time.
229///
230/// Fix #5: Upserts idle_polls or working_polls for the given role.
231/// Fix #6: Increments total_cycle_secs by `poll_interval_secs` when working.
232pub fn record_agent_poll_state(
233    conn: &Connection,
234    role: &str,
235    is_working: bool,
236    poll_interval_secs: u64,
237) -> Result<()> {
238    if is_working {
239        conn.execute(
240            "INSERT INTO agent_metrics (role, working_polls, total_cycle_secs)
241             VALUES (?1, 1, ?2)
242             ON CONFLICT(role) DO UPDATE SET
243                working_polls = working_polls + 1,
244                total_cycle_secs = total_cycle_secs + ?2",
245            params![role, poll_interval_secs as i64],
246        )?;
247    } else {
248        conn.execute(
249            "INSERT INTO agent_metrics (role, idle_polls) VALUES (?1, 1)
250             ON CONFLICT(role) DO UPDATE SET idle_polls = idle_polls + 1",
251            params![role],
252        )?;
253    }
254    Ok(())
255}
256
257// ---------------------------------------------------------------------------
258// Query helpers
259// ---------------------------------------------------------------------------
260
261/// Summary row for `batty telemetry summary`.
262#[derive(Debug, Clone)]
263pub struct SessionSummaryRow {
264    pub session_id: String,
265    pub started_at: i64,
266    pub ended_at: Option<i64>,
267    pub tasks_completed: i64,
268    pub total_merges: i64,
269    pub total_events: i64,
270}
271
272pub fn query_session_summaries(conn: &Connection) -> Result<Vec<SessionSummaryRow>> {
273    let mut stmt = conn.prepare(
274        "SELECT session_id, started_at, ended_at, tasks_completed, total_merges, total_events
275         FROM session_summary ORDER BY started_at DESC LIMIT 20",
276    )?;
277    let rows = stmt
278        .query_map([], |row| {
279            Ok(SessionSummaryRow {
280                session_id: row.get(0)?,
281                started_at: row.get(1)?,
282                ended_at: row.get(2)?,
283                tasks_completed: row.get(3)?,
284                total_merges: row.get(4)?,
285                total_events: row.get(5)?,
286            })
287        })?
288        .collect::<std::result::Result<Vec<_>, _>>()?;
289    Ok(rows)
290}
291
292/// Agent metrics row for `batty telemetry agents`.
293#[derive(Debug, Clone)]
294pub struct AgentMetricsRow {
295    pub role: String,
296    pub completions: i64,
297    pub failures: i64,
298    pub restarts: i64,
299    pub total_cycle_secs: i64,
300    pub idle_polls: i64,
301    pub working_polls: i64,
302}
303
304pub fn query_agent_metrics(conn: &Connection) -> Result<Vec<AgentMetricsRow>> {
305    let mut stmt = conn.prepare(
306        "SELECT role, completions, failures, restarts, total_cycle_secs, idle_polls, working_polls
307         FROM agent_metrics ORDER BY role",
308    )?;
309    let rows = stmt
310        .query_map([], |row| {
311            Ok(AgentMetricsRow {
312                role: row.get(0)?,
313                completions: row.get(1)?,
314                failures: row.get(2)?,
315                restarts: row.get(3)?,
316                total_cycle_secs: row.get(4)?,
317                idle_polls: row.get(5)?,
318                working_polls: row.get(6)?,
319            })
320        })?
321        .collect::<std::result::Result<Vec<_>, _>>()?;
322    Ok(rows)
323}
324
325/// Task metrics row for `batty telemetry tasks`.
326#[derive(Debug, Clone)]
327pub struct TaskMetricsRow {
328    pub task_id: String,
329    pub started_at: Option<i64>,
330    pub completed_at: Option<i64>,
331    pub retries: i64,
332    pub escalations: i64,
333    pub merge_time_secs: Option<i64>,
334    pub confidence_score: Option<f64>,
335}
336
337pub fn query_task_metrics(conn: &Connection) -> Result<Vec<TaskMetricsRow>> {
338    let mut stmt = conn.prepare(
339        "SELECT task_id, started_at, completed_at, retries, escalations, merge_time_secs, confidence_score
340         FROM task_metrics ORDER BY started_at DESC NULLS LAST LIMIT 50",
341    )?;
342    let rows = stmt
343        .query_map([], |row| {
344            Ok(TaskMetricsRow {
345                task_id: row.get(0)?,
346                started_at: row.get(1)?,
347                completed_at: row.get(2)?,
348                retries: row.get(3)?,
349                escalations: row.get(4)?,
350                merge_time_secs: row.get(5)?,
351                confidence_score: row.get(6)?,
352            })
353        })?
354        .collect::<std::result::Result<Vec<_>, _>>()?;
355    Ok(rows)
356}
357
358/// Recent events row for `batty telemetry events`.
359#[derive(Debug, Clone)]
360pub struct EventRow {
361    pub timestamp: i64,
362    pub event_type: String,
363    pub role: Option<String>,
364    pub task_id: Option<String>,
365}
366
367pub fn query_recent_events(conn: &Connection, limit: usize) -> Result<Vec<EventRow>> {
368    let mut stmt = conn.prepare(
369        "SELECT timestamp, event_type, role, task_id
370         FROM events ORDER BY timestamp DESC LIMIT ?1",
371    )?;
372    let rows = stmt
373        .query_map(params![limit as i64], |row| {
374            Ok(EventRow {
375                timestamp: row.get(0)?,
376                event_type: row.get(1)?,
377                role: row.get(2)?,
378                task_id: row.get(3)?,
379            })
380        })?
381        .collect::<std::result::Result<Vec<_>, _>>()?;
382    Ok(rows)
383}
384
385/// Review pipeline metrics aggregated from the events table.
386#[derive(Debug, Clone)]
387pub struct ReviewMetricsRow {
388    pub auto_merge_count: i64,
389    pub manual_merge_count: i64,
390    pub rework_count: i64,
391    pub review_nudge_count: i64,
392    pub review_escalation_count: i64,
393    pub avg_review_latency_secs: Option<f64>,
394}
395
396/// Query aggregated review pipeline metrics from the events table.
397pub fn query_review_metrics(conn: &Connection) -> Result<ReviewMetricsRow> {
398    let count_event = |event_type: &str| -> Result<i64> {
399        let n: i64 = conn.query_row(
400            "SELECT COUNT(*) FROM events WHERE event_type = ?1",
401            params![event_type],
402            |row| row.get(0),
403        )?;
404        Ok(n)
405    };
406
407    let auto_merge_count = count_event("task_auto_merged")?;
408    let manual_merge_count = count_event("task_manual_merged")?;
409    let rework_count = count_event("task_reworked")?;
410    let review_nudge_count = count_event("review_nudge_sent")?;
411    let review_escalation_count = count_event("review_escalated")?;
412
413    // Compute average review latency: time between task_completed and its
414    // corresponding merge event for each task.
415    let avg_review_latency_secs: Option<f64> = conn
416        .query_row(
417            "SELECT AVG(m.timestamp - c.timestamp)
418             FROM events c
419             JOIN events m ON c.task_id = m.task_id
420               AND m.event_type IN ('task_auto_merged', 'task_manual_merged')
421             WHERE c.event_type = 'task_completed'
422               AND c.task_id IS NOT NULL
423               AND m.timestamp >= c.timestamp",
424            [],
425            |row| row.get(0),
426        )
427        .unwrap_or(None);
428
429    Ok(ReviewMetricsRow {
430        auto_merge_count,
431        manual_merge_count,
432        rework_count,
433        review_nudge_count,
434        review_escalation_count,
435        avg_review_latency_secs,
436    })
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use crate::team::events::TeamEvent;
443
444    #[test]
445    fn schema_creation_succeeds() {
446        let conn = open_in_memory().unwrap();
447        // Verify tables exist by querying them.
448        let count: i64 = conn
449            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
450            .unwrap();
451        assert_eq!(count, 0);
452    }
453
454    #[test]
455    fn idempotent_schema_creation() {
456        let conn = open_in_memory().unwrap();
457        // Running init_schema again should not fail.
458        init_schema(&conn).unwrap();
459    }
460
461    #[test]
462    fn insert_and_query_event_round_trip() {
463        let conn = open_in_memory().unwrap();
464        let event = TeamEvent::daemon_started();
465        insert_event(&conn, &event).unwrap();
466
467        let events = query_recent_events(&conn, 10).unwrap();
468        assert_eq!(events.len(), 1);
469        assert_eq!(events[0].event_type, "daemon_started");
470    }
471
472    #[test]
473    fn task_assigned_creates_task_metric() {
474        let conn = open_in_memory().unwrap();
475        let event = TeamEvent::task_assigned("eng-1", "42");
476        insert_event(&conn, &event).unwrap();
477
478        let tasks = query_task_metrics(&conn).unwrap();
479        assert_eq!(tasks.len(), 1);
480        assert_eq!(tasks[0].task_id, "42");
481        assert!(tasks[0].started_at.is_some());
482    }
483
484    #[test]
485    fn task_completed_updates_agent_and_task_metrics() {
486        let conn = open_in_memory().unwrap();
487
488        let assign = TeamEvent::task_assigned("eng-1", "42");
489        insert_event(&conn, &assign).unwrap();
490
491        let complete = TeamEvent::task_completed("eng-1", Some("42"));
492        insert_event(&conn, &complete).unwrap();
493
494        let agents = query_agent_metrics(&conn).unwrap();
495        assert_eq!(agents.len(), 1);
496        assert_eq!(agents[0].role, "eng-1");
497        assert_eq!(agents[0].completions, 1);
498
499        let tasks = query_task_metrics(&conn).unwrap();
500        assert_eq!(tasks.len(), 1);
501        assert!(tasks[0].completed_at.is_some());
502    }
503
504    #[test]
505    fn escalation_increments_task_escalations() {
506        let conn = open_in_memory().unwrap();
507        let event = TeamEvent::task_escalated("eng-1", "42", None);
508        insert_event(&conn, &event).unwrap();
509        insert_event(&conn, &event).unwrap();
510
511        let tasks = query_task_metrics(&conn).unwrap();
512        assert_eq!(tasks[0].escalations, 2);
513    }
514
515    #[test]
516    fn pane_death_increments_failures() {
517        let conn = open_in_memory().unwrap();
518        let event = TeamEvent::pane_death("eng-1");
519        insert_event(&conn, &event).unwrap();
520
521        let agents = query_agent_metrics(&conn).unwrap();
522        assert_eq!(agents[0].failures, 1);
523    }
524
525    #[test]
526    fn pane_respawned_increments_restarts() {
527        let conn = open_in_memory().unwrap();
528        let event = TeamEvent::pane_respawned("eng-1");
529        insert_event(&conn, &event).unwrap();
530
531        let agents = query_agent_metrics(&conn).unwrap();
532        assert_eq!(agents[0].restarts, 1);
533    }
534
535    #[test]
536    fn delivery_failed_increments_failures() {
537        let conn = open_in_memory().unwrap();
538        let event = TeamEvent::delivery_failed("eng-1", "manager", "pane not ready");
539        insert_event(&conn, &event).unwrap();
540
541        let agents = query_agent_metrics(&conn).unwrap();
542        assert_eq!(agents.len(), 1);
543        assert_eq!(agents[0].role, "eng-1");
544        assert_eq!(agents[0].failures, 1);
545    }
546
547    #[test]
548    fn context_exhausted_increments_restarts() {
549        let conn = open_in_memory().unwrap();
550        let event = TeamEvent::context_exhausted("eng-1", Some(42), Some(500_000));
551        insert_event(&conn, &event).unwrap();
552
553        let agents = query_agent_metrics(&conn).unwrap();
554        assert_eq!(agents.len(), 1);
555        assert_eq!(agents[0].role, "eng-1");
556        assert_eq!(agents[0].restarts, 1);
557    }
558
559    #[test]
560    fn all_failure_event_types_accumulate() {
561        let conn = open_in_memory().unwrap();
562        insert_event(&conn, &TeamEvent::pane_death("eng-1")).unwrap();
563        insert_event(&conn, &TeamEvent::member_crashed("eng-1", true)).unwrap();
564        insert_event(
565            &conn,
566            &TeamEvent::delivery_failed("eng-1", "manager", "timeout"),
567        )
568        .unwrap();
569
570        let agents = query_agent_metrics(&conn).unwrap();
571        assert_eq!(agents[0].failures, 3);
572    }
573
574    #[test]
575    fn all_restart_event_types_accumulate() {
576        let conn = open_in_memory().unwrap();
577        insert_event(&conn, &TeamEvent::pane_respawned("eng-1")).unwrap();
578        insert_event(
579            &conn,
580            &TeamEvent::agent_restarted("eng-1", "42", "stall", 1),
581        )
582        .unwrap();
583        insert_event(
584            &conn,
585            &TeamEvent::context_exhausted("eng-1", Some(42), None),
586        )
587        .unwrap();
588
589        let agents = query_agent_metrics(&conn).unwrap();
590        assert_eq!(agents[0].restarts, 3);
591    }
592
593    #[test]
594    fn daemon_started_creates_session_summary() {
595        let conn = open_in_memory().unwrap();
596        let event = TeamEvent::daemon_started();
597        insert_event(&conn, &event).unwrap();
598
599        let summaries = query_session_summaries(&conn).unwrap();
600        assert_eq!(summaries.len(), 1);
601        assert!(summaries[0].session_id.starts_with("session-"));
602    }
603
604    #[test]
605    fn multiple_events_for_same_agent_accumulate() {
606        let conn = open_in_memory().unwrap();
607        let c1 = TeamEvent::task_completed("eng-1", Some("1"));
608        let c2 = TeamEvent::task_completed("eng-1", Some("2"));
609        insert_event(&conn, &c1).unwrap();
610        insert_event(&conn, &c2).unwrap();
611
612        let agents = query_agent_metrics(&conn).unwrap();
613        assert_eq!(agents[0].completions, 2);
614    }
615
616    #[test]
617    fn query_recent_events_respects_limit() {
618        let conn = open_in_memory().unwrap();
619        for _ in 0..5 {
620            insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
621        }
622        let events = query_recent_events(&conn, 3).unwrap();
623        assert_eq!(events.len(), 3);
624    }
625
626    #[test]
627    fn concurrent_writes_to_same_connection() {
628        // rusqlite Connection is not Send/Sync, but we verify sequential
629        // writes to the same connection work without errors.
630        let conn = open_in_memory().unwrap();
631        for i in 0..50 {
632            let event = TeamEvent::task_assigned("eng-1", &i.to_string());
633            insert_event(&conn, &event).unwrap();
634        }
635        let events = query_recent_events(&conn, 100).unwrap();
636        assert_eq!(events.len(), 50);
637    }
638
639    #[test]
640    fn review_metrics_empty_db() {
641        let conn = open_in_memory().unwrap();
642        let row = query_review_metrics(&conn).unwrap();
643        assert_eq!(row.auto_merge_count, 0);
644        assert_eq!(row.manual_merge_count, 0);
645        assert_eq!(row.rework_count, 0);
646        assert_eq!(row.review_nudge_count, 0);
647        assert_eq!(row.review_escalation_count, 0);
648        assert!(row.avg_review_latency_secs.is_none());
649    }
650
651    #[test]
652    fn review_metrics_counts_all_event_types() {
653        let conn = open_in_memory().unwrap();
654        insert_event(
655            &conn,
656            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
657        )
658        .unwrap();
659        insert_event(
660            &conn,
661            &TeamEvent::task_auto_merged("eng-1", "2", 0.9, 2, 30),
662        )
663        .unwrap();
664        insert_event(&conn, &TeamEvent::task_manual_merged("3")).unwrap();
665        insert_event(&conn, &TeamEvent::task_reworked("eng-1", "4")).unwrap();
666        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "5")).unwrap();
667        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "6")).unwrap();
668        insert_event(&conn, &TeamEvent::review_escalated_by_role("manager", "7")).unwrap();
669
670        let row = query_review_metrics(&conn).unwrap();
671        assert_eq!(row.auto_merge_count, 2);
672        assert_eq!(row.manual_merge_count, 1);
673        assert_eq!(row.rework_count, 1);
674        assert_eq!(row.review_nudge_count, 2);
675        assert_eq!(row.review_escalation_count, 1);
676    }
677
678    #[test]
679    fn review_metrics_computes_avg_latency() {
680        let conn = open_in_memory().unwrap();
681
682        // Task 10: completed at ts=1000, merged at ts=1100 → 100s latency
683        let mut c1 = TeamEvent::task_completed("eng-1", Some("10"));
684        c1.ts = 1000;
685        insert_event(&conn, &c1).unwrap();
686        let mut m1 = TeamEvent::task_auto_merged("eng-1", "10", 0.9, 2, 30);
687        m1.ts = 1100;
688        insert_event(&conn, &m1).unwrap();
689
690        // Task 20: completed at ts=2000, merged at ts=2300 → 300s latency
691        let mut c2 = TeamEvent::task_completed("eng-2", Some("20"));
692        c2.ts = 2000;
693        insert_event(&conn, &c2).unwrap();
694        let mut m2 = TeamEvent::task_manual_merged("20");
695        m2.ts = 2300;
696        insert_event(&conn, &m2).unwrap();
697
698        let row = query_review_metrics(&conn).unwrap();
699        // avg = (100 + 300) / 2 = 200
700        let avg = row.avg_review_latency_secs.unwrap();
701        assert!((avg - 200.0).abs() < 0.01);
702    }
703
704    // --- Fix #1: tasks_completed incremented on task_completed ---
705
706    #[test]
707    fn tasks_completed_increments_on_task_completed() {
708        let conn = open_in_memory().unwrap();
709        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
710
711        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
712        insert_event(&conn, &TeamEvent::task_completed("eng-2", Some("2"))).unwrap();
713
714        let summaries = query_session_summaries(&conn).unwrap();
715        assert_eq!(summaries[0].tasks_completed, 2);
716    }
717
718    // --- Fix #2: total_merges incremented on merge events ---
719
720    #[test]
721    fn total_merges_increments_on_auto_and_manual_merge() {
722        let conn = open_in_memory().unwrap();
723        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
724
725        insert_event(
726            &conn,
727            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
728        )
729        .unwrap();
730        insert_event(&conn, &TeamEvent::task_manual_merged("2")).unwrap();
731        insert_event(
732            &conn,
733            &TeamEvent::task_auto_merged("eng-1", "3", 0.8, 1, 10),
734        )
735        .unwrap();
736
737        let summaries = query_session_summaries(&conn).unwrap();
738        assert_eq!(summaries[0].total_merges, 3);
739    }
740
741    // --- Fix #3: total_events incremented on every insert ---
742
743    #[test]
744    fn total_events_increments_on_every_insert() {
745        let conn = open_in_memory().unwrap();
746        // daemon_started is the first event, creating the session and then incrementing.
747        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
748        insert_event(&conn, &TeamEvent::task_assigned("eng-1", "1")).unwrap();
749        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
750
751        let summaries = query_session_summaries(&conn).unwrap();
752        // 3 events inserted after session was created (daemon_started creates the row
753        // then total_events is incremented for it too).
754        assert_eq!(summaries[0].total_events, 3);
755    }
756
757    // --- Fix #4: ended_at set on daemon_stopped ---
758
759    #[test]
760    fn ended_at_set_on_daemon_stopped() {
761        let conn = open_in_memory().unwrap();
762        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
763
764        let summaries = query_session_summaries(&conn).unwrap();
765        assert!(summaries[0].ended_at.is_none());
766
767        let mut stop = TeamEvent::daemon_stopped_with_reason("shutdown", 3600);
768        stop.ts = 9999;
769        insert_event(&conn, &stop).unwrap();
770
771        let summaries = query_session_summaries(&conn).unwrap();
772        assert_eq!(summaries[0].ended_at, Some(9999));
773    }
774
775    #[test]
776    fn ended_at_set_on_plain_daemon_stopped() {
777        let conn = open_in_memory().unwrap();
778        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
779
780        let mut stop = TeamEvent::daemon_stopped();
781        stop.ts = 5000;
782        insert_event(&conn, &stop).unwrap();
783
784        let summaries = query_session_summaries(&conn).unwrap();
785        assert_eq!(summaries[0].ended_at, Some(5000));
786    }
787
788    // --- Fix #5: idle_polls / working_polls updated ---
789
790    #[test]
791    fn record_agent_poll_state_tracks_idle_polls() {
792        let conn = open_in_memory().unwrap();
793        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
794        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
795        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
796
797        let agents = query_agent_metrics(&conn).unwrap();
798        assert_eq!(agents[0].idle_polls, 3);
799        assert_eq!(agents[0].working_polls, 0);
800        assert_eq!(agents[0].total_cycle_secs, 0);
801    }
802
803    #[test]
804    fn record_agent_poll_state_tracks_working_polls() {
805        let conn = open_in_memory().unwrap();
806        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
807        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
808
809        let agents = query_agent_metrics(&conn).unwrap();
810        assert_eq!(agents[0].working_polls, 2);
811        assert_eq!(agents[0].idle_polls, 0);
812    }
813
814    // --- Fix #6: total_cycle_secs incremented for working agents ---
815
816    #[test]
817    fn record_agent_poll_state_accumulates_cycle_secs_for_working() {
818        let conn = open_in_memory().unwrap();
819        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
820        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
821        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
822
823        let agents = query_agent_metrics(&conn).unwrap();
824        assert_eq!(agents[0].total_cycle_secs, 15); // 3 * 5
825    }
826
827    #[test]
828    fn record_agent_poll_state_idle_does_not_accumulate_cycle_secs() {
829        let conn = open_in_memory().unwrap();
830        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
831        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
832
833        let agents = query_agent_metrics(&conn).unwrap();
834        assert_eq!(agents[0].total_cycle_secs, 0);
835    }
836
837    #[test]
838    fn record_agent_poll_state_mixed_idle_and_working() {
839        let conn = open_in_memory().unwrap();
840        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
841        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
842        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
843        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
844
845        let agents = query_agent_metrics(&conn).unwrap();
846        assert_eq!(agents[0].working_polls, 2);
847        assert_eq!(agents[0].idle_polls, 2);
848        assert_eq!(agents[0].total_cycle_secs, 10); // 2 * 5
849    }
850
851    #[test]
852    fn record_agent_poll_state_multiple_agents() {
853        let conn = open_in_memory().unwrap();
854        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
855        record_agent_poll_state(&conn, "eng-2", false, 5).unwrap();
856        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
857        record_agent_poll_state(&conn, "eng-2", true, 5).unwrap();
858
859        let agents = query_agent_metrics(&conn).unwrap();
860        let eng1 = agents.iter().find(|a| a.role == "eng-1").unwrap();
861        let eng2 = agents.iter().find(|a| a.role == "eng-2").unwrap();
862        assert_eq!(eng1.working_polls, 2);
863        assert_eq!(eng1.total_cycle_secs, 10);
864        assert_eq!(eng2.idle_polls, 1);
865        assert_eq!(eng2.working_polls, 1);
866        assert_eq!(eng2.total_cycle_secs, 5);
867    }
868
869    // --- Edge cases: session counters without a session ---
870
871    #[test]
872    fn session_counters_noop_without_session() {
873        // If no daemon_started event has been emitted, no session row exists.
874        // The UPDATE statements should just affect 0 rows — no error.
875        let conn = open_in_memory().unwrap();
876        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
877        insert_event(
878            &conn,
879            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
880        )
881        .unwrap();
882        let summaries = query_session_summaries(&conn).unwrap();
883        assert!(summaries.is_empty());
884    }
885}