1use anyhow::{Context, Result, bail};
7use serde_json::json;
8use std::fs::OpenOptions;
9use std::io::{Read, Write};
10use std::net::{TcpListener, TcpStream};
11use std::path::{Path, PathBuf};
12use std::process::{Command as ProcessCommand, Stdio};
13use std::time::Duration;
14
15pub const DASHBOARD_JSON: &str = include_str!("grafana/dashboard.json");
17
18pub const DEFAULT_PORT: u16 = 3000;
20pub const ALERT_WEBHOOK_PORT: u16 = 8787;
22
23pub const REQUIRED_ROWS: &[&str] = &[
25 "Session Overview",
26 "Pipeline Health",
27 "Activity Over Time",
28 "Agent Performance",
29 "Event Breakdown",
30 "Recent Activity",
31 "Throughput Over Time",
32 "Tact Engine",
33 "Health Signals",
34 "Board Health",
35];
36
37pub const REQUIRED_ALERTS: &[&str] = &[
39 "Zero Activity",
40 "Crash Spike",
41 "Dispatch Starvation",
42 "Merge Queue Depth",
43];
44
45const ALERT_RULE_GROUP: &str = "batty-operational-anomalies";
46const ALERT_FOLDER: &str = "Batty";
47const ALERT_CONTACT_POINT: &str = "batty-file-log";
48
49#[derive(Clone, Copy)]
50struct AlertRuleDefinition {
51 uid: &'static str,
52 title: &'static str,
53 severity: &'static str,
54 window_secs: u64,
55 for_duration: &'static str,
56 threshold: f64,
57 sql: &'static str,
58 description: &'static str,
59}
60
61const ALERT_RULES: &[AlertRuleDefinition] = &[
62 AlertRuleDefinition {
63 uid: "batty_zero_activity",
64 title: "Zero Activity",
65 severity: "warning",
66 window_secs: 30 * 60,
67 for_duration: "5m",
68 threshold: 0.5,
69 sql: "SELECT CASE WHEN COUNT(*) = 0 THEN 1 ELSE 0 END AS value \
70 FROM events \
71 WHERE event_type IN ('task_assigned', 'task_completed') \
72 AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
73 description: "No task assignment or completion activity in the last 30 minutes.",
74 },
75 AlertRuleDefinition {
76 uid: "batty_crash_spike",
77 title: "Crash Spike",
78 severity: "critical",
79 window_secs: 10 * 60,
80 for_duration: "2m",
81 threshold: 3.0,
82 sql: "SELECT COUNT(*) AS value \
83 FROM events \
84 WHERE event_type = 'member_crashed' \
85 AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
86 description: "More than three agent crashes in the last 10 minutes.",
87 },
88 AlertRuleDefinition {
89 uid: "batty_dispatch_starvation",
90 title: "Dispatch Starvation",
91 severity: "warning",
92 window_secs: 10 * 60,
93 for_duration: "2m",
94 threshold: 0.5,
95 sql: "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END AS value \
96 FROM events \
97 WHERE event_type = 'pipeline_starvation_detected' \
98 AND timestamp BETWEEN $__from / 1000 AND $__to / 1000;",
99 description: "Idle engineers outnumber runnable work and dispatch is starving the lane.",
100 },
101 AlertRuleDefinition {
102 uid: "batty_merge_queue_depth",
103 title: "Merge Queue Depth",
104 severity: "warning",
105 window_secs: 60 * 60,
106 for_duration: "2m",
107 threshold: 3.0,
108 sql: "SELECT COUNT(*) AS value \
109 FROM task_metrics tm \
110 WHERE tm.completed_at IS NOT NULL \
111 AND NOT EXISTS ( \
112 SELECT 1 \
113 FROM events e \
114 WHERE e.task_id = tm.task_id \
115 AND e.event_type IN ('task_auto_merged', 'task_manual_merged', 'task_reworked') \
116 AND e.timestamp >= tm.completed_at \
117 );",
118 description: "More than three completed tasks are still waiting for merge or rework.",
119 },
120];
121
122pub fn write_dashboard(path: &Path) -> anyhow::Result<()> {
124 if let Some(parent) = path.parent() {
125 std::fs::create_dir_all(parent)?;
126 }
127 std::fs::write(path, DASHBOARD_JSON)?;
128 Ok(())
129}
130
131pub fn grafana_url(port: u16) -> String {
133 format!("http://localhost:{port}")
134}
135
136fn alert_webhook_url(port: u16) -> String {
137 format!("http://127.0.0.1:{port}/grafana-alerts")
138}
139
140fn alerts_log_path(project_root: &Path) -> PathBuf {
141 project_root.join(".batty").join("alerts.log")
142}
143
144fn orchestrator_alert_log_path(project_root: &Path) -> PathBuf {
145 crate::team::orchestrator_log_path(project_root)
146}
147
148fn orchestrator_alert_ansi_log_path(project_root: &Path) -> PathBuf {
149 crate::team::orchestrator_ansi_log_path(project_root)
150}
151
152fn project_alerting_dir(project_root: &Path) -> PathBuf {
153 project_root.join(".batty").join("grafana").join("alerting")
154}
155
156fn grafana_provisioning_dir() -> Option<PathBuf> {
157 if let Ok(dir) = std::env::var("BATTY_GRAFANA_PROVISIONING_DIR") {
158 return Some(PathBuf::from(dir));
159 }
160
161 [
162 "/opt/homebrew/etc/grafana/provisioning",
163 "/usr/local/etc/grafana/provisioning",
164 "/etc/grafana/provisioning",
165 ]
166 .iter()
167 .map(PathBuf::from)
168 .find(|path| path.exists())
169}
170
171fn provisioning_target_dir(project_root: &Path) -> PathBuf {
172 grafana_provisioning_dir().unwrap_or_else(|| project_root.join(".batty").join("grafana"))
173}
174
175pub fn setup(project_root: &Path, port: u16) -> Result<()> {
177 println!("Installing Grafana via Homebrew...");
178 run_cmd("brew", &["install", "grafana"])?;
179
180 println!("Installing SQLite datasource plugin...");
181 let plugin_result = ProcessCommand::new("grafana")
182 .args([
183 "cli",
184 "--homepath",
185 "/opt/homebrew/opt/grafana/share/grafana",
186 "--pluginsDir",
187 "/opt/homebrew/var/lib/grafana/plugins",
188 "plugins",
189 "install",
190 "frser-sqlite-datasource",
191 ])
192 .status();
193 if plugin_result.is_err() || !plugin_result.unwrap().success() {
194 let _ = run_cmd(
195 "grafana-cli",
196 &["plugins", "install", "frser-sqlite-datasource"],
197 );
198 }
199
200 println!("Provisioning alert rules and notification policy...");
201 provision_alerting(project_root, ALERT_WEBHOOK_PORT)?;
202 ensure_alert_webhook_running(project_root, ALERT_WEBHOOK_PORT)?;
203
204 println!("Starting Grafana service...");
205 run_cmd("brew", &["services", "start", "grafana"])?;
206
207 println!("Waiting for Grafana to start...");
208 for _ in 0..10 {
209 std::thread::sleep(Duration::from_secs(1));
210 if check_health(&format!("{}/api/health", grafana_url(port))).is_ok() {
211 break;
212 }
213 }
214
215 let _ = reload_alerting_provisioning(port);
216
217 let db_path = project_root.join(".batty").join("telemetry.db");
218 if db_path.exists() {
219 provision_dashboard(project_root, port)?;
220 } else {
221 println!(
222 "telemetry.db not found at {}. Alerting files were written, but dashboard import \
223will complete after `batty start` creates the database.",
224 db_path.display()
225 );
226 }
227
228 println!("Grafana setup complete. Dashboard at {}", grafana_url(port));
229 Ok(())
230}
231
232pub fn provision_dashboard(project_root: &Path, port: u16) -> Result<()> {
234 let db_path = project_root.join(".batty").join("telemetry.db");
235 if !db_path.exists() {
236 bail!(
237 "telemetry.db not found at {}. Run `batty start` first.",
238 db_path.display()
239 );
240 }
241 let base_url = grafana_url(port);
242
243 let _ = ProcessCommand::new("sqlite3")
244 .args([
245 db_path.to_str().unwrap_or(""),
246 "PRAGMA wal_checkpoint(TRUNCATE);",
247 ])
248 .status();
249
250 println!("Creating SQLite datasource...");
251 let ds_body = format!(
252 r#"{{"name":"Batty Telemetry","uid":"batty-telemetry","type":"frser-sqlite-datasource","access":"proxy","jsonData":{{"path":"{}"}}}}"#,
253 db_path.display()
254 );
255 let ds_result = ProcessCommand::new("curl")
256 .args([
257 "-sf",
258 "-X",
259 "POST",
260 &format!("{base_url}/api/datasources"),
261 "-H",
262 "Content-Type: application/json",
263 "-u",
264 "admin:admin",
265 "-d",
266 &ds_body,
267 ])
268 .output();
269 match ds_result {
270 Ok(out) if out.status.success() => println!("Datasource created."),
271 _ => println!("Datasource may already exist (continuing)."),
272 }
273
274 println!("Importing dashboard...");
275 let dashboard_payload = format!(
276 r#"{{"dashboard":{},"overwrite":true,"folderId":0}}"#,
277 DASHBOARD_JSON
278 );
279 let tmp_file = std::env::temp_dir().join("batty-grafana-import.json");
280 std::fs::write(&tmp_file, &dashboard_payload)?;
281 let import_result = ProcessCommand::new("curl")
282 .args([
283 "-sf",
284 "-X",
285 "POST",
286 &format!("{base_url}/api/dashboards/db"),
287 "-H",
288 "Content-Type: application/json",
289 "-u",
290 "admin:admin",
291 "-d",
292 &format!("@{}", tmp_file.display()),
293 ])
294 .output();
295 let _ = std::fs::remove_file(&tmp_file);
296 match import_result {
297 Ok(out) if out.status.success() => {
298 let body = String::from_utf8_lossy(&out.stdout);
299 if body.contains("\"status\":\"success\"") {
300 println!("Dashboard imported successfully.");
301 } else {
302 println!("Dashboard import response: {body}");
303 }
304 }
305 _ => {
306 println!("Dashboard import may have failed. Check Grafana UI.");
307 }
308 }
309
310 let url = format!("{base_url}/d/batty-project");
311 println!("Dashboard at: {url}");
312 Ok(())
313}
314
315pub fn status(port: u16) -> Result<()> {
317 let url = format!("{}/api/health", grafana_url(port));
318 match check_health(&url) {
319 Ok(body) => {
320 println!("Grafana is running at {}", grafana_url(port));
321 println!("{body}");
322 Ok(())
323 }
324 Err(e) => bail!("Grafana is not reachable at {}: {e}", grafana_url(port)),
325 }
326}
327
328pub fn open(port: u16) -> Result<()> {
330 let url = grafana_url(port);
331 open_browser(&url).context("failed to open browser")?;
332 println!("Opened {url}");
333 Ok(())
334}
335
336pub fn run_alert_webhook(project_root: &Path, port: u16) -> Result<()> {
338 let listener = TcpListener::bind(("127.0.0.1", port))
339 .with_context(|| format!("failed to bind Grafana alert webhook on port {port}"))?;
340 for stream in listener.incoming() {
341 match stream {
342 Ok(stream) => {
343 if let Err(error) = handle_alert_webhook(stream, project_root) {
344 eprintln!("grafana alert webhook error: {error}");
345 }
346 }
347 Err(error) => eprintln!("grafana alert webhook accept failed: {error}"),
348 }
349 }
350 Ok(())
351}
352
353pub fn render_alert_rules_yaml() -> Result<String> {
354 let rules = ALERT_RULES
355 .iter()
356 .map(|rule| {
357 json!({
358 "uid": rule.uid,
359 "title": rule.title,
360 "condition": "B",
361 "data": [
362 {
363 "refId": "A",
364 "datasourceUid": "batty-telemetry",
365 "relativeTimeRange": {
366 "from": rule.window_secs,
367 "to": 0
368 },
369 "model": {
370 "datasource": {
371 "type": "frser-sqlite-datasource",
372 "uid": "batty-telemetry"
373 },
374 "intervalMs": 1000,
375 "maxDataPoints": 43200,
376 "queryText": rule.sql,
377 "queryType": "table",
378 "rawQueryText": rule.sql,
379 "rawSql": rule.sql,
380 "refId": "A"
381 }
382 },
383 {
384 "refId": "B",
385 "datasourceUid": "__expr__",
386 "relativeTimeRange": {
387 "from": rule.window_secs,
388 "to": 0
389 },
390 "model": {
391 "conditions": [
392 {
393 "evaluator": {
394 "params": [rule.threshold],
395 "type": "gt"
396 },
397 "operator": {
398 "type": "and"
399 },
400 "query": {
401 "params": ["A"]
402 },
403 "reducer": {
404 "type": "last"
405 },
406 "type": "query"
407 }
408 ],
409 "datasource": {
410 "type": "__expr__",
411 "uid": "__expr__"
412 },
413 "expression": "A",
414 "intervalMs": 1000,
415 "maxDataPoints": 43200,
416 "refId": "B",
417 "type": "classic_conditions"
418 }
419 }
420 ],
421 "noDataState": "OK",
422 "execErrState": "Alerting",
423 "for": rule.for_duration,
424 "annotations": {
425 "summary": rule.description
426 },
427 "labels": {
428 "severity": rule.severity,
429 "team": "batty"
430 }
431 })
432 })
433 .collect::<Vec<_>>();
434
435 serde_yaml::to_string(&json!({
436 "apiVersion": 1,
437 "groups": [
438 {
439 "orgId": 1,
440 "name": ALERT_RULE_GROUP,
441 "folder": ALERT_FOLDER,
442 "interval": "60s",
443 "rules": rules
444 }
445 ]
446 }))
447 .context("failed to render Grafana alert rules YAML")
448}
449
450fn render_contact_points_yaml(port: u16) -> Result<String> {
451 serde_yaml::to_string(&json!({
452 "apiVersion": 1,
453 "contactPoints": [
454 {
455 "orgId": 1,
456 "name": ALERT_CONTACT_POINT,
457 "receivers": [
458 {
459 "uid": "batty_file_log",
460 "type": "webhook",
461 "disableResolveMessage": false,
462 "settings": {
463 "url": alert_webhook_url(port),
464 "httpMethod": "POST",
465 "title": "{{ template \"default.title\" . }}",
466 "message": "{{ template \"default.message\" . }}"
467 }
468 }
469 ]
470 }
471 ]
472 }))
473 .context("failed to render Grafana contact points YAML")
474}
475
476fn render_notification_policies_yaml() -> Result<String> {
477 serde_yaml::to_string(&json!({
478 "apiVersion": 1,
479 "policies": [
480 {
481 "orgId": 1,
482 "receiver": ALERT_CONTACT_POINT,
483 "group_by": ["alertname", "severity"],
484 "group_wait": "30s",
485 "group_interval": "5m",
486 "repeat_interval": "4h"
487 }
488 ]
489 }))
490 .context("failed to render Grafana notification policy YAML")
491}
492
493fn provision_alerting(project_root: &Path, port: u16) -> Result<()> {
494 let project_dir = project_alerting_dir(project_root);
495 std::fs::create_dir_all(&project_dir)?;
496 if let Some(parent) = alerts_log_path(project_root).parent() {
497 std::fs::create_dir_all(parent)?;
498 }
499 let _ = OpenOptions::new()
500 .create(true)
501 .append(true)
502 .open(alerts_log_path(project_root))?;
503
504 let rules_yaml = render_alert_rules_yaml()?;
505 let contact_points_yaml = render_contact_points_yaml(port)?;
506 let policies_yaml = render_notification_policies_yaml()?;
507
508 write_alerting_file(&project_dir.join("rules.yaml"), &rules_yaml)?;
509 write_alerting_file(
510 &project_dir.join("contact-points.yaml"),
511 &contact_points_yaml,
512 )?;
513 write_alerting_file(&project_dir.join("policies.yaml"), &policies_yaml)?;
514
515 let target_dir = provisioning_target_dir(project_root).join("alerting");
516 std::fs::create_dir_all(&target_dir)?;
517 write_alerting_file(&target_dir.join("batty-rules.yaml"), &rules_yaml)?;
518 write_alerting_file(
519 &target_dir.join("batty-contact-points.yaml"),
520 &contact_points_yaml,
521 )?;
522 write_alerting_file(&target_dir.join("batty-policies.yaml"), &policies_yaml)?;
523 Ok(())
524}
525
526fn write_alerting_file(path: &Path, content: &str) -> Result<()> {
527 if let Some(parent) = path.parent() {
528 std::fs::create_dir_all(parent)?;
529 }
530 std::fs::write(path, content)
531 .with_context(|| format!("failed to write alerting file {}", path.display()))
532}
533
534fn ensure_alert_webhook_running(project_root: &Path, port: u16) -> Result<()> {
535 if TcpStream::connect(("127.0.0.1", port)).is_ok() {
536 return Ok(());
537 }
538
539 let current_exe = std::env::current_exe().context("failed to resolve current batty binary")?;
540 ProcessCommand::new(current_exe)
541 .args([
542 "grafana-webhook",
543 "--project-root",
544 &project_root.display().to_string(),
545 "--port",
546 &port.to_string(),
547 ])
548 .stdin(Stdio::null())
549 .stdout(Stdio::null())
550 .stderr(Stdio::null())
551 .spawn()
552 .context("failed to start Grafana webhook receiver")?;
553 std::thread::sleep(Duration::from_millis(250));
554 Ok(())
555}
556
557fn handle_alert_webhook(mut stream: TcpStream, project_root: &Path) -> Result<()> {
558 stream.set_read_timeout(Some(Duration::from_secs(2)))?;
559 let mut buf = Vec::new();
560 let _ = stream.read_to_end(&mut buf);
561 let request = String::from_utf8_lossy(&buf);
562 let body = request
563 .split_once("\r\n\r\n")
564 .map(|(_, body)| body)
565 .unwrap_or_default();
566 let line = format_alert_notification(body);
567 append_alert_logs(project_root, &line)?;
568 if let Err(error) = maybe_send_telegram_alert(&line) {
569 eprintln!("grafana alert telegram delivery failed: {error}");
570 }
571 stream.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok")?;
572 Ok(())
573}
574
575fn append_alert_logs(project_root: &Path, line: &str) -> Result<()> {
576 if let Some(parent) = alerts_log_path(project_root).parent() {
577 std::fs::create_dir_all(parent)?;
578 }
579 let mut alerts = OpenOptions::new()
580 .create(true)
581 .append(true)
582 .open(alerts_log_path(project_root))
583 .with_context(|| {
584 format!(
585 "failed to open alert log at {}",
586 alerts_log_path(project_root).display()
587 )
588 })?;
589 writeln!(alerts, "{line}")?;
590
591 let orchestrator_line = format!("alert: {line}");
592 let plain_path = orchestrator_alert_log_path(project_root);
593 let ansi_path = orchestrator_alert_ansi_log_path(project_root);
594 let mut plain = crate::team::open_log_for_append(&plain_path)?;
595 writeln!(plain, "{orchestrator_line}")?;
596 let mut ansi = crate::team::open_log_for_append(&ansi_path)?;
597 writeln!(ansi, "{orchestrator_line}")?;
598 Ok(())
599}
600
601fn maybe_send_telegram_alert(message: &str) -> Result<()> {
602 let bot_token = match std::env::var("BATTY_TELEGRAM_BOT_TOKEN") {
603 Ok(token) => token,
604 Err(_) => return Ok(()),
605 };
606 let chat_id = match std::env::var("BATTY_GRAFANA_ALERT_CHAT_ID")
607 .or_else(|_| std::env::var("BATTY_TELEGRAM_ALERT_CHAT_ID"))
608 {
609 Ok(chat_id) => chat_id,
610 Err(_) => return Ok(()),
611 };
612
613 let bot = crate::team::telegram::TelegramBot::new(bot_token, Vec::new());
614 bot.send_message(&chat_id, message)
615 .context("failed to send Grafana alert to Telegram")
616}
617
618fn format_alert_notification(body: &str) -> String {
619 let ts = crate::team::now_unix();
620 if let Ok(payload) = serde_json::from_str::<serde_json::Value>(body) {
621 let status = payload
622 .get("status")
623 .and_then(|v| v.as_str())
624 .unwrap_or("unknown");
625 let alerts = payload
626 .get("alerts")
627 .and_then(|v| v.as_array())
628 .cloned()
629 .unwrap_or_default();
630 let names = alerts
631 .iter()
632 .filter_map(|alert| {
633 alert
634 .get("labels")
635 .and_then(|labels| labels.get("alertname"))
636 .and_then(|name| name.as_str())
637 })
638 .collect::<Vec<_>>();
639 if !names.is_empty() {
640 return format!(
641 "[{ts}] status={status} alerts={} raw={}",
642 names.join(","),
643 body.trim()
644 );
645 }
646 }
647 format!("[{ts}] raw={}", body.trim())
648}
649
650fn reload_alerting_provisioning(port: u16) -> Result<()> {
651 let output = ProcessCommand::new("curl")
652 .args([
653 "-sf",
654 "-X",
655 "POST",
656 &format!(
657 "{}/api/admin/provisioning/alerting/reload",
658 grafana_url(port)
659 ),
660 "-u",
661 "admin:admin",
662 ])
663 .output()
664 .context("failed to run curl for alerting reload")?;
665 if !output.status.success() {
666 bail!(
667 "Grafana alerting reload failed with status {}",
668 output.status
669 );
670 }
671 Ok(())
672}
673
674fn run_cmd(program: &str, args: &[&str]) -> Result<()> {
675 let status = ProcessCommand::new(program)
676 .args(args)
677 .status()
678 .with_context(|| format!("failed to run {program} — is it installed?"))?;
679 if !status.success() {
680 bail!("{program} exited with status {status}");
681 }
682 Ok(())
683}
684
685fn check_health(url: &str) -> Result<String> {
686 let output = ProcessCommand::new("curl")
687 .args(["-sf", url])
688 .output()
689 .context("failed to run curl")?;
690 if !output.status.success() {
691 bail!("HTTP request failed (status {})", output.status);
692 }
693 Ok(String::from_utf8_lossy(&output.stdout).to_string())
694}
695
696fn open_browser(url: &str) -> Result<()> {
697 #[cfg(target_os = "macos")]
698 let program = "open";
699 #[cfg(target_os = "linux")]
700 let program = "xdg-open";
701 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
702 let program = "open";
703
704 run_cmd(program, &[url])
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710
711 #[test]
712 fn dashboard_json_valid() {
713 let parsed: serde_json::Value =
714 serde_json::from_str(DASHBOARD_JSON).expect("dashboard.json must be valid JSON");
715 assert!(parsed.is_object(), "root must be an object");
716 assert!(parsed["panels"].is_array(), "panels must be an array");
717 }
718
719 #[test]
720 fn dashboard_has_all_rows() {
721 let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
722 let panels = parsed["panels"].as_array().unwrap();
723 let row_titles: Vec<&str> = panels
724 .iter()
725 .filter(|p| p["type"].as_str() == Some("row"))
726 .filter_map(|p| p["title"].as_str())
727 .collect();
728
729 for expected in REQUIRED_ROWS {
730 assert!(
731 row_titles.contains(expected),
732 "missing row: {expected}. Found: {row_titles:?}"
733 );
734 }
735 }
736
737 #[test]
738 fn required_alerts_match_rule_definitions() {
739 let titles: Vec<&str> = ALERT_RULES.iter().map(|rule| rule.title).collect();
740 for expected in REQUIRED_ALERTS {
741 assert!(titles.contains(expected), "missing alert rule {expected}");
742 }
743 assert_eq!(titles.len(), REQUIRED_ALERTS.len());
744 }
745
746 #[test]
747 fn alert_rules_yaml_contains_expected_rules() {
748 let yaml = render_alert_rules_yaml().unwrap();
749 for expected in REQUIRED_ALERTS {
750 assert!(yaml.contains(expected), "missing {expected} from YAML");
751 }
752 assert!(yaml.contains("pipeline_starvation_detected"));
753 assert!(yaml.contains("task_auto_merged"));
754 assert!(yaml.contains("task_manual_merged"));
755 assert!(yaml.contains("task_reworked"));
756 }
757
758 #[test]
759 fn alert_contact_points_yaml_uses_local_webhook() {
760 let yaml = render_contact_points_yaml(ALERT_WEBHOOK_PORT).unwrap();
761 assert!(yaml.contains("type: webhook"));
762 assert!(yaml.contains("http://127.0.0.1:8787/grafana-alerts"));
763 assert!(yaml.contains(ALERT_CONTACT_POINT));
764 }
765
766 #[test]
767 fn notification_formatting_extracts_alert_names() {
768 let body = r#"{"status":"firing","alerts":[{"labels":{"alertname":"Crash Spike"}},{"labels":{"alertname":"Zero Activity"}}]}"#;
769 let formatted = format_alert_notification(body);
770 assert!(formatted.contains("status=firing"));
771 assert!(formatted.contains("Crash Spike,Zero Activity"));
772 }
773
774 #[test]
775 fn provision_alerting_writes_expected_files() {
776 let tmp = tempfile::tempdir().unwrap();
777 provision_alerting(tmp.path(), 9911).unwrap();
778
779 assert!(tmp.path().join(".batty/alerts.log").exists());
780 assert!(
781 tmp.path()
782 .join(".batty/grafana/alerting/rules.yaml")
783 .exists()
784 );
785 assert!(
786 tmp.path()
787 .join(".batty/grafana/alerting/contact-points.yaml")
788 .exists()
789 );
790 assert!(
791 tmp.path()
792 .join(".batty/grafana/alerting/policies.yaml")
793 .exists()
794 );
795 }
796
797 #[test]
798 fn append_alert_logs_writes_alert_and_orchestrator_logs() {
799 let tmp = tempfile::tempdir().unwrap();
800
801 append_alert_logs(tmp.path(), "[1] status=firing alerts=Crash Spike").unwrap();
802
803 let alerts = std::fs::read_to_string(tmp.path().join(".batty/alerts.log")).unwrap();
804 assert!(alerts.contains("Crash Spike"));
805
806 let orchestrator =
807 std::fs::read_to_string(tmp.path().join(".batty/orchestrator.log")).unwrap();
808 assert!(orchestrator.contains("alert: [1] status=firing alerts=Crash Spike"));
809
810 let orchestrator_ansi =
811 std::fs::read_to_string(tmp.path().join(".batty/orchestrator.ansi.log")).unwrap();
812 assert!(orchestrator_ansi.contains("alert: [1] status=firing alerts=Crash Spike"));
813 }
814
815 #[test]
816 fn dashboard_has_required_panels() {
817 let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
818 let panels = parsed["panels"].as_array().unwrap();
819 let titles: Vec<&str> = panels.iter().filter_map(|p| p["title"].as_str()).collect();
820
821 let required = [
822 "Total Events",
823 "Tasks Completed",
824 "Auto-Merged",
825 "Discord Events Sent",
826 "Merge Queue Depth",
827 "Notification Delivery Latency",
828 "Verification Pass Rate Over Time",
829 "Agent Metrics",
830 "Event Type Distribution",
831 "Recent Completions",
832 "Events Per Hour",
833 "Average Cycle and Lead Time Per Hour",
834 "Tasks Completed Per Hour",
835 "Narration Rejection Rate Per Hour (%)",
836 "Planning Cycles Triggered Per Hour",
837 "Tasks Created Per Planning Cycle",
838 "Planning Cycle Latency",
839 "Planning Cycle Success vs Failure",
840 "Narration Events Detected Per Agent",
841 "Context Pressure Warnings Per Agent",
842 "Board Task Count by Status Over Time",
843 "Tasks Archived Per Hour",
844 ];
845 for expected in required {
846 assert!(
847 titles.contains(&expected),
848 "missing panel: {expected}. Found: {titles:?}"
849 );
850 }
851 }
852
853 #[test]
854 fn dashboard_uses_consistent_datasource_uid() {
855 let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
856 let panels = parsed["panels"].as_array().unwrap();
857 for panel in panels {
858 if let Some(uid) = panel["datasource"]["uid"].as_str() {
859 assert_eq!(
860 uid,
861 "batty-telemetry",
862 "panel '{}' has unexpected datasource uid: {uid}",
863 panel["title"].as_str().unwrap_or("?")
864 );
865 }
866 }
867 }
868
869 #[test]
870 fn dashboard_alert_count_matches_expected() {
871 assert_eq!(REQUIRED_ALERTS.len(), 4);
872 }
873
874 #[test]
875 fn dashboard_panel_count() {
876 let parsed: serde_json::Value = serde_json::from_str(DASHBOARD_JSON).unwrap();
877 let panels = parsed["panels"].as_array().unwrap();
878 let non_row_panels: Vec<_> = panels
879 .iter()
880 .filter(|p| p["type"].as_str() != Some("row"))
881 .collect();
882 assert!(
883 non_row_panels.len() >= 28,
884 "expected at least 28 data panels, got {}",
885 non_row_panels.len()
886 );
887 }
888
889 #[test]
890 fn write_dashboard_creates_file() {
891 let dir = std::env::temp_dir().join("batty_grafana_test");
892 let path = dir.join("dashboard.json");
893 let _ = std::fs::remove_dir_all(&dir);
894
895 write_dashboard(&path).expect("write_dashboard should succeed");
896 assert!(path.exists());
897
898 let content = std::fs::read_to_string(&path).unwrap();
899 let parsed: serde_json::Value = serde_json::from_str(&content).unwrap();
900 assert!(
901 parsed["title"].as_str().is_some(),
902 "dashboard must have a title"
903 );
904
905 let _ = std::fs::remove_dir_all(&dir);
906 }
907
908 #[test]
909 fn grafana_url_default_port() {
910 assert_eq!(grafana_url(3000), "http://localhost:3000");
911 }
912
913 #[test]
914 fn grafana_url_custom_port() {
915 assert_eq!(grafana_url(9090), "http://localhost:9090");
916 }
917
918 #[test]
919 fn default_port_is_3000() {
920 assert_eq!(DEFAULT_PORT, 3000);
921 }
922
923 #[test]
924 fn check_health_unreachable() {
925 let result = check_health("http://localhost:1/api/health");
926 assert!(result.is_err());
927 }
928}