Skip to main content

batty_cli/team/
telemetry_db.rs

1//! SQLite-backed telemetry database for agent performance tracking.
2//!
3//! Stores events, per-agent metrics, per-task metrics, and session summaries
4//! in `.batty/telemetry.db`. All tables use `CREATE TABLE IF NOT EXISTS` —
5//! no migration framework needed.
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::path::Path;
9
10use anyhow::{Context, Result};
11use rusqlite::{Connection, params};
12use serde::{Deserialize, Serialize};
13
14use super::events::TeamEvent;
15use super::metrics::TaskCycleTimeRecord;
16use super::test_results::{TestFailure, TestResults};
17
18/// Database file name under `.batty/`.
19const DB_FILENAME: &str = "telemetry.db";
20
21struct SchemaColumn {
22    name: &'static str,
23    definition: &'static str,
24}
25
26const TASK_METRICS_COLUMNS: &[SchemaColumn] = &[
27    SchemaColumn {
28        name: "started_at",
29        definition: "started_at INTEGER",
30    },
31    SchemaColumn {
32        name: "completed_at",
33        definition: "completed_at INTEGER",
34    },
35    SchemaColumn {
36        name: "retries",
37        definition: "retries INTEGER NOT NULL DEFAULT 0",
38    },
39    SchemaColumn {
40        name: "narration_rejections",
41        definition: "narration_rejections INTEGER NOT NULL DEFAULT 0",
42    },
43    SchemaColumn {
44        name: "escalations",
45        definition: "escalations INTEGER NOT NULL DEFAULT 0",
46    },
47    SchemaColumn {
48        name: "context_restart_count",
49        definition: "context_restart_count INTEGER NOT NULL DEFAULT 0",
50    },
51    SchemaColumn {
52        name: "handoff_attempts",
53        definition: "handoff_attempts INTEGER NOT NULL DEFAULT 0",
54    },
55    SchemaColumn {
56        name: "handoff_successes",
57        definition: "handoff_successes INTEGER NOT NULL DEFAULT 0",
58    },
59    SchemaColumn {
60        name: "carry_forward_effective",
61        definition: "carry_forward_effective INTEGER",
62    },
63    SchemaColumn {
64        name: "merge_time_secs",
65        definition: "merge_time_secs INTEGER",
66    },
67    SchemaColumn {
68        name: "confidence_score",
69        definition: "confidence_score REAL",
70    },
71];
72
73const SESSION_SUMMARY_COLUMNS: &[SchemaColumn] = &[
74    SchemaColumn {
75        name: "started_at",
76        definition: "started_at INTEGER NOT NULL",
77    },
78    SchemaColumn {
79        name: "ended_at",
80        definition: "ended_at INTEGER",
81    },
82    SchemaColumn {
83        name: "tasks_completed",
84        definition: "tasks_completed INTEGER NOT NULL DEFAULT 0",
85    },
86    SchemaColumn {
87        name: "total_merges",
88        definition: "total_merges INTEGER NOT NULL DEFAULT 0",
89    },
90    SchemaColumn {
91        name: "total_events",
92        definition: "total_events INTEGER NOT NULL DEFAULT 0",
93    },
94    SchemaColumn {
95        name: "discord_events_sent",
96        definition: "discord_events_sent INTEGER NOT NULL DEFAULT 0",
97    },
98    SchemaColumn {
99        name: "verification_passes",
100        definition: "verification_passes INTEGER NOT NULL DEFAULT 0",
101    },
102    SchemaColumn {
103        name: "verification_failures",
104        definition: "verification_failures INTEGER NOT NULL DEFAULT 0",
105    },
106    SchemaColumn {
107        name: "notification_isolations",
108        definition: "notification_isolations INTEGER NOT NULL DEFAULT 0",
109    },
110    SchemaColumn {
111        name: "notification_latency_total_secs",
112        definition: "notification_latency_total_secs INTEGER NOT NULL DEFAULT 0",
113    },
114    SchemaColumn {
115        name: "notification_latency_samples",
116        definition: "notification_latency_samples INTEGER NOT NULL DEFAULT 0",
117    },
118];
119
120#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)]
121struct SchemaRepairReport {
122    repaired_columns: BTreeMap<String, Vec<String>>,
123}
124
125impl SchemaRepairReport {
126    fn record(&mut self, table: &str, column: &str) {
127        self.repaired_columns
128            .entry(table.to_string())
129            .or_default()
130            .push(column.to_string());
131    }
132
133    fn is_empty(&self) -> bool {
134        self.repaired_columns.is_empty()
135    }
136}
137
138/// Open or create the telemetry database, initializing the schema.
139pub fn open(project_root: &Path) -> Result<Connection> {
140    let db_path = project_root.join(".batty").join(DB_FILENAME);
141    let conn = Connection::open(&db_path)
142        .with_context(|| format!("failed to open telemetry db at {}", db_path.display()))?;
143
144    // WAL mode for better concurrent read/write performance.
145    conn.pragma_update(None, "journal_mode", "WAL")?;
146
147    init_schema(&conn)?;
148    Ok(conn)
149}
150
151/// Open an in-memory database (for tests).
152#[cfg(test)]
153pub fn open_in_memory() -> Result<Connection> {
154    let conn = Connection::open_in_memory()?;
155    init_schema(&conn)?;
156    Ok(conn)
157}
158
159fn init_schema(conn: &Connection) -> Result<()> {
160    conn.execute_batch(
161        "
162        CREATE TABLE IF NOT EXISTS events (
163            id          INTEGER PRIMARY KEY AUTOINCREMENT,
164            timestamp   INTEGER NOT NULL,
165            event_type  TEXT NOT NULL,
166            role        TEXT,
167            task_id     TEXT,
168            payload     TEXT NOT NULL
169        );
170
171        CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
172        CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
173        CREATE INDEX IF NOT EXISTS idx_events_role ON events(role);
174
175        CREATE TABLE IF NOT EXISTS agent_metrics (
176            role            TEXT PRIMARY KEY,
177            completions     INTEGER NOT NULL DEFAULT 0,
178            failures        INTEGER NOT NULL DEFAULT 0,
179            restarts        INTEGER NOT NULL DEFAULT 0,
180            total_cycle_secs INTEGER NOT NULL DEFAULT 0,
181            idle_polls      INTEGER NOT NULL DEFAULT 0,
182            working_polls   INTEGER NOT NULL DEFAULT 0
183        );
184
185        CREATE TABLE IF NOT EXISTS task_metrics (
186            task_id          TEXT PRIMARY KEY,
187            started_at       INTEGER,
188            completed_at     INTEGER,
189            retries          INTEGER NOT NULL DEFAULT 0,
190            narration_rejections INTEGER NOT NULL DEFAULT 0,
191            escalations      INTEGER NOT NULL DEFAULT 0,
192            context_restart_count INTEGER NOT NULL DEFAULT 0,
193            handoff_attempts INTEGER NOT NULL DEFAULT 0,
194            handoff_successes INTEGER NOT NULL DEFAULT 0,
195            carry_forward_effective INTEGER,
196            merge_time_secs  INTEGER,
197            confidence_score REAL
198        );
199
200        CREATE TABLE IF NOT EXISTS session_summary (
201            session_id      TEXT PRIMARY KEY,
202            started_at      INTEGER NOT NULL,
203            ended_at        INTEGER,
204            tasks_completed INTEGER NOT NULL DEFAULT 0,
205            total_merges    INTEGER NOT NULL DEFAULT 0,
206            total_events    INTEGER NOT NULL DEFAULT 0,
207            discord_events_sent INTEGER NOT NULL DEFAULT 0,
208            verification_passes INTEGER NOT NULL DEFAULT 0,
209            verification_failures INTEGER NOT NULL DEFAULT 0,
210            notification_isolations INTEGER NOT NULL DEFAULT 0,
211            notification_latency_total_secs INTEGER NOT NULL DEFAULT 0,
212            notification_latency_samples INTEGER NOT NULL DEFAULT 0
213        );
214
215        CREATE TABLE IF NOT EXISTS test_case_metrics (
216            framework       TEXT NOT NULL,
217            test_name       TEXT NOT NULL,
218            failures        INTEGER NOT NULL DEFAULT 0,
219            flaky_passes    INTEGER NOT NULL DEFAULT 0,
220            last_task_id    TEXT,
221            last_engineer   TEXT,
222            last_seen_at    INTEGER NOT NULL DEFAULT 0,
223            PRIMARY KEY (framework, test_name)
224        );
225
226        CREATE TABLE IF NOT EXISTS task_cycle_times (
227            task_id          TEXT PRIMARY KEY,
228            engineer         TEXT,
229            priority         TEXT NOT NULL,
230            cycle_time_mins  INTEGER,
231            lead_time_mins   INTEGER,
232            completed_at     INTEGER NOT NULL
233        );
234        ",
235    )
236    .context("failed to initialize telemetry schema")?;
237    let repairs = repair_legacy_schema(conn)?;
238    if !repairs.is_empty() {
239        record_schema_repair_event(conn, &repairs)?;
240    }
241    Ok(())
242}
243
244fn repair_legacy_schema(conn: &Connection) -> Result<SchemaRepairReport> {
245    let mut repairs = SchemaRepairReport::default();
246    ensure_table_columns(conn, "task_metrics", TASK_METRICS_COLUMNS, &mut repairs)?;
247    ensure_table_columns(
248        conn,
249        "session_summary",
250        SESSION_SUMMARY_COLUMNS,
251        &mut repairs,
252    )?;
253    Ok(repairs)
254}
255
256fn ensure_table_columns(
257    conn: &Connection,
258    table: &str,
259    columns: &[SchemaColumn],
260    repairs: &mut SchemaRepairReport,
261) -> Result<()> {
262    let mut existing = query_table_columns(conn, table)?;
263    for column in columns {
264        if existing.contains(column.name) {
265            continue;
266        }
267        let sql = format!("ALTER TABLE {table} ADD COLUMN {}", column.definition);
268        conn.execute(&sql, []).with_context(|| {
269            format!(
270                "failed to add {}.{} to telemetry schema",
271                table, column.name
272            )
273        })?;
274        existing.insert(column.name.to_string());
275        repairs.record(table, column.name);
276    }
277    Ok(())
278}
279
280fn query_table_columns(conn: &Connection, table: &str) -> Result<BTreeSet<String>> {
281    let sql = format!("PRAGMA table_info({table})");
282    let mut stmt = conn
283        .prepare(&sql)
284        .with_context(|| format!("failed to inspect telemetry schema for table {table}"))?;
285    let columns = stmt
286        .query_map([], |row| row.get::<_, String>(1))?
287        .collect::<std::result::Result<BTreeSet<_>, _>>()?;
288    Ok(columns)
289}
290
291fn record_schema_repair_event(conn: &Connection, repairs: &SchemaRepairReport) -> Result<()> {
292    let payload = serde_json::to_string(repairs).context("failed to serialize schema repair")?;
293    conn.execute(
294        "INSERT INTO events (timestamp, event_type, role, task_id, payload)
295         VALUES (?1, ?2, NULL, NULL, ?3)",
296        params![
297            chrono::Utc::now().timestamp(),
298            "telemetry_schema_repaired",
299            payload
300        ],
301    )
302    .context("failed to record telemetry schema repair event")?;
303    Ok(())
304}
305
306#[cfg(test)]
307pub(crate) fn install_legacy_schema_for_tests(conn: &Connection) -> Result<()> {
308    conn.execute_batch(
309        "
310        DROP TABLE IF EXISTS events;
311        DROP TABLE IF EXISTS agent_metrics;
312        DROP TABLE IF EXISTS task_metrics;
313        DROP TABLE IF EXISTS session_summary;
314        DROP TABLE IF EXISTS test_case_metrics;
315        DROP TABLE IF EXISTS task_cycle_times;
316
317        CREATE TABLE events (
318            id          INTEGER PRIMARY KEY AUTOINCREMENT,
319            timestamp   INTEGER NOT NULL,
320            event_type  TEXT NOT NULL,
321            role        TEXT,
322            task_id     TEXT,
323            payload     TEXT NOT NULL
324        );
325
326        CREATE TABLE agent_metrics (
327            role            TEXT PRIMARY KEY,
328            completions     INTEGER NOT NULL DEFAULT 0,
329            failures        INTEGER NOT NULL DEFAULT 0,
330            restarts        INTEGER NOT NULL DEFAULT 0,
331            total_cycle_secs INTEGER NOT NULL DEFAULT 0,
332            idle_polls      INTEGER NOT NULL DEFAULT 0,
333            working_polls   INTEGER NOT NULL DEFAULT 0
334        );
335
336        CREATE TABLE task_metrics (
337            task_id         TEXT PRIMARY KEY,
338            started_at      INTEGER,
339            completed_at    INTEGER,
340            retries         INTEGER NOT NULL DEFAULT 0,
341            escalations     INTEGER NOT NULL DEFAULT 0,
342            merge_time_secs INTEGER
343        );
344
345        CREATE TABLE session_summary (
346            session_id      TEXT PRIMARY KEY,
347            started_at      INTEGER NOT NULL,
348            ended_at        INTEGER,
349            tasks_completed INTEGER NOT NULL DEFAULT 0,
350            total_merges    INTEGER NOT NULL DEFAULT 0,
351            total_events    INTEGER NOT NULL DEFAULT 0
352        );
353        ",
354    )
355    .context("failed to install legacy telemetry schema fixture")?;
356    Ok(())
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct TaskCycleTimeRow {
361    pub task_id: String,
362    pub engineer: Option<String>,
363    pub priority: String,
364    pub cycle_time_mins: Option<i64>,
365    pub lead_time_mins: Option<i64>,
366    pub completed_at: i64,
367}
368
369#[derive(Debug, Clone, PartialEq)]
370pub struct PriorityCycleTimeRow {
371    pub priority: String,
372    pub average_cycle_time_mins: f64,
373    pub completed_tasks: i64,
374}
375
376#[derive(Debug, Clone, PartialEq)]
377pub struct EngineerThroughputRow {
378    pub engineer: String,
379    pub completed_tasks: i64,
380    pub average_cycle_time_mins: Option<f64>,
381    pub average_lead_time_mins: Option<f64>,
382}
383
384#[derive(Debug, Clone, PartialEq, Eq)]
385pub struct HourlyThroughputRow {
386    pub hour_start: i64,
387    pub completed_tasks: i64,
388}
389
390pub fn record_test_results(
391    conn: &Connection,
392    task_id: u32,
393    engineer: &str,
394    results: &TestResults,
395    flaky_failures: &[TestFailure],
396) -> Result<()> {
397    let task_id = task_id.to_string();
398    let now = chrono::Utc::now().timestamp();
399
400    for failure in flaky_failures {
401        conn.execute(
402            "INSERT INTO test_case_metrics (framework, test_name, flaky_passes, last_task_id, last_engineer, last_seen_at)
403             VALUES (?1, ?2, 1, ?3, ?4, ?5)
404             ON CONFLICT(framework, test_name) DO UPDATE SET
405               flaky_passes = flaky_passes + 1,
406               last_task_id = excluded.last_task_id,
407               last_engineer = excluded.last_engineer,
408               last_seen_at = excluded.last_seen_at",
409            params![results.framework, failure.test_name, task_id, engineer, now],
410        )?;
411    }
412
413    for failure in &results.failures {
414        conn.execute(
415            "INSERT INTO test_case_metrics (framework, test_name, failures, last_task_id, last_engineer, last_seen_at)
416             VALUES (?1, ?2, 1, ?3, ?4, ?5)
417             ON CONFLICT(framework, test_name) DO UPDATE SET
418               failures = failures + 1,
419               last_task_id = excluded.last_task_id,
420               last_engineer = excluded.last_engineer,
421               last_seen_at = excluded.last_seen_at",
422            params![results.framework, failure.test_name, task_id, engineer, now],
423        )?;
424    }
425
426    Ok(())
427}
428
429#[cfg(test)]
430fn query_test_case_metric(
431    conn: &Connection,
432    framework: &str,
433    test_name: &str,
434) -> Result<(u32, u32)> {
435    conn.query_row(
436        "SELECT failures, flaky_passes FROM test_case_metrics WHERE framework = ?1 AND test_name = ?2",
437        params![framework, test_name],
438        |row| Ok((row.get::<_, u32>(0)?, row.get::<_, u32>(1)?)),
439    )
440    .context("failed to query test case metric")
441}
442
443// ---------------------------------------------------------------------------
444// Insert helpers
445// ---------------------------------------------------------------------------
446
447/// Insert a raw event into the events table. Also updates derived metrics.
448pub fn insert_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
449    let payload =
450        serde_json::to_string(event).context("failed to serialize event for telemetry")?;
451
452    conn.execute(
453        "INSERT INTO events (timestamp, event_type, role, task_id, payload) VALUES (?1, ?2, ?3, ?4, ?5)",
454        params![
455            event.ts as i64,
456            event.event,
457            event.role,
458            event.task,
459            payload,
460        ],
461    )
462    .context("failed to insert telemetry event")?;
463
464    // Update derived metrics based on event type (may create session row).
465    update_metrics_for_event(conn, event)?;
466
467    // Fix #3: Increment total_events on every insert (after update_metrics
468    // so that daemon_started can create the session row first).
469    conn.execute(
470        "UPDATE session_summary SET total_events = total_events + 1
471         WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
472        [],
473    )?;
474
475    Ok(())
476}
477
478fn update_metrics_for_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
479    match event.event.as_str() {
480        "task_completed" => {
481            if let Some(role) = &event.role {
482                upsert_agent_counter(conn, role, "completions")?;
483            }
484            if let Some(task) = &event.task {
485                conn.execute(
486                    "INSERT INTO task_metrics (task_id, completed_at) VALUES (?1, ?2)
487                     ON CONFLICT(task_id) DO UPDATE SET completed_at = ?2",
488                    params![task, event.ts as i64],
489                )?;
490                conn.execute(
491                    "UPDATE task_metrics
492                     SET carry_forward_effective = CASE
493                         WHEN context_restart_count = 0 THEN carry_forward_effective
494                         WHEN context_restart_count = 1 THEN 1
495                         ELSE 0
496                     END
497                     WHERE task_id = ?1",
498                    params![task],
499                )?;
500            }
501            // Fix #1: Increment tasks_completed on latest session.
502            conn.execute(
503                "UPDATE session_summary SET tasks_completed = tasks_completed + 1
504                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
505                [],
506            )?;
507        }
508        "task_assigned" => {
509            if let Some(task) = &event.task {
510                conn.execute(
511                    "INSERT INTO task_metrics (task_id, started_at) VALUES (?1, ?2)
512                     ON CONFLICT(task_id) DO UPDATE SET started_at = COALESCE(task_metrics.started_at, ?2)",
513                    params![task, event.ts as i64],
514                )?;
515            }
516        }
517        "task_escalated" | "narration_restart" => {
518            if let Some(task) = &event.task {
519                conn.execute(
520                    "INSERT INTO task_metrics (task_id, escalations) VALUES (?1, 1)
521                     ON CONFLICT(task_id) DO UPDATE SET escalations = escalations + 1",
522                    params![task],
523                )?;
524            }
525        }
526        "narration_rejection" => {
527            if let Some(task) = &event.task {
528                conn.execute(
529                    "INSERT INTO task_metrics (task_id, narration_rejections) VALUES (?1, 1)
530                     ON CONFLICT(task_id) DO UPDATE SET narration_rejections = narration_rejections + 1",
531                    params![task],
532                )?;
533            }
534        }
535        "member_crashed" | "pane_death" | "delivery_failed" => {
536            if let Some(role) = &event.role {
537                upsert_agent_counter(conn, role, "failures")?;
538            }
539        }
540        "pane_respawned" | "agent_restarted" | "context_exhausted" => {
541            if let Some(role) = &event.role {
542                upsert_agent_counter(conn, role, "restarts")?;
543            }
544            if event.event == "agent_restarted"
545                && event.reason.as_deref() == Some("context_exhausted")
546                && let Some(task) = &event.task
547            {
548                conn.execute(
549                    "INSERT INTO task_metrics (task_id, context_restart_count) VALUES (?1, 1)
550                     ON CONFLICT(task_id) DO UPDATE SET context_restart_count = context_restart_count + 1",
551                    params![task],
552                )?;
553            }
554        }
555        "agent_handoff" => {
556            if let Some(task) = &event.task {
557                let success = if event.success == Some(true) { 1 } else { 0 };
558                conn.execute(
559                    "INSERT INTO task_metrics (task_id, handoff_attempts, handoff_successes) VALUES (?1, 1, ?2)
560                     ON CONFLICT(task_id) DO UPDATE SET
561                       handoff_attempts = handoff_attempts + 1,
562                       handoff_successes = handoff_successes + ?2",
563                    params![task, success],
564                )?;
565            }
566        }
567        "task_auto_merged" | "task_manual_merged" => {
568            if let Some(task) = &event.task {
569                conn.execute(
570                    "INSERT INTO task_metrics (task_id, merge_time_secs) VALUES (?1, ?2)
571                     ON CONFLICT(task_id) DO UPDATE SET merge_time_secs = ?2 - COALESCE(task_metrics.started_at, ?2)",
572                    params![task, event.ts as i64],
573                )?;
574            }
575            // Fix #2: Increment total_merges on latest session.
576            conn.execute(
577                "UPDATE session_summary SET total_merges = total_merges + 1
578                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
579                [],
580            )?;
581        }
582        "merge_confidence_scored" => {
583            if let Some(task) = &event.task {
584                if let Some(confidence) = event.load {
585                    conn.execute(
586                        "INSERT INTO task_metrics (task_id, confidence_score) VALUES (?1, ?2)
587                         ON CONFLICT(task_id) DO UPDATE SET confidence_score = ?2",
588                        params![task, confidence],
589                    )?;
590                }
591            }
592        }
593        "daemon_started" => {
594            let session_id = format!("session-{}", event.ts);
595            conn.execute(
596                "INSERT OR IGNORE INTO session_summary (session_id, started_at) VALUES (?1, ?2)",
597                params![session_id, event.ts as i64],
598            )?;
599        }
600        "discord_event_sent" => {
601            conn.execute(
602                "UPDATE session_summary SET discord_events_sent = discord_events_sent + 1
603                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
604                [],
605            )?;
606        }
607        "auto_merge_post_verify_result" => match event.success {
608            Some(true) => {
609                conn.execute(
610                    "UPDATE session_summary SET verification_passes = verification_passes + 1
611                     WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
612                    [],
613                )?;
614            }
615            Some(false) => {
616                conn.execute(
617                    "UPDATE session_summary SET verification_failures = verification_failures + 1
618                     WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
619                    [],
620                )?;
621            }
622            None => {}
623        },
624        "inbox_message_deduplicated" | "inbox_batch_delivered" => {
625            conn.execute(
626                "UPDATE session_summary SET notification_isolations = notification_isolations + 1
627                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
628                [],
629            )?;
630        }
631        "notification_delivery_sample" => {
632            let latency_secs = event.uptime_secs.unwrap_or(0) as i64;
633            conn.execute(
634                "UPDATE session_summary
635                 SET notification_latency_total_secs = notification_latency_total_secs + ?1,
636                     notification_latency_samples = notification_latency_samples + 1
637                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
638                params![latency_secs],
639            )?;
640        }
641        // Fix #4: Set ended_at on latest session when daemon stops.
642        // Both daemon_stopped() and daemon_stopped_with_reason() use "daemon_stopped" as event name.
643        "daemon_stopped" => {
644            conn.execute(
645                "UPDATE session_summary SET ended_at = ?1
646                 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
647                params![event.ts as i64],
648            )?;
649        }
650        _ => {}
651    }
652    Ok(())
653}
654
655fn upsert_agent_counter(conn: &Connection, role: &str, column: &str) -> Result<()> {
656    // column is a known static string, safe to interpolate.
657    let sql = format!(
658        "INSERT INTO agent_metrics (role, {column}) VALUES (?1, 1)
659         ON CONFLICT(role) DO UPDATE SET {column} = {column} + 1"
660    );
661    conn.execute(&sql, params![role])?;
662    Ok(())
663}
664
665/// Record an agent's poll state (idle or working) and accumulate cycle time.
666///
667/// Fix #5: Upserts idle_polls or working_polls for the given role.
668/// Fix #6: Increments total_cycle_secs by `poll_interval_secs` when working.
669pub fn record_agent_poll_state(
670    conn: &Connection,
671    role: &str,
672    is_working: bool,
673    poll_interval_secs: u64,
674) -> Result<()> {
675    if is_working {
676        conn.execute(
677            "INSERT INTO agent_metrics (role, working_polls, total_cycle_secs)
678             VALUES (?1, 1, ?2)
679             ON CONFLICT(role) DO UPDATE SET
680                working_polls = working_polls + 1,
681                total_cycle_secs = total_cycle_secs + ?2",
682            params![role, poll_interval_secs as i64],
683        )?;
684    } else {
685        conn.execute(
686            "INSERT INTO agent_metrics (role, idle_polls) VALUES (?1, 1)
687             ON CONFLICT(role) DO UPDATE SET idle_polls = idle_polls + 1",
688            params![role],
689        )?;
690    }
691    Ok(())
692}
693
694// ---------------------------------------------------------------------------
695// Query helpers
696// ---------------------------------------------------------------------------
697
698/// Summary row for `batty telemetry summary`.
699#[derive(Debug, Clone)]
700pub struct SessionSummaryRow {
701    pub session_id: String,
702    pub started_at: i64,
703    pub ended_at: Option<i64>,
704    pub tasks_completed: i64,
705    pub total_merges: i64,
706    pub total_events: i64,
707    pub discord_events_sent: i64,
708    pub verification_passes: i64,
709    pub verification_failures: i64,
710    pub notification_isolations: i64,
711    pub notification_latency_total_secs: i64,
712    pub notification_latency_samples: i64,
713}
714
715pub fn query_session_summaries(conn: &Connection) -> Result<Vec<SessionSummaryRow>> {
716    let mut stmt = conn.prepare(
717        "SELECT session_id, started_at, ended_at, tasks_completed, total_merges, total_events,
718                discord_events_sent, verification_passes, verification_failures,
719                notification_isolations, notification_latency_total_secs, notification_latency_samples
720         FROM session_summary ORDER BY started_at DESC LIMIT 20",
721    )?;
722    let rows = stmt
723        .query_map([], |row| {
724            Ok(SessionSummaryRow {
725                session_id: row.get(0)?,
726                started_at: row.get(1)?,
727                ended_at: row.get(2)?,
728                tasks_completed: row.get(3)?,
729                total_merges: row.get(4)?,
730                total_events: row.get(5)?,
731                discord_events_sent: row.get(6)?,
732                verification_passes: row.get(7)?,
733                verification_failures: row.get(8)?,
734                notification_isolations: row.get(9)?,
735                notification_latency_total_secs: row.get(10)?,
736                notification_latency_samples: row.get(11)?,
737            })
738        })?
739        .collect::<std::result::Result<Vec<_>, _>>()?;
740    Ok(rows)
741}
742
743/// Agent metrics row for `batty telemetry agents`.
744#[derive(Debug, Clone)]
745pub struct AgentMetricsRow {
746    pub role: String,
747    pub completions: i64,
748    pub failures: i64,
749    pub restarts: i64,
750    pub total_cycle_secs: i64,
751    pub idle_polls: i64,
752    pub working_polls: i64,
753}
754
755pub fn query_agent_metrics(conn: &Connection) -> Result<Vec<AgentMetricsRow>> {
756    let mut stmt = conn.prepare(
757        "SELECT role, completions, failures, restarts, total_cycle_secs, idle_polls, working_polls
758         FROM agent_metrics ORDER BY role",
759    )?;
760    let rows = stmt
761        .query_map([], |row| {
762            Ok(AgentMetricsRow {
763                role: row.get(0)?,
764                completions: row.get(1)?,
765                failures: row.get(2)?,
766                restarts: row.get(3)?,
767                total_cycle_secs: row.get(4)?,
768                idle_polls: row.get(5)?,
769                working_polls: row.get(6)?,
770            })
771        })?
772        .collect::<std::result::Result<Vec<_>, _>>()?;
773    Ok(rows)
774}
775
776/// Task metrics row for `batty telemetry tasks`.
777#[derive(Debug, Clone)]
778pub struct TaskMetricsRow {
779    pub task_id: String,
780    pub started_at: Option<i64>,
781    pub completed_at: Option<i64>,
782    pub retries: i64,
783    pub narration_rejections: i64,
784    pub escalations: i64,
785    pub context_restart_count: i64,
786    pub handoff_attempts: i64,
787    pub handoff_successes: i64,
788    pub carry_forward_effective: Option<bool>,
789    pub merge_time_secs: Option<i64>,
790    pub confidence_score: Option<f64>,
791}
792
793pub fn query_task_metrics(conn: &Connection) -> Result<Vec<TaskMetricsRow>> {
794    let mut stmt = conn.prepare(
795        "SELECT task_id, started_at, completed_at, retries, narration_rejections, escalations,
796                context_restart_count, handoff_attempts, handoff_successes,
797                carry_forward_effective, merge_time_secs, confidence_score
798         FROM task_metrics ORDER BY started_at DESC NULLS LAST LIMIT 50",
799    )?;
800    let rows = stmt
801        .query_map([], |row| {
802            Ok(TaskMetricsRow {
803                task_id: row.get(0)?,
804                started_at: row.get(1)?,
805                completed_at: row.get(2)?,
806                retries: row.get(3)?,
807                narration_rejections: row.get(4)?,
808                escalations: row.get(5)?,
809                context_restart_count: row.get(6)?,
810                handoff_attempts: row.get(7)?,
811                handoff_successes: row.get(8)?,
812                carry_forward_effective: row.get::<_, Option<i64>>(9)?.map(|value| value != 0),
813                merge_time_secs: row.get(10)?,
814                confidence_score: row.get(11)?,
815            })
816        })?
817        .collect::<std::result::Result<Vec<_>, _>>()?;
818    Ok(rows)
819}
820
821pub fn replace_task_cycle_times(conn: &Connection, rows: &[TaskCycleTimeRecord]) -> Result<()> {
822    conn.execute("DELETE FROM task_cycle_times", [])?;
823
824    let mut insert = conn.prepare(
825        "INSERT INTO task_cycle_times
826         (task_id, engineer, priority, cycle_time_mins, lead_time_mins, completed_at)
827         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
828    )?;
829
830    for row in rows {
831        let Some(completed_at) = row.completed_at else {
832            continue;
833        };
834        insert.execute(params![
835            row.task_id.to_string(),
836            row.engineer.as_deref(),
837            row.priority.as_str(),
838            row.cycle_time_minutes,
839            row.lead_time_minutes,
840            completed_at,
841        ])?;
842    }
843
844    Ok(())
845}
846
847pub fn query_task_cycle_times(conn: &Connection) -> Result<Vec<TaskCycleTimeRow>> {
848    let mut stmt = conn.prepare(
849        "SELECT task_id, engineer, priority, cycle_time_mins, lead_time_mins, completed_at
850         FROM task_cycle_times
851         ORDER BY completed_at DESC",
852    )?;
853    let rows = stmt
854        .query_map([], |row| {
855            Ok(TaskCycleTimeRow {
856                task_id: row.get(0)?,
857                engineer: row.get(1)?,
858                priority: row.get(2)?,
859                cycle_time_mins: row.get(3)?,
860                lead_time_mins: row.get(4)?,
861                completed_at: row.get(5)?,
862            })
863        })?
864        .collect::<std::result::Result<Vec<_>, _>>()?;
865    Ok(rows)
866}
867
868pub fn query_average_cycle_time_by_priority(
869    conn: &Connection,
870) -> Result<Vec<PriorityCycleTimeRow>> {
871    let mut stmt = conn.prepare(
872        "SELECT priority,
873                AVG(cycle_time_mins) AS avg_cycle_time_mins,
874                COUNT(*) AS completed_tasks
875         FROM task_cycle_times
876         WHERE cycle_time_mins IS NOT NULL
877         GROUP BY priority
878         ORDER BY CASE priority
879             WHEN 'critical' THEN 0
880             WHEN 'high' THEN 1
881             WHEN 'medium' THEN 2
882             WHEN 'low' THEN 3
883             ELSE 4
884         END, priority",
885    )?;
886    let rows = stmt
887        .query_map([], |row| {
888            Ok(PriorityCycleTimeRow {
889                priority: row.get(0)?,
890                average_cycle_time_mins: row.get(1)?,
891                completed_tasks: row.get(2)?,
892            })
893        })?
894        .collect::<std::result::Result<Vec<_>, _>>()?;
895    Ok(rows)
896}
897
898pub fn query_engineer_throughput(conn: &Connection) -> Result<Vec<EngineerThroughputRow>> {
899    let mut stmt = conn.prepare(
900        "SELECT engineer,
901                COUNT(*) AS completed_tasks,
902                AVG(cycle_time_mins) AS avg_cycle_time_mins,
903                AVG(lead_time_mins) AS avg_lead_time_mins
904         FROM task_cycle_times
905         WHERE engineer IS NOT NULL
906         GROUP BY engineer
907         ORDER BY completed_tasks DESC, engineer ASC",
908    )?;
909    let rows = stmt
910        .query_map([], |row| {
911            Ok(EngineerThroughputRow {
912                engineer: row.get(0)?,
913                completed_tasks: row.get(1)?,
914                average_cycle_time_mins: row.get(2)?,
915                average_lead_time_mins: row.get(3)?,
916            })
917        })?
918        .collect::<std::result::Result<Vec<_>, _>>()?;
919    Ok(rows)
920}
921
922pub fn query_hourly_throughput(
923    conn: &Connection,
924    window_start: i64,
925) -> Result<Vec<HourlyThroughputRow>> {
926    let mut stmt = conn.prepare(
927        "WITH RECURSIVE hours(hour_start) AS (
928            SELECT (?1 / 3600) * 3600
929            UNION ALL
930            SELECT hour_start + 3600
931            FROM hours
932            WHERE hour_start + 3600 <= (strftime('%s', 'now', 'localtime') / 3600) * 3600
933         )
934         SELECT hours.hour_start,
935                COALESCE(counts.completed_tasks, 0) AS completed_tasks
936         FROM hours
937         LEFT JOIN (
938            SELECT (completed_at / 3600) * 3600 AS hour_start,
939                   COUNT(*) AS completed_tasks
940            FROM task_cycle_times
941            WHERE completed_at >= ?1
942            GROUP BY 1
943         ) counts ON counts.hour_start = hours.hour_start
944         ORDER BY hours.hour_start",
945    )?;
946    let rows = stmt
947        .query_map(params![window_start], |row| {
948            Ok(HourlyThroughputRow {
949                hour_start: row.get(0)?,
950                completed_tasks: row.get(1)?,
951            })
952        })?
953        .collect::<std::result::Result<Vec<_>, _>>()?;
954    Ok(rows)
955}
956
957/// Recent events row for `batty telemetry events`.
958#[derive(Debug, Clone)]
959pub struct EventRow {
960    pub timestamp: i64,
961    pub event_type: String,
962    pub role: Option<String>,
963    pub task_id: Option<String>,
964}
965
966pub fn query_recent_events(conn: &Connection, limit: usize) -> Result<Vec<EventRow>> {
967    let mut stmt = conn.prepare(
968        "SELECT timestamp, event_type, role, task_id
969         FROM events ORDER BY timestamp DESC LIMIT ?1",
970    )?;
971    let rows = stmt
972        .query_map(params![limit as i64], |row| {
973            Ok(EventRow {
974                timestamp: row.get(0)?,
975                event_type: row.get(1)?,
976                role: row.get(2)?,
977                task_id: row.get(3)?,
978            })
979        })?
980        .collect::<std::result::Result<Vec<_>, _>>()?;
981    Ok(rows)
982}
983
984/// Review pipeline metrics aggregated from the events table.
985#[derive(Debug, Clone)]
986pub struct ReviewMetricsRow {
987    pub auto_merge_count: i64,
988    pub manual_merge_count: i64,
989    pub direct_root_merge_count: i64,
990    pub isolated_integration_merge_count: i64,
991    pub direct_root_failure_count: i64,
992    pub isolated_integration_failure_count: i64,
993    pub rework_count: i64,
994    pub review_nudge_count: i64,
995    pub review_escalation_count: i64,
996    pub avg_review_latency_secs: Option<f64>,
997    pub accepted_decision_count: i64,
998    pub rejected_decision_count: i64,
999    pub rejection_reasons: Vec<AutoMergeReasonRow>,
1000    pub post_merge_verify_pass_count: i64,
1001    pub post_merge_verify_fail_count: i64,
1002    pub post_merge_verify_skip_count: i64,
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1006pub struct AutoMergeReasonRow {
1007    pub reason: String,
1008    pub count: i64,
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Serialize)]
1012pub struct EngineerPerformanceProfileRow {
1013    pub role: String,
1014    pub completed_tasks: i64,
1015    pub avg_task_completion_secs: Option<f64>,
1016    pub lines_per_hour: Option<f64>,
1017    pub first_pass_test_rate: Option<f64>,
1018    pub context_exhaustion_frequency: Option<f64>,
1019}
1020
1021pub fn query_engineer_performance_profiles(
1022    conn: &Connection,
1023) -> Result<Vec<EngineerPerformanceProfileRow>> {
1024    let mut completion_stmt = conn.prepare(
1025        "SELECT role, task_id,
1026                json_extract(payload, '$.time_to_completion_secs') AS time_to_completion_secs,
1027                json_extract(payload, '$.first_pass_test_rate') AS first_pass_test_rate
1028         FROM events
1029         WHERE event_type = 'quality_metrics_recorded'
1030           AND role IS NOT NULL
1031           AND task_id IS NOT NULL
1032         ORDER BY role, task_id",
1033    )?;
1034
1035    #[derive(Default)]
1036    struct Accumulator {
1037        completed_tasks: i64,
1038        total_completion_secs: f64,
1039        completion_secs_samples: i64,
1040        first_pass_sum: f64,
1041        first_pass_samples: i64,
1042        context_exhausted_tasks: i64,
1043        loc_lines: i64,
1044        loc_hours: f64,
1045    }
1046
1047    let mut by_role = std::collections::BTreeMap::<String, Accumulator>::new();
1048    let mut task_durations = std::collections::HashMap::<String, (String, f64)>::new();
1049
1050    let completion_rows = completion_stmt.query_map([], |row| {
1051        Ok((
1052            row.get::<_, String>(0)?,
1053            row.get::<_, String>(1)?,
1054            row.get::<_, Option<f64>>(2)?,
1055            row.get::<_, Option<f64>>(3)?,
1056        ))
1057    })?;
1058
1059    for row in completion_rows {
1060        let (role, task_id, completion_secs, first_pass_rate) = row?;
1061        let entry = by_role.entry(role.clone()).or_default();
1062        entry.completed_tasks += 1;
1063        if let Some(completion_secs) = completion_secs {
1064            entry.total_completion_secs += completion_secs;
1065            entry.completion_secs_samples += 1;
1066            task_durations.insert(task_id.clone(), (role.clone(), completion_secs));
1067        }
1068        if let Some(first_pass_rate) = first_pass_rate {
1069            entry.first_pass_sum += first_pass_rate;
1070            entry.first_pass_samples += 1;
1071        }
1072    }
1073
1074    let mut ctx_stmt = conn.prepare(
1075        "SELECT task_id, context_restart_count
1076         FROM task_metrics
1077         WHERE context_restart_count > 0",
1078    )?;
1079    let ctx_rows = ctx_stmt.query_map([], |row| {
1080        Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1081    })?;
1082    for row in ctx_rows {
1083        let (task_id, _) = row?;
1084        if let Some((role, _)) = task_durations.get(&task_id) {
1085            by_role
1086                .entry(role.clone())
1087                .or_default()
1088                .context_exhausted_tasks += 1;
1089        }
1090    }
1091
1092    let mut merge_stmt = conn.prepare(
1093        "SELECT role, task_id, json_extract(payload, '$.reason') AS reason
1094         FROM events
1095         WHERE event_type = 'task_auto_merged'
1096           AND role IS NOT NULL
1097           AND task_id IS NOT NULL",
1098    )?;
1099    let merge_rows = merge_stmt.query_map([], |row| {
1100        Ok((
1101            row.get::<_, String>(0)?,
1102            row.get::<_, String>(1)?,
1103            row.get::<_, Option<String>>(2)?,
1104        ))
1105    })?;
1106    for row in merge_rows {
1107        let (role, task_id, reason) = row?;
1108        let Some(lines_changed) = reason
1109            .as_deref()
1110            .and_then(parse_lines_changed_from_merge_reason)
1111        else {
1112            continue;
1113        };
1114        let Some((task_role, completion_secs)) = task_durations.get(&task_id) else {
1115            continue;
1116        };
1117        if task_role != &role {
1118            continue;
1119        }
1120        let entry = by_role.entry(role).or_default();
1121        entry.loc_lines += lines_changed as i64;
1122        entry.loc_hours += completion_secs / 3600.0;
1123    }
1124
1125    Ok(by_role
1126        .into_iter()
1127        .map(|(role, acc)| EngineerPerformanceProfileRow {
1128            role,
1129            completed_tasks: acc.completed_tasks,
1130            avg_task_completion_secs: (acc.completion_secs_samples > 0)
1131                .then(|| acc.total_completion_secs / acc.completion_secs_samples as f64),
1132            lines_per_hour: (acc.loc_hours > 0.0).then(|| acc.loc_lines as f64 / acc.loc_hours),
1133            first_pass_test_rate: (acc.first_pass_samples > 0)
1134                .then(|| acc.first_pass_sum / acc.first_pass_samples as f64),
1135            context_exhaustion_frequency: (acc.completed_tasks > 0)
1136                .then(|| acc.context_exhausted_tasks as f64 / acc.completed_tasks as f64),
1137        })
1138        .collect())
1139}
1140
1141fn parse_lines_changed_from_merge_reason(reason: &str) -> Option<u64> {
1142    reason
1143        .split_whitespace()
1144        .find_map(|token| token.strip_prefix("lines="))
1145        .and_then(|value| value.parse::<u64>().ok())
1146}
1147
1148fn load_events_by_type(conn: &Connection, event_type: &str) -> Result<Vec<TeamEvent>> {
1149    let mut stmt =
1150        conn.prepare("SELECT payload FROM events WHERE event_type = ?1 ORDER BY timestamp ASC")?;
1151    let rows = stmt.query_map(params![event_type], |row| row.get::<_, String>(0))?;
1152    rows.map(|row| {
1153        let payload = row?;
1154        serde_json::from_str::<TeamEvent>(&payload)
1155            .context("failed to deserialize telemetry event payload")
1156    })
1157    .collect()
1158}
1159
1160fn extract_auto_merge_reasons(event: &TeamEvent) -> Vec<String> {
1161    if let Some(details) = event.details.as_deref()
1162        && let Ok(record) =
1163            serde_json::from_str::<crate::team::auto_merge::AutoMergeDecisionRecord>(details)
1164    {
1165        return record.reasons;
1166    }
1167
1168    event
1169        .reason
1170        .as_deref()
1171        .and_then(|reason| reason.split("reasons: ").nth(1))
1172        .map(|reasons| {
1173            reasons
1174                .split("; ")
1175                .map(str::trim)
1176                .filter(|reason| !reason.is_empty())
1177                .map(str::to_string)
1178                .collect::<Vec<_>>()
1179        })
1180        .unwrap_or_default()
1181}
1182
1183fn increment_merge_mode_counts(
1184    direct_root_count: &mut i64,
1185    isolated_integration_count: &mut i64,
1186    merge_mode: Option<&str>,
1187) {
1188    match merge_mode {
1189        Some("direct_root") => *direct_root_count += 1,
1190        Some("isolated_integration") => *isolated_integration_count += 1,
1191        _ => {}
1192    }
1193}
1194
1195/// Query aggregated review pipeline metrics from the events table.
1196pub fn query_review_metrics(conn: &Connection) -> Result<ReviewMetricsRow> {
1197    let count_event = |event_type: &str| -> Result<i64> {
1198        let n: i64 = conn.query_row(
1199            "SELECT COUNT(*) FROM events WHERE event_type = ?1",
1200            params![event_type],
1201            |row| row.get(0),
1202        )?;
1203        Ok(n)
1204    };
1205
1206    let auto_merge_count = count_event("task_auto_merged")?;
1207    let manual_merge_count = count_event("task_manual_merged")?;
1208    let rework_count = count_event("task_reworked")?;
1209    let review_nudge_count = count_event("review_nudge_sent")?;
1210    let review_escalation_count = count_event("review_escalated")?;
1211    let decision_events = load_events_by_type(conn, "auto_merge_decision_recorded")?;
1212    let post_verify_events = load_events_by_type(conn, "auto_merge_post_verify_result")?;
1213    let auto_merge_events = load_events_by_type(conn, "task_auto_merged")?;
1214    let manual_merge_events = load_events_by_type(conn, "task_manual_merged")?;
1215    let merge_failure_events = load_events_by_type(conn, "task_merge_failed")?;
1216
1217    let mut direct_root_merge_count = 0;
1218    let mut isolated_integration_merge_count = 0;
1219    for event in auto_merge_events.iter().chain(manual_merge_events.iter()) {
1220        increment_merge_mode_counts(
1221            &mut direct_root_merge_count,
1222            &mut isolated_integration_merge_count,
1223            event.merge_mode.as_deref(),
1224        );
1225    }
1226
1227    let mut direct_root_failure_count = 0;
1228    let mut isolated_integration_failure_count = 0;
1229    for event in &merge_failure_events {
1230        increment_merge_mode_counts(
1231            &mut direct_root_failure_count,
1232            &mut isolated_integration_failure_count,
1233            event.merge_mode.as_deref(),
1234        );
1235    }
1236
1237    let mut accepted_decision_count = 0;
1238    let mut rejected_decision_count = 0;
1239    let mut rejection_reasons = BTreeMap::<String, i64>::new();
1240    for event in decision_events {
1241        match event.action_type.as_deref() {
1242            Some("accepted") => accepted_decision_count += 1,
1243            Some("manual_review") => {
1244                rejected_decision_count += 1;
1245                for reason in extract_auto_merge_reasons(&event) {
1246                    *rejection_reasons.entry(reason).or_insert(0) += 1;
1247                }
1248            }
1249            _ => {}
1250        }
1251    }
1252
1253    let mut post_merge_verify_pass_count = 0;
1254    let mut post_merge_verify_fail_count = 0;
1255    let mut post_merge_verify_skip_count = 0;
1256    for event in post_verify_events {
1257        match event.success {
1258            Some(true) => post_merge_verify_pass_count += 1,
1259            Some(false) => post_merge_verify_fail_count += 1,
1260            None => post_merge_verify_skip_count += 1,
1261        }
1262    }
1263
1264    // Compute average review latency: time between task_completed and its
1265    // corresponding merge event for each task.
1266    let avg_review_latency_secs: Option<f64> = conn
1267        .query_row(
1268            "SELECT AVG(m.timestamp - c.timestamp)
1269             FROM events c
1270             JOIN events m ON c.task_id = m.task_id
1271               AND m.event_type IN ('task_auto_merged', 'task_manual_merged')
1272             WHERE c.event_type = 'task_completed'
1273               AND c.task_id IS NOT NULL
1274               AND m.timestamp >= c.timestamp",
1275            [],
1276            |row| row.get(0),
1277        )
1278        .unwrap_or(None);
1279
1280    let mut rejection_reasons = rejection_reasons
1281        .into_iter()
1282        .map(|(reason, count)| AutoMergeReasonRow { reason, count })
1283        .collect::<Vec<_>>();
1284    rejection_reasons.sort_by(|left, right| {
1285        right
1286            .count
1287            .cmp(&left.count)
1288            .then_with(|| left.reason.cmp(&right.reason))
1289    });
1290
1291    Ok(ReviewMetricsRow {
1292        auto_merge_count,
1293        manual_merge_count,
1294        direct_root_merge_count,
1295        isolated_integration_merge_count,
1296        direct_root_failure_count,
1297        isolated_integration_failure_count,
1298        rework_count,
1299        review_nudge_count,
1300        review_escalation_count,
1301        avg_review_latency_secs,
1302        accepted_decision_count,
1303        rejected_decision_count,
1304        rejection_reasons,
1305        post_merge_verify_pass_count,
1306        post_merge_verify_fail_count,
1307        post_merge_verify_skip_count,
1308    })
1309}
1310
1311pub fn query_merge_queue_depth(conn: &Connection) -> Result<i64> {
1312    conn.query_row(
1313        "SELECT COUNT(*)
1314         FROM task_metrics tm
1315         WHERE tm.completed_at IS NOT NULL
1316           AND NOT EXISTS (
1317               SELECT 1
1318               FROM events e
1319               WHERE e.task_id = tm.task_id
1320                 AND e.event_type IN ('task_auto_merged', 'task_manual_merged', 'task_reworked')
1321                 AND e.timestamp >= tm.completed_at
1322           )",
1323        [],
1324        |row| row.get(0),
1325    )
1326    .context("failed to query merge queue depth")
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331    use super::*;
1332    use crate::team::events::TeamEvent;
1333
1334    #[test]
1335    fn schema_creation_succeeds() {
1336        let conn = open_in_memory().unwrap();
1337        // Verify tables exist by querying them.
1338        let count: i64 = conn
1339            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
1340            .unwrap();
1341        assert_eq!(count, 0);
1342    }
1343
1344    #[test]
1345    fn idempotent_schema_creation() {
1346        let conn = open_in_memory().unwrap();
1347        // Running init_schema again should not fail.
1348        init_schema(&conn).unwrap();
1349    }
1350
1351    #[test]
1352    fn legacy_schema_repairs_missing_columns_once() {
1353        let conn = Connection::open_in_memory().unwrap();
1354        install_legacy_schema_for_tests(&conn).unwrap();
1355
1356        init_schema(&conn).unwrap();
1357        init_schema(&conn).unwrap();
1358
1359        let task_columns = query_table_columns(&conn, "task_metrics").unwrap();
1360        assert!(task_columns.contains("narration_rejections"));
1361        assert!(task_columns.contains("confidence_score"));
1362        assert!(task_columns.contains("context_restart_count"));
1363
1364        let session_columns = query_table_columns(&conn, "session_summary").unwrap();
1365        assert!(session_columns.contains("verification_passes"));
1366        assert!(session_columns.contains("notification_latency_samples"));
1367
1368        let repair_events: i64 = conn
1369            .query_row(
1370                "SELECT COUNT(*) FROM events WHERE event_type = 'telemetry_schema_repaired'",
1371                [],
1372                |row| row.get(0),
1373            )
1374            .unwrap();
1375        assert_eq!(repair_events, 1);
1376
1377        let payload: String = conn
1378            .query_row(
1379                "SELECT payload FROM events WHERE event_type = 'telemetry_schema_repaired'",
1380                [],
1381                |row| row.get(0),
1382            )
1383            .unwrap();
1384        assert!(payload.contains("narration_rejections"));
1385        assert!(payload.contains("confidence_score"));
1386        assert!(payload.contains("verification_passes"));
1387    }
1388
1389    #[test]
1390    fn insert_and_query_event_round_trip() {
1391        let conn = open_in_memory().unwrap();
1392        let event = TeamEvent::daemon_started();
1393        insert_event(&conn, &event).unwrap();
1394
1395        let events = query_recent_events(&conn, 10).unwrap();
1396        assert_eq!(events.len(), 1);
1397        assert_eq!(events[0].event_type, "daemon_started");
1398    }
1399
1400    #[test]
1401    fn task_assigned_creates_task_metric() {
1402        let conn = open_in_memory().unwrap();
1403        let event = TeamEvent::task_assigned("eng-1", "42");
1404        insert_event(&conn, &event).unwrap();
1405
1406        let tasks = query_task_metrics(&conn).unwrap();
1407        assert_eq!(tasks.len(), 1);
1408        assert_eq!(tasks[0].task_id, "42");
1409        assert!(tasks[0].started_at.is_some());
1410    }
1411
1412    #[test]
1413    fn task_completed_updates_agent_and_task_metrics() {
1414        let conn = open_in_memory().unwrap();
1415
1416        let assign = TeamEvent::task_assigned("eng-1", "42");
1417        insert_event(&conn, &assign).unwrap();
1418
1419        let complete = TeamEvent::task_completed("eng-1", Some("42"));
1420        insert_event(&conn, &complete).unwrap();
1421
1422        let agents = query_agent_metrics(&conn).unwrap();
1423        assert_eq!(agents.len(), 1);
1424        assert_eq!(agents[0].role, "eng-1");
1425        assert_eq!(agents[0].completions, 1);
1426
1427        let tasks = query_task_metrics(&conn).unwrap();
1428        assert_eq!(tasks.len(), 1);
1429        assert!(tasks[0].completed_at.is_some());
1430    }
1431
1432    #[test]
1433    fn escalation_increments_task_escalations() {
1434        let conn = open_in_memory().unwrap();
1435        let event = TeamEvent::task_escalated("eng-1", "42", None);
1436        insert_event(&conn, &event).unwrap();
1437        insert_event(&conn, &event).unwrap();
1438
1439        let tasks = query_task_metrics(&conn).unwrap();
1440        assert_eq!(tasks[0].escalations, 2);
1441    }
1442
1443    #[test]
1444    fn meta_conversation_escalation_increments_task_escalations() {
1445        let conn = open_in_memory().unwrap();
1446        insert_event(&conn, &TeamEvent::narration_restart("eng-1", Some(42))).unwrap();
1447        insert_event(&conn, &TeamEvent::narration_restart("eng-1", Some(42))).unwrap();
1448
1449        let tasks = query_task_metrics(&conn).unwrap();
1450        assert_eq!(tasks.len(), 1);
1451        assert_eq!(tasks[0].task_id, "42");
1452        assert_eq!(tasks[0].escalations, 2);
1453    }
1454
1455    #[test]
1456    fn narration_rejection_increments_task_metric() {
1457        let conn = open_in_memory().unwrap();
1458        insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 1)).unwrap();
1459        insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 2)).unwrap();
1460
1461        let tasks = query_task_metrics(&conn).unwrap();
1462        assert_eq!(tasks.len(), 1);
1463        assert_eq!(tasks[0].task_id, "42");
1464        assert_eq!(tasks[0].narration_rejections, 2);
1465    }
1466
1467    #[test]
1468    fn legacy_schema_accepts_new_task_metric_event_writes_after_repair() {
1469        let conn = Connection::open_in_memory().unwrap();
1470        install_legacy_schema_for_tests(&conn).unwrap();
1471        init_schema(&conn).unwrap();
1472
1473        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1474        insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 1)).unwrap();
1475        insert_event(
1476            &conn,
1477            &TeamEvent::merge_confidence_scored(&crate::team::events::MergeConfidenceInfo {
1478                engineer: "eng-1",
1479                task: "42",
1480                confidence: 0.82,
1481                files_changed: 3,
1482                lines_changed: 40,
1483                has_migrations: false,
1484                has_config_changes: false,
1485                rename_count: 0,
1486            }),
1487        )
1488        .unwrap();
1489
1490        let tasks = query_task_metrics(&conn).unwrap();
1491        assert_eq!(tasks.len(), 1);
1492        assert_eq!(tasks[0].task_id, "42");
1493        assert_eq!(tasks[0].narration_rejections, 1);
1494        assert_eq!(tasks[0].confidence_score, Some(0.82));
1495    }
1496
1497    #[test]
1498    fn legacy_schema_accepts_new_session_summary_counters_after_repair() {
1499        let conn = Connection::open_in_memory().unwrap();
1500        install_legacy_schema_for_tests(&conn).unwrap();
1501        init_schema(&conn).unwrap();
1502
1503        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1504        insert_event(
1505            &conn,
1506            &TeamEvent::auto_merge_post_verify_result("eng-1", "42", Some(true), "passed", None),
1507        )
1508        .unwrap();
1509        insert_event(
1510            &conn,
1511            &TeamEvent::notification_delivery_sample("daemon", "manager", 12, "digest"),
1512        )
1513        .unwrap();
1514
1515        let summaries = query_session_summaries(&conn).unwrap();
1516        assert_eq!(summaries.len(), 1);
1517        assert_eq!(summaries[0].verification_passes, 1);
1518        assert_eq!(summaries[0].verification_failures, 0);
1519        assert_eq!(summaries[0].notification_latency_total_secs, 12);
1520        assert_eq!(summaries[0].notification_latency_samples, 1);
1521    }
1522
1523    #[test]
1524    fn context_exhausted_restart_increments_task_restart_count() {
1525        let conn = open_in_memory().unwrap();
1526        insert_event(
1527            &conn,
1528            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1529        )
1530        .unwrap();
1531        insert_event(
1532            &conn,
1533            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 2),
1534        )
1535        .unwrap();
1536
1537        let tasks = query_task_metrics(&conn).unwrap();
1538        assert_eq!(tasks.len(), 1);
1539        assert_eq!(tasks[0].task_id, "42");
1540        assert_eq!(tasks[0].context_restart_count, 2);
1541        assert_eq!(tasks[0].handoff_attempts, 0);
1542        assert_eq!(tasks[0].carry_forward_effective, None);
1543    }
1544
1545    #[test]
1546    fn agent_handoff_updates_attempt_and_success_counts() {
1547        let conn = open_in_memory().unwrap();
1548        insert_event(
1549            &conn,
1550            &TeamEvent::agent_handoff("eng-1", "42", "stalled", true),
1551        )
1552        .unwrap();
1553        insert_event(
1554            &conn,
1555            &TeamEvent::agent_handoff("eng-1", "42", "shim_crash", false),
1556        )
1557        .unwrap();
1558
1559        let tasks = query_task_metrics(&conn).unwrap();
1560        assert_eq!(tasks.len(), 1);
1561        assert_eq!(tasks[0].task_id, "42");
1562        assert_eq!(tasks[0].handoff_attempts, 2);
1563        assert_eq!(tasks[0].handoff_successes, 1);
1564    }
1565
1566    #[test]
1567    fn task_completion_marks_single_restart_carry_forward_effective() {
1568        let conn = open_in_memory().unwrap();
1569        insert_event(
1570            &conn,
1571            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1572        )
1573        .unwrap();
1574        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("42"))).unwrap();
1575
1576        let tasks = query_task_metrics(&conn).unwrap();
1577        assert_eq!(tasks[0].context_restart_count, 1);
1578        assert_eq!(tasks[0].carry_forward_effective, Some(true));
1579    }
1580
1581    #[test]
1582    fn task_completion_marks_multi_restart_carry_forward_ineffective() {
1583        let conn = open_in_memory().unwrap();
1584        insert_event(
1585            &conn,
1586            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1587        )
1588        .unwrap();
1589        insert_event(
1590            &conn,
1591            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 2),
1592        )
1593        .unwrap();
1594        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("42"))).unwrap();
1595
1596        let tasks = query_task_metrics(&conn).unwrap();
1597        assert_eq!(tasks[0].context_restart_count, 2);
1598        assert_eq!(tasks[0].carry_forward_effective, Some(false));
1599    }
1600
1601    #[test]
1602    fn engineer_performance_profiles_aggregate_completion_quality_and_context() {
1603        let conn = open_in_memory().unwrap();
1604
1605        insert_event(
1606            &conn,
1607            &TeamEvent::quality_metrics_recorded(&crate::team::events::QualityMetricsInfo {
1608                backend: "codex",
1609                role: "eng-1",
1610                task: "41",
1611                narration_ratio: 0.1,
1612                commit_frequency: 1.0,
1613                first_pass_test_rate: 1.0,
1614                retry_rate: 0.0,
1615                time_to_completion_secs: 3_600,
1616            }),
1617        )
1618        .unwrap();
1619        insert_event(
1620            &conn,
1621            &TeamEvent::quality_metrics_recorded(&crate::team::events::QualityMetricsInfo {
1622                backend: "codex",
1623                role: "eng-1",
1624                task: "42",
1625                narration_ratio: 0.2,
1626                commit_frequency: 2.0,
1627                first_pass_test_rate: 0.0,
1628                retry_rate: 1.0,
1629                time_to_completion_secs: 1_800,
1630            }),
1631        )
1632        .unwrap();
1633        insert_event(
1634            &conn,
1635            &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1636        )
1637        .unwrap();
1638        insert_event(
1639            &conn,
1640            &TeamEvent::task_auto_merged("eng-1", "41", 0.9, 2, 90),
1641        )
1642        .unwrap();
1643        insert_event(
1644            &conn,
1645            &TeamEvent::task_auto_merged("eng-1", "42", 0.9, 2, 30),
1646        )
1647        .unwrap();
1648
1649        let rows = query_engineer_performance_profiles(&conn).unwrap();
1650        assert_eq!(rows.len(), 1);
1651        let row = &rows[0];
1652        assert_eq!(row.role, "eng-1");
1653        assert_eq!(row.completed_tasks, 2);
1654        assert_eq!(row.avg_task_completion_secs, Some(2_700.0));
1655        assert_eq!(row.first_pass_test_rate, Some(0.5));
1656        assert_eq!(row.context_exhaustion_frequency, Some(0.5));
1657        assert_eq!(row.lines_per_hour, Some(80.0));
1658    }
1659
1660    #[test]
1661    fn pane_death_increments_failures() {
1662        let conn = open_in_memory().unwrap();
1663        let event = TeamEvent::pane_death("eng-1");
1664        insert_event(&conn, &event).unwrap();
1665
1666        let agents = query_agent_metrics(&conn).unwrap();
1667        assert_eq!(agents[0].failures, 1);
1668    }
1669
1670    #[test]
1671    fn pane_respawned_increments_restarts() {
1672        let conn = open_in_memory().unwrap();
1673        let event = TeamEvent::pane_respawned("eng-1");
1674        insert_event(&conn, &event).unwrap();
1675
1676        let agents = query_agent_metrics(&conn).unwrap();
1677        assert_eq!(agents[0].restarts, 1);
1678    }
1679
1680    #[test]
1681    fn delivery_failed_increments_failures() {
1682        let conn = open_in_memory().unwrap();
1683        let event = TeamEvent::delivery_failed("eng-1", "manager", "pane not ready");
1684        insert_event(&conn, &event).unwrap();
1685
1686        let agents = query_agent_metrics(&conn).unwrap();
1687        assert_eq!(agents.len(), 1);
1688        assert_eq!(agents[0].role, "eng-1");
1689        assert_eq!(agents[0].failures, 1);
1690    }
1691
1692    #[test]
1693    fn context_exhausted_increments_restarts() {
1694        let conn = open_in_memory().unwrap();
1695        let event = TeamEvent::context_exhausted("eng-1", Some(42), Some(500_000));
1696        insert_event(&conn, &event).unwrap();
1697
1698        let agents = query_agent_metrics(&conn).unwrap();
1699        assert_eq!(agents.len(), 1);
1700        assert_eq!(agents[0].role, "eng-1");
1701        assert_eq!(agents[0].restarts, 1);
1702    }
1703
1704    #[test]
1705    fn all_failure_event_types_accumulate() {
1706        let conn = open_in_memory().unwrap();
1707        insert_event(&conn, &TeamEvent::pane_death("eng-1")).unwrap();
1708        insert_event(&conn, &TeamEvent::member_crashed("eng-1", true)).unwrap();
1709        insert_event(
1710            &conn,
1711            &TeamEvent::delivery_failed("eng-1", "manager", "timeout"),
1712        )
1713        .unwrap();
1714
1715        let agents = query_agent_metrics(&conn).unwrap();
1716        assert_eq!(agents[0].failures, 3);
1717    }
1718
1719    #[test]
1720    fn all_restart_event_types_accumulate() {
1721        let conn = open_in_memory().unwrap();
1722        insert_event(&conn, &TeamEvent::pane_respawned("eng-1")).unwrap();
1723        insert_event(
1724            &conn,
1725            &TeamEvent::agent_restarted("eng-1", "42", "stall", 1),
1726        )
1727        .unwrap();
1728        insert_event(
1729            &conn,
1730            &TeamEvent::context_exhausted("eng-1", Some(42), None),
1731        )
1732        .unwrap();
1733
1734        let agents = query_agent_metrics(&conn).unwrap();
1735        assert_eq!(agents[0].restarts, 3);
1736    }
1737
1738    #[test]
1739    fn daemon_started_creates_session_summary() {
1740        let conn = open_in_memory().unwrap();
1741        let event = TeamEvent::daemon_started();
1742        insert_event(&conn, &event).unwrap();
1743
1744        let summaries = query_session_summaries(&conn).unwrap();
1745        assert_eq!(summaries.len(), 1);
1746        assert!(summaries[0].session_id.starts_with("session-"));
1747    }
1748
1749    #[test]
1750    fn multiple_events_for_same_agent_accumulate() {
1751        let conn = open_in_memory().unwrap();
1752        let c1 = TeamEvent::task_completed("eng-1", Some("1"));
1753        let c2 = TeamEvent::task_completed("eng-1", Some("2"));
1754        insert_event(&conn, &c1).unwrap();
1755        insert_event(&conn, &c2).unwrap();
1756
1757        let agents = query_agent_metrics(&conn).unwrap();
1758        assert_eq!(agents[0].completions, 2);
1759    }
1760
1761    #[test]
1762    fn query_recent_events_respects_limit() {
1763        let conn = open_in_memory().unwrap();
1764        for _ in 0..5 {
1765            insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1766        }
1767        let events = query_recent_events(&conn, 3).unwrap();
1768        assert_eq!(events.len(), 3);
1769    }
1770
1771    #[test]
1772    fn concurrent_writes_to_same_connection() {
1773        // rusqlite Connection is not Send/Sync, but we verify sequential
1774        // writes to the same connection work without errors.
1775        let conn = open_in_memory().unwrap();
1776        for i in 0..50 {
1777            let event = TeamEvent::task_assigned("eng-1", &i.to_string());
1778            insert_event(&conn, &event).unwrap();
1779        }
1780        let events = query_recent_events(&conn, 100).unwrap();
1781        assert_eq!(events.len(), 50);
1782    }
1783
1784    #[test]
1785    fn review_metrics_empty_db() {
1786        let conn = open_in_memory().unwrap();
1787        let row = query_review_metrics(&conn).unwrap();
1788        assert_eq!(row.auto_merge_count, 0);
1789        assert_eq!(row.manual_merge_count, 0);
1790        assert_eq!(row.direct_root_merge_count, 0);
1791        assert_eq!(row.isolated_integration_merge_count, 0);
1792        assert_eq!(row.direct_root_failure_count, 0);
1793        assert_eq!(row.isolated_integration_failure_count, 0);
1794        assert_eq!(row.rework_count, 0);
1795        assert_eq!(row.review_nudge_count, 0);
1796        assert_eq!(row.review_escalation_count, 0);
1797        assert!(row.avg_review_latency_secs.is_none());
1798        assert_eq!(row.accepted_decision_count, 0);
1799        assert_eq!(row.rejected_decision_count, 0);
1800        assert!(row.rejection_reasons.is_empty());
1801        assert_eq!(row.post_merge_verify_pass_count, 0);
1802        assert_eq!(row.post_merge_verify_fail_count, 0);
1803        assert_eq!(row.post_merge_verify_skip_count, 0);
1804    }
1805
1806    #[test]
1807    fn review_metrics_counts_all_event_types() {
1808        let conn = open_in_memory().unwrap();
1809        insert_event(
1810            &conn,
1811            &TeamEvent::task_auto_merged_with_mode(
1812                "eng-1",
1813                "1",
1814                0.9,
1815                2,
1816                30,
1817                Some(crate::team::merge::MergeMode::DirectRoot),
1818            ),
1819        )
1820        .unwrap();
1821        insert_event(
1822            &conn,
1823            &TeamEvent::task_auto_merged_with_mode(
1824                "eng-1",
1825                "2",
1826                0.9,
1827                2,
1828                30,
1829                Some(crate::team::merge::MergeMode::IsolatedIntegration),
1830            ),
1831        )
1832        .unwrap();
1833        insert_event(
1834            &conn,
1835            &TeamEvent::task_manual_merged_with_mode(
1836                "3",
1837                Some(crate::team::merge::MergeMode::DirectRoot),
1838            ),
1839        )
1840        .unwrap();
1841        insert_event(
1842            &conn,
1843            &TeamEvent::task_merge_failed(
1844                "eng-1",
1845                "8",
1846                Some(crate::team::merge::MergeMode::IsolatedIntegration),
1847                "isolated merge path failed: integration checkout broke",
1848            ),
1849        )
1850        .unwrap();
1851        insert_event(&conn, &TeamEvent::task_reworked("eng-1", "4")).unwrap();
1852        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "5")).unwrap();
1853        insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "6")).unwrap();
1854        insert_event(&conn, &TeamEvent::review_escalated_by_role("manager", "7")).unwrap();
1855        insert_event(
1856            &conn,
1857            &TeamEvent::auto_merge_decision_recorded(&crate::team::events::AutoMergeDecisionInfo {
1858                engineer: "eng-1",
1859                task: "1",
1860                action_type: "accepted",
1861                confidence: 0.9,
1862                reason: "accepted for auto-merge: confidence 0.90; 2 files, 30 lines, 1 modules; reasons: confidence 0.90 meets threshold 0.80",
1863                details: r#"{"decision":"accepted","reasons":["confidence 0.90 meets threshold 0.80"],"files_changed":2,"lines_changed":30,"modules_touched":1,"has_migrations":false,"has_config_changes":false,"has_unsafe":false,"has_conflicts":false,"rename_count":0,"tests_passed":true,"override_forced":null,"diff_available":true}"#,
1864            }),
1865        )
1866        .unwrap();
1867        insert_event(
1868            &conn,
1869            &TeamEvent::auto_merge_decision_recorded(&crate::team::events::AutoMergeDecisionInfo {
1870                engineer: "eng-1",
1871                task: "3",
1872                action_type: "manual_review",
1873                confidence: 0.52,
1874                reason: "routed to manual review: confidence 0.52; 6 files, 220 lines, 3 modules; reasons: touches sensitive paths; 6 files changed (max 5)",
1875                details: r#"{"decision":"manual_review","reasons":["touches sensitive paths","6 files changed (max 5)"],"files_changed":6,"lines_changed":220,"modules_touched":3,"has_migrations":false,"has_config_changes":false,"has_unsafe":false,"has_conflicts":false,"rename_count":0,"tests_passed":true,"override_forced":null,"diff_available":true}"#,
1876            }),
1877        )
1878        .unwrap();
1879        insert_event(
1880            &conn,
1881            &TeamEvent::auto_merge_post_verify_result(
1882                "eng-1",
1883                "1",
1884                Some(true),
1885                "passed",
1886                Some("post-merge verification on main passed"),
1887            ),
1888        )
1889        .unwrap();
1890        insert_event(
1891            &conn,
1892            &TeamEvent::auto_merge_post_verify_result(
1893                "eng-1",
1894                "2",
1895                Some(false),
1896                "failed",
1897                Some("post-merge verification on main failed"),
1898            ),
1899        )
1900        .unwrap();
1901        insert_event(
1902            &conn,
1903            &TeamEvent::auto_merge_post_verify_result(
1904                "eng-1",
1905                "3",
1906                None,
1907                "skipped",
1908                Some("post-merge verification was not requested for this merge"),
1909            ),
1910        )
1911        .unwrap();
1912
1913        let row = query_review_metrics(&conn).unwrap();
1914        assert_eq!(row.auto_merge_count, 2);
1915        assert_eq!(row.manual_merge_count, 1);
1916        assert_eq!(row.direct_root_merge_count, 2);
1917        assert_eq!(row.isolated_integration_merge_count, 1);
1918        assert_eq!(row.direct_root_failure_count, 0);
1919        assert_eq!(row.isolated_integration_failure_count, 1);
1920        assert_eq!(row.rework_count, 1);
1921        assert_eq!(row.review_nudge_count, 2);
1922        assert_eq!(row.review_escalation_count, 1);
1923        assert_eq!(row.accepted_decision_count, 1);
1924        assert_eq!(row.rejected_decision_count, 1);
1925        assert_eq!(
1926            row.rejection_reasons,
1927            vec![
1928                AutoMergeReasonRow {
1929                    reason: "6 files changed (max 5)".to_string(),
1930                    count: 1,
1931                },
1932                AutoMergeReasonRow {
1933                    reason: "touches sensitive paths".to_string(),
1934                    count: 1,
1935                },
1936            ]
1937        );
1938        assert_eq!(row.post_merge_verify_pass_count, 1);
1939        assert_eq!(row.post_merge_verify_fail_count, 1);
1940        assert_eq!(row.post_merge_verify_skip_count, 1);
1941    }
1942
1943    #[test]
1944    fn review_metrics_computes_avg_latency() {
1945        let conn = open_in_memory().unwrap();
1946
1947        // Task 10: completed at ts=1000, merged at ts=1100 → 100s latency
1948        let mut c1 = TeamEvent::task_completed("eng-1", Some("10"));
1949        c1.ts = 1000;
1950        insert_event(&conn, &c1).unwrap();
1951        let mut m1 = TeamEvent::task_auto_merged_with_mode(
1952            "eng-1",
1953            "10",
1954            0.9,
1955            2,
1956            30,
1957            Some(crate::team::merge::MergeMode::DirectRoot),
1958        );
1959        m1.ts = 1100;
1960        insert_event(&conn, &m1).unwrap();
1961
1962        // Task 20: completed at ts=2000, merged at ts=2300 → 300s latency
1963        let mut c2 = TeamEvent::task_completed("eng-2", Some("20"));
1964        c2.ts = 2000;
1965        insert_event(&conn, &c2).unwrap();
1966        let mut m2 = TeamEvent::task_manual_merged_with_mode(
1967            "20",
1968            Some(crate::team::merge::MergeMode::IsolatedIntegration),
1969        );
1970        m2.ts = 2300;
1971        insert_event(&conn, &m2).unwrap();
1972
1973        let row = query_review_metrics(&conn).unwrap();
1974        // avg = (100 + 300) / 2 = 200
1975        let avg = row.avg_review_latency_secs.unwrap();
1976        assert!((avg - 200.0).abs() < 0.01);
1977    }
1978
1979    #[test]
1980    fn task_cycle_time_rows_replace_existing_snapshot() {
1981        let conn = open_in_memory().unwrap();
1982        let rows = vec![
1983            TaskCycleTimeRecord {
1984                task_id: 1,
1985                title: "One".to_string(),
1986                engineer: Some("eng-1".to_string()),
1987                priority: "high".to_string(),
1988                status: "done".to_string(),
1989                created_at: Some(100),
1990                started_at: Some(160),
1991                completed_at: Some(460),
1992                cycle_time_minutes: Some(5),
1993                lead_time_minutes: Some(6),
1994            },
1995            TaskCycleTimeRecord {
1996                task_id: 2,
1997                title: "Two".to_string(),
1998                engineer: Some("eng-2".to_string()),
1999                priority: "low".to_string(),
2000                status: "done".to_string(),
2001                created_at: Some(200),
2002                started_at: Some(260),
2003                completed_at: Some(560),
2004                cycle_time_minutes: Some(5),
2005                lead_time_minutes: Some(6),
2006            },
2007        ];
2008
2009        replace_task_cycle_times(&conn, &rows).unwrap();
2010        let stored = query_task_cycle_times(&conn).unwrap();
2011        assert_eq!(stored.len(), 2);
2012
2013        replace_task_cycle_times(&conn, &rows[..1]).unwrap();
2014        let stored = query_task_cycle_times(&conn).unwrap();
2015        assert_eq!(stored.len(), 1);
2016        assert_eq!(stored[0].task_id, "1");
2017    }
2018
2019    #[test]
2020    fn task_cycle_time_queries_aggregate_priority_and_engineer() {
2021        let conn = open_in_memory().unwrap();
2022        replace_task_cycle_times(
2023            &conn,
2024            &[
2025                TaskCycleTimeRecord {
2026                    task_id: 1,
2027                    title: "One".to_string(),
2028                    engineer: Some("eng-1".to_string()),
2029                    priority: "high".to_string(),
2030                    status: "done".to_string(),
2031                    created_at: Some(100),
2032                    started_at: Some(160),
2033                    completed_at: Some(460),
2034                    cycle_time_minutes: Some(5),
2035                    lead_time_minutes: Some(6),
2036                },
2037                TaskCycleTimeRecord {
2038                    task_id: 2,
2039                    title: "Two".to_string(),
2040                    engineer: Some("eng-1".to_string()),
2041                    priority: "high".to_string(),
2042                    status: "done".to_string(),
2043                    created_at: Some(200),
2044                    started_at: Some(260),
2045                    completed_at: Some(860),
2046                    cycle_time_minutes: Some(10),
2047                    lead_time_minutes: Some(11),
2048                },
2049                TaskCycleTimeRecord {
2050                    task_id: 3,
2051                    title: "Three".to_string(),
2052                    engineer: Some("eng-2".to_string()),
2053                    priority: "medium".to_string(),
2054                    status: "done".to_string(),
2055                    created_at: Some(300),
2056                    started_at: Some(360),
2057                    completed_at: Some(660),
2058                    cycle_time_minutes: Some(5),
2059                    lead_time_minutes: Some(6),
2060                },
2061            ],
2062        )
2063        .unwrap();
2064
2065        let by_priority = query_average_cycle_time_by_priority(&conn).unwrap();
2066        assert_eq!(by_priority[0].priority, "high");
2067        assert!((by_priority[0].average_cycle_time_mins - 7.5).abs() < f64::EPSILON);
2068
2069        let by_engineer = query_engineer_throughput(&conn).unwrap();
2070        assert_eq!(by_engineer[0].engineer, "eng-1");
2071        assert_eq!(by_engineer[0].completed_tasks, 2);
2072    }
2073
2074    #[test]
2075    fn record_test_results_tracks_failures_and_flakes() {
2076        let conn = open_in_memory().unwrap();
2077        let failed_results = TestResults {
2078            framework: "cargo".to_string(),
2079            total: Some(2),
2080            passed: 1,
2081            failed: 1,
2082            ignored: 0,
2083            failures: vec![super::super::test_results::TestFailure {
2084                test_name: "tests::fails".to_string(),
2085                message: Some("assertion failed".to_string()),
2086                location: Some("src/lib.rs:9".to_string()),
2087            }],
2088            summary: None,
2089        };
2090        record_test_results(&conn, 42, "eng-1", &failed_results, &[]).unwrap();
2091
2092        let (failures, flaky_passes) =
2093            query_test_case_metric(&conn, "cargo", "tests::fails").unwrap();
2094        assert_eq!(failures, 1);
2095        assert_eq!(flaky_passes, 0);
2096
2097        let passed_results = TestResults {
2098            framework: "cargo".to_string(),
2099            total: Some(2),
2100            passed: 2,
2101            failed: 0,
2102            ignored: 0,
2103            failures: vec![],
2104            summary: None,
2105        };
2106        record_test_results(
2107            &conn,
2108            42,
2109            "eng-1",
2110            &passed_results,
2111            &failed_results.failures,
2112        )
2113        .unwrap();
2114
2115        let (failures, flaky_passes) =
2116            query_test_case_metric(&conn, "cargo", "tests::fails").unwrap();
2117        assert_eq!(failures, 1);
2118        assert_eq!(flaky_passes, 1);
2119    }
2120
2121    // --- Fix #1: tasks_completed incremented on task_completed ---
2122
2123    #[test]
2124    fn tasks_completed_increments_on_task_completed() {
2125        let conn = open_in_memory().unwrap();
2126        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2127
2128        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2129        insert_event(&conn, &TeamEvent::task_completed("eng-2", Some("2"))).unwrap();
2130
2131        let summaries = query_session_summaries(&conn).unwrap();
2132        assert_eq!(summaries[0].tasks_completed, 2);
2133    }
2134
2135    // --- Fix #2: total_merges incremented on merge events ---
2136
2137    #[test]
2138    fn total_merges_increments_on_auto_and_manual_merge() {
2139        let conn = open_in_memory().unwrap();
2140        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2141
2142        insert_event(
2143            &conn,
2144            &TeamEvent::task_auto_merged_with_mode(
2145                "eng-1",
2146                "1",
2147                0.9,
2148                2,
2149                30,
2150                Some(crate::team::merge::MergeMode::DirectRoot),
2151            ),
2152        )
2153        .unwrap();
2154        insert_event(
2155            &conn,
2156            &TeamEvent::task_manual_merged_with_mode(
2157                "2",
2158                Some(crate::team::merge::MergeMode::IsolatedIntegration),
2159            ),
2160        )
2161        .unwrap();
2162        insert_event(
2163            &conn,
2164            &TeamEvent::task_auto_merged_with_mode(
2165                "eng-1",
2166                "3",
2167                0.8,
2168                1,
2169                10,
2170                Some(crate::team::merge::MergeMode::DirectRoot),
2171            ),
2172        )
2173        .unwrap();
2174
2175        let summaries = query_session_summaries(&conn).unwrap();
2176        assert_eq!(summaries[0].total_merges, 3);
2177    }
2178
2179    // --- Fix #3: total_events incremented on every insert ---
2180
2181    #[test]
2182    fn total_events_increments_on_every_insert() {
2183        let conn = open_in_memory().unwrap();
2184        // daemon_started is the first event, creating the session and then incrementing.
2185        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2186        insert_event(&conn, &TeamEvent::task_assigned("eng-1", "1")).unwrap();
2187        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2188
2189        let summaries = query_session_summaries(&conn).unwrap();
2190        // 3 events inserted after session was created (daemon_started creates the row
2191        // then total_events is incremented for it too).
2192        assert_eq!(summaries[0].total_events, 3);
2193    }
2194
2195    // --- Fix #4: ended_at set on daemon_stopped ---
2196
2197    #[test]
2198    fn ended_at_set_on_daemon_stopped() {
2199        let conn = open_in_memory().unwrap();
2200        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2201
2202        let summaries = query_session_summaries(&conn).unwrap();
2203        assert!(summaries[0].ended_at.is_none());
2204
2205        let mut stop = TeamEvent::daemon_stopped_with_reason("shutdown", 3600);
2206        stop.ts = 9999;
2207        insert_event(&conn, &stop).unwrap();
2208
2209        let summaries = query_session_summaries(&conn).unwrap();
2210        assert_eq!(summaries[0].ended_at, Some(9999));
2211    }
2212
2213    #[test]
2214    fn ended_at_set_on_plain_daemon_stopped() {
2215        let conn = open_in_memory().unwrap();
2216        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2217
2218        let mut stop = TeamEvent::daemon_stopped();
2219        stop.ts = 5000;
2220        insert_event(&conn, &stop).unwrap();
2221
2222        let summaries = query_session_summaries(&conn).unwrap();
2223        assert_eq!(summaries[0].ended_at, Some(5000));
2224    }
2225
2226    #[test]
2227    fn session_summary_tracks_discord_verification_and_notification_counters() {
2228        let conn = open_in_memory().unwrap();
2229        insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2230        insert_event(
2231            &conn,
2232            &TeamEvent::discord_event_sent("events", "task_completed"),
2233        )
2234        .unwrap();
2235        insert_event(
2236            &conn,
2237            &TeamEvent::auto_merge_post_verify_result("eng-1", "42", Some(true), "passed", None),
2238        )
2239        .unwrap();
2240        insert_event(
2241            &conn,
2242            &TeamEvent::auto_merge_post_verify_result("eng-1", "43", Some(false), "failed", None),
2243        )
2244        .unwrap();
2245        insert_event(
2246            &conn,
2247            &TeamEvent::inbox_message_deduplicated("manager", "eng-1", 0xfeed),
2248        )
2249        .unwrap();
2250        insert_event(
2251            &conn,
2252            &TeamEvent::notification_delivery_sample("daemon", "manager", 12, "digest"),
2253        )
2254        .unwrap();
2255
2256        let summaries = query_session_summaries(&conn).unwrap();
2257        assert_eq!(summaries[0].discord_events_sent, 1);
2258        assert_eq!(summaries[0].verification_passes, 1);
2259        assert_eq!(summaries[0].verification_failures, 1);
2260        assert_eq!(summaries[0].notification_isolations, 1);
2261        assert_eq!(summaries[0].notification_latency_total_secs, 12);
2262        assert_eq!(summaries[0].notification_latency_samples, 1);
2263    }
2264
2265    #[test]
2266    fn merge_queue_depth_counts_completed_tasks_awaiting_merge() {
2267        let conn = open_in_memory().unwrap();
2268        let mut started = TeamEvent::daemon_started();
2269        started.ts = 100;
2270        insert_event(&conn, &started).unwrap();
2271
2272        let mut completed = TeamEvent::task_completed("eng-1", Some("42"));
2273        completed.ts = 200;
2274        insert_event(&conn, &completed).unwrap();
2275
2276        assert_eq!(query_merge_queue_depth(&conn).unwrap(), 1);
2277
2278        let mut merged = TeamEvent::task_auto_merged("eng-1", "42", 0.9, 2, 10);
2279        merged.ts = 250;
2280        insert_event(&conn, &merged).unwrap();
2281
2282        assert_eq!(query_merge_queue_depth(&conn).unwrap(), 0);
2283    }
2284
2285    // --- Fix #5: idle_polls / working_polls updated ---
2286
2287    #[test]
2288    fn record_agent_poll_state_tracks_idle_polls() {
2289        let conn = open_in_memory().unwrap();
2290        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2291        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2292        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2293
2294        let agents = query_agent_metrics(&conn).unwrap();
2295        assert_eq!(agents[0].idle_polls, 3);
2296        assert_eq!(agents[0].working_polls, 0);
2297        assert_eq!(agents[0].total_cycle_secs, 0);
2298    }
2299
2300    #[test]
2301    fn record_agent_poll_state_tracks_working_polls() {
2302        let conn = open_in_memory().unwrap();
2303        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2304        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2305
2306        let agents = query_agent_metrics(&conn).unwrap();
2307        assert_eq!(agents[0].working_polls, 2);
2308        assert_eq!(agents[0].idle_polls, 0);
2309    }
2310
2311    // --- Fix #6: total_cycle_secs incremented for working agents ---
2312
2313    #[test]
2314    fn record_agent_poll_state_accumulates_cycle_secs_for_working() {
2315        let conn = open_in_memory().unwrap();
2316        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2317        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2318        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2319
2320        let agents = query_agent_metrics(&conn).unwrap();
2321        assert_eq!(agents[0].total_cycle_secs, 15); // 3 * 5
2322    }
2323
2324    #[test]
2325    fn record_agent_poll_state_idle_does_not_accumulate_cycle_secs() {
2326        let conn = open_in_memory().unwrap();
2327        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2328        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2329
2330        let agents = query_agent_metrics(&conn).unwrap();
2331        assert_eq!(agents[0].total_cycle_secs, 0);
2332    }
2333
2334    #[test]
2335    fn record_agent_poll_state_mixed_idle_and_working() {
2336        let conn = open_in_memory().unwrap();
2337        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2338        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2339        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2340        record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2341
2342        let agents = query_agent_metrics(&conn).unwrap();
2343        assert_eq!(agents[0].working_polls, 2);
2344        assert_eq!(agents[0].idle_polls, 2);
2345        assert_eq!(agents[0].total_cycle_secs, 10); // 2 * 5
2346    }
2347
2348    #[test]
2349    fn record_agent_poll_state_multiple_agents() {
2350        let conn = open_in_memory().unwrap();
2351        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2352        record_agent_poll_state(&conn, "eng-2", false, 5).unwrap();
2353        record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2354        record_agent_poll_state(&conn, "eng-2", true, 5).unwrap();
2355
2356        let agents = query_agent_metrics(&conn).unwrap();
2357        let eng1 = agents.iter().find(|a| a.role == "eng-1").unwrap();
2358        let eng2 = agents.iter().find(|a| a.role == "eng-2").unwrap();
2359        assert_eq!(eng1.working_polls, 2);
2360        assert_eq!(eng1.total_cycle_secs, 10);
2361        assert_eq!(eng2.idle_polls, 1);
2362        assert_eq!(eng2.working_polls, 1);
2363        assert_eq!(eng2.total_cycle_secs, 5);
2364    }
2365
2366    // --- Edge cases: session counters without a session ---
2367
2368    #[test]
2369    fn session_counters_noop_without_session() {
2370        // If no daemon_started event has been emitted, no session row exists.
2371        // The UPDATE statements should just affect 0 rows — no error.
2372        let conn = open_in_memory().unwrap();
2373        insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2374        insert_event(
2375            &conn,
2376            &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
2377        )
2378        .unwrap();
2379        let summaries = query_session_summaries(&conn).unwrap();
2380        assert!(summaries.is_empty());
2381    }
2382}