Skip to main content

envoy/monitor/
ci.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use crate::event::{EventSeverity, EventType};
7use crate::http::SharedState;
8
9/// Background CI poller. Polls gh CLI every `interval_secs` and emits events on state changes.
10pub async fn run_ci_monitor(
11    state: SharedState,
12    project: String,
13    owner_repo: String,
14    interval_secs: u64,
15) {
16    let last_state = Arc::new(Mutex::new(HashMap::<String, String>::new()));
17
18    loop {
19        tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
20
21        let output = tokio::process::Command::new("gh")
22            .args([
23                "run",
24                "list",
25                "--repo",
26                &owner_repo,
27                "--limit",
28                "5",
29                "--json",
30                "status,conclusion,headBranch,displayTitle,databaseId",
31            ])
32            .output()
33            .await;
34
35        let stdout = match output {
36            Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).to_string(),
37            _ => continue,
38        };
39
40        let runs: Vec<serde_json::Value> = serde_json::from_str(&stdout).unwrap_or_default();
41
42        // Collect changed runs under lock, then release before async work
43        let changed: Vec<(String, String, String, String, String)> = {
44            let mut cache = last_state.lock();
45            let mut changes = Vec::new();
46            for run in &runs {
47                let run_id = run["databaseId"].to_string();
48                let conclusion = run["conclusion"]
49                    .as_str()
50                    .unwrap_or("in_progress")
51                    .to_string();
52                let status = run["status"].as_str().unwrap_or("").to_string();
53                let display_title = run["displayTitle"].as_str().unwrap_or("").to_string();
54                let head_branch = run["headBranch"].as_str().unwrap_or("").to_string();
55
56                let prev = cache.get(&run_id).cloned().unwrap_or_default();
57                if prev != conclusion {
58                    cache.insert(run_id.clone(), conclusion.clone());
59                    changes.push((run_id, conclusion, status, display_title, head_branch));
60                }
61            }
62            changes
63        };
64        // cache lock dropped here — safe to .await
65
66        for (run_id, conclusion, status, display_title, head_branch) in changed {
67            let state_fb = state.clone();
68            let project_fb = project.clone();
69            let project_fb2 = project.clone();
70            let _ = tokio::task::spawn_blocking(move || {
71                let engine = state_fb.engine.lock();
72                let severity = match conclusion.as_str() {
73                    "success" => EventSeverity::Info,
74                    "failure" => EventSeverity::Blocking,
75                    _ => EventSeverity::Info,
76                };
77                if let Ok(event) = state_fb.event_bus.ingest(
78                    engine.graph(),
79                    project_fb,
80                    EventType::CiStatus,
81                    severity,
82                    "ci:github".into(),
83                    format!("CI {}: {}", run_id, conclusion),
84                    serde_json::json!({
85                        "run_id": run_id,
86                        "status": status,
87                        "conclusion": conclusion,
88                        "head_branch": head_branch,
89                        "display_title": display_title,
90                    }),
91                ) {
92                    let event_json = serde_json::to_value(&event).unwrap_or_default();
93                    let subs = state_fb
94                        .subscription_store
95                        .subscribers(engine.graph(), &project_fb2)
96                        .unwrap_or_default();
97                    for agent_id in &subs {
98                        let delivered =
99                            state_fb
100                                .ws_registry
101                                .send_json(agent_id, "ci_event", &event_json);
102                        if delivered {
103                            let _ = state_fb.delivery_tracker.record_delivery(
104                                engine.graph(),
105                                agent_id,
106                                &event.id,
107                            );
108                        }
109                    }
110                }
111            })
112            .await;
113        }
114    }
115}