Skip to main content

batty_cli/team/
telemetry_db.rs

1//! SQLite-backed telemetry database for agent performance tracking.
2//!
3//! Stores events, per-agent metrics, per-task metrics, and session summaries
4//! in `.batty/telemetry.db`. All tables use `CREATE TABLE IF NOT EXISTS` —
5//! no migration framework needed.
6
7use std::path::Path;
8
9use anyhow::{Context, Result};
10use rusqlite::{Connection, params};
11
12use super::events::TeamEvent;
13
14/// Database file name under `.batty/`.
15const DB_FILENAME: &str = "telemetry.db";
16
17/// Open or create the telemetry database, initializing the schema.
18pub fn open(project_root: &Path) -> Result<Connection> {
19    let db_path = project_root.join(".batty").join(DB_FILENAME);
20    let conn = Connection::open(&db_path)
21        .with_context(|| format!("failed to open telemetry db at {}", db_path.display()))?;
22
23    // WAL mode for better concurrent read/write performance.
24    conn.pragma_update(None, "journal_mode", "WAL")?;
25
26    init_schema(&conn)?;
27    Ok(conn)
28}
29
30/// Open an in-memory database (for tests).
31#[cfg(test)]
32pub fn open_in_memory() -> Result<Connection> {
33    let conn = Connection::open_in_memory()?;
34    init_schema(&conn)?;
35    Ok(conn)
36}
37
38fn init_schema(conn: &Connection) -> Result<()> {
39    conn.execute_batch(
40        "
41        CREATE TABLE IF NOT EXISTS events (
42            id          INTEGER PRIMARY KEY AUTOINCREMENT,
43            timestamp   INTEGER NOT NULL,
44            event_type  TEXT NOT NULL,
45            role        TEXT,
46            task_id     TEXT,
47            payload     TEXT NOT NULL
48        );
49
50        CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
51        CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
52        CREATE INDEX IF NOT EXISTS idx_events_role ON events(role);
53
54        CREATE TABLE IF NOT EXISTS agent_metrics (
55            role            TEXT PRIMARY KEY,
56            completions     INTEGER NOT NULL DEFAULT 0,
57            failures        INTEGER NOT NULL DEFAULT 0,
58            restarts        INTEGER NOT NULL DEFAULT 0,
59            total_cycle_secs INTEGER NOT NULL DEFAULT 0,
60            idle_polls      INTEGER NOT NULL DEFAULT 0,
61            working_polls   INTEGER NOT NULL DEFAULT 0
62        );
63
64        CREATE TABLE IF NOT EXISTS task_metrics (
65            task_id          TEXT PRIMARY KEY,
66            started_at       INTEGER,
67            completed_at     INTEGER,
68            retries          INTEGER NOT NULL DEFAULT 0,
69            narration_rejections INTEGER NOT NULL DEFAULT 0,
70            escalations      INTEGER NOT NULL DEFAULT 0,
71            merge_time_secs  INTEGER,
72            confidence_score REAL
73        );
74
75        CREATE TABLE IF NOT EXISTS session_summary (
76            session_id      TEXT PRIMARY KEY,
77            started_at      INTEGER NOT NULL,
78            ended_at        INTEGER,
79            tasks_completed INTEGER NOT NULL DEFAULT 0,
80            total_merges    INTEGER NOT NULL DEFAULT 0,
81            total_events    INTEGER NOT NULL DEFAULT 0
82        );
83        ",
84    )
85    .context("failed to initialize telemetry schema")?;
86    Ok(())
87}
88
89// ---------------------------------------------------------------------------
90// Insert helpers
91// ---------------------------------------------------------------------------
92
93/// Insert a raw event into the events table. Also updates derived metrics.
94pub fn insert_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
95    let payload =
96        serde_json::to_string(event).context("failed to serialize event for telemetry")?;
97
98    conn.execute(
99        "INSERT INTO events (timestamp, event_type, role, task_id, payload) VALUES (?1, ?2, ?3, ?4, ?5)",
100        params![
101            event.ts as i64,
102            event.event,
103            event.role,
104            event.task,
105            payload,
106        ],
107    )
108    .context("failed to insert telemetry event")?;
109
110    // Update derived metrics based on event type (may create session row).
111    update_metrics_for_event(conn, event)?;
112
113    // Fix #3: Increment total_events on every insert (after update_metrics
114    // so that daemon_started can create the session row first).
115    conn.execute(
116        "UPDATE session_summary SET total_events = total_events + 1
117         WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
118        [],
119    )?;
120
121    Ok(())
122}
123
124fn update_metrics_for_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
125    match event.event.as_str() {
126        "task_completed" => {
127            if let Some(role) = &event.role {
128                upsert_agent_counter(conn, role, "completions")?;
129            }
130            if let Some(task) = &event.task {
131                conn.execute(
132                    "INSERT INTO task_metrics (task_id, completed_at) VALUES (?1, ?2)
133                     ON CONFLICT(task_id) DO UPDATE SET completed_at = ?2",
134                    params![task, event.ts as i64],
135                )?;
136            }
137            // Fix #1: Increment tasks_completed on latest session.
138            conn.execute(
139                "UPDATE session_summary SET tasks_completed = tasks_completed + 1
140                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
141                [],
142            )?;
143        }
144        "task_assigned" => {
145            if let Some(task) = &event.task {
146                conn.execute(
147                    "INSERT INTO task_metrics (task_id, started_at) VALUES (?1, ?2)
148                     ON CONFLICT(task_id) DO UPDATE SET started_at = COALESCE(task_metrics.started_at, ?2)",
149                    params![task, event.ts as i64],
150                )?;
151            }
152        }
153        "task_escalated" => {
154            if let Some(task) = &event.task {
155                conn.execute(
156                    "INSERT INTO task_metrics (task_id, escalations) VALUES (?1, 1)
157                     ON CONFLICT(task_id) DO UPDATE SET escalations = escalations + 1",
158                    params![task],
159                )?;
160            }
161        }
162        "narration_rejection" => {
163            if let Some(task) = &event.task {
164                conn.execute(
165                    "INSERT INTO task_metrics (task_id, narration_rejections) VALUES (?1, 1)
166                     ON CONFLICT(task_id) DO UPDATE SET narration_rejections = narration_rejections + 1",
167                    params![task],
168                )?;
169            }
170        }
171        "member_crashed" | "pane_death" | "delivery_failed" => {
172            if let Some(role) = &event.role {
173                upsert_agent_counter(conn, role, "failures")?;
174            }
175        }
176        "pane_respawned" | "agent_restarted" | "context_exhausted" => {
177            if let Some(role) = &event.role {
178                upsert_agent_counter(conn, role, "restarts")?;
179            }
180        }
181        "task_auto_merged" | "task_manual_merged" => {
182            if let Some(task) = &event.task {
183                conn.execute(
184                    "INSERT INTO task_metrics (task_id, merge_time_secs) VALUES (?1, ?2)
185                     ON CONFLICT(task_id) DO UPDATE SET merge_time_secs = ?2 - COALESCE(task_metrics.started_at, ?2)",
186                    params![task, event.ts as i64],
187                )?;
188            }
189            // Fix #2: Increment total_merges on latest session.
190            conn.execute(
191                "UPDATE session_summary SET total_merges = total_merges + 1
192                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
193                [],
194            )?;
195        }
196        "merge_confidence_scored" => {
197            if let Some(task) = &event.task {
198                if let Some(confidence) = event.load {
199                    conn.execute(
200                        "INSERT INTO task_metrics (task_id, confidence_score) VALUES (?1, ?2)
201                         ON CONFLICT(task_id) DO UPDATE SET confidence_score = ?2",
202                        params![task, confidence],
203                    )?;
204                }
205            }
206        }
207        "daemon_started" => {
208            let session_id = format!("session-{}", event.ts);
209            conn.execute(
210                "INSERT OR IGNORE INTO session_summary (session_id, started_at) VALUES (?1, ?2)",
211                params![session_id, event.ts as i64],
212            )?;
213        }
214        // Fix #4: Set ended_at on latest session when daemon stops.
215        // Both daemon_stopped() and daemon_stopped_with_reason() use "daemon_stopped" as event name.
216        "daemon_stopped" => {
217            conn.execute(
218                "UPDATE session_summary SET ended_at = ?1
219                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
220                params![event.ts as i64],
221            )?;
222        }
223        _ => {}
224    }
225    Ok(())
226}
227
228fn upsert_agent_counter(conn: &Connection, role: &str, column: &str) -> Result<()> {
229    // column is a known static string, safe to interpolate.
230    let sql = format!(
231        "INSERT INTO agent_metrics (role, {column}) VALUES (?1, 1)
232         ON CONFLICT(role) DO UPDATE SET {column} = {column} + 1"
233    );
234    conn.execute(&sql, params![role])?;
235    Ok(())
236}
237
238/// Record an agent's poll state (idle or working) and accumulate cycle time.
239///
240/// Fix #5: Upserts idle_polls or working_polls for the given role.
241/// Fix #6: Increments total_cycle_secs by `poll_interval_secs` when working.
242pub fn record_agent_poll_state(
243    conn: &Connection,
244    role: &str,
245    is_working: bool,
246    poll_interval_secs: u64,
247) -> Result<()> {
248    if is_working {
249        conn.execute(
250            "INSERT INTO agent_metrics (role, working_polls, total_cycle_secs)
251             VALUES (?1, 1, ?2)
252             ON CONFLICT(role) DO UPDATE SET
253                working_polls = working_polls + 1,
254                total_cycle_secs = total_cycle_secs + ?2",
255            params![role, poll_interval_secs as i64],
256        )?;
257    } else {
258        conn.execute(
259            "INSERT INTO agent_metrics (role, idle_polls) VALUES (?1, 1)
260             ON CONFLICT(role) DO UPDATE SET idle_polls = idle_polls + 1",
261            params![role],
262        )?;
263    }
264    Ok(())
265}
266
267// ---------------------------------------------------------------------------
268// Query helpers
269// ---------------------------------------------------------------------------
270
271/// Summary row for `batty telemetry summary`.
272#[derive(Debug, Clone)]
273pub struct SessionSummaryRow {
274    pub session_id: String,
275    pub started_at: i64,
276    pub ended_at: Option<i64>,
277    pub tasks_completed: i64,
278    pub total_merges: i64,
279    pub total_events: i64,
280}
281
282pub fn query_session_summaries(conn: &Connection) -> Result<Vec<SessionSummaryRow>> {
283    let mut stmt = conn.prepare(
284        "SELECT session_id, started_at, ended_at, tasks_completed, total_merges, total_events
285         FROM session_summary ORDER BY started_at DESC LIMIT 20",
286    )?;
287    let rows = stmt
288        .query_map([], |row| {
289            Ok(SessionSummaryRow {
290                session_id: row.get(0)?,
291                started_at: row.get(1)?,
292                ended_at: row.get(2)?,
293                tasks_completed: row.get(3)?,
294                total_merges: row.get(4)?,
295                total_events: row.get(5)?,
296            })
297        })?
298        .collect::<std::result::Result<Vec<_>, _>>()?;
299    Ok(rows)
300}
301
302/// Agent metrics row for `batty telemetry agents`.
303#[derive(Debug, Clone)]
304pub struct AgentMetricsRow {
305    pub role: String,
306    pub completions: i64,
307    pub failures: i64,
308    pub restarts: i64,
309    pub total_cycle_secs: i64,
310    pub idle_polls: i64,
311    pub working_polls: i64,
312}
313
314pub fn query_agent_metrics(conn: &Connection) -> Result<Vec<AgentMetricsRow>> {
315    let mut stmt = conn.prepare(
316        "SELECT role, completions, failures, restarts, total_cycle_secs, idle_polls, working_polls
317         FROM agent_metrics ORDER BY role",
318    )?;
319    let rows = stmt
320        .query_map([], |row| {
321            Ok(AgentMetricsRow {
322                role: row.get(0)?,
323                completions: row.get(1)?,
324                failures: row.get(2)?,
325                restarts: row.get(3)?,
326                total_cycle_secs: row.get(4)?,
327                idle_polls: row.get(5)?,
328                working_polls: row.get(6)?,
329            })
330        })?
331        .collect::<std::result::Result<Vec<_>, _>>()?;
332    Ok(rows)
333}
334
335/// Task metrics row for `batty telemetry tasks`.
336#[derive(Debug, Clone)]
337pub struct TaskMetricsRow {
338    pub task_id: String,
339    pub started_at: Option<i64>,
340    pub completed_at: Option<i64>,
341    pub retries: i64,
342    pub narration_rejections: i64,
343    pub escalations: i64,
344    pub merge_time_secs: Option<i64>,
345    pub confidence_score: Option<f64>,
346}
347
348pub fn query_task_metrics(conn: &Connection) -> Result<Vec<TaskMetricsRow>> {
349    let mut stmt = conn.prepare(
350        "SELECT task_id, started_at, completed_at, retries, narration_rejections, escalations, merge_time_secs, confidence_score
351         FROM task_metrics ORDER BY started_at DESC NULLS LAST LIMIT 50",
352    )?;
353    let rows = stmt
354        .query_map([], |row| {
355            Ok(TaskMetricsRow {
356                task_id: row.get(0)?,
357                started_at: row.get(1)?,
358                completed_at: row.get(2)?,
359                retries: row.get(3)?,
360                narration_rejections: row.get(4)?,
361                escalations: row.get(5)?,
362                merge_time_secs: row.get(6)?,
363                confidence_score: row.get(7)?,
364            })
365        })?
366        .collect::<std::result::Result<Vec<_>, _>>()?;
367    Ok(rows)
368}
369
370/// Recent events row for `batty telemetry events`.
371#[derive(Debug, Clone)]
372pub struct EventRow {
373    pub timestamp: i64,
374    pub event_type: String,
375    pub role: Option<String>,
376    pub task_id: Option<String>,
377}
378
379pub fn query_recent_events(conn: &Connection, limit: usize) -> Result<Vec<EventRow>> {
380    let mut stmt = conn.prepare(
381        "SELECT timestamp, event_type, role, task_id
382         FROM events ORDER BY timestamp DESC LIMIT ?1",
383    )?;
384    let rows = stmt
385        .query_map(params![limit as i64], |row| {
386            Ok(EventRow {
387                timestamp: row.get(0)?,
388                event_type: row.get(1)?,
389                role: row.get(2)?,
390                task_id: row.get(3)?,
391            })
392        })?
393        .collect::<std::result::Result<Vec<_>, _>>()?;
394    Ok(rows)
395}
396
397/// Review pipeline metrics aggregated from the events table.
398#[derive(Debug, Clone)]
399pub struct ReviewMetricsRow {
400    pub auto_merge_count: i64,
401    pub manual_merge_count: i64,
402    pub rework_count: i64,
403    pub review_nudge_count: i64,
404    pub review_escalation_count: i64,
405    pub avg_review_latency_secs: Option<f64>,
406}
407
408/// Query aggregated review pipeline metrics from the events table.
409pub fn query_review_metrics(conn: &Connection) -> Result<ReviewMetricsRow> {
410    let count_event = |event_type: &str| -> Result<i64> {
411        let n: i64 = conn.query_row(
412            "SELECT COUNT(*) FROM events WHERE event_type = ?1",
413            params![event_type],
414            |row| row.get(0),
415        )?;
416        Ok(n)
417    };
418
419    let auto_merge_count = count_event("task_auto_merged")?;
420    let manual_merge_count = count_event("task_manual_merged")?;
421    let rework_count = count_event("task_reworked")?;
422    let review_nudge_count = count_event("review_nudge_sent")?;
423    let review_escalation_count = count_event("review_escalated")?;
424
425    // Compute average review latency: time between task_completed and its
426    // corresponding merge event for each task.
427    let avg_review_latency_secs: Option<f64> = conn
428        .query_row(
429            "SELECT AVG(m.timestamp - c.timestamp)
430             FROM events c
431             JOIN events m ON c.task_id = m.task_id
432               AND m.event_type IN ('task_auto_merged', 'task_manual_merged')
433             WHERE c.event_type = 'task_completed'
434               AND c.task_id IS NOT NULL
435               AND m.timestamp >= c.timestamp",
436            [],
437            |row| row.get(0),
438        )
439        .unwrap_or(None);
440
441    Ok(ReviewMetricsRow {
442        auto_merge_count,
443        manual_merge_count,
444        rework_count,
445        review_nudge_count,
446        review_escalation_count,
447        avg_review_latency_secs,
448    })
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::team::events::TeamEvent;
455
456    #[test]
457    fn schema_creation_succeeds() {
458        let conn = open_in_memory().unwrap();
459        // Verify tables exist by querying them.
460        let count: i64 = conn
461            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
462            .unwrap();
463        assert_eq!(count, 0);
464    }
465
466    #[test]
467    fn idempotent_schema_creation() {
468        let conn = open_in_memory().unwrap();
469        // Running init_schema again should not fail.
470        init_schema(&conn).unwrap();
471    }
472
473    #[test]
474    fn insert_and_query_event_round_trip() {
475        let conn = open_in_memory().unwrap();
476        let event = TeamEvent::daemon_started();
477        insert_event(&conn, &event).unwrap();
478
479        let events = query_recent_events(&conn, 10).unwrap();
480        assert_eq!(events.len(), 1);
481        assert_eq!(events[0].event_type, "daemon_started");
482    }
483
484    #[test]
485    fn task_assigned_creates_task_metric() {
486        let conn = open_in_memory().unwrap();
487        let event = TeamEvent::task_assigned("eng-1", "42");
488        insert_event(&conn, &event).unwrap();
489
490        let tasks = query_task_metrics(&conn).unwrap();
491        assert_eq!(tasks.len(), 1);
492        assert_eq!(tasks[0].task_id, "42");
493        assert!(tasks[0].started_at.is_some());
494    }
495
496    #[test]
497    fn task_completed_updates_agent_and_task_metrics() {
498        let conn = open_in_memory().unwrap();
499
500        let assign = TeamEvent::task_assigned("eng-1", "42");
501        insert_event(&conn, &assign).unwrap();
502
503        let complete = TeamEvent::task_completed("eng-1", Some("42"));
504        insert_event(&conn, &complete).unwrap();
505
506        let agents = query_agent_metrics(&conn).unwrap();
507        assert_eq!(agents.len(), 1);
508        assert_eq!(agents[0].role, "eng-1");
509        assert_eq!(agents[0].completions, 1);
510
511        let tasks = query_task_metrics(&conn).unwrap();
512        assert_eq!(tasks.len(), 1);
513        assert!(tasks[0].completed_at.is_some());
514    }
515
516    #[test]
517    fn escalation_increments_task_escalations() {
518        let conn = open_in_memory().unwrap();
519        let event = TeamEvent::task_escalated("eng-1", "42", None);
520        insert_event(&conn, &event).unwrap();
521        insert_event(&conn, &event).unwrap();
522
523        let tasks = query_task_metrics(&conn).unwrap();
524        assert_eq!(tasks[0].escalations, 2);
525    }
526
527    #[test]
528    fn narration_rejection_increments_task_metric() {
529        let conn = open_in_memory().unwrap();
530        insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 1)).unwrap();
531        insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 2)).unwrap();
532
533        let tasks = query_task_metrics(&conn).unwrap();
534        assert_eq!(tasks.len(), 1);
535        assert_eq!(tasks[0].task_id, "42");
536        assert_eq!(tasks[0].narration_rejections, 2);
537    }
538
539    #[test]
540    fn pane_death_increments_failures() {
541        let conn = open_in_memory().unwrap();
542        let event = TeamEvent::pane_death("eng-1");
543        insert_event(&conn, &event).unwrap();
544
545        let agents = query_agent_metrics(&conn).unwrap();
546        assert_eq!(agents[0].failures, 1);
547    }
548
549    #[test]
550    fn pane_respawned_increments_restarts() {
551        let conn = open_in_memory().unwrap();
552        let event = TeamEvent::pane_respawned("eng-1");
553        insert_event(&conn, &event).unwrap();
554
555        let agents = query_agent_metrics(&conn).unwrap();
556        assert_eq!(agents[0].restarts, 1);
557    }
558
559    #[test]
560    fn delivery_failed_increments_failures() {
561        let conn = open_in_memory().unwrap();
562        let event = TeamEvent::delivery_failed("eng-1", "manager", "pane not ready");
563        insert_event(&conn, &event).unwrap();
564
565        let agents = query_agent_metrics(&conn).unwrap();
566        assert_eq!(agents.len(), 1);
567        assert_eq!(agents[0].role, "eng-1");
568        assert_eq!(agents[0].failures, 1);
569    }
570
571    #[test]
572    fn context_exhausted_increments_restarts() {
573        let conn = open_in_memory().unwrap();
574        let event = TeamEvent::context_exhausted("eng-1", Some(42), Some(500_000));
575        insert_event(&conn, &event).unwrap();
576
577        let agents = query_agent_metrics(&conn).unwrap();
578        assert_eq!(agents.len(), 1);
579        assert_eq!(agents[0].role, "eng-1");
580        assert_eq!(agents[0].restarts, 1);
581    }
582
583    #[test]
584    fn all_failure_event_types_accumulate() {
585        let conn = open_in_memory().unwrap();
586        insert_event(&conn, &TeamEvent::pane_death("eng-1")).unwrap();
587        insert_event(&conn, &TeamEvent::member_crashed("eng-1", true)).unwrap();
588        insert_event(
589            &conn,
590            &TeamEvent::delivery_failed("eng-1", "manager", "timeout"),
591        )
592        .unwrap();
593
594        let agents = query_agent_metrics(&conn).unwrap();
595        assert_eq!(agents[0].failures, 3);
596    }
597
598    #[test]
599    fn all_restart_event_types_accumulate() {
600        let conn = open_in_memory().unwrap();
601        insert_event(&conn, &TeamEvent::pane_respawned("eng-1")).unwrap();
602        insert_event(
603            &conn,
604            &TeamEvent::agent_restarted("eng-1", "42", "stall", 1),
605        )
606        .unwrap();
607        insert_event(
608            &conn,
609            &TeamEvent::context_exhausted("eng-1", Some(42), None),
610        )
611        .unwrap();
612
613        let agents = query_agent_metrics(&conn).unwrap();
614        assert_eq!(agents[0].restarts, 3);
615    }
616
617    #[test]
618    fn daemon_started_creates_session_summary() {
619        let conn = open_in_memory().unwrap();
620        let event = TeamEvent::daemon_started();
621        insert_event(&conn, &event).unwrap();
622
623        let summaries = query_session_summaries(&conn).unwrap();
624        assert_eq!(summaries.len(), 1);
625        assert!(summaries[0].session_id.starts_with("session-"));
626    }
627
628    #[test]
629    fn multiple_events_for_same_agent_accumulate() {
630        let conn = open_in_memory().unwrap();
631        let c1 = TeamEvent::task_completed("eng-1", Some("1"));
632        let c2 = TeamEvent::task_completed("eng-1", Some("2"));
633        insert_event(&conn, &c1).unwrap();
634        insert_event(&conn, &c2).unwrap();
635
636        let agents = query_agent_metrics(&conn).unwrap();
637        assert_eq!(agents[0].completions, 2);
638    }
639
640    #[test]
641    fn query_recent_events_respects_limit() {
642        let conn = open_in_memory().unwrap();
643        for _ in 0..5 {
644            insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
645        }
646        let events = query_recent_events(&conn, 3).unwrap();
647        assert_eq!(events.len(), 3);
648    }
649
650    #[test]
651    fn concurrent_writes_to_same_connection() {
652        // rusqlite Connection is not Send/Sync, but we verify sequential
653        // writes to the same connection work without errors.
654        let conn = open_in_memory().unwrap();
655        for i in 0..50 {
656            let event = TeamEvent::task_assigned("eng-1", &i.to_string());
657            insert_event(&conn, &event).unwrap();
658        }
659        let events = query_recent_events(&conn, 100).unwrap();
660        assert_eq!(events.len(), 50);
661    }
662
663    #[test]
664    fn review_metrics_empty_db() {
665        let conn = open_in_memory().unwrap();
666        let row = query_review_metrics(&conn).unwrap();
667        assert_eq!(row.auto_merge_count, 0);
668        assert_eq!(row.manual_merge_count, 0);
669        assert_eq!(row.rework_count, 0);
670        assert_eq!(row.review_nudge_count, 0);
671        assert_eq!(row.review_escalation_count, 0);
672        assert!(row.avg_review_latency_secs.is_none());
673    }
674
675    #[test]
676    fn review_metrics_counts_all_event_types() {
677        let conn = open_in_memory().unwrap();
678        insert_event(
679            &conn,
680            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
681        )
682        .unwrap();
683        insert_event(
684            &conn,
685            &TeamEvent::task_auto_merged("eng-1", "2", 0.9, 2, 30),
686        )
687        .unwrap();
688        insert_event(&conn, &TeamEvent::task_manual_merged("3")).unwrap();
689        insert_event(&conn, &TeamEvent::task_reworked("eng-1", "4")).unwrap();
690        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "5")).unwrap();
691        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "6")).unwrap();
692        insert_event(&conn, &TeamEvent::review_escalated_by_role("manager", "7")).unwrap();
693
694        let row = query_review_metrics(&conn).unwrap();
695        assert_eq!(row.auto_merge_count, 2);
696        assert_eq!(row.manual_merge_count, 1);
697        assert_eq!(row.rework_count, 1);
698        assert_eq!(row.review_nudge_count, 2);
699        assert_eq!(row.review_escalation_count, 1);
700    }
701
702    #[test]
703    fn review_metrics_computes_avg_latency() {
704        let conn = open_in_memory().unwrap();
705
706        // Task 10: completed at ts=1000, merged at ts=1100 → 100s latency
707        let mut c1 = TeamEvent::task_completed("eng-1", Some("10"));
708        c1.ts = 1000;
709        insert_event(&conn, &c1).unwrap();
710        let mut m1 = TeamEvent::task_auto_merged("eng-1", "10", 0.9, 2, 30);
711        m1.ts = 1100;
712        insert_event(&conn, &m1).unwrap();
713
714        // Task 20: completed at ts=2000, merged at ts=2300 → 300s latency
715        let mut c2 = TeamEvent::task_completed("eng-2", Some("20"));
716        c2.ts = 2000;
717        insert_event(&conn, &c2).unwrap();
718        let mut m2 = TeamEvent::task_manual_merged("20");
719        m2.ts = 2300;
720        insert_event(&conn, &m2).unwrap();
721
722        let row = query_review_metrics(&conn).unwrap();
723        // avg = (100 + 300) / 2 = 200
724        let avg = row.avg_review_latency_secs.unwrap();
725        assert!((avg - 200.0).abs() < 0.01);
726    }
727
728    // --- Fix #1: tasks_completed incremented on task_completed ---
729
730    #[test]
731    fn tasks_completed_increments_on_task_completed() {
732        let conn = open_in_memory().unwrap();
733        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
734
735        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
736        insert_event(&conn, &TeamEvent::task_completed("eng-2", Some("2"))).unwrap();
737
738        let summaries = query_session_summaries(&conn).unwrap();
739        assert_eq!(summaries[0].tasks_completed, 2);
740    }
741
742    // --- Fix #2: total_merges incremented on merge events ---
743
744    #[test]
745    fn total_merges_increments_on_auto_and_manual_merge() {
746        let conn = open_in_memory().unwrap();
747        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
748
749        insert_event(
750            &conn,
751            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
752        )
753        .unwrap();
754        insert_event(&conn, &TeamEvent::task_manual_merged("2")).unwrap();
755        insert_event(
756            &conn,
757            &TeamEvent::task_auto_merged("eng-1", "3", 0.8, 1, 10),
758        )
759        .unwrap();
760
761        let summaries = query_session_summaries(&conn).unwrap();
762        assert_eq!(summaries[0].total_merges, 3);
763    }
764
765    // --- Fix #3: total_events incremented on every insert ---
766
767    #[test]
768    fn total_events_increments_on_every_insert() {
769        let conn = open_in_memory().unwrap();
770        // daemon_started is the first event, creating the session and then incrementing.
771        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
772        insert_event(&conn, &TeamEvent::task_assigned("eng-1", "1")).unwrap();
773        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
774
775        let summaries = query_session_summaries(&conn).unwrap();
776        // 3 events inserted after session was created (daemon_started creates the row
777        // then total_events is incremented for it too).
778        assert_eq!(summaries[0].total_events, 3);
779    }
780
781    // --- Fix #4: ended_at set on daemon_stopped ---
782
783    #[test]
784    fn ended_at_set_on_daemon_stopped() {
785        let conn = open_in_memory().unwrap();
786        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
787
788        let summaries = query_session_summaries(&conn).unwrap();
789        assert!(summaries[0].ended_at.is_none());
790
791        let mut stop = TeamEvent::daemon_stopped_with_reason("shutdown", 3600);
792        stop.ts = 9999;
793        insert_event(&conn, &stop).unwrap();
794
795        let summaries = query_session_summaries(&conn).unwrap();
796        assert_eq!(summaries[0].ended_at, Some(9999));
797    }
798
799    #[test]
800    fn ended_at_set_on_plain_daemon_stopped() {
801        let conn = open_in_memory().unwrap();
802        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
803
804        let mut stop = TeamEvent::daemon_stopped();
805        stop.ts = 5000;
806        insert_event(&conn, &stop).unwrap();
807
808        let summaries = query_session_summaries(&conn).unwrap();
809        assert_eq!(summaries[0].ended_at, Some(5000));
810    }
811
812    // --- Fix #5: idle_polls / working_polls updated ---
813
814    #[test]
815    fn record_agent_poll_state_tracks_idle_polls() {
816        let conn = open_in_memory().unwrap();
817        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
818        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
819        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
820
821        let agents = query_agent_metrics(&conn).unwrap();
822        assert_eq!(agents[0].idle_polls, 3);
823        assert_eq!(agents[0].working_polls, 0);
824        assert_eq!(agents[0].total_cycle_secs, 0);
825    }
826
827    #[test]
828    fn record_agent_poll_state_tracks_working_polls() {
829        let conn = open_in_memory().unwrap();
830        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
831        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
832
833        let agents = query_agent_metrics(&conn).unwrap();
834        assert_eq!(agents[0].working_polls, 2);
835        assert_eq!(agents[0].idle_polls, 0);
836    }
837
838    // --- Fix #6: total_cycle_secs incremented for working agents ---
839
840    #[test]
841    fn record_agent_poll_state_accumulates_cycle_secs_for_working() {
842        let conn = open_in_memory().unwrap();
843        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
844        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
845        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
846
847        let agents = query_agent_metrics(&conn).unwrap();
848        assert_eq!(agents[0].total_cycle_secs, 15); // 3 * 5
849    }
850
851    #[test]
852    fn record_agent_poll_state_idle_does_not_accumulate_cycle_secs() {
853        let conn = open_in_memory().unwrap();
854        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
855        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
856
857        let agents = query_agent_metrics(&conn).unwrap();
858        assert_eq!(agents[0].total_cycle_secs, 0);
859    }
860
861    #[test]
862    fn record_agent_poll_state_mixed_idle_and_working() {
863        let conn = open_in_memory().unwrap();
864        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
865        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
866        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
867        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
868
869        let agents = query_agent_metrics(&conn).unwrap();
870        assert_eq!(agents[0].working_polls, 2);
871        assert_eq!(agents[0].idle_polls, 2);
872        assert_eq!(agents[0].total_cycle_secs, 10); // 2 * 5
873    }
874
875    #[test]
876    fn record_agent_poll_state_multiple_agents() {
877        let conn = open_in_memory().unwrap();
878        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
879        record_agent_poll_state(&conn, "eng-2", false, 5).unwrap();
880        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
881        record_agent_poll_state(&conn, "eng-2", true, 5).unwrap();
882
883        let agents = query_agent_metrics(&conn).unwrap();
884        let eng1 = agents.iter().find(|a| a.role == "eng-1").unwrap();
885        let eng2 = agents.iter().find(|a| a.role == "eng-2").unwrap();
886        assert_eq!(eng1.working_polls, 2);
887        assert_eq!(eng1.total_cycle_secs, 10);
888        assert_eq!(eng2.idle_polls, 1);
889        assert_eq!(eng2.working_polls, 1);
890        assert_eq!(eng2.total_cycle_secs, 5);
891    }
892
893    // --- Edge cases: session counters without a session ---
894
895    #[test]
896    fn session_counters_noop_without_session() {
897        // If no daemon_started event has been emitted, no session row exists.
898        // The UPDATE statements should just affect 0 rows — no error.
899        let conn = open_in_memory().unwrap();
900        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
901        insert_event(
902            &conn,
903            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
904        )
905        .unwrap();
906        let summaries = query_session_summaries(&conn).unwrap();
907        assert!(summaries.is_empty());
908    }
909}