1use std::collections::{BTreeMap, BTreeSet};
8use std::path::Path;
9
10use anyhow::{Context, Result};
11use rusqlite::{Connection, params};
12use serde::{Deserialize, Serialize};
13
14use super::events::TeamEvent;
15use super::metrics::TaskCycleTimeRecord;
16use super::test_results::{TestFailure, TestResults};
17
18const DB_FILENAME: &str = "telemetry.db";
20
21struct SchemaColumn {
22 name: &'static str,
23 definition: &'static str,
24}
25
26const TASK_METRICS_COLUMNS: &[SchemaColumn] = &[
27 SchemaColumn {
28 name: "started_at",
29 definition: "started_at INTEGER",
30 },
31 SchemaColumn {
32 name: "completed_at",
33 definition: "completed_at INTEGER",
34 },
35 SchemaColumn {
36 name: "retries",
37 definition: "retries INTEGER NOT NULL DEFAULT 0",
38 },
39 SchemaColumn {
40 name: "narration_rejections",
41 definition: "narration_rejections INTEGER NOT NULL DEFAULT 0",
42 },
43 SchemaColumn {
44 name: "escalations",
45 definition: "escalations INTEGER NOT NULL DEFAULT 0",
46 },
47 SchemaColumn {
48 name: "context_restart_count",
49 definition: "context_restart_count INTEGER NOT NULL DEFAULT 0",
50 },
51 SchemaColumn {
52 name: "handoff_attempts",
53 definition: "handoff_attempts INTEGER NOT NULL DEFAULT 0",
54 },
55 SchemaColumn {
56 name: "handoff_successes",
57 definition: "handoff_successes INTEGER NOT NULL DEFAULT 0",
58 },
59 SchemaColumn {
60 name: "carry_forward_effective",
61 definition: "carry_forward_effective INTEGER",
62 },
63 SchemaColumn {
64 name: "merge_time_secs",
65 definition: "merge_time_secs INTEGER",
66 },
67 SchemaColumn {
68 name: "confidence_score",
69 definition: "confidence_score REAL",
70 },
71];
72
73const SESSION_SUMMARY_COLUMNS: &[SchemaColumn] = &[
74 SchemaColumn {
75 name: "started_at",
76 definition: "started_at INTEGER NOT NULL",
77 },
78 SchemaColumn {
79 name: "ended_at",
80 definition: "ended_at INTEGER",
81 },
82 SchemaColumn {
83 name: "tasks_completed",
84 definition: "tasks_completed INTEGER NOT NULL DEFAULT 0",
85 },
86 SchemaColumn {
87 name: "total_merges",
88 definition: "total_merges INTEGER NOT NULL DEFAULT 0",
89 },
90 SchemaColumn {
91 name: "total_events",
92 definition: "total_events INTEGER NOT NULL DEFAULT 0",
93 },
94 SchemaColumn {
95 name: "discord_events_sent",
96 definition: "discord_events_sent INTEGER NOT NULL DEFAULT 0",
97 },
98 SchemaColumn {
99 name: "verification_passes",
100 definition: "verification_passes INTEGER NOT NULL DEFAULT 0",
101 },
102 SchemaColumn {
103 name: "verification_failures",
104 definition: "verification_failures INTEGER NOT NULL DEFAULT 0",
105 },
106 SchemaColumn {
107 name: "notification_isolations",
108 definition: "notification_isolations INTEGER NOT NULL DEFAULT 0",
109 },
110 SchemaColumn {
111 name: "notification_latency_total_secs",
112 definition: "notification_latency_total_secs INTEGER NOT NULL DEFAULT 0",
113 },
114 SchemaColumn {
115 name: "notification_latency_samples",
116 definition: "notification_latency_samples INTEGER NOT NULL DEFAULT 0",
117 },
118];
119
120#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)]
121struct SchemaRepairReport {
122 repaired_columns: BTreeMap<String, Vec<String>>,
123}
124
125impl SchemaRepairReport {
126 fn record(&mut self, table: &str, column: &str) {
127 self.repaired_columns
128 .entry(table.to_string())
129 .or_default()
130 .push(column.to_string());
131 }
132
133 fn is_empty(&self) -> bool {
134 self.repaired_columns.is_empty()
135 }
136}
137
138pub fn open(project_root: &Path) -> Result<Connection> {
140 let db_path = project_root.join(".batty").join(DB_FILENAME);
141 let conn = Connection::open(&db_path)
142 .with_context(|| format!("failed to open telemetry db at {}", db_path.display()))?;
143
144 conn.pragma_update(None, "journal_mode", "WAL")?;
146
147 init_schema(&conn)?;
148 Ok(conn)
149}
150
151#[cfg(test)]
153pub fn open_in_memory() -> Result<Connection> {
154 let conn = Connection::open_in_memory()?;
155 init_schema(&conn)?;
156 Ok(conn)
157}
158
159fn init_schema(conn: &Connection) -> Result<()> {
160 conn.execute_batch(
161 "
162 CREATE TABLE IF NOT EXISTS events (
163 id INTEGER PRIMARY KEY AUTOINCREMENT,
164 timestamp INTEGER NOT NULL,
165 event_type TEXT NOT NULL,
166 role TEXT,
167 task_id TEXT,
168 payload TEXT NOT NULL
169 );
170
171 CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
172 CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
173 CREATE INDEX IF NOT EXISTS idx_events_role ON events(role);
174
175 CREATE TABLE IF NOT EXISTS agent_metrics (
176 role TEXT PRIMARY KEY,
177 completions INTEGER NOT NULL DEFAULT 0,
178 failures INTEGER NOT NULL DEFAULT 0,
179 restarts INTEGER NOT NULL DEFAULT 0,
180 total_cycle_secs INTEGER NOT NULL DEFAULT 0,
181 idle_polls INTEGER NOT NULL DEFAULT 0,
182 working_polls INTEGER NOT NULL DEFAULT 0
183 );
184
185 CREATE TABLE IF NOT EXISTS task_metrics (
186 task_id TEXT PRIMARY KEY,
187 started_at INTEGER,
188 completed_at INTEGER,
189 retries INTEGER NOT NULL DEFAULT 0,
190 narration_rejections INTEGER NOT NULL DEFAULT 0,
191 escalations INTEGER NOT NULL DEFAULT 0,
192 context_restart_count INTEGER NOT NULL DEFAULT 0,
193 handoff_attempts INTEGER NOT NULL DEFAULT 0,
194 handoff_successes INTEGER NOT NULL DEFAULT 0,
195 carry_forward_effective INTEGER,
196 merge_time_secs INTEGER,
197 confidence_score REAL
198 );
199
200 CREATE TABLE IF NOT EXISTS session_summary (
201 session_id TEXT PRIMARY KEY,
202 started_at INTEGER NOT NULL,
203 ended_at INTEGER,
204 tasks_completed INTEGER NOT NULL DEFAULT 0,
205 total_merges INTEGER NOT NULL DEFAULT 0,
206 total_events INTEGER NOT NULL DEFAULT 0,
207 discord_events_sent INTEGER NOT NULL DEFAULT 0,
208 verification_passes INTEGER NOT NULL DEFAULT 0,
209 verification_failures INTEGER NOT NULL DEFAULT 0,
210 notification_isolations INTEGER NOT NULL DEFAULT 0,
211 notification_latency_total_secs INTEGER NOT NULL DEFAULT 0,
212 notification_latency_samples INTEGER NOT NULL DEFAULT 0
213 );
214
215 CREATE TABLE IF NOT EXISTS test_case_metrics (
216 framework TEXT NOT NULL,
217 test_name TEXT NOT NULL,
218 failures INTEGER NOT NULL DEFAULT 0,
219 flaky_passes INTEGER NOT NULL DEFAULT 0,
220 last_task_id TEXT,
221 last_engineer TEXT,
222 last_seen_at INTEGER NOT NULL DEFAULT 0,
223 PRIMARY KEY (framework, test_name)
224 );
225
226 CREATE TABLE IF NOT EXISTS task_cycle_times (
227 task_id TEXT PRIMARY KEY,
228 engineer TEXT,
229 priority TEXT NOT NULL,
230 cycle_time_mins INTEGER,
231 lead_time_mins INTEGER,
232 completed_at INTEGER NOT NULL
233 );
234 ",
235 )
236 .context("failed to initialize telemetry schema")?;
237 let repairs = repair_legacy_schema(conn)?;
238 if !repairs.is_empty() {
239 record_schema_repair_event(conn, &repairs)?;
240 }
241 Ok(())
242}
243
244fn repair_legacy_schema(conn: &Connection) -> Result<SchemaRepairReport> {
245 let mut repairs = SchemaRepairReport::default();
246 ensure_table_columns(conn, "task_metrics", TASK_METRICS_COLUMNS, &mut repairs)?;
247 ensure_table_columns(
248 conn,
249 "session_summary",
250 SESSION_SUMMARY_COLUMNS,
251 &mut repairs,
252 )?;
253 Ok(repairs)
254}
255
256fn ensure_table_columns(
257 conn: &Connection,
258 table: &str,
259 columns: &[SchemaColumn],
260 repairs: &mut SchemaRepairReport,
261) -> Result<()> {
262 let mut existing = query_table_columns(conn, table)?;
263 for column in columns {
264 if existing.contains(column.name) {
265 continue;
266 }
267 let sql = format!("ALTER TABLE {table} ADD COLUMN {}", column.definition);
268 conn.execute(&sql, []).with_context(|| {
269 format!(
270 "failed to add {}.{} to telemetry schema",
271 table, column.name
272 )
273 })?;
274 existing.insert(column.name.to_string());
275 repairs.record(table, column.name);
276 }
277 Ok(())
278}
279
280fn query_table_columns(conn: &Connection, table: &str) -> Result<BTreeSet<String>> {
281 let sql = format!("PRAGMA table_info({table})");
282 let mut stmt = conn
283 .prepare(&sql)
284 .with_context(|| format!("failed to inspect telemetry schema for table {table}"))?;
285 let columns = stmt
286 .query_map([], |row| row.get::<_, String>(1))?
287 .collect::<std::result::Result<BTreeSet<_>, _>>()?;
288 Ok(columns)
289}
290
291fn record_schema_repair_event(conn: &Connection, repairs: &SchemaRepairReport) -> Result<()> {
292 let payload = serde_json::to_string(repairs).context("failed to serialize schema repair")?;
293 conn.execute(
294 "INSERT INTO events (timestamp, event_type, role, task_id, payload)
295 VALUES (?1, ?2, NULL, NULL, ?3)",
296 params![
297 chrono::Utc::now().timestamp(),
298 "telemetry_schema_repaired",
299 payload
300 ],
301 )
302 .context("failed to record telemetry schema repair event")?;
303 Ok(())
304}
305
306#[cfg(test)]
307pub(crate) fn install_legacy_schema_for_tests(conn: &Connection) -> Result<()> {
308 conn.execute_batch(
309 "
310 DROP TABLE IF EXISTS events;
311 DROP TABLE IF EXISTS agent_metrics;
312 DROP TABLE IF EXISTS task_metrics;
313 DROP TABLE IF EXISTS session_summary;
314 DROP TABLE IF EXISTS test_case_metrics;
315 DROP TABLE IF EXISTS task_cycle_times;
316
317 CREATE TABLE events (
318 id INTEGER PRIMARY KEY AUTOINCREMENT,
319 timestamp INTEGER NOT NULL,
320 event_type TEXT NOT NULL,
321 role TEXT,
322 task_id TEXT,
323 payload TEXT NOT NULL
324 );
325
326 CREATE TABLE agent_metrics (
327 role TEXT PRIMARY KEY,
328 completions INTEGER NOT NULL DEFAULT 0,
329 failures INTEGER NOT NULL DEFAULT 0,
330 restarts INTEGER NOT NULL DEFAULT 0,
331 total_cycle_secs INTEGER NOT NULL DEFAULT 0,
332 idle_polls INTEGER NOT NULL DEFAULT 0,
333 working_polls INTEGER NOT NULL DEFAULT 0
334 );
335
336 CREATE TABLE task_metrics (
337 task_id TEXT PRIMARY KEY,
338 started_at INTEGER,
339 completed_at INTEGER,
340 retries INTEGER NOT NULL DEFAULT 0,
341 escalations INTEGER NOT NULL DEFAULT 0,
342 merge_time_secs INTEGER
343 );
344
345 CREATE TABLE session_summary (
346 session_id TEXT PRIMARY KEY,
347 started_at INTEGER NOT NULL,
348 ended_at INTEGER,
349 tasks_completed INTEGER NOT NULL DEFAULT 0,
350 total_merges INTEGER NOT NULL DEFAULT 0,
351 total_events INTEGER NOT NULL DEFAULT 0
352 );
353 ",
354 )
355 .context("failed to install legacy telemetry schema fixture")?;
356 Ok(())
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct TaskCycleTimeRow {
361 pub task_id: String,
362 pub engineer: Option<String>,
363 pub priority: String,
364 pub cycle_time_mins: Option<i64>,
365 pub lead_time_mins: Option<i64>,
366 pub completed_at: i64,
367}
368
369#[derive(Debug, Clone, PartialEq)]
370pub struct PriorityCycleTimeRow {
371 pub priority: String,
372 pub average_cycle_time_mins: f64,
373 pub completed_tasks: i64,
374}
375
376#[derive(Debug, Clone, PartialEq)]
377pub struct EngineerThroughputRow {
378 pub engineer: String,
379 pub completed_tasks: i64,
380 pub average_cycle_time_mins: Option<f64>,
381 pub average_lead_time_mins: Option<f64>,
382}
383
384#[derive(Debug, Clone, PartialEq, Eq)]
385pub struct HourlyThroughputRow {
386 pub hour_start: i64,
387 pub completed_tasks: i64,
388}
389
390pub fn record_test_results(
391 conn: &Connection,
392 task_id: u32,
393 engineer: &str,
394 results: &TestResults,
395 flaky_failures: &[TestFailure],
396) -> Result<()> {
397 let task_id = task_id.to_string();
398 let now = chrono::Utc::now().timestamp();
399
400 for failure in flaky_failures {
401 conn.execute(
402 "INSERT INTO test_case_metrics (framework, test_name, flaky_passes, last_task_id, last_engineer, last_seen_at)
403 VALUES (?1, ?2, 1, ?3, ?4, ?5)
404 ON CONFLICT(framework, test_name) DO UPDATE SET
405 flaky_passes = flaky_passes + 1,
406 last_task_id = excluded.last_task_id,
407 last_engineer = excluded.last_engineer,
408 last_seen_at = excluded.last_seen_at",
409 params![results.framework, failure.test_name, task_id, engineer, now],
410 )?;
411 }
412
413 for failure in &results.failures {
414 conn.execute(
415 "INSERT INTO test_case_metrics (framework, test_name, failures, last_task_id, last_engineer, last_seen_at)
416 VALUES (?1, ?2, 1, ?3, ?4, ?5)
417 ON CONFLICT(framework, test_name) DO UPDATE SET
418 failures = failures + 1,
419 last_task_id = excluded.last_task_id,
420 last_engineer = excluded.last_engineer,
421 last_seen_at = excluded.last_seen_at",
422 params![results.framework, failure.test_name, task_id, engineer, now],
423 )?;
424 }
425
426 Ok(())
427}
428
429#[cfg(test)]
430fn query_test_case_metric(
431 conn: &Connection,
432 framework: &str,
433 test_name: &str,
434) -> Result<(u32, u32)> {
435 conn.query_row(
436 "SELECT failures, flaky_passes FROM test_case_metrics WHERE framework = ?1 AND test_name = ?2",
437 params![framework, test_name],
438 |row| Ok((row.get::<_, u32>(0)?, row.get::<_, u32>(1)?)),
439 )
440 .context("failed to query test case metric")
441}
442
443pub fn insert_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
449 let payload =
450 serde_json::to_string(event).context("failed to serialize event for telemetry")?;
451
452 conn.execute(
453 "INSERT INTO events (timestamp, event_type, role, task_id, payload) VALUES (?1, ?2, ?3, ?4, ?5)",
454 params![
455 event.ts as i64,
456 event.event,
457 event.role,
458 event.task,
459 payload,
460 ],
461 )
462 .context("failed to insert telemetry event")?;
463
464 update_metrics_for_event(conn, event)?;
466
467 conn.execute(
470 "UPDATE session_summary SET total_events = total_events + 1
471 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
472 [],
473 )?;
474
475 Ok(())
476}
477
478fn update_metrics_for_event(conn: &Connection, event: &TeamEvent) -> Result<()> {
479 match event.event.as_str() {
480 "task_completed" => {
481 if let Some(role) = &event.role {
482 upsert_agent_counter(conn, role, "completions")?;
483 }
484 if let Some(task) = &event.task {
485 conn.execute(
486 "INSERT INTO task_metrics (task_id, completed_at) VALUES (?1, ?2)
487 ON CONFLICT(task_id) DO UPDATE SET completed_at = ?2",
488 params![task, event.ts as i64],
489 )?;
490 conn.execute(
491 "UPDATE task_metrics
492 SET carry_forward_effective = CASE
493 WHEN context_restart_count = 0 THEN carry_forward_effective
494 WHEN context_restart_count = 1 THEN 1
495 ELSE 0
496 END
497 WHERE task_id = ?1",
498 params![task],
499 )?;
500 }
501 conn.execute(
503 "UPDATE session_summary SET tasks_completed = tasks_completed + 1
504 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
505 [],
506 )?;
507 }
508 "task_assigned" => {
509 if let Some(task) = &event.task {
510 conn.execute(
511 "INSERT INTO task_metrics (task_id, started_at) VALUES (?1, ?2)
512 ON CONFLICT(task_id) DO UPDATE SET started_at = COALESCE(task_metrics.started_at, ?2)",
513 params![task, event.ts as i64],
514 )?;
515 }
516 }
517 "task_escalated" | "narration_restart" => {
518 if let Some(task) = &event.task {
519 conn.execute(
520 "INSERT INTO task_metrics (task_id, escalations) VALUES (?1, 1)
521 ON CONFLICT(task_id) DO UPDATE SET escalations = escalations + 1",
522 params![task],
523 )?;
524 }
525 }
526 "narration_rejection" => {
527 if let Some(task) = &event.task {
528 conn.execute(
529 "INSERT INTO task_metrics (task_id, narration_rejections) VALUES (?1, 1)
530 ON CONFLICT(task_id) DO UPDATE SET narration_rejections = narration_rejections + 1",
531 params![task],
532 )?;
533 }
534 }
535 "member_crashed" | "pane_death" | "delivery_failed" => {
536 if let Some(role) = &event.role {
537 upsert_agent_counter(conn, role, "failures")?;
538 }
539 }
540 "pane_respawned" | "agent_restarted" | "context_exhausted" => {
541 if let Some(role) = &event.role {
542 upsert_agent_counter(conn, role, "restarts")?;
543 }
544 if event.event == "agent_restarted"
545 && event.reason.as_deref() == Some("context_exhausted")
546 && let Some(task) = &event.task
547 {
548 conn.execute(
549 "INSERT INTO task_metrics (task_id, context_restart_count) VALUES (?1, 1)
550 ON CONFLICT(task_id) DO UPDATE SET context_restart_count = context_restart_count + 1",
551 params![task],
552 )?;
553 }
554 }
555 "agent_handoff" => {
556 if let Some(task) = &event.task {
557 let success = if event.success == Some(true) { 1 } else { 0 };
558 conn.execute(
559 "INSERT INTO task_metrics (task_id, handoff_attempts, handoff_successes) VALUES (?1, 1, ?2)
560 ON CONFLICT(task_id) DO UPDATE SET
561 handoff_attempts = handoff_attempts + 1,
562 handoff_successes = handoff_successes + ?2",
563 params![task, success],
564 )?;
565 }
566 }
567 "task_auto_merged" | "task_manual_merged" => {
568 if let Some(task) = &event.task {
569 conn.execute(
570 "INSERT INTO task_metrics (task_id, merge_time_secs) VALUES (?1, ?2)
571 ON CONFLICT(task_id) DO UPDATE SET merge_time_secs = ?2 - COALESCE(task_metrics.started_at, ?2)",
572 params![task, event.ts as i64],
573 )?;
574 }
575 conn.execute(
577 "UPDATE session_summary SET total_merges = total_merges + 1
578 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
579 [],
580 )?;
581 }
582 "merge_confidence_scored" => {
583 if let Some(task) = &event.task {
584 if let Some(confidence) = event.load {
585 conn.execute(
586 "INSERT INTO task_metrics (task_id, confidence_score) VALUES (?1, ?2)
587 ON CONFLICT(task_id) DO UPDATE SET confidence_score = ?2",
588 params![task, confidence],
589 )?;
590 }
591 }
592 }
593 "daemon_started" => {
594 let session_id = format!("session-{}", event.ts);
595 conn.execute(
596 "INSERT OR IGNORE INTO session_summary (session_id, started_at) VALUES (?1, ?2)",
597 params![session_id, event.ts as i64],
598 )?;
599 }
600 "discord_event_sent" => {
601 conn.execute(
602 "UPDATE session_summary SET discord_events_sent = discord_events_sent + 1
603 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
604 [],
605 )?;
606 }
607 "auto_merge_post_verify_result" => match event.success {
608 Some(true) => {
609 conn.execute(
610 "UPDATE session_summary SET verification_passes = verification_passes + 1
611 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
612 [],
613 )?;
614 }
615 Some(false) => {
616 conn.execute(
617 "UPDATE session_summary SET verification_failures = verification_failures + 1
618 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
619 [],
620 )?;
621 }
622 None => {}
623 },
624 "inbox_message_deduplicated" | "inbox_batch_delivered" => {
625 conn.execute(
626 "UPDATE session_summary SET notification_isolations = notification_isolations + 1
627 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
628 [],
629 )?;
630 }
631 "notification_delivery_sample" => {
632 let latency_secs = event.uptime_secs.unwrap_or(0) as i64;
633 conn.execute(
634 "UPDATE session_summary
635 SET notification_latency_total_secs = notification_latency_total_secs + ?1,
636 notification_latency_samples = notification_latency_samples + 1
637 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
638 params![latency_secs],
639 )?;
640 }
641 "daemon_stopped" => {
644 conn.execute(
645 "UPDATE session_summary SET ended_at = ?1
646 WHERE rowid = (SELECT rowid FROM session_summary ORDER BY started_at DESC LIMIT 1)",
647 params![event.ts as i64],
648 )?;
649 }
650 _ => {}
651 }
652 Ok(())
653}
654
655fn upsert_agent_counter(conn: &Connection, role: &str, column: &str) -> Result<()> {
656 let sql = format!(
658 "INSERT INTO agent_metrics (role, {column}) VALUES (?1, 1)
659 ON CONFLICT(role) DO UPDATE SET {column} = {column} + 1"
660 );
661 conn.execute(&sql, params![role])?;
662 Ok(())
663}
664
665pub fn record_agent_poll_state(
670 conn: &Connection,
671 role: &str,
672 is_working: bool,
673 poll_interval_secs: u64,
674) -> Result<()> {
675 if is_working {
676 conn.execute(
677 "INSERT INTO agent_metrics (role, working_polls, total_cycle_secs)
678 VALUES (?1, 1, ?2)
679 ON CONFLICT(role) DO UPDATE SET
680 working_polls = working_polls + 1,
681 total_cycle_secs = total_cycle_secs + ?2",
682 params![role, poll_interval_secs as i64],
683 )?;
684 } else {
685 conn.execute(
686 "INSERT INTO agent_metrics (role, idle_polls) VALUES (?1, 1)
687 ON CONFLICT(role) DO UPDATE SET idle_polls = idle_polls + 1",
688 params![role],
689 )?;
690 }
691 Ok(())
692}
693
694#[derive(Debug, Clone)]
700pub struct SessionSummaryRow {
701 pub session_id: String,
702 pub started_at: i64,
703 pub ended_at: Option<i64>,
704 pub tasks_completed: i64,
705 pub total_merges: i64,
706 pub total_events: i64,
707 pub discord_events_sent: i64,
708 pub verification_passes: i64,
709 pub verification_failures: i64,
710 pub notification_isolations: i64,
711 pub notification_latency_total_secs: i64,
712 pub notification_latency_samples: i64,
713}
714
715pub fn query_session_summaries(conn: &Connection) -> Result<Vec<SessionSummaryRow>> {
716 let mut stmt = conn.prepare(
717 "SELECT session_id, started_at, ended_at, tasks_completed, total_merges, total_events,
718 discord_events_sent, verification_passes, verification_failures,
719 notification_isolations, notification_latency_total_secs, notification_latency_samples
720 FROM session_summary ORDER BY started_at DESC LIMIT 20",
721 )?;
722 let rows = stmt
723 .query_map([], |row| {
724 Ok(SessionSummaryRow {
725 session_id: row.get(0)?,
726 started_at: row.get(1)?,
727 ended_at: row.get(2)?,
728 tasks_completed: row.get(3)?,
729 total_merges: row.get(4)?,
730 total_events: row.get(5)?,
731 discord_events_sent: row.get(6)?,
732 verification_passes: row.get(7)?,
733 verification_failures: row.get(8)?,
734 notification_isolations: row.get(9)?,
735 notification_latency_total_secs: row.get(10)?,
736 notification_latency_samples: row.get(11)?,
737 })
738 })?
739 .collect::<std::result::Result<Vec<_>, _>>()?;
740 Ok(rows)
741}
742
743#[derive(Debug, Clone)]
745pub struct AgentMetricsRow {
746 pub role: String,
747 pub completions: i64,
748 pub failures: i64,
749 pub restarts: i64,
750 pub total_cycle_secs: i64,
751 pub idle_polls: i64,
752 pub working_polls: i64,
753}
754
755pub fn query_agent_metrics(conn: &Connection) -> Result<Vec<AgentMetricsRow>> {
756 let mut stmt = conn.prepare(
757 "SELECT role, completions, failures, restarts, total_cycle_secs, idle_polls, working_polls
758 FROM agent_metrics ORDER BY role",
759 )?;
760 let rows = stmt
761 .query_map([], |row| {
762 Ok(AgentMetricsRow {
763 role: row.get(0)?,
764 completions: row.get(1)?,
765 failures: row.get(2)?,
766 restarts: row.get(3)?,
767 total_cycle_secs: row.get(4)?,
768 idle_polls: row.get(5)?,
769 working_polls: row.get(6)?,
770 })
771 })?
772 .collect::<std::result::Result<Vec<_>, _>>()?;
773 Ok(rows)
774}
775
776#[derive(Debug, Clone)]
778pub struct TaskMetricsRow {
779 pub task_id: String,
780 pub started_at: Option<i64>,
781 pub completed_at: Option<i64>,
782 pub retries: i64,
783 pub narration_rejections: i64,
784 pub escalations: i64,
785 pub context_restart_count: i64,
786 pub handoff_attempts: i64,
787 pub handoff_successes: i64,
788 pub carry_forward_effective: Option<bool>,
789 pub merge_time_secs: Option<i64>,
790 pub confidence_score: Option<f64>,
791}
792
793pub fn query_task_metrics(conn: &Connection) -> Result<Vec<TaskMetricsRow>> {
794 let mut stmt = conn.prepare(
795 "SELECT task_id, started_at, completed_at, retries, narration_rejections, escalations,
796 context_restart_count, handoff_attempts, handoff_successes,
797 carry_forward_effective, merge_time_secs, confidence_score
798 FROM task_metrics ORDER BY started_at DESC NULLS LAST LIMIT 50",
799 )?;
800 let rows = stmt
801 .query_map([], |row| {
802 Ok(TaskMetricsRow {
803 task_id: row.get(0)?,
804 started_at: row.get(1)?,
805 completed_at: row.get(2)?,
806 retries: row.get(3)?,
807 narration_rejections: row.get(4)?,
808 escalations: row.get(5)?,
809 context_restart_count: row.get(6)?,
810 handoff_attempts: row.get(7)?,
811 handoff_successes: row.get(8)?,
812 carry_forward_effective: row.get::<_, Option<i64>>(9)?.map(|value| value != 0),
813 merge_time_secs: row.get(10)?,
814 confidence_score: row.get(11)?,
815 })
816 })?
817 .collect::<std::result::Result<Vec<_>, _>>()?;
818 Ok(rows)
819}
820
821pub fn replace_task_cycle_times(conn: &Connection, rows: &[TaskCycleTimeRecord]) -> Result<()> {
822 conn.execute("DELETE FROM task_cycle_times", [])?;
823
824 let mut insert = conn.prepare(
825 "INSERT INTO task_cycle_times
826 (task_id, engineer, priority, cycle_time_mins, lead_time_mins, completed_at)
827 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
828 )?;
829
830 for row in rows {
831 let Some(completed_at) = row.completed_at else {
832 continue;
833 };
834 insert.execute(params![
835 row.task_id.to_string(),
836 row.engineer.as_deref(),
837 row.priority.as_str(),
838 row.cycle_time_minutes,
839 row.lead_time_minutes,
840 completed_at,
841 ])?;
842 }
843
844 Ok(())
845}
846
847pub fn query_task_cycle_times(conn: &Connection) -> Result<Vec<TaskCycleTimeRow>> {
848 let mut stmt = conn.prepare(
849 "SELECT task_id, engineer, priority, cycle_time_mins, lead_time_mins, completed_at
850 FROM task_cycle_times
851 ORDER BY completed_at DESC",
852 )?;
853 let rows = stmt
854 .query_map([], |row| {
855 Ok(TaskCycleTimeRow {
856 task_id: row.get(0)?,
857 engineer: row.get(1)?,
858 priority: row.get(2)?,
859 cycle_time_mins: row.get(3)?,
860 lead_time_mins: row.get(4)?,
861 completed_at: row.get(5)?,
862 })
863 })?
864 .collect::<std::result::Result<Vec<_>, _>>()?;
865 Ok(rows)
866}
867
868pub fn query_average_cycle_time_by_priority(
869 conn: &Connection,
870) -> Result<Vec<PriorityCycleTimeRow>> {
871 let mut stmt = conn.prepare(
872 "SELECT priority,
873 AVG(cycle_time_mins) AS avg_cycle_time_mins,
874 COUNT(*) AS completed_tasks
875 FROM task_cycle_times
876 WHERE cycle_time_mins IS NOT NULL
877 GROUP BY priority
878 ORDER BY CASE priority
879 WHEN 'critical' THEN 0
880 WHEN 'high' THEN 1
881 WHEN 'medium' THEN 2
882 WHEN 'low' THEN 3
883 ELSE 4
884 END, priority",
885 )?;
886 let rows = stmt
887 .query_map([], |row| {
888 Ok(PriorityCycleTimeRow {
889 priority: row.get(0)?,
890 average_cycle_time_mins: row.get(1)?,
891 completed_tasks: row.get(2)?,
892 })
893 })?
894 .collect::<std::result::Result<Vec<_>, _>>()?;
895 Ok(rows)
896}
897
898pub fn query_engineer_throughput(conn: &Connection) -> Result<Vec<EngineerThroughputRow>> {
899 let mut stmt = conn.prepare(
900 "SELECT engineer,
901 COUNT(*) AS completed_tasks,
902 AVG(cycle_time_mins) AS avg_cycle_time_mins,
903 AVG(lead_time_mins) AS avg_lead_time_mins
904 FROM task_cycle_times
905 WHERE engineer IS NOT NULL
906 GROUP BY engineer
907 ORDER BY completed_tasks DESC, engineer ASC",
908 )?;
909 let rows = stmt
910 .query_map([], |row| {
911 Ok(EngineerThroughputRow {
912 engineer: row.get(0)?,
913 completed_tasks: row.get(1)?,
914 average_cycle_time_mins: row.get(2)?,
915 average_lead_time_mins: row.get(3)?,
916 })
917 })?
918 .collect::<std::result::Result<Vec<_>, _>>()?;
919 Ok(rows)
920}
921
922pub fn query_hourly_throughput(
923 conn: &Connection,
924 window_start: i64,
925) -> Result<Vec<HourlyThroughputRow>> {
926 let mut stmt = conn.prepare(
927 "WITH RECURSIVE hours(hour_start) AS (
928 SELECT (?1 / 3600) * 3600
929 UNION ALL
930 SELECT hour_start + 3600
931 FROM hours
932 WHERE hour_start + 3600 <= (strftime('%s', 'now', 'localtime') / 3600) * 3600
933 )
934 SELECT hours.hour_start,
935 COALESCE(counts.completed_tasks, 0) AS completed_tasks
936 FROM hours
937 LEFT JOIN (
938 SELECT (completed_at / 3600) * 3600 AS hour_start,
939 COUNT(*) AS completed_tasks
940 FROM task_cycle_times
941 WHERE completed_at >= ?1
942 GROUP BY 1
943 ) counts ON counts.hour_start = hours.hour_start
944 ORDER BY hours.hour_start",
945 )?;
946 let rows = stmt
947 .query_map(params![window_start], |row| {
948 Ok(HourlyThroughputRow {
949 hour_start: row.get(0)?,
950 completed_tasks: row.get(1)?,
951 })
952 })?
953 .collect::<std::result::Result<Vec<_>, _>>()?;
954 Ok(rows)
955}
956
957#[derive(Debug, Clone)]
959pub struct EventRow {
960 pub timestamp: i64,
961 pub event_type: String,
962 pub role: Option<String>,
963 pub task_id: Option<String>,
964}
965
966pub fn query_recent_events(conn: &Connection, limit: usize) -> Result<Vec<EventRow>> {
967 let mut stmt = conn.prepare(
968 "SELECT timestamp, event_type, role, task_id
969 FROM events ORDER BY timestamp DESC LIMIT ?1",
970 )?;
971 let rows = stmt
972 .query_map(params![limit as i64], |row| {
973 Ok(EventRow {
974 timestamp: row.get(0)?,
975 event_type: row.get(1)?,
976 role: row.get(2)?,
977 task_id: row.get(3)?,
978 })
979 })?
980 .collect::<std::result::Result<Vec<_>, _>>()?;
981 Ok(rows)
982}
983
984#[derive(Debug, Clone)]
986pub struct ReviewMetricsRow {
987 pub auto_merge_count: i64,
988 pub manual_merge_count: i64,
989 pub direct_root_merge_count: i64,
990 pub isolated_integration_merge_count: i64,
991 pub direct_root_failure_count: i64,
992 pub isolated_integration_failure_count: i64,
993 pub rework_count: i64,
994 pub review_nudge_count: i64,
995 pub review_escalation_count: i64,
996 pub avg_review_latency_secs: Option<f64>,
997 pub accepted_decision_count: i64,
998 pub rejected_decision_count: i64,
999 pub rejection_reasons: Vec<AutoMergeReasonRow>,
1000 pub post_merge_verify_pass_count: i64,
1001 pub post_merge_verify_fail_count: i64,
1002 pub post_merge_verify_skip_count: i64,
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1006pub struct AutoMergeReasonRow {
1007 pub reason: String,
1008 pub count: i64,
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Serialize)]
1012pub struct EngineerPerformanceProfileRow {
1013 pub role: String,
1014 pub completed_tasks: i64,
1015 pub avg_task_completion_secs: Option<f64>,
1016 pub lines_per_hour: Option<f64>,
1017 pub first_pass_test_rate: Option<f64>,
1018 pub context_exhaustion_frequency: Option<f64>,
1019}
1020
1021pub fn query_engineer_performance_profiles(
1022 conn: &Connection,
1023) -> Result<Vec<EngineerPerformanceProfileRow>> {
1024 let mut completion_stmt = conn.prepare(
1025 "SELECT role, task_id,
1026 json_extract(payload, '$.time_to_completion_secs') AS time_to_completion_secs,
1027 json_extract(payload, '$.first_pass_test_rate') AS first_pass_test_rate
1028 FROM events
1029 WHERE event_type = 'quality_metrics_recorded'
1030 AND role IS NOT NULL
1031 AND task_id IS NOT NULL
1032 ORDER BY role, task_id",
1033 )?;
1034
1035 #[derive(Default)]
1036 struct Accumulator {
1037 completed_tasks: i64,
1038 total_completion_secs: f64,
1039 completion_secs_samples: i64,
1040 first_pass_sum: f64,
1041 first_pass_samples: i64,
1042 context_exhausted_tasks: i64,
1043 loc_lines: i64,
1044 loc_hours: f64,
1045 }
1046
1047 let mut by_role = std::collections::BTreeMap::<String, Accumulator>::new();
1048 let mut task_durations = std::collections::HashMap::<String, (String, f64)>::new();
1049
1050 let completion_rows = completion_stmt.query_map([], |row| {
1051 Ok((
1052 row.get::<_, String>(0)?,
1053 row.get::<_, String>(1)?,
1054 row.get::<_, Option<f64>>(2)?,
1055 row.get::<_, Option<f64>>(3)?,
1056 ))
1057 })?;
1058
1059 for row in completion_rows {
1060 let (role, task_id, completion_secs, first_pass_rate) = row?;
1061 let entry = by_role.entry(role.clone()).or_default();
1062 entry.completed_tasks += 1;
1063 if let Some(completion_secs) = completion_secs {
1064 entry.total_completion_secs += completion_secs;
1065 entry.completion_secs_samples += 1;
1066 task_durations.insert(task_id.clone(), (role.clone(), completion_secs));
1067 }
1068 if let Some(first_pass_rate) = first_pass_rate {
1069 entry.first_pass_sum += first_pass_rate;
1070 entry.first_pass_samples += 1;
1071 }
1072 }
1073
1074 let mut ctx_stmt = conn.prepare(
1075 "SELECT task_id, context_restart_count
1076 FROM task_metrics
1077 WHERE context_restart_count > 0",
1078 )?;
1079 let ctx_rows = ctx_stmt.query_map([], |row| {
1080 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1081 })?;
1082 for row in ctx_rows {
1083 let (task_id, _) = row?;
1084 if let Some((role, _)) = task_durations.get(&task_id) {
1085 by_role
1086 .entry(role.clone())
1087 .or_default()
1088 .context_exhausted_tasks += 1;
1089 }
1090 }
1091
1092 let mut merge_stmt = conn.prepare(
1093 "SELECT role, task_id, json_extract(payload, '$.reason') AS reason
1094 FROM events
1095 WHERE event_type = 'task_auto_merged'
1096 AND role IS NOT NULL
1097 AND task_id IS NOT NULL",
1098 )?;
1099 let merge_rows = merge_stmt.query_map([], |row| {
1100 Ok((
1101 row.get::<_, String>(0)?,
1102 row.get::<_, String>(1)?,
1103 row.get::<_, Option<String>>(2)?,
1104 ))
1105 })?;
1106 for row in merge_rows {
1107 let (role, task_id, reason) = row?;
1108 let Some(lines_changed) = reason
1109 .as_deref()
1110 .and_then(parse_lines_changed_from_merge_reason)
1111 else {
1112 continue;
1113 };
1114 let Some((task_role, completion_secs)) = task_durations.get(&task_id) else {
1115 continue;
1116 };
1117 if task_role != &role {
1118 continue;
1119 }
1120 let entry = by_role.entry(role).or_default();
1121 entry.loc_lines += lines_changed as i64;
1122 entry.loc_hours += completion_secs / 3600.0;
1123 }
1124
1125 Ok(by_role
1126 .into_iter()
1127 .map(|(role, acc)| EngineerPerformanceProfileRow {
1128 role,
1129 completed_tasks: acc.completed_tasks,
1130 avg_task_completion_secs: (acc.completion_secs_samples > 0)
1131 .then(|| acc.total_completion_secs / acc.completion_secs_samples as f64),
1132 lines_per_hour: (acc.loc_hours > 0.0).then(|| acc.loc_lines as f64 / acc.loc_hours),
1133 first_pass_test_rate: (acc.first_pass_samples > 0)
1134 .then(|| acc.first_pass_sum / acc.first_pass_samples as f64),
1135 context_exhaustion_frequency: (acc.completed_tasks > 0)
1136 .then(|| acc.context_exhausted_tasks as f64 / acc.completed_tasks as f64),
1137 })
1138 .collect())
1139}
1140
1141fn parse_lines_changed_from_merge_reason(reason: &str) -> Option<u64> {
1142 reason
1143 .split_whitespace()
1144 .find_map(|token| token.strip_prefix("lines="))
1145 .and_then(|value| value.parse::<u64>().ok())
1146}
1147
1148fn load_events_by_type(conn: &Connection, event_type: &str) -> Result<Vec<TeamEvent>> {
1149 let mut stmt =
1150 conn.prepare("SELECT payload FROM events WHERE event_type = ?1 ORDER BY timestamp ASC")?;
1151 let rows = stmt.query_map(params![event_type], |row| row.get::<_, String>(0))?;
1152 rows.map(|row| {
1153 let payload = row?;
1154 serde_json::from_str::<TeamEvent>(&payload)
1155 .context("failed to deserialize telemetry event payload")
1156 })
1157 .collect()
1158}
1159
1160fn extract_auto_merge_reasons(event: &TeamEvent) -> Vec<String> {
1161 if let Some(details) = event.details.as_deref()
1162 && let Ok(record) =
1163 serde_json::from_str::<crate::team::auto_merge::AutoMergeDecisionRecord>(details)
1164 {
1165 return record.reasons;
1166 }
1167
1168 event
1169 .reason
1170 .as_deref()
1171 .and_then(|reason| reason.split("reasons: ").nth(1))
1172 .map(|reasons| {
1173 reasons
1174 .split("; ")
1175 .map(str::trim)
1176 .filter(|reason| !reason.is_empty())
1177 .map(str::to_string)
1178 .collect::<Vec<_>>()
1179 })
1180 .unwrap_or_default()
1181}
1182
1183fn increment_merge_mode_counts(
1184 direct_root_count: &mut i64,
1185 isolated_integration_count: &mut i64,
1186 merge_mode: Option<&str>,
1187) {
1188 match merge_mode {
1189 Some("direct_root") => *direct_root_count += 1,
1190 Some("isolated_integration") => *isolated_integration_count += 1,
1191 _ => {}
1192 }
1193}
1194
1195pub fn query_review_metrics(conn: &Connection) -> Result<ReviewMetricsRow> {
1197 let count_event = |event_type: &str| -> Result<i64> {
1198 let n: i64 = conn.query_row(
1199 "SELECT COUNT(*) FROM events WHERE event_type = ?1",
1200 params![event_type],
1201 |row| row.get(0),
1202 )?;
1203 Ok(n)
1204 };
1205
1206 let auto_merge_count = count_event("task_auto_merged")?;
1207 let manual_merge_count = count_event("task_manual_merged")?;
1208 let rework_count = count_event("task_reworked")?;
1209 let review_nudge_count = count_event("review_nudge_sent")?;
1210 let review_escalation_count = count_event("review_escalated")?;
1211 let decision_events = load_events_by_type(conn, "auto_merge_decision_recorded")?;
1212 let post_verify_events = load_events_by_type(conn, "auto_merge_post_verify_result")?;
1213 let auto_merge_events = load_events_by_type(conn, "task_auto_merged")?;
1214 let manual_merge_events = load_events_by_type(conn, "task_manual_merged")?;
1215 let merge_failure_events = load_events_by_type(conn, "task_merge_failed")?;
1216
1217 let mut direct_root_merge_count = 0;
1218 let mut isolated_integration_merge_count = 0;
1219 for event in auto_merge_events.iter().chain(manual_merge_events.iter()) {
1220 increment_merge_mode_counts(
1221 &mut direct_root_merge_count,
1222 &mut isolated_integration_merge_count,
1223 event.merge_mode.as_deref(),
1224 );
1225 }
1226
1227 let mut direct_root_failure_count = 0;
1228 let mut isolated_integration_failure_count = 0;
1229 for event in &merge_failure_events {
1230 increment_merge_mode_counts(
1231 &mut direct_root_failure_count,
1232 &mut isolated_integration_failure_count,
1233 event.merge_mode.as_deref(),
1234 );
1235 }
1236
1237 let mut accepted_decision_count = 0;
1238 let mut rejected_decision_count = 0;
1239 let mut rejection_reasons = BTreeMap::<String, i64>::new();
1240 for event in decision_events {
1241 match event.action_type.as_deref() {
1242 Some("accepted") => accepted_decision_count += 1,
1243 Some("manual_review") => {
1244 rejected_decision_count += 1;
1245 for reason in extract_auto_merge_reasons(&event) {
1246 *rejection_reasons.entry(reason).or_insert(0) += 1;
1247 }
1248 }
1249 _ => {}
1250 }
1251 }
1252
1253 let mut post_merge_verify_pass_count = 0;
1254 let mut post_merge_verify_fail_count = 0;
1255 let mut post_merge_verify_skip_count = 0;
1256 for event in post_verify_events {
1257 match event.success {
1258 Some(true) => post_merge_verify_pass_count += 1,
1259 Some(false) => post_merge_verify_fail_count += 1,
1260 None => post_merge_verify_skip_count += 1,
1261 }
1262 }
1263
1264 let avg_review_latency_secs: Option<f64> = conn
1267 .query_row(
1268 "SELECT AVG(m.timestamp - c.timestamp)
1269 FROM events c
1270 JOIN events m ON c.task_id = m.task_id
1271 AND m.event_type IN ('task_auto_merged', 'task_manual_merged')
1272 WHERE c.event_type = 'task_completed'
1273 AND c.task_id IS NOT NULL
1274 AND m.timestamp >= c.timestamp",
1275 [],
1276 |row| row.get(0),
1277 )
1278 .unwrap_or(None);
1279
1280 let mut rejection_reasons = rejection_reasons
1281 .into_iter()
1282 .map(|(reason, count)| AutoMergeReasonRow { reason, count })
1283 .collect::<Vec<_>>();
1284 rejection_reasons.sort_by(|left, right| {
1285 right
1286 .count
1287 .cmp(&left.count)
1288 .then_with(|| left.reason.cmp(&right.reason))
1289 });
1290
1291 Ok(ReviewMetricsRow {
1292 auto_merge_count,
1293 manual_merge_count,
1294 direct_root_merge_count,
1295 isolated_integration_merge_count,
1296 direct_root_failure_count,
1297 isolated_integration_failure_count,
1298 rework_count,
1299 review_nudge_count,
1300 review_escalation_count,
1301 avg_review_latency_secs,
1302 accepted_decision_count,
1303 rejected_decision_count,
1304 rejection_reasons,
1305 post_merge_verify_pass_count,
1306 post_merge_verify_fail_count,
1307 post_merge_verify_skip_count,
1308 })
1309}
1310
1311pub fn query_merge_queue_depth(conn: &Connection) -> Result<i64> {
1312 conn.query_row(
1313 "SELECT COUNT(*)
1314 FROM task_metrics tm
1315 WHERE tm.completed_at IS NOT NULL
1316 AND NOT EXISTS (
1317 SELECT 1
1318 FROM events e
1319 WHERE e.task_id = tm.task_id
1320 AND e.event_type IN ('task_auto_merged', 'task_manual_merged', 'task_reworked')
1321 AND e.timestamp >= tm.completed_at
1322 )",
1323 [],
1324 |row| row.get(0),
1325 )
1326 .context("failed to query merge queue depth")
1327}
1328
1329#[cfg(test)]
1330mod tests {
1331 use super::*;
1332 use crate::team::events::TeamEvent;
1333
1334 #[test]
1335 fn schema_creation_succeeds() {
1336 let conn = open_in_memory().unwrap();
1337 let count: i64 = conn
1339 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
1340 .unwrap();
1341 assert_eq!(count, 0);
1342 }
1343
1344 #[test]
1345 fn idempotent_schema_creation() {
1346 let conn = open_in_memory().unwrap();
1347 init_schema(&conn).unwrap();
1349 }
1350
1351 #[test]
1352 fn legacy_schema_repairs_missing_columns_once() {
1353 let conn = Connection::open_in_memory().unwrap();
1354 install_legacy_schema_for_tests(&conn).unwrap();
1355
1356 init_schema(&conn).unwrap();
1357 init_schema(&conn).unwrap();
1358
1359 let task_columns = query_table_columns(&conn, "task_metrics").unwrap();
1360 assert!(task_columns.contains("narration_rejections"));
1361 assert!(task_columns.contains("confidence_score"));
1362 assert!(task_columns.contains("context_restart_count"));
1363
1364 let session_columns = query_table_columns(&conn, "session_summary").unwrap();
1365 assert!(session_columns.contains("verification_passes"));
1366 assert!(session_columns.contains("notification_latency_samples"));
1367
1368 let repair_events: i64 = conn
1369 .query_row(
1370 "SELECT COUNT(*) FROM events WHERE event_type = 'telemetry_schema_repaired'",
1371 [],
1372 |row| row.get(0),
1373 )
1374 .unwrap();
1375 assert_eq!(repair_events, 1);
1376
1377 let payload: String = conn
1378 .query_row(
1379 "SELECT payload FROM events WHERE event_type = 'telemetry_schema_repaired'",
1380 [],
1381 |row| row.get(0),
1382 )
1383 .unwrap();
1384 assert!(payload.contains("narration_rejections"));
1385 assert!(payload.contains("confidence_score"));
1386 assert!(payload.contains("verification_passes"));
1387 }
1388
1389 #[test]
1390 fn insert_and_query_event_round_trip() {
1391 let conn = open_in_memory().unwrap();
1392 let event = TeamEvent::daemon_started();
1393 insert_event(&conn, &event).unwrap();
1394
1395 let events = query_recent_events(&conn, 10).unwrap();
1396 assert_eq!(events.len(), 1);
1397 assert_eq!(events[0].event_type, "daemon_started");
1398 }
1399
1400 #[test]
1401 fn task_assigned_creates_task_metric() {
1402 let conn = open_in_memory().unwrap();
1403 let event = TeamEvent::task_assigned("eng-1", "42");
1404 insert_event(&conn, &event).unwrap();
1405
1406 let tasks = query_task_metrics(&conn).unwrap();
1407 assert_eq!(tasks.len(), 1);
1408 assert_eq!(tasks[0].task_id, "42");
1409 assert!(tasks[0].started_at.is_some());
1410 }
1411
1412 #[test]
1413 fn task_completed_updates_agent_and_task_metrics() {
1414 let conn = open_in_memory().unwrap();
1415
1416 let assign = TeamEvent::task_assigned("eng-1", "42");
1417 insert_event(&conn, &assign).unwrap();
1418
1419 let complete = TeamEvent::task_completed("eng-1", Some("42"));
1420 insert_event(&conn, &complete).unwrap();
1421
1422 let agents = query_agent_metrics(&conn).unwrap();
1423 assert_eq!(agents.len(), 1);
1424 assert_eq!(agents[0].role, "eng-1");
1425 assert_eq!(agents[0].completions, 1);
1426
1427 let tasks = query_task_metrics(&conn).unwrap();
1428 assert_eq!(tasks.len(), 1);
1429 assert!(tasks[0].completed_at.is_some());
1430 }
1431
1432 #[test]
1433 fn escalation_increments_task_escalations() {
1434 let conn = open_in_memory().unwrap();
1435 let event = TeamEvent::task_escalated("eng-1", "42", None);
1436 insert_event(&conn, &event).unwrap();
1437 insert_event(&conn, &event).unwrap();
1438
1439 let tasks = query_task_metrics(&conn).unwrap();
1440 assert_eq!(tasks[0].escalations, 2);
1441 }
1442
1443 #[test]
1444 fn meta_conversation_escalation_increments_task_escalations() {
1445 let conn = open_in_memory().unwrap();
1446 insert_event(&conn, &TeamEvent::narration_restart("eng-1", Some(42))).unwrap();
1447 insert_event(&conn, &TeamEvent::narration_restart("eng-1", Some(42))).unwrap();
1448
1449 let tasks = query_task_metrics(&conn).unwrap();
1450 assert_eq!(tasks.len(), 1);
1451 assert_eq!(tasks[0].task_id, "42");
1452 assert_eq!(tasks[0].escalations, 2);
1453 }
1454
1455 #[test]
1456 fn narration_rejection_increments_task_metric() {
1457 let conn = open_in_memory().unwrap();
1458 insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 1)).unwrap();
1459 insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 2)).unwrap();
1460
1461 let tasks = query_task_metrics(&conn).unwrap();
1462 assert_eq!(tasks.len(), 1);
1463 assert_eq!(tasks[0].task_id, "42");
1464 assert_eq!(tasks[0].narration_rejections, 2);
1465 }
1466
1467 #[test]
1468 fn legacy_schema_accepts_new_task_metric_event_writes_after_repair() {
1469 let conn = Connection::open_in_memory().unwrap();
1470 install_legacy_schema_for_tests(&conn).unwrap();
1471 init_schema(&conn).unwrap();
1472
1473 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1474 insert_event(&conn, &TeamEvent::narration_rejection("eng-1", 42, 1)).unwrap();
1475 insert_event(
1476 &conn,
1477 &TeamEvent::merge_confidence_scored(&crate::team::events::MergeConfidenceInfo {
1478 engineer: "eng-1",
1479 task: "42",
1480 confidence: 0.82,
1481 files_changed: 3,
1482 lines_changed: 40,
1483 has_migrations: false,
1484 has_config_changes: false,
1485 rename_count: 0,
1486 }),
1487 )
1488 .unwrap();
1489
1490 let tasks = query_task_metrics(&conn).unwrap();
1491 assert_eq!(tasks.len(), 1);
1492 assert_eq!(tasks[0].task_id, "42");
1493 assert_eq!(tasks[0].narration_rejections, 1);
1494 assert_eq!(tasks[0].confidence_score, Some(0.82));
1495 }
1496
1497 #[test]
1498 fn legacy_schema_accepts_new_session_summary_counters_after_repair() {
1499 let conn = Connection::open_in_memory().unwrap();
1500 install_legacy_schema_for_tests(&conn).unwrap();
1501 init_schema(&conn).unwrap();
1502
1503 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1504 insert_event(
1505 &conn,
1506 &TeamEvent::auto_merge_post_verify_result("eng-1", "42", Some(true), "passed", None),
1507 )
1508 .unwrap();
1509 insert_event(
1510 &conn,
1511 &TeamEvent::notification_delivery_sample("daemon", "manager", 12, "digest"),
1512 )
1513 .unwrap();
1514
1515 let summaries = query_session_summaries(&conn).unwrap();
1516 assert_eq!(summaries.len(), 1);
1517 assert_eq!(summaries[0].verification_passes, 1);
1518 assert_eq!(summaries[0].verification_failures, 0);
1519 assert_eq!(summaries[0].notification_latency_total_secs, 12);
1520 assert_eq!(summaries[0].notification_latency_samples, 1);
1521 }
1522
1523 #[test]
1524 fn context_exhausted_restart_increments_task_restart_count() {
1525 let conn = open_in_memory().unwrap();
1526 insert_event(
1527 &conn,
1528 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1529 )
1530 .unwrap();
1531 insert_event(
1532 &conn,
1533 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 2),
1534 )
1535 .unwrap();
1536
1537 let tasks = query_task_metrics(&conn).unwrap();
1538 assert_eq!(tasks.len(), 1);
1539 assert_eq!(tasks[0].task_id, "42");
1540 assert_eq!(tasks[0].context_restart_count, 2);
1541 assert_eq!(tasks[0].handoff_attempts, 0);
1542 assert_eq!(tasks[0].carry_forward_effective, None);
1543 }
1544
1545 #[test]
1546 fn agent_handoff_updates_attempt_and_success_counts() {
1547 let conn = open_in_memory().unwrap();
1548 insert_event(
1549 &conn,
1550 &TeamEvent::agent_handoff("eng-1", "42", "stalled", true),
1551 )
1552 .unwrap();
1553 insert_event(
1554 &conn,
1555 &TeamEvent::agent_handoff("eng-1", "42", "shim_crash", false),
1556 )
1557 .unwrap();
1558
1559 let tasks = query_task_metrics(&conn).unwrap();
1560 assert_eq!(tasks.len(), 1);
1561 assert_eq!(tasks[0].task_id, "42");
1562 assert_eq!(tasks[0].handoff_attempts, 2);
1563 assert_eq!(tasks[0].handoff_successes, 1);
1564 }
1565
1566 #[test]
1567 fn task_completion_marks_single_restart_carry_forward_effective() {
1568 let conn = open_in_memory().unwrap();
1569 insert_event(
1570 &conn,
1571 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1572 )
1573 .unwrap();
1574 insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("42"))).unwrap();
1575
1576 let tasks = query_task_metrics(&conn).unwrap();
1577 assert_eq!(tasks[0].context_restart_count, 1);
1578 assert_eq!(tasks[0].carry_forward_effective, Some(true));
1579 }
1580
1581 #[test]
1582 fn task_completion_marks_multi_restart_carry_forward_ineffective() {
1583 let conn = open_in_memory().unwrap();
1584 insert_event(
1585 &conn,
1586 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1587 )
1588 .unwrap();
1589 insert_event(
1590 &conn,
1591 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 2),
1592 )
1593 .unwrap();
1594 insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("42"))).unwrap();
1595
1596 let tasks = query_task_metrics(&conn).unwrap();
1597 assert_eq!(tasks[0].context_restart_count, 2);
1598 assert_eq!(tasks[0].carry_forward_effective, Some(false));
1599 }
1600
1601 #[test]
1602 fn engineer_performance_profiles_aggregate_completion_quality_and_context() {
1603 let conn = open_in_memory().unwrap();
1604
1605 insert_event(
1606 &conn,
1607 &TeamEvent::quality_metrics_recorded(&crate::team::events::QualityMetricsInfo {
1608 backend: "codex",
1609 role: "eng-1",
1610 task: "41",
1611 narration_ratio: 0.1,
1612 commit_frequency: 1.0,
1613 first_pass_test_rate: 1.0,
1614 retry_rate: 0.0,
1615 time_to_completion_secs: 3_600,
1616 }),
1617 )
1618 .unwrap();
1619 insert_event(
1620 &conn,
1621 &TeamEvent::quality_metrics_recorded(&crate::team::events::QualityMetricsInfo {
1622 backend: "codex",
1623 role: "eng-1",
1624 task: "42",
1625 narration_ratio: 0.2,
1626 commit_frequency: 2.0,
1627 first_pass_test_rate: 0.0,
1628 retry_rate: 1.0,
1629 time_to_completion_secs: 1_800,
1630 }),
1631 )
1632 .unwrap();
1633 insert_event(
1634 &conn,
1635 &TeamEvent::agent_restarted("eng-1", "42", "context_exhausted", 1),
1636 )
1637 .unwrap();
1638 insert_event(
1639 &conn,
1640 &TeamEvent::task_auto_merged("eng-1", "41", 0.9, 2, 90),
1641 )
1642 .unwrap();
1643 insert_event(
1644 &conn,
1645 &TeamEvent::task_auto_merged("eng-1", "42", 0.9, 2, 30),
1646 )
1647 .unwrap();
1648
1649 let rows = query_engineer_performance_profiles(&conn).unwrap();
1650 assert_eq!(rows.len(), 1);
1651 let row = &rows[0];
1652 assert_eq!(row.role, "eng-1");
1653 assert_eq!(row.completed_tasks, 2);
1654 assert_eq!(row.avg_task_completion_secs, Some(2_700.0));
1655 assert_eq!(row.first_pass_test_rate, Some(0.5));
1656 assert_eq!(row.context_exhaustion_frequency, Some(0.5));
1657 assert_eq!(row.lines_per_hour, Some(80.0));
1658 }
1659
1660 #[test]
1661 fn pane_death_increments_failures() {
1662 let conn = open_in_memory().unwrap();
1663 let event = TeamEvent::pane_death("eng-1");
1664 insert_event(&conn, &event).unwrap();
1665
1666 let agents = query_agent_metrics(&conn).unwrap();
1667 assert_eq!(agents[0].failures, 1);
1668 }
1669
1670 #[test]
1671 fn pane_respawned_increments_restarts() {
1672 let conn = open_in_memory().unwrap();
1673 let event = TeamEvent::pane_respawned("eng-1");
1674 insert_event(&conn, &event).unwrap();
1675
1676 let agents = query_agent_metrics(&conn).unwrap();
1677 assert_eq!(agents[0].restarts, 1);
1678 }
1679
1680 #[test]
1681 fn delivery_failed_increments_failures() {
1682 let conn = open_in_memory().unwrap();
1683 let event = TeamEvent::delivery_failed("eng-1", "manager", "pane not ready");
1684 insert_event(&conn, &event).unwrap();
1685
1686 let agents = query_agent_metrics(&conn).unwrap();
1687 assert_eq!(agents.len(), 1);
1688 assert_eq!(agents[0].role, "eng-1");
1689 assert_eq!(agents[0].failures, 1);
1690 }
1691
1692 #[test]
1693 fn context_exhausted_increments_restarts() {
1694 let conn = open_in_memory().unwrap();
1695 let event = TeamEvent::context_exhausted("eng-1", Some(42), Some(500_000));
1696 insert_event(&conn, &event).unwrap();
1697
1698 let agents = query_agent_metrics(&conn).unwrap();
1699 assert_eq!(agents.len(), 1);
1700 assert_eq!(agents[0].role, "eng-1");
1701 assert_eq!(agents[0].restarts, 1);
1702 }
1703
1704 #[test]
1705 fn all_failure_event_types_accumulate() {
1706 let conn = open_in_memory().unwrap();
1707 insert_event(&conn, &TeamEvent::pane_death("eng-1")).unwrap();
1708 insert_event(&conn, &TeamEvent::member_crashed("eng-1", true)).unwrap();
1709 insert_event(
1710 &conn,
1711 &TeamEvent::delivery_failed("eng-1", "manager", "timeout"),
1712 )
1713 .unwrap();
1714
1715 let agents = query_agent_metrics(&conn).unwrap();
1716 assert_eq!(agents[0].failures, 3);
1717 }
1718
1719 #[test]
1720 fn all_restart_event_types_accumulate() {
1721 let conn = open_in_memory().unwrap();
1722 insert_event(&conn, &TeamEvent::pane_respawned("eng-1")).unwrap();
1723 insert_event(
1724 &conn,
1725 &TeamEvent::agent_restarted("eng-1", "42", "stall", 1),
1726 )
1727 .unwrap();
1728 insert_event(
1729 &conn,
1730 &TeamEvent::context_exhausted("eng-1", Some(42), None),
1731 )
1732 .unwrap();
1733
1734 let agents = query_agent_metrics(&conn).unwrap();
1735 assert_eq!(agents[0].restarts, 3);
1736 }
1737
1738 #[test]
1739 fn daemon_started_creates_session_summary() {
1740 let conn = open_in_memory().unwrap();
1741 let event = TeamEvent::daemon_started();
1742 insert_event(&conn, &event).unwrap();
1743
1744 let summaries = query_session_summaries(&conn).unwrap();
1745 assert_eq!(summaries.len(), 1);
1746 assert!(summaries[0].session_id.starts_with("session-"));
1747 }
1748
1749 #[test]
1750 fn multiple_events_for_same_agent_accumulate() {
1751 let conn = open_in_memory().unwrap();
1752 let c1 = TeamEvent::task_completed("eng-1", Some("1"));
1753 let c2 = TeamEvent::task_completed("eng-1", Some("2"));
1754 insert_event(&conn, &c1).unwrap();
1755 insert_event(&conn, &c2).unwrap();
1756
1757 let agents = query_agent_metrics(&conn).unwrap();
1758 assert_eq!(agents[0].completions, 2);
1759 }
1760
1761 #[test]
1762 fn query_recent_events_respects_limit() {
1763 let conn = open_in_memory().unwrap();
1764 for _ in 0..5 {
1765 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
1766 }
1767 let events = query_recent_events(&conn, 3).unwrap();
1768 assert_eq!(events.len(), 3);
1769 }
1770
1771 #[test]
1772 fn concurrent_writes_to_same_connection() {
1773 let conn = open_in_memory().unwrap();
1776 for i in 0..50 {
1777 let event = TeamEvent::task_assigned("eng-1", &i.to_string());
1778 insert_event(&conn, &event).unwrap();
1779 }
1780 let events = query_recent_events(&conn, 100).unwrap();
1781 assert_eq!(events.len(), 50);
1782 }
1783
1784 #[test]
1785 fn review_metrics_empty_db() {
1786 let conn = open_in_memory().unwrap();
1787 let row = query_review_metrics(&conn).unwrap();
1788 assert_eq!(row.auto_merge_count, 0);
1789 assert_eq!(row.manual_merge_count, 0);
1790 assert_eq!(row.direct_root_merge_count, 0);
1791 assert_eq!(row.isolated_integration_merge_count, 0);
1792 assert_eq!(row.direct_root_failure_count, 0);
1793 assert_eq!(row.isolated_integration_failure_count, 0);
1794 assert_eq!(row.rework_count, 0);
1795 assert_eq!(row.review_nudge_count, 0);
1796 assert_eq!(row.review_escalation_count, 0);
1797 assert!(row.avg_review_latency_secs.is_none());
1798 assert_eq!(row.accepted_decision_count, 0);
1799 assert_eq!(row.rejected_decision_count, 0);
1800 assert!(row.rejection_reasons.is_empty());
1801 assert_eq!(row.post_merge_verify_pass_count, 0);
1802 assert_eq!(row.post_merge_verify_fail_count, 0);
1803 assert_eq!(row.post_merge_verify_skip_count, 0);
1804 }
1805
1806 #[test]
1807 fn review_metrics_counts_all_event_types() {
1808 let conn = open_in_memory().unwrap();
1809 insert_event(
1810 &conn,
1811 &TeamEvent::task_auto_merged_with_mode(
1812 "eng-1",
1813 "1",
1814 0.9,
1815 2,
1816 30,
1817 Some(crate::team::merge::MergeMode::DirectRoot),
1818 ),
1819 )
1820 .unwrap();
1821 insert_event(
1822 &conn,
1823 &TeamEvent::task_auto_merged_with_mode(
1824 "eng-1",
1825 "2",
1826 0.9,
1827 2,
1828 30,
1829 Some(crate::team::merge::MergeMode::IsolatedIntegration),
1830 ),
1831 )
1832 .unwrap();
1833 insert_event(
1834 &conn,
1835 &TeamEvent::task_manual_merged_with_mode(
1836 "3",
1837 Some(crate::team::merge::MergeMode::DirectRoot),
1838 ),
1839 )
1840 .unwrap();
1841 insert_event(
1842 &conn,
1843 &TeamEvent::task_merge_failed(
1844 "eng-1",
1845 "8",
1846 Some(crate::team::merge::MergeMode::IsolatedIntegration),
1847 "isolated merge path failed: integration checkout broke",
1848 ),
1849 )
1850 .unwrap();
1851 insert_event(&conn, &TeamEvent::task_reworked("eng-1", "4")).unwrap();
1852 insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "5")).unwrap();
1853 insert_event(&conn, &TeamEvent::review_nudge_sent("manager", "6")).unwrap();
1854 insert_event(&conn, &TeamEvent::review_escalated_by_role("manager", "7")).unwrap();
1855 insert_event(
1856 &conn,
1857 &TeamEvent::auto_merge_decision_recorded(&crate::team::events::AutoMergeDecisionInfo {
1858 engineer: "eng-1",
1859 task: "1",
1860 action_type: "accepted",
1861 confidence: 0.9,
1862 reason: "accepted for auto-merge: confidence 0.90; 2 files, 30 lines, 1 modules; reasons: confidence 0.90 meets threshold 0.80",
1863 details: r#"{"decision":"accepted","reasons":["confidence 0.90 meets threshold 0.80"],"files_changed":2,"lines_changed":30,"modules_touched":1,"has_migrations":false,"has_config_changes":false,"has_unsafe":false,"has_conflicts":false,"rename_count":0,"tests_passed":true,"override_forced":null,"diff_available":true}"#,
1864 }),
1865 )
1866 .unwrap();
1867 insert_event(
1868 &conn,
1869 &TeamEvent::auto_merge_decision_recorded(&crate::team::events::AutoMergeDecisionInfo {
1870 engineer: "eng-1",
1871 task: "3",
1872 action_type: "manual_review",
1873 confidence: 0.52,
1874 reason: "routed to manual review: confidence 0.52; 6 files, 220 lines, 3 modules; reasons: touches sensitive paths; 6 files changed (max 5)",
1875 details: r#"{"decision":"manual_review","reasons":["touches sensitive paths","6 files changed (max 5)"],"files_changed":6,"lines_changed":220,"modules_touched":3,"has_migrations":false,"has_config_changes":false,"has_unsafe":false,"has_conflicts":false,"rename_count":0,"tests_passed":true,"override_forced":null,"diff_available":true}"#,
1876 }),
1877 )
1878 .unwrap();
1879 insert_event(
1880 &conn,
1881 &TeamEvent::auto_merge_post_verify_result(
1882 "eng-1",
1883 "1",
1884 Some(true),
1885 "passed",
1886 Some("post-merge verification on main passed"),
1887 ),
1888 )
1889 .unwrap();
1890 insert_event(
1891 &conn,
1892 &TeamEvent::auto_merge_post_verify_result(
1893 "eng-1",
1894 "2",
1895 Some(false),
1896 "failed",
1897 Some("post-merge verification on main failed"),
1898 ),
1899 )
1900 .unwrap();
1901 insert_event(
1902 &conn,
1903 &TeamEvent::auto_merge_post_verify_result(
1904 "eng-1",
1905 "3",
1906 None,
1907 "skipped",
1908 Some("post-merge verification was not requested for this merge"),
1909 ),
1910 )
1911 .unwrap();
1912
1913 let row = query_review_metrics(&conn).unwrap();
1914 assert_eq!(row.auto_merge_count, 2);
1915 assert_eq!(row.manual_merge_count, 1);
1916 assert_eq!(row.direct_root_merge_count, 2);
1917 assert_eq!(row.isolated_integration_merge_count, 1);
1918 assert_eq!(row.direct_root_failure_count, 0);
1919 assert_eq!(row.isolated_integration_failure_count, 1);
1920 assert_eq!(row.rework_count, 1);
1921 assert_eq!(row.review_nudge_count, 2);
1922 assert_eq!(row.review_escalation_count, 1);
1923 assert_eq!(row.accepted_decision_count, 1);
1924 assert_eq!(row.rejected_decision_count, 1);
1925 assert_eq!(
1926 row.rejection_reasons,
1927 vec![
1928 AutoMergeReasonRow {
1929 reason: "6 files changed (max 5)".to_string(),
1930 count: 1,
1931 },
1932 AutoMergeReasonRow {
1933 reason: "touches sensitive paths".to_string(),
1934 count: 1,
1935 },
1936 ]
1937 );
1938 assert_eq!(row.post_merge_verify_pass_count, 1);
1939 assert_eq!(row.post_merge_verify_fail_count, 1);
1940 assert_eq!(row.post_merge_verify_skip_count, 1);
1941 }
1942
1943 #[test]
1944 fn review_metrics_computes_avg_latency() {
1945 let conn = open_in_memory().unwrap();
1946
1947 let mut c1 = TeamEvent::task_completed("eng-1", Some("10"));
1949 c1.ts = 1000;
1950 insert_event(&conn, &c1).unwrap();
1951 let mut m1 = TeamEvent::task_auto_merged_with_mode(
1952 "eng-1",
1953 "10",
1954 0.9,
1955 2,
1956 30,
1957 Some(crate::team::merge::MergeMode::DirectRoot),
1958 );
1959 m1.ts = 1100;
1960 insert_event(&conn, &m1).unwrap();
1961
1962 let mut c2 = TeamEvent::task_completed("eng-2", Some("20"));
1964 c2.ts = 2000;
1965 insert_event(&conn, &c2).unwrap();
1966 let mut m2 = TeamEvent::task_manual_merged_with_mode(
1967 "20",
1968 Some(crate::team::merge::MergeMode::IsolatedIntegration),
1969 );
1970 m2.ts = 2300;
1971 insert_event(&conn, &m2).unwrap();
1972
1973 let row = query_review_metrics(&conn).unwrap();
1974 let avg = row.avg_review_latency_secs.unwrap();
1976 assert!((avg - 200.0).abs() < 0.01);
1977 }
1978
1979 #[test]
1980 fn task_cycle_time_rows_replace_existing_snapshot() {
1981 let conn = open_in_memory().unwrap();
1982 let rows = vec![
1983 TaskCycleTimeRecord {
1984 task_id: 1,
1985 title: "One".to_string(),
1986 engineer: Some("eng-1".to_string()),
1987 priority: "high".to_string(),
1988 status: "done".to_string(),
1989 created_at: Some(100),
1990 started_at: Some(160),
1991 completed_at: Some(460),
1992 cycle_time_minutes: Some(5),
1993 lead_time_minutes: Some(6),
1994 },
1995 TaskCycleTimeRecord {
1996 task_id: 2,
1997 title: "Two".to_string(),
1998 engineer: Some("eng-2".to_string()),
1999 priority: "low".to_string(),
2000 status: "done".to_string(),
2001 created_at: Some(200),
2002 started_at: Some(260),
2003 completed_at: Some(560),
2004 cycle_time_minutes: Some(5),
2005 lead_time_minutes: Some(6),
2006 },
2007 ];
2008
2009 replace_task_cycle_times(&conn, &rows).unwrap();
2010 let stored = query_task_cycle_times(&conn).unwrap();
2011 assert_eq!(stored.len(), 2);
2012
2013 replace_task_cycle_times(&conn, &rows[..1]).unwrap();
2014 let stored = query_task_cycle_times(&conn).unwrap();
2015 assert_eq!(stored.len(), 1);
2016 assert_eq!(stored[0].task_id, "1");
2017 }
2018
2019 #[test]
2020 fn task_cycle_time_queries_aggregate_priority_and_engineer() {
2021 let conn = open_in_memory().unwrap();
2022 replace_task_cycle_times(
2023 &conn,
2024 &[
2025 TaskCycleTimeRecord {
2026 task_id: 1,
2027 title: "One".to_string(),
2028 engineer: Some("eng-1".to_string()),
2029 priority: "high".to_string(),
2030 status: "done".to_string(),
2031 created_at: Some(100),
2032 started_at: Some(160),
2033 completed_at: Some(460),
2034 cycle_time_minutes: Some(5),
2035 lead_time_minutes: Some(6),
2036 },
2037 TaskCycleTimeRecord {
2038 task_id: 2,
2039 title: "Two".to_string(),
2040 engineer: Some("eng-1".to_string()),
2041 priority: "high".to_string(),
2042 status: "done".to_string(),
2043 created_at: Some(200),
2044 started_at: Some(260),
2045 completed_at: Some(860),
2046 cycle_time_minutes: Some(10),
2047 lead_time_minutes: Some(11),
2048 },
2049 TaskCycleTimeRecord {
2050 task_id: 3,
2051 title: "Three".to_string(),
2052 engineer: Some("eng-2".to_string()),
2053 priority: "medium".to_string(),
2054 status: "done".to_string(),
2055 created_at: Some(300),
2056 started_at: Some(360),
2057 completed_at: Some(660),
2058 cycle_time_minutes: Some(5),
2059 lead_time_minutes: Some(6),
2060 },
2061 ],
2062 )
2063 .unwrap();
2064
2065 let by_priority = query_average_cycle_time_by_priority(&conn).unwrap();
2066 assert_eq!(by_priority[0].priority, "high");
2067 assert!((by_priority[0].average_cycle_time_mins - 7.5).abs() < f64::EPSILON);
2068
2069 let by_engineer = query_engineer_throughput(&conn).unwrap();
2070 assert_eq!(by_engineer[0].engineer, "eng-1");
2071 assert_eq!(by_engineer[0].completed_tasks, 2);
2072 }
2073
2074 #[test]
2075 fn record_test_results_tracks_failures_and_flakes() {
2076 let conn = open_in_memory().unwrap();
2077 let failed_results = TestResults {
2078 framework: "cargo".to_string(),
2079 total: Some(2),
2080 passed: 1,
2081 failed: 1,
2082 ignored: 0,
2083 failures: vec![super::super::test_results::TestFailure {
2084 test_name: "tests::fails".to_string(),
2085 message: Some("assertion failed".to_string()),
2086 location: Some("src/lib.rs:9".to_string()),
2087 }],
2088 summary: None,
2089 };
2090 record_test_results(&conn, 42, "eng-1", &failed_results, &[]).unwrap();
2091
2092 let (failures, flaky_passes) =
2093 query_test_case_metric(&conn, "cargo", "tests::fails").unwrap();
2094 assert_eq!(failures, 1);
2095 assert_eq!(flaky_passes, 0);
2096
2097 let passed_results = TestResults {
2098 framework: "cargo".to_string(),
2099 total: Some(2),
2100 passed: 2,
2101 failed: 0,
2102 ignored: 0,
2103 failures: vec![],
2104 summary: None,
2105 };
2106 record_test_results(
2107 &conn,
2108 42,
2109 "eng-1",
2110 &passed_results,
2111 &failed_results.failures,
2112 )
2113 .unwrap();
2114
2115 let (failures, flaky_passes) =
2116 query_test_case_metric(&conn, "cargo", "tests::fails").unwrap();
2117 assert_eq!(failures, 1);
2118 assert_eq!(flaky_passes, 1);
2119 }
2120
2121 #[test]
2124 fn tasks_completed_increments_on_task_completed() {
2125 let conn = open_in_memory().unwrap();
2126 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2127
2128 insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2129 insert_event(&conn, &TeamEvent::task_completed("eng-2", Some("2"))).unwrap();
2130
2131 let summaries = query_session_summaries(&conn).unwrap();
2132 assert_eq!(summaries[0].tasks_completed, 2);
2133 }
2134
2135 #[test]
2138 fn total_merges_increments_on_auto_and_manual_merge() {
2139 let conn = open_in_memory().unwrap();
2140 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2141
2142 insert_event(
2143 &conn,
2144 &TeamEvent::task_auto_merged_with_mode(
2145 "eng-1",
2146 "1",
2147 0.9,
2148 2,
2149 30,
2150 Some(crate::team::merge::MergeMode::DirectRoot),
2151 ),
2152 )
2153 .unwrap();
2154 insert_event(
2155 &conn,
2156 &TeamEvent::task_manual_merged_with_mode(
2157 "2",
2158 Some(crate::team::merge::MergeMode::IsolatedIntegration),
2159 ),
2160 )
2161 .unwrap();
2162 insert_event(
2163 &conn,
2164 &TeamEvent::task_auto_merged_with_mode(
2165 "eng-1",
2166 "3",
2167 0.8,
2168 1,
2169 10,
2170 Some(crate::team::merge::MergeMode::DirectRoot),
2171 ),
2172 )
2173 .unwrap();
2174
2175 let summaries = query_session_summaries(&conn).unwrap();
2176 assert_eq!(summaries[0].total_merges, 3);
2177 }
2178
2179 #[test]
2182 fn total_events_increments_on_every_insert() {
2183 let conn = open_in_memory().unwrap();
2184 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2186 insert_event(&conn, &TeamEvent::task_assigned("eng-1", "1")).unwrap();
2187 insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2188
2189 let summaries = query_session_summaries(&conn).unwrap();
2190 assert_eq!(summaries[0].total_events, 3);
2193 }
2194
2195 #[test]
2198 fn ended_at_set_on_daemon_stopped() {
2199 let conn = open_in_memory().unwrap();
2200 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2201
2202 let summaries = query_session_summaries(&conn).unwrap();
2203 assert!(summaries[0].ended_at.is_none());
2204
2205 let mut stop = TeamEvent::daemon_stopped_with_reason("shutdown", 3600);
2206 stop.ts = 9999;
2207 insert_event(&conn, &stop).unwrap();
2208
2209 let summaries = query_session_summaries(&conn).unwrap();
2210 assert_eq!(summaries[0].ended_at, Some(9999));
2211 }
2212
2213 #[test]
2214 fn ended_at_set_on_plain_daemon_stopped() {
2215 let conn = open_in_memory().unwrap();
2216 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2217
2218 let mut stop = TeamEvent::daemon_stopped();
2219 stop.ts = 5000;
2220 insert_event(&conn, &stop).unwrap();
2221
2222 let summaries = query_session_summaries(&conn).unwrap();
2223 assert_eq!(summaries[0].ended_at, Some(5000));
2224 }
2225
2226 #[test]
2227 fn session_summary_tracks_discord_verification_and_notification_counters() {
2228 let conn = open_in_memory().unwrap();
2229 insert_event(&conn, &TeamEvent::daemon_started()).unwrap();
2230 insert_event(
2231 &conn,
2232 &TeamEvent::discord_event_sent("events", "task_completed"),
2233 )
2234 .unwrap();
2235 insert_event(
2236 &conn,
2237 &TeamEvent::auto_merge_post_verify_result("eng-1", "42", Some(true), "passed", None),
2238 )
2239 .unwrap();
2240 insert_event(
2241 &conn,
2242 &TeamEvent::auto_merge_post_verify_result("eng-1", "43", Some(false), "failed", None),
2243 )
2244 .unwrap();
2245 insert_event(
2246 &conn,
2247 &TeamEvent::inbox_message_deduplicated("manager", "eng-1", 0xfeed),
2248 )
2249 .unwrap();
2250 insert_event(
2251 &conn,
2252 &TeamEvent::notification_delivery_sample("daemon", "manager", 12, "digest"),
2253 )
2254 .unwrap();
2255
2256 let summaries = query_session_summaries(&conn).unwrap();
2257 assert_eq!(summaries[0].discord_events_sent, 1);
2258 assert_eq!(summaries[0].verification_passes, 1);
2259 assert_eq!(summaries[0].verification_failures, 1);
2260 assert_eq!(summaries[0].notification_isolations, 1);
2261 assert_eq!(summaries[0].notification_latency_total_secs, 12);
2262 assert_eq!(summaries[0].notification_latency_samples, 1);
2263 }
2264
2265 #[test]
2266 fn merge_queue_depth_counts_completed_tasks_awaiting_merge() {
2267 let conn = open_in_memory().unwrap();
2268 let mut started = TeamEvent::daemon_started();
2269 started.ts = 100;
2270 insert_event(&conn, &started).unwrap();
2271
2272 let mut completed = TeamEvent::task_completed("eng-1", Some("42"));
2273 completed.ts = 200;
2274 insert_event(&conn, &completed).unwrap();
2275
2276 assert_eq!(query_merge_queue_depth(&conn).unwrap(), 1);
2277
2278 let mut merged = TeamEvent::task_auto_merged("eng-1", "42", 0.9, 2, 10);
2279 merged.ts = 250;
2280 insert_event(&conn, &merged).unwrap();
2281
2282 assert_eq!(query_merge_queue_depth(&conn).unwrap(), 0);
2283 }
2284
2285 #[test]
2288 fn record_agent_poll_state_tracks_idle_polls() {
2289 let conn = open_in_memory().unwrap();
2290 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2291 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2292 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2293
2294 let agents = query_agent_metrics(&conn).unwrap();
2295 assert_eq!(agents[0].idle_polls, 3);
2296 assert_eq!(agents[0].working_polls, 0);
2297 assert_eq!(agents[0].total_cycle_secs, 0);
2298 }
2299
2300 #[test]
2301 fn record_agent_poll_state_tracks_working_polls() {
2302 let conn = open_in_memory().unwrap();
2303 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2304 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2305
2306 let agents = query_agent_metrics(&conn).unwrap();
2307 assert_eq!(agents[0].working_polls, 2);
2308 assert_eq!(agents[0].idle_polls, 0);
2309 }
2310
2311 #[test]
2314 fn record_agent_poll_state_accumulates_cycle_secs_for_working() {
2315 let conn = open_in_memory().unwrap();
2316 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2317 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2318 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2319
2320 let agents = query_agent_metrics(&conn).unwrap();
2321 assert_eq!(agents[0].total_cycle_secs, 15); }
2323
2324 #[test]
2325 fn record_agent_poll_state_idle_does_not_accumulate_cycle_secs() {
2326 let conn = open_in_memory().unwrap();
2327 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2328 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2329
2330 let agents = query_agent_metrics(&conn).unwrap();
2331 assert_eq!(agents[0].total_cycle_secs, 0);
2332 }
2333
2334 #[test]
2335 fn record_agent_poll_state_mixed_idle_and_working() {
2336 let conn = open_in_memory().unwrap();
2337 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2338 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2339 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2340 record_agent_poll_state(&conn, "eng-1", false, 5).unwrap();
2341
2342 let agents = query_agent_metrics(&conn).unwrap();
2343 assert_eq!(agents[0].working_polls, 2);
2344 assert_eq!(agents[0].idle_polls, 2);
2345 assert_eq!(agents[0].total_cycle_secs, 10); }
2347
2348 #[test]
2349 fn record_agent_poll_state_multiple_agents() {
2350 let conn = open_in_memory().unwrap();
2351 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2352 record_agent_poll_state(&conn, "eng-2", false, 5).unwrap();
2353 record_agent_poll_state(&conn, "eng-1", true, 5).unwrap();
2354 record_agent_poll_state(&conn, "eng-2", true, 5).unwrap();
2355
2356 let agents = query_agent_metrics(&conn).unwrap();
2357 let eng1 = agents.iter().find(|a| a.role == "eng-1").unwrap();
2358 let eng2 = agents.iter().find(|a| a.role == "eng-2").unwrap();
2359 assert_eq!(eng1.working_polls, 2);
2360 assert_eq!(eng1.total_cycle_secs, 10);
2361 assert_eq!(eng2.idle_polls, 1);
2362 assert_eq!(eng2.working_polls, 1);
2363 assert_eq!(eng2.total_cycle_secs, 5);
2364 }
2365
2366 #[test]
2369 fn session_counters_noop_without_session() {
2370 let conn = open_in_memory().unwrap();
2373 insert_event(&conn, &TeamEvent::task_completed("eng-1", Some("1"))).unwrap();
2374 insert_event(
2375 &conn,
2376 &TeamEvent::task_auto_merged("eng-1", "1", 0.9, 2, 30),
2377 )
2378 .unwrap();
2379 let summaries = query_session_summaries(&conn).unwrap();
2380 assert!(summaries.is_empty());
2381 }
2382}