1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use crate::event::{EventSeverity, EventType};
7use crate::http::SharedState;
8
9pub async fn run_ci_monitor(
11 state: SharedState,
12 project: String,
13 owner_repo: String,
14 interval_secs: u64,
15) {
16 let last_state = Arc::new(Mutex::new(HashMap::<String, String>::new()));
17
18 loop {
19 tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
20
21 let output = tokio::process::Command::new("gh")
22 .args([
23 "run",
24 "list",
25 "--repo",
26 &owner_repo,
27 "--limit",
28 "5",
29 "--json",
30 "status,conclusion,headBranch,displayTitle,databaseId",
31 ])
32 .output()
33 .await;
34
35 let stdout = match output {
36 Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).to_string(),
37 _ => continue,
38 };
39
40 let runs: Vec<serde_json::Value> = serde_json::from_str(&stdout).unwrap_or_default();
41
42 let changed: Vec<(String, String, String, String, String)> = {
44 let mut cache = last_state.lock();
45 let mut changes = Vec::new();
46 for run in &runs {
47 let run_id = run["databaseId"].to_string();
48 let conclusion = run["conclusion"]
49 .as_str()
50 .unwrap_or("in_progress")
51 .to_string();
52 let status = run["status"].as_str().unwrap_or("").to_string();
53 let display_title = run["displayTitle"].as_str().unwrap_or("").to_string();
54 let head_branch = run["headBranch"].as_str().unwrap_or("").to_string();
55
56 let prev = cache.get(&run_id).cloned().unwrap_or_default();
57 if prev != conclusion {
58 cache.insert(run_id.clone(), conclusion.clone());
59 changes.push((run_id, conclusion, status, display_title, head_branch));
60 }
61 }
62 changes
63 };
64 for (run_id, conclusion, status, display_title, head_branch) in changed {
67 let state_fb = state.clone();
68 let project_fb = project.clone();
69 let project_fb2 = project.clone();
70 let _ = tokio::task::spawn_blocking(move || {
71 let engine = state_fb.engine.lock();
72 let severity = match conclusion.as_str() {
73 "success" => EventSeverity::Info,
74 "failure" => EventSeverity::Blocking,
75 _ => EventSeverity::Info,
76 };
77 if let Ok(event) = state_fb.event_bus.ingest(
78 engine.graph(),
79 project_fb,
80 EventType::CiStatus,
81 severity,
82 "ci:github".into(),
83 format!("CI {}: {}", run_id, conclusion),
84 serde_json::json!({
85 "run_id": run_id,
86 "status": status,
87 "conclusion": conclusion,
88 "head_branch": head_branch,
89 "display_title": display_title,
90 }),
91 ) {
92 let event_json = serde_json::to_value(&event).unwrap_or_default();
93 let subs = state_fb
94 .subscription_store
95 .subscribers(engine.graph(), &project_fb2)
96 .unwrap_or_default();
97 for agent_id in &subs {
98 let delivered =
99 state_fb
100 .ws_registry
101 .send_json(agent_id, "ci_event", &event_json);
102 if delivered {
103 let _ = state_fb.delivery_tracker.record_delivery(
104 engine.graph(),
105 agent_id,
106 &event.id,
107 );
108 }
109 }
110 }
111 })
112 .await;
113 }
114 }
115}