Skip to main content

ninox_core/lifecycle/
poller.rs

1use crate::{
2    config::AppConfig,
3    events::{Engine, Event},
4    github::{split_repo, CheckRun},
5    hooks,
6    lifecycle::{
7        enrichment::EnrichmentCache,
8        probe::is_pid_alive,
9    },
10    types::{
11        CIStatus, Comment, Notification, NotificationKind, PrId, SessionStatus, PR,
12    },
13};
14use std::{collections::HashMap, sync::Arc, time::Duration};
15use tokio_util::sync::CancellationToken;
16
17pub struct Poller {
18    engine:           Arc<Engine>,
19    enrichment_cache: Arc<std::sync::Mutex<EnrichmentCache>>,
20}
21
22impl Poller {
23    pub fn new(engine: Arc<Engine>) -> Self {
24        Self {
25            engine,
26            enrichment_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
27        }
28    }
29
30    pub async fn start(self, token: CancellationToken) {
31        let mut pid_interval    = tokio::time::interval(Duration::from_secs(5));
32        let mut github_interval = tokio::time::interval(Duration::from_secs(30));
33        // Prevent a missed tick from causing back-to-back polls.
34        github_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
35        loop {
36            tokio::select! {
37                _ = token.cancelled()      => break,
38                _ = pid_interval.tick()    => self.poll_pids().await,
39                _ = github_interval.tick() => self.poll_github().await,
40            }
41        }
42    }
43
44    // ── PID liveness ────────────────────────────────────────────────────────
45
46    async fn poll_pids(&self) {
47        let Ok(sessions) = self.engine.store.list_sessions() else { return };
48        for mut session in sessions {
49            if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
50                continue;
51            }
52            if let Some(pid) = session.pid {
53                if !is_pid_alive(pid) {
54                    session.status = SessionStatus::Terminated;
55                    let _ = self.engine.store.upsert_session(&session);
56                    self.engine.emit(Event::SessionUpdated(session));
57                    continue;
58                }
59            }
60
61            // Poll metadata files for PR number on working sessions that have none yet.
62            if matches!(session.status, SessionStatus::Working | SessionStatus::Spawning)
63                && session.pr_number.is_none()
64            {
65                let sessions_dir = AppConfig::sessions_dir();
66                if let Ok(meta) = hooks::read_session_metadata(&sessions_dir, &session.id) {
67                    if let Some(pr_num) = meta.pr_number {
68                        session.pr_number = Some(pr_num);
69                        session.status    = SessionStatus::PrOpen;
70                        let _ = self.engine.store.upsert_session(&session);
71                        self.engine.emit(Event::SessionUpdated(session.clone()));
72                        tracing::info!(
73                            "session {} PR #{pr_num} detected via metadata hook",
74                            session.id
75                        );
76                    }
77                }
78            }
79        }
80    }
81
82    // ── GitHub enrichment ────────────────────────────────────────────────────
83
84    async fn poll_github(&self) {
85        let Some(gh) = &self.engine.github else { return };
86        let Ok(sessions) = self.engine.store.list_sessions() else { return };
87
88        for session in sessions {
89            if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
90                continue;
91            }
92            let Some(pr_number) = session.pr_number else { continue };
93            let Some((owner, repo)) = split_repo(&session.repo) else { continue };
94
95            // -- PR state --
96            let pr_status = match gh.get_pr_status(&owner, &repo, pr_number).await {
97                Ok(s)  => s,
98                Err(e) => { tracing::warn!("github pr status: {e}"); continue }
99            };
100
101            let pr_id: PrId = pr_number as i64;
102
103            // -- Merge detection — handle before CI (no point polling CI on merged PR) --
104            if pr_status.merged && !matches!(session.status, SessionStatus::Done) {
105                self.engine.emit(Event::Notification(Notification {
106                    id:         format!("merged-{}", session.id),
107                    kind:       NotificationKind::WorkerDone,
108                    title:      format!("PR merged — {}", session.name),
109                    body:       format!("#{} merged successfully", pr_number),
110                    session_id: Some(session.id.clone()),
111                }));
112                if let Err(e) = self.engine.cleanup_session(&session.id).await {
113                    tracing::warn!("cleanup_session {}: {e}", session.id);
114                }
115                // Remove enrichment state for this session — it's done
116                {
117                    let mut cache = self.enrichment_cache.lock().unwrap();
118                    cache.remove(&session.id);
119                }
120                continue; // skip further enrichment for this session
121            }
122
123            // Upsert PR record — only when not merged (merged sessions stay Done after cleanup)
124            {
125                let pr = PR {
126                    id:         pr_id,
127                    number:     pr_number,
128                    title:      pr_status.title.clone(),
129                    url:        format!("https://github.com/{owner}/{repo}/pull/{pr_number}"),
130                    body:       String::new(),
131                    session_id: session.id.clone(),
132                };
133                let _ = self.engine.store.upsert_pr(&pr);
134                self.engine.emit(Event::PrOpened { session_id: session.id.clone(), pr });
135            }
136
137            // -- CI checks --
138            let checks = match gh.get_ci_checks(&owner, &repo, &pr_status.head_sha).await {
139                Ok(c)  => c,
140                Err(e) => { tracing::warn!("github ci checks: {e}"); vec![] }
141            };
142            let ci = summarize_checks(pr_id, &checks);
143            let _ = self.engine.store.upsert_ci_status(&ci);
144            self.engine.emit(Event::CiUpdated { pr_id, status: ci.clone() });
145
146            // -- Detect CI transition and update session status --
147            let (newly_failing, ci_reaction_already_sent) = {
148                let mut cache = self.enrichment_cache.lock().unwrap();
149                let state = cache.entry(session.id.clone()).or_default();
150
151                let newly_failing = state.prev_failing.is_none_or(|p| p == 0)
152                    && ci.failing > 0;
153                state.prev_failing = Some(ci.failing);
154
155                let already_sent = state.ci_reaction_sent;
156                if newly_failing && !already_sent {
157                    state.ci_reaction_sent = true;
158                }
159                if ci.failing == 0 {
160                    state.ci_reaction_sent = false;
161                }
162                (newly_failing, already_sent)
163            };
164
165            if newly_failing && !ci_reaction_already_sent {
166                self.engine.emit(Event::Notification(Notification {
167                    id:         format!("ci-{}", session.id),
168                    kind:       NotificationKind::CiFailure,
169                    title:      format!("CI failing — {}", session.name),
170                    body:       format!("{}/{} checks failing", ci.failing, ci.total),
171                    session_id: Some(session.id.clone()),
172                }));
173                // Send reaction to the agent in the tmux session
174                let failing_names: Vec<String> = checks.iter()
175                    .filter(|c| c.conclusion.as_deref() == Some("failure")
176                             || c.conclusion.as_deref() == Some("timed_out"))
177                    .map(|c| c.name.clone())
178                    .collect();
179                let msg = crate::lifecycle::reactions::format_ci_reaction(
180                    &session, &ci, &failing_names
181                );
182                if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
183                    tracing::warn!("send ci reaction to {}: {e}", session.id);
184                }
185            }
186
187            // -- Review threads (throttled via seen_comment_ids) --
188            let threads = match gh.get_review_threads(&owner, &repo, pr_number).await {
189                Ok(t)  => t,
190                Err(e) => { tracing::warn!("github review threads: {e}"); vec![] }
191            };
192
193            let has_changes_requested = threads.iter().any(|t| t.state == "CHANGES_REQUESTED");
194
195            let (has_new, review_reaction_already_sent, new_comments) = {
196                let mut cache = self.enrichment_cache.lock().unwrap();
197                let state = cache.entry(session.id.clone()).or_default();
198                let mut has_new = false;
199                let mut new_comments: Vec<Comment> = Vec::new();
200
201                for thread in &threads {
202                    if thread.state == "CHANGES_REQUESTED"
203                        && !state.seen_comment_ids.contains(&thread.id)
204                    {
205                        state.seen_comment_ids.insert(thread.id);
206                        has_new = true;
207                        let comment = Comment {
208                            id:         thread.id,
209                            pr_id,
210                            author:     thread.author.clone(),
211                            body:       thread.body.clone(),
212                            path:       thread.path.clone(),
213                            line:       thread.line,
214                            created_at: 0,
215                        };
216                        let _ = self.engine.store.upsert_comment(&comment);
217                        self.engine.emit(Event::ReviewComment { pr_id, comment: comment.clone() });
218                        new_comments.push(comment);
219                    }
220                }
221
222                let already_sent = state.review_reaction_sent;
223                if has_new && !already_sent {
224                    state.review_reaction_sent = true;
225                }
226                // Reset when all CHANGES_REQUESTED are resolved
227                if !has_changes_requested {
228                    state.review_reaction_sent = false;
229                }
230                (has_new, already_sent, new_comments)
231            };
232
233            // Update session status in DB (after review threads so has_changes_requested is known)
234            let new_status = derive_session_status(&session.status, &pr_status, &ci, has_changes_requested);
235            let mut updated = session.clone();
236            updated.status = new_status;
237            if updated.status != session.status {
238                let _ = self.engine.store.upsert_session(&updated);
239                self.engine.emit(Event::SessionUpdated(updated.clone()));
240            }
241
242            if has_new && !review_reaction_already_sent {
243                self.engine.emit(Event::Notification(Notification {
244                    id:         format!("review-{}", session.id),
245                    kind:       NotificationKind::PrNeedsAttention,
246                    title:      format!("Review comments — {}", session.name),
247                    body:       "Changes requested on your PR".to_string(),
248                    session_id: Some(session.id.clone()),
249                }));
250                if !new_comments.is_empty() {
251                    let msg = crate::lifecycle::reactions::format_review_reaction(
252                        &session, &new_comments
253                    );
254                    if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
255                        tracing::warn!("send review reaction to {}: {e}", session.id);
256                    }
257                }
258            }
259        }
260    }
261}
262
263// ── Helpers ──────────────────────────────────────────────────────────────────
264
265fn summarize_checks(pr_id: PrId, checks: &[CheckRun]) -> CIStatus {
266    let total   = checks.len() as u32;
267    let failing = checks.iter().filter(|c| {
268        c.conclusion.as_deref() == Some("failure")
269            || c.conclusion.as_deref() == Some("timed_out")
270    }).count() as u32;
271    let passing = checks.iter().filter(|c| {
272        c.conclusion.as_deref() == Some("success")
273    }).count() as u32;
274    let pending = total - failing - passing;
275    CIStatus { pr_id, total, failing, passing, pending }
276}
277
278fn derive_session_status(
279    current:               &SessionStatus,
280    pr_status:             &crate::github::PrStatus,
281    ci:                    &CIStatus,
282    has_changes_requested: bool,
283) -> SessionStatus {
284    // Terminal states are never overwritten.
285    if matches!(current, SessionStatus::Done | SessionStatus::Terminated) {
286        return current.clone();
287    }
288    if pr_status.merged {
289        return SessionStatus::Done;
290    }
291    if ci.failing > 0 {
292        return SessionStatus::CiFailed;
293    }
294    if has_changes_requested {
295        return SessionStatus::ReviewPending;
296    }
297    if pr_status.mergeable == Some(true) && ci.failing == 0 && ci.pending == 0 {
298        return SessionStatus::Mergeable;
299    }
300    SessionStatus::PrOpen
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::types::SessionStatus;
307
308    #[test]
309    fn summarize_checks_counts_failures() {
310        let checks = vec![
311            CheckRun { name: "lint".into(), status: "completed".into(), conclusion: Some("success".into()) },
312            CheckRun { name: "test".into(), status: "completed".into(), conclusion: Some("failure".into()) },
313            CheckRun { name: "build".into(), status: "in_progress".into(), conclusion: None },
314        ];
315        let ci = summarize_checks(1, &checks);
316        assert_eq!(ci.total,   3);
317        assert_eq!(ci.passing, 1);
318        assert_eq!(ci.failing, 1);
319        assert_eq!(ci.pending, 1);
320    }
321
322    #[test]
323    fn derive_status_merged_becomes_done() {
324        let pr = crate::github::PrStatus {
325            merged: true, state: "closed".into(), mergeable: None,
326            title: "t".into(), number: 1, head_sha: String::new(),
327        };
328        let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
329        let s  = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
330        assert!(matches!(s, SessionStatus::Done));
331    }
332
333    #[test]
334    fn derive_status_ci_failure_overrides_open() {
335        let pr = crate::github::PrStatus {
336            merged: false, state: "open".into(), mergeable: Some(true),
337            title: "t".into(), number: 1, head_sha: String::new(),
338        };
339        let ci = CIStatus { pr_id: 1, total: 3, failing: 1, passing: 2, pending: 0 };
340        let s  = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
341        assert!(matches!(s, SessionStatus::CiFailed));
342    }
343
344    #[test]
345    fn derive_status_all_green_becomes_mergeable() {
346        let pr = crate::github::PrStatus {
347            merged: false, state: "open".into(), mergeable: Some(true),
348            title: "t".into(), number: 1, head_sha: String::new(),
349        };
350        let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
351        let s  = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
352        assert!(matches!(s, SessionStatus::Mergeable));
353    }
354
355    #[test]
356    fn derive_status_preserves_done() {
357        let pr = crate::github::PrStatus {
358            merged: false, state: "open".into(), mergeable: Some(true),
359            title: "t".into(), number: 1, head_sha: String::new(),
360        };
361        let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
362        let s  = derive_session_status(&SessionStatus::Done, &pr, &ci, false);
363        assert!(matches!(s, SessionStatus::Done));
364    }
365
366    #[test]
367    fn derive_status_preserves_terminated() {
368        let pr = crate::github::PrStatus {
369            merged: true, state: "closed".into(), mergeable: None,   // merged=true!
370            title: "t".into(), number: 1, head_sha: String::new(),
371        };
372        let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
373        let s  = derive_session_status(&SessionStatus::Terminated, &pr, &ci, false);
374        assert!(matches!(s, SessionStatus::Terminated));  // must not become Done
375    }
376
377    #[test]
378    fn derive_status_changes_requested_becomes_review_pending() {
379        let pr = crate::github::PrStatus {
380            merged: false, state: "open".into(), mergeable: Some(true),
381            title: "t".into(), number: 1, head_sha: String::new(),
382        };
383        let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
384        let s  = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, true);
385        assert!(matches!(s, SessionStatus::ReviewPending));
386    }
387}