use crate::{
config::AppConfig,
events::{Engine, Event},
github::{split_repo, CheckRun},
hooks,
lifecycle::{
enrichment::EnrichmentCache,
probe::is_pid_alive,
},
types::{
CIStatus, Comment, Notification, NotificationKind, PrId, SessionStatus, PR,
},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
pub struct Poller {
engine: Arc<Engine>,
enrichment_cache: Arc<std::sync::Mutex<EnrichmentCache>>,
}
impl Poller {
pub fn new(engine: Arc<Engine>) -> Self {
Self {
engine,
enrichment_cache: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
pub async fn start(self, token: CancellationToken) {
let mut pid_interval = tokio::time::interval(Duration::from_secs(5));
let mut github_interval = tokio::time::interval(Duration::from_secs(30));
github_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = pid_interval.tick() => self.poll_pids().await,
_ = github_interval.tick() => self.poll_github().await,
}
}
}
async fn poll_pids(&self) {
let Ok(sessions) = self.engine.store.list_sessions() else { return };
for mut session in sessions {
if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
continue;
}
if let Some(pid) = session.pid {
if !is_pid_alive(pid) {
session.status = SessionStatus::Terminated;
let _ = self.engine.store.upsert_session(&session);
self.engine.emit(Event::SessionUpdated(session));
continue;
}
}
if matches!(session.status, SessionStatus::Working | SessionStatus::Spawning)
&& session.pr_number.is_none()
{
let sessions_dir = AppConfig::sessions_dir();
if let Ok(meta) = hooks::read_session_metadata(&sessions_dir, &session.id) {
if let Some(pr_num) = meta.pr_number {
session.pr_number = Some(pr_num);
session.status = SessionStatus::PrOpen;
let _ = self.engine.store.upsert_session(&session);
self.engine.emit(Event::SessionUpdated(session.clone()));
tracing::info!(
"session {} PR #{pr_num} detected via metadata hook",
session.id
);
}
}
}
}
}
async fn poll_github(&self) {
let Some(gh) = &self.engine.github else { return };
let Ok(sessions) = self.engine.store.list_sessions() else { return };
for session in sessions {
if matches!(session.status, SessionStatus::Done | SessionStatus::Terminated) {
continue;
}
let Some(pr_number) = session.pr_number else { continue };
let Some((owner, repo)) = split_repo(&session.repo) else { continue };
let pr_status = match gh.get_pr_status(&owner, &repo, pr_number).await {
Ok(s) => s,
Err(e) => { tracing::warn!("github pr status: {e}"); continue }
};
let pr_id: PrId = pr_number as i64;
if pr_status.merged && !matches!(session.status, SessionStatus::Done) {
self.engine.emit(Event::Notification(Notification {
id: format!("merged-{}", session.id),
kind: NotificationKind::WorkerDone,
title: format!("PR merged — {}", session.name),
body: format!("#{} merged successfully", pr_number),
session_id: Some(session.id.clone()),
}));
if let Err(e) = self.engine.cleanup_session(&session.id).await {
tracing::warn!("cleanup_session {}: {e}", session.id);
}
{
let mut cache = self.enrichment_cache.lock().unwrap();
cache.remove(&session.id);
}
continue; }
{
let pr = PR {
id: pr_id,
number: pr_number,
title: pr_status.title.clone(),
url: format!("https://github.com/{owner}/{repo}/pull/{pr_number}"),
body: String::new(),
session_id: session.id.clone(),
};
let _ = self.engine.store.upsert_pr(&pr);
self.engine.emit(Event::PrOpened { session_id: session.id.clone(), pr });
}
let checks = match gh.get_ci_checks(&owner, &repo, &pr_status.head_sha).await {
Ok(c) => c,
Err(e) => { tracing::warn!("github ci checks: {e}"); vec![] }
};
let ci = summarize_checks(pr_id, &checks);
let _ = self.engine.store.upsert_ci_status(&ci);
self.engine.emit(Event::CiUpdated { pr_id, status: ci.clone() });
let (newly_failing, ci_reaction_already_sent) = {
let mut cache = self.enrichment_cache.lock().unwrap();
let state = cache.entry(session.id.clone()).or_default();
let newly_failing = state.prev_failing.is_none_or(|p| p == 0)
&& ci.failing > 0;
state.prev_failing = Some(ci.failing);
let already_sent = state.ci_reaction_sent;
if newly_failing && !already_sent {
state.ci_reaction_sent = true;
}
if ci.failing == 0 {
state.ci_reaction_sent = false;
}
(newly_failing, already_sent)
};
if newly_failing && !ci_reaction_already_sent {
self.engine.emit(Event::Notification(Notification {
id: format!("ci-{}", session.id),
kind: NotificationKind::CiFailure,
title: format!("CI failing — {}", session.name),
body: format!("{}/{} checks failing", ci.failing, ci.total),
session_id: Some(session.id.clone()),
}));
let failing_names: Vec<String> = checks.iter()
.filter(|c| c.conclusion.as_deref() == Some("failure")
|| c.conclusion.as_deref() == Some("timed_out"))
.map(|c| c.name.clone())
.collect();
let msg = crate::lifecycle::reactions::format_ci_reaction(
&session, &ci, &failing_names
);
if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
tracing::warn!("send ci reaction to {}: {e}", session.id);
}
}
let threads = match gh.get_review_threads(&owner, &repo, pr_number).await {
Ok(t) => t,
Err(e) => { tracing::warn!("github review threads: {e}"); vec![] }
};
let has_changes_requested = threads.iter().any(|t| t.state == "CHANGES_REQUESTED");
let (has_new, review_reaction_already_sent, new_comments) = {
let mut cache = self.enrichment_cache.lock().unwrap();
let state = cache.entry(session.id.clone()).or_default();
let mut has_new = false;
let mut new_comments: Vec<Comment> = Vec::new();
for thread in &threads {
if thread.state == "CHANGES_REQUESTED"
&& !state.seen_comment_ids.contains(&thread.id)
{
state.seen_comment_ids.insert(thread.id);
has_new = true;
let comment = Comment {
id: thread.id,
pr_id,
author: thread.author.clone(),
body: thread.body.clone(),
path: thread.path.clone(),
line: thread.line,
created_at: 0,
};
let _ = self.engine.store.upsert_comment(&comment);
self.engine.emit(Event::ReviewComment { pr_id, comment: comment.clone() });
new_comments.push(comment);
}
}
let already_sent = state.review_reaction_sent;
if has_new && !already_sent {
state.review_reaction_sent = true;
}
if !has_changes_requested {
state.review_reaction_sent = false;
}
(has_new, already_sent, new_comments)
};
let new_status = derive_session_status(&session.status, &pr_status, &ci, has_changes_requested);
let mut updated = session.clone();
updated.status = new_status;
if updated.status != session.status {
let _ = self.engine.store.upsert_session(&updated);
self.engine.emit(Event::SessionUpdated(updated.clone()));
}
if has_new && !review_reaction_already_sent {
self.engine.emit(Event::Notification(Notification {
id: format!("review-{}", session.id),
kind: NotificationKind::PrNeedsAttention,
title: format!("Review comments — {}", session.name),
body: "Changes requested on your PR".to_string(),
session_id: Some(session.id.clone()),
}));
if !new_comments.is_empty() {
let msg = crate::lifecycle::reactions::format_review_reaction(
&session, &new_comments
);
if let Err(e) = self.engine.send_to_session(&session.id, &msg).await {
tracing::warn!("send review reaction to {}: {e}", session.id);
}
}
}
}
}
}
fn summarize_checks(pr_id: PrId, checks: &[CheckRun]) -> CIStatus {
let total = checks.len() as u32;
let failing = checks.iter().filter(|c| {
c.conclusion.as_deref() == Some("failure")
|| c.conclusion.as_deref() == Some("timed_out")
}).count() as u32;
let passing = checks.iter().filter(|c| {
c.conclusion.as_deref() == Some("success")
}).count() as u32;
let pending = total - failing - passing;
CIStatus { pr_id, total, failing, passing, pending }
}
fn derive_session_status(
current: &SessionStatus,
pr_status: &crate::github::PrStatus,
ci: &CIStatus,
has_changes_requested: bool,
) -> SessionStatus {
if matches!(current, SessionStatus::Done | SessionStatus::Terminated) {
return current.clone();
}
if pr_status.merged {
return SessionStatus::Done;
}
if ci.failing > 0 {
return SessionStatus::CiFailed;
}
if has_changes_requested {
return SessionStatus::ReviewPending;
}
if pr_status.mergeable == Some(true) && ci.failing == 0 && ci.pending == 0 {
return SessionStatus::Mergeable;
}
SessionStatus::PrOpen
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::SessionStatus;
#[test]
fn summarize_checks_counts_failures() {
let checks = vec![
CheckRun { name: "lint".into(), status: "completed".into(), conclusion: Some("success".into()) },
CheckRun { name: "test".into(), status: "completed".into(), conclusion: Some("failure".into()) },
CheckRun { name: "build".into(), status: "in_progress".into(), conclusion: None },
];
let ci = summarize_checks(1, &checks);
assert_eq!(ci.total, 3);
assert_eq!(ci.passing, 1);
assert_eq!(ci.failing, 1);
assert_eq!(ci.pending, 1);
}
#[test]
fn derive_status_merged_becomes_done() {
let pr = crate::github::PrStatus {
merged: true, state: "closed".into(), mergeable: None,
title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
assert!(matches!(s, SessionStatus::Done));
}
#[test]
fn derive_status_ci_failure_overrides_open() {
let pr = crate::github::PrStatus {
merged: false, state: "open".into(), mergeable: Some(true),
title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 3, failing: 1, passing: 2, pending: 0 };
let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
assert!(matches!(s, SessionStatus::CiFailed));
}
#[test]
fn derive_status_all_green_becomes_mergeable() {
let pr = crate::github::PrStatus {
merged: false, state: "open".into(), mergeable: Some(true),
title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, false);
assert!(matches!(s, SessionStatus::Mergeable));
}
#[test]
fn derive_status_preserves_done() {
let pr = crate::github::PrStatus {
merged: false, state: "open".into(), mergeable: Some(true),
title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
let s = derive_session_status(&SessionStatus::Done, &pr, &ci, false);
assert!(matches!(s, SessionStatus::Done));
}
#[test]
fn derive_status_preserves_terminated() {
let pr = crate::github::PrStatus {
merged: true, state: "closed".into(), mergeable: None, title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 0, failing: 0, passing: 0, pending: 0 };
let s = derive_session_status(&SessionStatus::Terminated, &pr, &ci, false);
assert!(matches!(s, SessionStatus::Terminated)); }
#[test]
fn derive_status_changes_requested_becomes_review_pending() {
let pr = crate::github::PrStatus {
merged: false, state: "open".into(), mergeable: Some(true),
title: "t".into(), number: 1, head_sha: String::new(),
};
let ci = CIStatus { pr_id: 1, total: 3, failing: 0, passing: 3, pending: 0 };
let s = derive_session_status(&SessionStatus::PrOpen, &pr, &ci, true);
assert!(matches!(s, SessionStatus::ReviewPending));
}
}