Skip to main content

batty_cli/team/
grafana.rs

1//! Grafana monitoring: bundled dashboard template, alert provisioning, and CLI commands.
2//!
3//! The JSON is compiled into the binary via `include_str!()` so `batty init`
4//! can write it without needing a network fetch or external file.
5
6use anyhow::{Context, Result, bail};
7use serde_json::json;
8use std::fs::OpenOptions;
9use std::io::{Read, Write};
10use std::net::{TcpListener, TcpStream};
11use std::path::{Path, PathBuf};
12use std::process::{Command as ProcessCommand, Stdio};
13use std::time::Duration;
14
15/// Raw JSON for the Grafana dashboard template.
16pub const DASHBOARD_JSON: &str = include_str!("grafana/dashboard.json");
17
18/// Default Grafana port.
19pub const DEFAULT_PORT: u16 = 3000;
20/// Local webhook receiver used by the file-backed contact point.
21pub const ALERT_WEBHOOK_PORT: u16 = 8787;
22
23/// Expected row titles in the dashboard (used for validation).
24pub const REQUIRED_ROWS: &[&str] = &[
25    "Session Overview",
26    "Pipeline Health",
27    "Activity Over Time",
28    "Agent Performance",
29    "Event Breakdown",
30    "Recent Activity",
31    "Throughput Over Time",
32    "Tact Engine",
33    "Health Signals",
34    "Board Health",
35];
36
37/// Expected alert names provisioned alongside the dashboard.
38pub const REQUIRED_ALERTS: &[&str] = &[
39    "Zero Activity",
40    "Crash Spike",
41    "Dispatch Starvation",
42    "Merge Queue Depth",
43];
44
45const ALERT_RULE_GROUP: &str = "batty-operational-anomalies";
46const ALERT_FOLDER: &str = "Batty";
47const ALERT_CONTACT_POINT: &str = "batty-file-log";
48
49#[derive(Clone, Copy)]
50struct AlertRuleDefinition {
51    uid: &'static str,
52    title: &'static str,
53    severity: &'static str,
54    window_secs: u64,
55    for_duration: &'static str,
56    threshold: f64,
57    sql: &'static str,
58    description: &'static str,
59}
60
61const ALERT_RULES: &[AlertRuleDefinition] = &[
62    AlertRuleDefinition {
63        uid: "batty_zero_activity",
64        title: "Zero Activity",
65        severity: "warning",
66        window_secs: 30 * 60,
67        for_duration: "5m",
68        threshold: 0.5,
69        sql: "SELECT CASE WHEN COUNT(*) = 0 THEN 1 ELSE 0 END AS value \
70              FROM events \
71              WHERE event_type IN ('task_assigned', 'task_completed') \
72                AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
73        description: "No task assignment or completion activity in the last 30 minutes.",
74    },
75    AlertRuleDefinition {
76        uid: "batty_crash_spike",
77        title: "Crash Spike",
78        severity: "critical",
79        window_secs: 10 * 60,
80        for_duration: "2m",
81        threshold: 3.0,
82        sql: "SELECT COUNT(*) AS value \
83              FROM events \
84              WHERE event_type = 'member_crashed' \
85                AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
86        description: "More than three agent crashes in the last 10 minutes.",
87    },
88    AlertRuleDefinition {
89        uid: "batty_dispatch_starvation",
90        title: "Dispatch Starvation",
91        severity: "warning",
92        window_secs: 10 * 60,
93        for_duration: "2m",
94        threshold: 0.5,
95        sql: "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END AS value \
96              FROM events \
97              WHERE event_type = 'pipeline_starvation_detected' \
98                AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
99        description: "Idle engineers outnumber runnable work and dispatch is starving the lane.",
100    },
101    AlertRuleDefinition {
102        uid: "batty_merge_queue_depth",
103        title: "Merge Queue Depth",
104        severity: "warning",
105        window_secs: 60 * 60,
106        for_duration: "2m",
107        threshold: 3.0,
108        sql: "SELECT COUNT(*) AS value \
109              FROM task_metrics tm \
110              WHERE tm.completed_at IS NOT NULL \
111                AND NOT EXISTS ( \
112                    SELECT 1 \
113                    FROM events e \
114                    WHERE e.task_id = tm.task_id \
115                      AND e.event_type IN ('task_auto_merged', 'task_manual_merged', 'task_reworked') \
116                      AND e.timestamp >= tm.completed_at \
117                );",
118        description: "More than three completed tasks are still waiting for merge or rework.",
119    },
120];
121
122/// Write the bundled Grafana dashboard JSON to a file.
123pub fn write_dashboard(path: &Path) -> anyhow::Result<()> {
124    if let Some(parent) = path.parent() {
125        std::fs::create_dir_all(parent)?;
126    }
127    std::fs::write(path, DASHBOARD_JSON)?;
128    Ok(())
129}
130
131/// Build the Grafana base URL from a port number.
132pub fn grafana_url(port: u16) -> String {
133    format!("http://localhost:{port}")
134}
135
136fn alert_webhook_url(port: u16) -> String {
137    format!("http://127.0.0.1:{port}/grafana-alerts")
138}
139
140fn alerts_log_path(project_root: &Path) -> PathBuf {
141    project_root.join(".batty").join("alerts.log")
142}
143
144fn orchestrator_alert_log_path(project_root: &Path) -> PathBuf {
145    crate::team::orchestrator_log_path(project_root)
146}
147
148fn orchestrator_alert_ansi_log_path(project_root: &Path) -> PathBuf {
149    crate::team::orchestrator_ansi_log_path(project_root)
150}
151
152fn project_alerting_dir(project_root: &Path) -> PathBuf {
153    project_root.join(".batty").join("grafana").join("alerting")
154}
155
156fn grafana_provisioning_dir() -> Option<PathBuf> {
157    if let Ok(dir) = std::env::var("BATTY_GRAFANA_PROVISIONING_DIR") {
158        return Some(PathBuf::from(dir));
159    }
160
161    [
162        "/opt/homebrew/etc/grafana/provisioning",
163        "/usr/local/etc/grafana/provisioning",
164        "/etc/grafana/provisioning",
165    ]
166    .iter()
167    .map(PathBuf::from)
168    .find(|path| path.exists())
169}
170
171fn provisioning_target_dir(project_root: &Path) -> PathBuf {
172    grafana_provisioning_dir().unwrap_or_else(|| project_root.join(".batty").join("grafana"))
173}
174
175/// Install Grafana, provision alerting resources, and start the service.
176pub fn setup(project_root: &Path, port: u16) -> Result<()> {
177    println!("Installing Grafana via Homebrew...");
178    run_cmd("brew", &["install", "grafana"])?;
179
180    println!("Installing SQLite datasource plugin...");
181    let plugin_result = ProcessCommand::new("grafana")
182        .args([
183            "cli",
184            "--homepath",
185            "/opt/homebrew/opt/grafana/share/grafana",
186            "--pluginsDir",
187            "/opt/homebrew/var/lib/grafana/plugins",
188            "plugins",
189            "install",
190            "frser-sqlite-datasource",
191        ])
192        .status();
193    if plugin_result.is_err() || !plugin_result.unwrap().success() {
194        let _ = run_cmd(
195            "grafana-cli",
196            &["plugins", "install", "frser-sqlite-datasource"],
197        );
198    }
199
200    println!("Provisioning alert rules and notification policy...");
201    provision_alerting(project_root, ALERT_WEBHOOK_PORT)?;
202    ensure_alert_webhook_running(project_root, ALERT_WEBHOOK_PORT)?;
203
204    println!("Starting Grafana service...");
205    run_cmd("brew", &["services", "start", "grafana"])?;
206
207    println!("Waiting for Grafana to start...");
208    for _ in 0..10 {
209        std::thread::sleep(Duration::from_secs(1));
210        if check_health(&format!("{}/api/health", grafana_url(port))).is_ok() {
211            break;
212        }
213    }
214
215    let _ = reload_alerting_provisioning(port);
216
217    let db_path = project_root.join(".batty").join("telemetry.db");
218    if db_path.exists() {
219        provision_dashboard(project_root, port)?;
220    } else {
221        println!(
222            "telemetry.db not found at {}. Alerting files were written, but dashboard import \
223will complete after `batty start` creates the database.",
224            db_path.display()
225        );
226    }
227
228    println!("Grafana setup complete. Dashboard at {}", grafana_url(port));
229    Ok(())
230}
231
232/// Provision a SQLite datasource and import the bundled dashboard for a project.
233pub fn provision_dashboard(project_root: &Path, port: u16) -> Result<()> {
234    let db_path = project_root.join(".batty").join("telemetry.db");
235    if !db_path.exists() {
236        bail!(
237            "telemetry.db not found at {}. Run `batty start` first.",
238            db_path.display()
239        );
240    }
241    let base_url = grafana_url(port);
242
243    let _ = ProcessCommand::new("sqlite3")
244        .args([
245            db_path.to_str().unwrap_or(""),
246            "PRAGMA wal_checkpoint(TRUNCATE);",
247        ])
248        .status();
249
250    println!("Creating SQLite datasource...");
251    let ds_body = format!(
252        r#"{{"name":"Batty Telemetry","uid":"batty-telemetry","type":"frser-sqlite-datasource","access":"proxy","jsonData":{{"path":"{}"}}}}"#,
253        db_path.display()
254    );
255    let ds_result = ProcessCommand::new("curl")
256        .args([
257            "-sf",
258            "-X",
259            "POST",
260            &format!("{base_url}/api/datasources"),
261            "-H",
262            "Content-Type: application/json",
263            "-u",
264            "admin:admin",
265            "-d",
266            &ds_body,
267        ])
268        .output();
269    match ds_result {
270        Ok(out) if out.status.success() => println!("Datasource created."),
271        _ => println!("Datasource may already exist (continuing)."),
272    }
273
274    println!("Importing dashboard...");
275    let dashboard_payload = format!(
276        r#"{{"dashboard":{},"overwrite":true,"folderId":0}}"#,
277        DASHBOARD_JSON
278    );
279    let tmp_file = std::env::temp_dir().join("batty-grafana-import.json");
280    std::fs::write(&tmp_file, &dashboard_payload)?;
281    let import_result = ProcessCommand::new("curl")
282        .args([
283            "-sf",
284            "-X",
285            "POST",
286            &format!("{base_url}/api/dashboards/db"),
287            "-H",
288            "Content-Type: application/json",
289            "-u",
290            "admin:admin",
291            "-d",
292            &format!("@{}", tmp_file.display()),
293        ])
294        .output();
295    let _ = std::fs::remove_file(&tmp_file);
296    match import_result {
297        Ok(out) if out.status.success() => {
298            let body = String::from_utf8_lossy(&out.stdout);
299            if body.contains("\"status\":\"success\"") {
300                println!("Dashboard imported successfully.");
301            } else {
302                println!("Dashboard import response: {body}");
303            }
304        }
305        _ => {
306            println!("Dashboard import may have failed. Check Grafana UI.");
307        }
308    }
309
310    let url = format!("{base_url}/d/batty-project");
311    println!("Dashboard at: {url}");
312    Ok(())
313}
314
315/// Check whether the Grafana server is reachable by hitting `/api/health`.
316pub fn status(port: u16) -> Result<()> {
317    let url = format!("{}/api/health", grafana_url(port));
318    match check_health(&url) {
319        Ok(body) => {
320            println!("Grafana is running at {}", grafana_url(port));
321            println!("{body}");
322            Ok(())
323        }
324        Err(e) => bail!("Grafana is not reachable at {}: {e}", grafana_url(port)),
325    }
326}
327
328/// Open the Grafana dashboard in the default browser.
329pub fn open(port: u16) -> Result<()> {
330    let url = grafana_url(port);
331    open_browser(&url).context("failed to open browser")?;
332    println!("Opened {url}");
333    Ok(())
334}
335
336/// Run the local webhook receiver used by Grafana's webhook contact point.
337pub fn run_alert_webhook(project_root: &Path, port: u16) -> Result<()> {
338    let listener = TcpListener::bind(("127.0.0.1", port))
339        .with_context(|| format!("failed to bind Grafana alert webhook on port {port}"))?;
340    for stream in listener.incoming() {
341        match stream {
342            Ok(stream) => {
343                if let Err(error) = handle_alert_webhook(stream, project_root) {
344                    eprintln!("grafana alert webhook error: {error}");
345                }
346            }
347            Err(error) => eprintln!("grafana alert webhook accept failed: {error}"),
348        }
349    }
350    Ok(())
351}
352
353pub fn render_alert_rules_yaml() -> Result<String> {
354    let rules = ALERT_RULES
355        .iter()
356        .map(|rule| {
357            json!({
358                "uid": rule.uid,
359                "title": rule.title,
360                "condition": "B",
361                "data": [
362                    {
363                        "refId": "A",
364                        "datasourceUid": "batty-telemetry",
365                        "relativeTimeRange": {
366                            "from": rule.window_secs,
367                            "to": 0
368                        },
369                        "model": {
370                            "datasource": {
371                                "type": "frser-sqlite-datasource",
372                                "uid": "batty-telemetry"
373                            },
374                            "intervalMs": 1000,
375                            "maxDataPoints": 43200,
376                            "queryText": rule.sql,
377                            "queryType": "table",
378                            "rawQueryText": rule.sql,
379                            "rawSql": rule.sql,
380                            "refId": "A"
381                        }
382                    },
383                    {
384                        "refId": "B",
385                        "datasourceUid": "__expr__",
386                        "relativeTimeRange": {
387                            "from": rule.window_secs,
388                            "to": 0
389                        },
390                        "model": {
391                            "conditions": [
392                                {
393                                    "evaluator": {
394                                        "params": [rule.threshold],
395                                        "type": "gt"
396                                    },
397                                    "operator": {
398                                        "type": "and"
399                                    },
400                                    "query": {
401                                        "params": ["A"]
402                                    },
403                                    "reducer": {
404                                        "type": "last"
405                                    },
406                                    "type": "query"
407                                }
408                            ],
409                            "datasource": {
410                                "type": "__expr__",
411                                "uid": "__expr__"
412                            },
413                            "expression": "A",
414                            "intervalMs": 1000,
415                            "maxDataPoints": 43200,
416                            "refId": "B",
417                            "type": "classic_conditions"
418                        }
419                    }
420                ],
421                "noDataState": "OK",
422                "execErrState": "Alerting",
423                "for": rule.for_duration,
424                "annotations": {
425                    "summary": rule.description
426                },
427                "labels": {
428                    "severity": rule.severity,
429                    "team": "batty"
430                }
431            })
432        })
433        .collect::<Vec<_>>();
434
435    serde_yaml::to_string(&json!({
436        "apiVersion": 1,
437        "groups": [
438            {
439                "orgId": 1,
440                "name": ALERT_RULE_GROUP,
441                "folder": ALERT_FOLDER,
442                "interval": "60s",
443                "rules": rules
444            }
445        ]
446    }))
447    .context("failed to render Grafana alert rules YAML")
448}
449
450fn render_contact_points_yaml(port: u16) -> Result<String> {
451    serde_yaml::to_string(&json!({
452        "apiVersion": 1,
453        "contactPoints": [
454            {
455                "orgId": 1,
456                "name": ALERT_CONTACT_POINT,
457                "receivers": [
458                    {
459                        "uid": "batty_file_log",
460                        "type": "webhook",
461                        "disableResolveMessage": false,
462                        "settings": {
463                            "url": alert_webhook_url(port),
464                            "httpMethod": "POST",
465                            "title": "{{ template \"default.title\" . }}",
466                            "message": "{{ template \"default.message\" . }}"
467                        }
468                    }
469                ]
470            }
471        ]
472    }))
473    .context("failed to render Grafana contact points YAML")
474}
475
476fn render_notification_policies_yaml() -> Result<String> {
477    serde_yaml::to_string(&json!({
478        "apiVersion": 1,
479        "policies": [
480            {
481                "orgId": 1,
482                "receiver": ALERT_CONTACT_POINT,
483                "group_by": ["alertname", "severity"],
484                "group_wait": "30s",
485                "group_interval": "5m",
486                "repeat_interval": "4h"
487            }
488        ]
489    }))
490    .context("failed to render Grafana notification policy YAML")
491}
492
493fn provision_alerting(project_root: &Path, port: u16) -> Result<()> {
494    let project_dir = project_alerting_dir(project_root);
495    std::fs::create_dir_all(&project_dir)?;
496    if let Some(parent) = alerts_log_path(project_root).parent() {
497        std::fs::create_dir_all(parent)?;
498    }
499    let _ = OpenOptions::new()
500        .create(true)
501        .append(true)
502        .open(alerts_log_path(project_root))?;
503
504    let rules_yaml = render_alert_rules_yaml()?;
505    let contact_points_yaml = render_contact_points_yaml(port)?;
506    let policies_yaml = render_notification_policies_yaml()?;
507
508    write_alerting_file(&project_dir.join("rules.yaml"), &rules_yaml)?;
509    write_alerting_file(
510        &project_dir.join("contact-points.yaml"),
511        &contact_points_yaml,
512    )?;
513    write_alerting_file(&project_dir.join("policies.yaml"), &policies_yaml)?;
514
515    let target_dir = provisioning_target_dir(project_root).join("alerting");
516    std::fs::create_dir_all(&target_dir)?;
517    write_alerting_file(&target_dir.join("batty-rules.yaml"), &rules_yaml)?;
518    write_alerting_file(
519        &target_dir.join("batty-contact-points.yaml"),
520        &contact_points_yaml,
521    )?;
522    write_alerting_file(&target_dir.join("batty-policies.yaml"), &policies_yaml)?;
523    Ok(())
524}
525
526fn write_alerting_file(path: &Path, content: &str) -> Result<()> {
527    if let Some(parent) = path.parent() {
528        std::fs::create_dir_all(parent)?;
529    }
530    std::fs::write(path, content)
531        .with_context(|| format!("failed to write alerting file {}", path.display()))
532}
533
534fn ensure_alert_webhook_running(project_root: &Path, port: u16) -> Result<()> {
535    if TcpStream::connect(("127.0.0.1", port)).is_ok() {
536        return Ok(());
537    }
538
539    let current_exe = std::env::current_exe().context("failed to resolve current batty binary")?;
540    ProcessCommand::new(current_exe)
541        .args([
542            "grafana-webhook",
543            "--project-root",
544            &project_root.display().to_string(),
545            "--port",
546            &port.to_string(),
547        ])
548        .stdin(Stdio::null())
549        .stdout(Stdio::null())
550        .stderr(Stdio::null())
551        .spawn()
552        .context("failed to start Grafana webhook receiver")?;
553    std::thread::sleep(Duration::from_millis(250));
554    Ok(())
555}
556
557fn handle_alert_webhook(mut stream: TcpStream, project_root: &Path) -> Result<()> {
558    stream.set_read_timeout(Some(Duration::from_secs(2)))?;
559    let mut buf = Vec::new();
560    let _ = stream.read_to_end(&mut buf);
561    let request = String::from_utf8_lossy(&buf);
562    let body = request
563        .split_once("\r\n\r\n")
564        .map(|(_, body)| body)
565        .unwrap_or_default();
566    let line = format_alert_notification(body);
567    append_alert_logs(project_root, &line)?;
568    if let Err(error) = maybe_send_telegram_alert(&line) {
569        eprintln!("grafana alert telegram delivery failed: {error}");
570    }
571    stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok")?;
572    Ok(())
573}
574
575fn append_alert_logs(project_root: &Path, line: &str) -> Result<()> {
576    if let Some(parent) = alerts_log_path(project_root).parent() {
577        std::fs::create_dir_all(parent)?;
578    }
579    let mut alerts = OpenOptions::new()
580        .create(true)
581        .append(true)
582        .open(alerts_log_path(project_root))
583        .with_context(|| {
584            format!(
585                "failed to open alert log at {}",
586                alerts_log_path(project_root).display()
587            )
588        })?;
589    writeln!(alerts, "{line}")?;
590
591    let orchestrator_line = format!("alert: {line}");
592    let plain_path = orchestrator_alert_log_path(project_root);
593    let ansi_path = orchestrator_alert_ansi_log_path(project_root);
594    let mut plain = crate::team::open_log_for_append(&plain_path)?;
595    writeln!(plain, "{orchestrator_line}")?;
596    let mut ansi = crate::team::open_log_for_append(&ansi_path)?;
597    writeln!(ansi, "{orchestrator_line}")?;
598    Ok(())
599}
600
601fn maybe_send_telegram_alert(message: &str) -> Result<()> {
602    let bot_token = match std::env::var("BATTY_TELEGRAM_BOT_TOKEN") {
603        Ok(token) => token,
604        Err(_) => return Ok(()),
605    };
606    let chat_id = match std::env::var("BATTY_GRAFANA_ALERT_CHAT_ID")
607        .or_else(|_| std::env::var("BATTY_TELEGRAM_ALERT_CHAT_ID"))
608    {
609        Ok(chat_id) => chat_id,
610        Err(_) => return Ok(()),
611    };
612
613    let bot = crate::team::telegram::TelegramBot::new(bot_token, Vec::new());
614    bot.send_message(&chat_id, message)
615        .context("failed to send Grafana alert to Telegram")
616}
617
618fn format_alert_notification(body: &str) -> String {
619    let ts = crate::team::now_unix();
620    if let Ok(payload) = serde_json::from_str::<serde_json::Value>(body) {
621        let status = payload
622            .get("status")
623            .and_then(|v| v.as_str())
624            .unwrap_or("unknown");
625        let alerts = payload
626            .get("alerts")
627            .and_then(|v| v.as_array())
628            .cloned()
629            .unwrap_or_default();
630        let names = alerts
631            .iter()
632            .filter_map(|alert| {
633                alert
634                    .get("labels")
635                    .and_then(|labels| labels.get("alertname"))
636                    .and_then(|name| name.as_str())
637            })
638            .collect::<Vec<_>>();
639        if !names.is_empty() {
640            return format!(
641                "[{ts}] status={status} alerts={} raw={}",
642                names.join(","),
643                body.trim()
644            );
645        }
646    }
647    format!("[{ts}] raw={}", body.trim())
648}
649
650fn reload_alerting_provisioning(port: u16) -> Result<()> {
651    let output = ProcessCommand::new("curl")
652        .args([
653            "-sf",
654            "-X",
655            "POST",
656            &format!(
657                "{}/api/admin/provisioning/alerting/reload",
658                grafana_url(port)
659            ),
660            "-u",
661            "admin:admin",
662        ])
663        .output()
664        .context("failed to run curl for alerting reload")?;
665    if !output.status.success() {
666        bail!(
667            "Grafana alerting reload failed with status {}",
668            output.status
669        );
670    }
671    Ok(())
672}
673
674fn run_cmd(program: &str, args: &[&str]) -> Result<()> {
675    let status = ProcessCommand::new(program)
676        .args(args)
677        .status()
678        .with_context(|| format!("failed to run {program} — is it installed?"))?;
679    if !status.success() {
680        bail!("{program} exited with status {status}");
681    }
682    Ok(())
683}
684
685fn check_health(url: &str) -> Result<String> {
686    let output = ProcessCommand::new("curl")
687        .args(["-sf", url])
688        .output()
689        .context("failed to run curl")?;
690    if !output.status.success() {
691        bail!("HTTP request failed (status {})", output.status);
692    }
693    Ok(String::from_utf8_lossy(&output.stdout).to_string())
694}
695
696fn open_browser(url: &str) -> Result<()> {
697    #[cfg(target_os = "macos")]
698    let program = "open";
699    #[cfg(target_os = "linux")]
700    let program = "xdg-open";
701    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
702    let program = "open";
703
704    run_cmd(program, &[url])
705}
706
707#[cfg(test)]
708mod tests {
709    use super::*;
710
711    #[test]
712    fn dashboard_json_valid() {
713        let parsed: serde_json::Value =
714            serde_json::from_str(DASHBOARD_JSON).expect("dashboard.json must be valid JSON");
715        assert!(parsed.is_object(), "root must be an object");
716        assert!(parsed["panels"].is_array(), "panels must be an array");
717    }
718
719    #[test]
720    fn dashboard_has_all_rows() {
721        let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
722        let panels = parsed["panels"].as_array().unwrap();
723        let row_titles: Vec<&str> = panels
724            .iter()
725            .filter(|p| p["type"].as_str() == Some("row"))
726            .filter_map(|p| p["title"].as_str())
727            .collect();
728
729        for expected in REQUIRED_ROWS {
730            assert!(
731                row_titles.contains(expected),
732                "missing row: {expected}. Found: {row_titles:?}"
733            );
734        }
735    }
736
737    #[test]
738    fn required_alerts_match_rule_definitions() {
739        let titles: Vec<&str> = ALERT_RULES.iter().map(|rule| rule.title).collect();
740        for expected in REQUIRED_ALERTS {
741            assert!(titles.contains(expected), "missing alert rule {expected}");
742        }
743        assert_eq!(titles.len(), REQUIRED_ALERTS.len());
744    }
745
746    #[test]
747    fn alert_rules_yaml_contains_expected_rules() {
748        let yaml = render_alert_rules_yaml().unwrap();
749        for expected in REQUIRED_ALERTS {
750            assert!(yaml.contains(expected), "missing {expected} from YAML");
751        }
752        assert!(yaml.contains("pipeline_starvation_detected"));
753        assert!(yaml.contains("task_auto_merged"));
754        assert!(yaml.contains("task_manual_merged"));
755        assert!(yaml.contains("task_reworked"));
756    }
757
758    #[test]
759    fn alert_contact_points_yaml_uses_local_webhook() {
760        let yaml = render_contact_points_yaml(ALERT_WEBHOOK_PORT).unwrap();
761        assert!(yaml.contains("type: webhook"));
762        assert!(yaml.contains("http://127.0.0.1:8787/grafana-alerts"));
763        assert!(yaml.contains(ALERT_CONTACT_POINT));
764    }
765
766    #[test]
767    fn notification_formatting_extracts_alert_names() {
768        let body = r#"{"status":"firing","alerts":[{"labels":{"alertname":"Crash Spike"}},{"labels":{"alertname":"Zero Activity"}}]}"#;
769        let formatted = format_alert_notification(body);
770        assert!(formatted.contains("status=firing"));
771        assert!(formatted.contains("Crash Spike,Zero Activity"));
772    }
773
774    #[test]
775    fn provision_alerting_writes_expected_files() {
776        let tmp = tempfile::tempdir().unwrap();
777        provision_alerting(tmp.path(), 9911).unwrap();
778
779        assert!(tmp.path().join(".batty/alerts.log").exists());
780        assert!(
781            tmp.path()
782                .join(".batty/grafana/alerting/rules.yaml")
783                .exists()
784        );
785        assert!(
786            tmp.path()
787                .join(".batty/grafana/alerting/contact-points.yaml")
788                .exists()
789        );
790        assert!(
791            tmp.path()
792                .join(".batty/grafana/alerting/policies.yaml")
793                .exists()
794        );
795    }
796
797    #[test]
798    fn append_alert_logs_writes_alert_and_orchestrator_logs() {
799        let tmp = tempfile::tempdir().unwrap();
800
801        append_alert_logs(tmp.path(), "[1] status=firing alerts=Crash Spike").unwrap();
802
803        let alerts = std::fs::read_to_string(tmp.path().join(".batty/alerts.log")).unwrap();
804        assert!(alerts.contains("Crash Spike"));
805
806        let orchestrator =
807            std::fs::read_to_string(tmp.path().join(".batty/orchestrator.log")).unwrap();
808        assert!(orchestrator.contains("alert: [1] status=firing alerts=Crash Spike"));
809
810        let orchestrator_ansi =
811            std::fs::read_to_string(tmp.path().join(".batty/orchestrator.ansi.log")).unwrap();
812        assert!(orchestrator_ansi.contains("alert: [1] status=firing alerts=Crash Spike"));
813    }
814
815    #[test]
816    fn dashboard_has_required_panels() {
817        let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
818        let panels = parsed["panels"].as_array().unwrap();
819        let titles: Vec<&str> = panels.iter().filter_map(|p| p["title"].as_str()).collect();
820
821        let required = [
822            "Total Events",
823            "Tasks Completed",
824            "Auto-Merged",
825            "Discord Events Sent",
826            "Merge Queue Depth",
827            "Notification Delivery Latency",
828            "Verification Pass Rate Over Time",
829            "Agent Metrics",
830            "Event Type Distribution",
831            "Recent Completions",
832            "Events Per Hour",
833            "Average Cycle and Lead Time Per Hour",
834            "Tasks Completed Per Hour",
835            "Narration Rejection Rate Per Hour (%)",
836            "Planning Cycles Triggered Per Hour",
837            "Tasks Created Per Planning Cycle",
838            "Planning Cycle Latency",
839            "Planning Cycle Success vs Failure",
840            "Narration Events Detected Per Agent",
841            "Context Pressure Warnings Per Agent",
842            "Board Task Count by Status Over Time",
843            "Tasks Archived Per Hour",
844        ];
845        for expected in required {
846            assert!(
847                titles.contains(&expected),
848                "missing panel: {expected}. Found: {titles:?}"
849            );
850        }
851    }
852
853    #[test]
854    fn dashboard_uses_consistent_datasource_uid() {
855        let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
856        let panels = parsed["panels"].as_array().unwrap();
857        for panel in panels {
858            if let Some(uid) = panel["datasource"]["uid"].as_str() {
859                assert_eq!(
860                    uid,
861                    "batty-telemetry",
862                    "panel '{}' has unexpected datasource uid: {uid}",
863                    panel["title"].as_str().unwrap_or("?")
864                );
865            }
866        }
867    }
868
869    #[test]
870    fn dashboard_alert_count_matches_expected() {
871        assert_eq!(REQUIRED_ALERTS.len(), 4);
872    }
873
874    #[test]
875    fn dashboard_panel_count() {
876        let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
877        let panels = parsed["panels"].as_array().unwrap();
878        let non_row_panels: Vec<_> = panels
879            .iter()
880            .filter(|p| p["type"].as_str() != Some("row"))
881            .collect();
882        assert!(
883            non_row_panels.len() >= 28,
884            "expected at least 28 data panels, got {}",
885            non_row_panels.len()
886        );
887    }
888
889    #[test]
890    fn write_dashboard_creates_file() {
891        let dir = std::env::temp_dir().join("batty_grafana_test");
892        let path = dir.join("dashboard.json");
893        let _ = std::fs::remove_dir_all(&dir);
894
895        write_dashboard(&path).expect("write_dashboard should succeed");
896        assert!(path.exists());
897
898        let content = std::fs::read_to_string(&path).unwrap();
899        let parsed: serde_json::Value = serde_json::from_str(&content).unwrap();
900        assert!(
901            parsed["title"].as_str().is_some(),
902            "dashboard must have a title"
903        );
904
905        let _ = std::fs::remove_dir_all(&dir);
906    }
907
908    #[test]
909    fn grafana_url_default_port() {
910        assert_eq!(grafana_url(3000), "http://localhost:3000");
911    }
912
913    #[test]
914    fn grafana_url_custom_port() {
915        assert_eq!(grafana_url(9090), "http://localhost:9090");
916    }
917
918    #[test]
919    fn default_port_is_3000() {
920        assert_eq!(DEFAULT_PORT, 3000);
921    }
922
923    #[test]
924    fn check_health_unreachable() {
925        let result = check_health("http://localhost:1/api/health");
926        assert!(result.is_err());
927    }
928}