1use crate::{
2 config::AppConfig,
3 events::{Engine, Event},
4 github::{split_repo, CheckRun},
5 hooks,
6 lifecycle::{
7 enrichment::EnrichmentCache,
8 probe::is_pid_alive,
9 },
10 types::{
11 CIStatus, Comment, Notification, NotificationKind, PrId, SessionStatus, PR,
12 },
13};
14use std::{collections::HashMap, sync::Arc, time::Duration};
15use tokio_util::sync::CancellationToken;
16
17pub struct Poller {
18 engine: Arc<Engine>,
19 enrichment_cache: Arc<std::sync::Mutex<EnrichmentCache>>,
20}
21
22impl Poller {
23 pub fn new(engine: Arc<Engine>) -> Self {
24 Self {
25 engine,
26 enrichment_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
27 }
28 }
29
30 pub async fn start(self, token: CancellationToken) {
31 let mut pid_interval = tokio::time::interval(Duration::from_secs(5));
32 let mut github_interval = tokio::time::interval(Duration::from_secs(30));
33 github_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
35 loop {
36 tokio::select! {
37 _ = token.cancelled() => break,
38 _ = pid_interval.tick() => self.poll_pids().await,
39 _ = github_interval.tick() => self.poll_github().await,
40 }
41 }
42 }
43
44 async fn poll_pids(&self) {
47 let Ok(sessions) = self.engine.store.list_sessions() else { return };
48 for mut session in sessions {
49 if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
50 continue;
51 }
52 if let Some(pid) = session.pid {
53 if !is_pid_alive(pid) {
54 session.status = SessionStatus::Terminated;
55 let _ = self.engine.store.upsert_session(&session);
56 self.engine.emit(Event::SessionUpdated(session));
57 continue;
58 }
59 }
60
61 if matches!(session.status, SessionStatus::Working | SessionStatus::Spawning)
63 && session.pr_number.is_none()
64 {
65 let sessions_dir = AppConfig::sessions_dir();
66 if let Ok(meta) = hooks::read_session_metadata(&sessions_dir, &session.id) {
67 if let Some(pr_num) = meta.pr_number {
68 session.pr_number = Some(pr_num);
69 session.status = SessionStatus::PrOpen;
70 let _ = self.engine.store.upsert_session(&session);
71 self.engine.emit(Event::SessionUpdated(session.clone()));
72 tracing::info!(
73 "session {} PR #{pr_num} detected via metadata hook",
74 session.id
75 );
76 }
77 }
78 }
79 }
80 }
81
82 async fn poll_github(&self) {
85 let Some(gh) = &self.engine.github else { return };
86 let Ok(sessions) = self.engine.store.list_sessions() else { return };
87
88 for session in sessions {
89 if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
90 continue;
91 }
92 let Some(pr_number) = session.pr_number else { continue };
93 let Some((owner, repo)) = split_repo(&session.repo) else { continue };
94
95 let pr_status = match gh.get_pr_status(&owner, &repo, pr_number).await {
97 Ok(s) => s,
98 Err(e) => { tracing::warn!("github pr status: {e}"); continue }
99 };
100
101 let pr_id: PrId = pr_number as i64;
102
103 if pr_status.merged && !matches!(session.status, SessionStatus::Done) {
105 self.engine.emit(Event::Notification(Notification {
106 id: format!("merged-{}", session.id),
107 kind: NotificationKind::WorkerDone,
108 title: format!("PR merged — {}", session.name),
109 body: format!("#{} merged successfully", pr_number),
110 session_id: Some(session.id.clone()),
111 }));
112 if let Err(e) = self.engine.cleanup_session(&session.id).await {
113 tracing::warn!("cleanup_session {}: {e}", session.id);
114 }
115 {
117 let mut cache = self.enrichment_cache.lock().unwrap();
118 cache.remove(&session.id);
119 }
120 continue; }
122
123 {
125 let pr = PR {
126 id: pr_id,
127 number: pr_number,
128 title: pr_status.title.clone(),
129 url: format!("https://github.com/{owner}/{repo}/pull/{pr_number}"),
130 body: String::new(),
131 session_id: session.id.clone(),
132 };
133 let _ = self.engine.store.upsert_pr(&pr);
134 self.engine.emit(Event::PrOpened { session_id: session.id.clone(), pr });
135 }
136
137 let checks = match gh.get_ci_checks(&owner, &repo, &pr_status.head_sha).await {
139 Ok(c) => c,
140 Err(e) => { tracing::warn!("github ci checks: {e}"); vec![] }
141 };
142 let ci = summarize_checks(pr_id, &checks);
143 let _ = self.engine.store.upsert_ci_status(&ci);
144 self.engine.emit(Event::CiUpdated { pr_id, status: ci.clone() });
145
146 let (newly_failing, ci_reaction_already_sent) = {
148 let mut cache = self.enrichment_cache.lock().unwrap();
149 let state = cache.entry(session.id.clone()).or_default();
150
151 let newly_failing = state.prev_failing.is_none_or(|p| p == 0)
152 && ci.failing > 0;
153 state.prev_failing = Some(ci.failing);
154
155 let already_sent = state.ci_reaction_sent;
156 if newly_failing && !already_sent {
157 state.ci_reaction_sent = true;
158 }
159 if ci.failing == 0 {
160 state.ci_reaction_sent = false;
161 }
162 (newly_failing, already_sent)
163 };
164
165 if newly_failing && !ci_reaction_already_sent {
166 self.engine.emit(Event::Notification(Notification {
167 id: format!("ci-{}", session.id),
168 kind: NotificationKind::CiFailure,
169 title: format!("CI failing — {}", session.name),
170 body: format!("{}/{} checks failing", ci.failing, ci.total),
171 session_id: Some(session.id.clone()),
172 }));
173 let failing_names: Vec<String> = checks.iter()
175 .filter(|c| c.conclusion.as_deref() == Some("failure")
176 || c.conclusion.as_deref() == Some("timed_out"))
177 .map(|c| c.name.clone())
178 .collect();
179 let msg = crate::lifecycle::reactions::format_ci_reaction(
180 &session, &ci, &failing_names
181 );
182 if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
183 tracing::warn!("send ci reaction to {}: {e}", session.id);
184 }
185 }
186
187 let threads = match gh.get_review_threads(&owner, &repo, pr_number).await {
189 Ok(t) => t,
190 Err(e) => { tracing::warn!("github review threads: {e}"); vec![] }
191 };
192
193 let has_changes_requested = threads.iter().any(|t| t.state == "CHANGES_REQUESTED");
194
195 let (has_new, review_reaction_already_sent, new_comments) = {
196 let mut cache = self.enrichment_cache.lock().unwrap();
197 let state = cache.entry(session.id.clone()).or_default();
198 let mut has_new = false;
199 let mut new_comments: Vec<Comment> = Vec::new();
200
201 for thread in &threads {
202 if thread.state == "CHANGES_REQUESTED"
203 && !state.seen_comment_ids.contains(&thread.id)
204 {
205 state.seen_comment_ids.insert(thread.id);
206 has_new = true;
207 let comment = Comment {
208 id: thread.id,
209 pr_id,
210 author: thread.author.clone(),
211 body: thread.body.clone(),
212 path: thread.path.clone(),
213 line: thread.line,
214 created_at: 0,
215 };
216 let _ = self.engine.store.upsert_comment(&comment);
217 self.engine.emit(Event::ReviewComment { pr_id, comment: comment.clone() });
218 new_comments.push(comment);
219 }
220 }
221
222 let already_sent = state.review_reaction_sent;
223 if has_new && !already_sent {
224 state.review_reaction_sent = true;
225 }
226 if !has_changes_requested {
228 state.review_reaction_sent = false;
229 }
230 (has_new, already_sent, new_comments)
231 };
232
233 let new_status = derive_session_status(&session.status, &pr_status, &ci, has_changes_requested);
235 let mut updated = session.clone();
236 updated.status = new_status;
237 if updated.status != session.status {
238 let _ = self.engine.store.upsert_session(&updated);
239 self.engine.emit(Event::SessionUpdated(updated.clone()));
240 }
241
242 if has_new && !review_reaction_already_sent {
243 self.engine.emit(Event::Notification(Notification {
244 id: format!("review-{}", session.id),
245 kind: NotificationKind::PrNeedsAttention,
246 title: format!("Review comments — {}", session.name),
247 body: "Changes requested on your PR".to_string(),
248 session_id: Some(session.id.clone()),
249 }));
250 if !new_comments.is_empty() {
251 let msg = crate::lifecycle::reactions::format_review_reaction(
252 &session, &new_comments
253 );
254 if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
255 tracing::warn!("send review reaction to {}: {e}", session.id);
256 }
257 }
258 }
259 }
260 }
261}
262
263fn summarize_checks(pr_id: PrId, checks: &[CheckRun]) -> CIStatus {
266 let total = checks.len() as u32;
267 let failing = checks.iter().filter(|c| {
268 c.conclusion.as_deref() == Some("failure")
269 || c.conclusion.as_deref() == Some("timed_out")
270 }).count() as u32;
271 let passing = checks.iter().filter(|c| {
272 c.conclusion.as_deref() == Some("success")
273 }).count() as u32;
274 let pending = total - failing - passing;
275 CIStatus { pr_id, total, failing, passing, pending }
276}
277
278fn derive_session_status(
279 current: &SessionStatus,
280 pr_status: &crate::github::PrStatus,
281 ci: &CIStatus,
282 has_changes_requested: bool,
283) -> SessionStatus {
284 if matches!(current, SessionStatus::Done | SessionStatus::Terminated) {
286 return current.clone();
287 }
288 if pr_status.merged {
289 return SessionStatus::Done;
290 }
291 if ci.failing > 0 {
292 return SessionStatus::CiFailed;
293 }
294 if has_changes_requested {
295 return SessionStatus::ReviewPending;
296 }
297 if pr_status.mergeable == Some(true) && ci.failing == 0 && ci.pending == 0 {
298 return SessionStatus::Mergeable;
299 }
300 SessionStatus::PrOpen
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::types::SessionStatus;
307
308 #[test]
309 fn summarize_checks_counts_failures() {
310 let checks = vec![
311 CheckRun { name: "lint".into(), status: "completed".into(), conclusion: Some("success".into()) },
312 CheckRun { name: "test".into(), status: "completed".into(), conclusion: Some("failure".into()) },
313 CheckRun { name: "build".into(), status: "in_progress".into(), conclusion: None },
314 ];
315 let ci = summarize_checks(1, &checks);
316 assert_eq!(ci.total, 3);
317 assert_eq!(ci.passing, 1);
318 assert_eq!(ci.failing, 1);
319 assert_eq!(ci.pending, 1);
320 }
321
322 #[test]
323 fn derive_status_merged_becomes_done() {
324 let pr = crate::github::PrStatus {
325 merged: true, state: "closed".into(), mergeable: None,
326 title: "t".into(), number: 1, head_sha: String::new(),
327 };
328 let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
329 let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
330 assert!(matches!(s, SessionStatus::Done));
331 }
332
333 #[test]
334 fn derive_status_ci_failure_overrides_open() {
335 let pr = crate::github::PrStatus {
336 merged: false, state: "open".into(), mergeable: Some(true),
337 title: "t".into(), number: 1, head_sha: String::new(),
338 };
339 let ci = CIStatus { pr_id: 1, total: 3, failing: 1, passing: 2, pending: 0 };
340 let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
341 assert!(matches!(s, SessionStatus::CiFailed));
342 }
343
344 #[test]
345 fn derive_status_all_green_becomes_mergeable() {
346 let pr = crate::github::PrStatus {
347 merged: false, state: "open".into(), mergeable: Some(true),
348 title: "t".into(), number: 1, head_sha: String::new(),
349 };
350 let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
351 let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
352 assert!(matches!(s, SessionStatus::Mergeable));
353 }
354
355 #[test]
356 fn derive_status_preserves_done() {
357 let pr = crate::github::PrStatus {
358 merged: false, state: "open".into(), mergeable: Some(true),
359 title: "t".into(), number: 1, head_sha: String::new(),
360 };
361 let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
362 let s = derive_session_status(&SessionStatus::Done, &pr, &ci, false);
363 assert!(matches!(s, SessionStatus::Done));
364 }
365
366 #[test]
367 fn derive_status_preserves_terminated() {
368 let pr = crate::github::PrStatus {
369 merged: true, state: "closed".into(), mergeable: None, title: "t".into(), number: 1, head_sha: String::new(),
371 };
372 let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
373 let s = derive_session_status(&SessionStatus::Terminated, &pr, &ci, false);
374 assert!(matches!(s, SessionStatus::Terminated)); }
376
377 #[test]
378 fn derive_status_changes_requested_becomes_review_pending() {
379 let pr = crate::github::PrStatus {
380 merged: false, state: "open".into(), mergeable: Some(true),
381 title: "t".into(), number: 1, head_sha: String::new(),
382 };
383 let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
384 let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, true);
385 assert!(matches!(s, SessionStatus::ReviewPending));
386 }
387}