use std::collections::HashMap;
use serde_json::Value;
use tokio::process::Command;
use tracing::{debug, info, instrument, warn};
use crate::agent::AgentId;
use crate::channel::Channel;
use crate::error::Result;
use crate::message::{Message, MessageType};
use crate::mission::storage::MissionStorage;
use crate::mission::types::{MissionId, TriggerAction, WatchId, WatchItem, WatchKind, WatchStatus};
use crate::task::Task;
#[derive(Debug, Clone)]
pub struct PrCheckResult {
pub pr_number: u64,
pub repo: String,
pub status: CheckStatus,
pub checks: Vec<CheckDetail>,
pub mergeable: bool,
pub review_state: ReviewState,
pub blocking_comments: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckStatus {
Success,
Failure,
Pending,
Unknown,
}
#[derive(Debug, Clone)]
pub struct CheckDetail {
pub name: String,
pub status: CheckStatus,
pub details: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReviewState {
Approved,
ChangesRequested,
Pending,
NotRequired,
}
#[derive(Debug, Clone)]
pub struct WatchTickResult {
pub watch_id: WatchId,
pub mission_id: MissionId,
pub triggered: bool,
pub action_taken: Option<TriggerAction>,
pub new_status: WatchStatus,
pub error: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct WatchEngineTickResult {
pub watches_processed: usize,
pub watches_triggered: usize,
pub watches_completed: usize,
pub watches_failed: usize,
pub results: Vec<WatchTickResult>,
}
#[async_trait::async_trait]
pub trait GitHubClient: Send + Sync {
async fn get_pr_checks(&self, owner: &str, repo: &str, pr_number: u64)
-> Result<PrCheckResult>;
async fn get_pr_reviews(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<ReviewComment>>;
async fn get_bugbot_comments(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<BugbotComment>>;
}
#[derive(Debug, Clone)]
pub struct ReviewComment {
pub author: String,
pub body: String,
pub is_actionable: bool,
}
#[derive(Debug, Clone)]
pub struct BugbotComment {
pub bot_name: String,
pub severity: String,
pub description: String,
pub file_path: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WatchEngineConfig {
pub default_interval_secs: u64,
pub max_failures: u32,
}
impl Default for WatchEngineConfig {
fn default() -> Self {
Self {
default_interval_secs: 180,
max_failures: 5,
}
}
}
pub struct WatchEngine<G: GitHubClient> {
storage: MissionStorage,
channel: Channel,
github: G,
config: WatchEngineConfig,
}
impl<G: GitHubClient> WatchEngine<G> {
pub fn new(
storage: MissionStorage,
channel: Channel,
github: G,
config: WatchEngineConfig,
) -> Self {
Self {
storage,
channel,
github,
config,
}
}
pub fn with_defaults(storage: MissionStorage, channel: Channel, github: G) -> Self {
Self::new(storage, channel, github, WatchEngineConfig::default())
}
#[instrument(skip(self))]
pub async fn tick(&self) -> Result<WatchEngineTickResult> {
let due_watches = self.storage.list_due_watches().await?;
self.process_due_watches(due_watches).await
}
#[instrument(skip(self, mission_ids))]
pub async fn tick_missions(&self, mission_ids: &[MissionId]) -> Result<WatchEngineTickResult> {
let mut due_watches = Vec::new();
for mission_id in mission_ids {
for watch in self.storage.list_watch_items(*mission_id).await? {
if watch.is_due() {
due_watches.push(watch);
}
}
}
self.process_due_watches(due_watches).await
}
async fn process_due_watches(
&self,
due_watches: Vec<WatchItem>,
) -> Result<WatchEngineTickResult> {
let mut result = WatchEngineTickResult::default();
debug!("Watch engine tick: {} due watches", due_watches.len());
for watch in due_watches {
match self.process_watch(&watch).await {
Ok(tick_result) => {
result.watches_processed += 1;
if tick_result.triggered {
result.watches_triggered += 1;
}
if tick_result.new_status == WatchStatus::Done {
result.watches_completed += 1;
}
if tick_result.error.is_some() {
result.watches_failed += 1;
}
result.results.push(tick_result);
}
Err(e) => {
warn!("Error processing watch {}: {}", watch.id, e);
result.watches_failed += 1;
}
}
}
info!(
"Watch engine tick: {} processed, {} triggered, {} completed, {} failed",
result.watches_processed,
result.watches_triggered,
result.watches_completed,
result.watches_failed
);
Ok(result)
}
#[instrument(skip(self), fields(watch_id = %watch.id, kind = ?watch.kind))]
async fn process_watch(&self, watch: &WatchItem) -> Result<WatchTickResult> {
let mut result = WatchTickResult {
watch_id: watch.id,
mission_id: watch.mission_id,
triggered: false,
action_taken: None,
new_status: watch.status,
error: None,
};
let Some((owner, repo, pr_number)) = parse_pr_ref(&watch.target_ref) else {
warn!("Invalid PR reference: {}", watch.target_ref);
result.error = Some(format!("Invalid PR reference: {}", watch.target_ref));
return Ok(result);
};
let check_result = match watch.kind {
WatchKind::PrChecks => self.check_pr_status(&owner, &repo, pr_number).await,
WatchKind::BugbotComments => self.check_bugbot(&owner, &repo, pr_number).await,
WatchKind::ReviewComments => self.check_reviews(&owner, &repo, pr_number).await,
WatchKind::Mergeability => self.check_mergeability(&owner, &repo, pr_number).await,
};
match check_result {
Ok((triggered, should_complete)) => {
let mut updated_watch = watch.clone();
if triggered {
result.triggered = true;
result.action_taken = Some(watch.on_trigger);
if watch.on_trigger != TriggerAction::AdvancePipeline || !should_complete {
self.execute_trigger_action(watch).await?;
}
if !should_complete {
updated_watch.snooze(self.config.default_interval_secs);
result.new_status = WatchStatus::Snoozed;
}
}
if should_complete {
updated_watch.complete();
result.new_status = WatchStatus::Done;
} else if !triggered {
updated_watch.record_check();
}
self.storage.save_watch_item(&updated_watch).await?;
}
Err(e) => {
let mut updated_watch = watch.clone();
updated_watch.record_failure();
if updated_watch.consecutive_failures >= self.config.max_failures {
self.mark_work_blocked(watch, &format!("Watch check failed: {}", e))
.await?;
}
self.storage.save_watch_item(&updated_watch).await?;
result.error = Some(e.to_string());
}
}
Ok(result)
}
async fn check_pr_status(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<(bool, bool)> {
let pr_result = self.github.get_pr_checks(owner, repo, pr_number).await?;
match pr_result.status {
CheckStatus::Success => {
Ok((false, true))
}
CheckStatus::Failure => {
Ok((true, false))
}
CheckStatus::Pending | CheckStatus::Unknown => {
Ok((false, false))
}
}
}
async fn check_bugbot(&self, owner: &str, repo: &str, pr_number: u64) -> Result<(bool, bool)> {
let comments = self
.github
.get_bugbot_comments(owner, repo, pr_number)
.await?;
if comments.is_empty() {
Ok((
false,
self.pr_checks_succeeded(owner, repo, pr_number).await?,
))
} else {
Ok((true, false))
}
}
async fn check_reviews(&self, owner: &str, repo: &str, pr_number: u64) -> Result<(bool, bool)> {
let reviews = self.github.get_pr_reviews(owner, repo, pr_number).await?;
let has_actionable = reviews.iter().any(|r| r.is_actionable);
if has_actionable {
Ok((true, false))
} else {
Ok((
false,
self.pr_checks_succeeded(owner, repo, pr_number).await?,
))
}
}
async fn check_mergeability(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<(bool, bool)> {
let pr_result = self.github.get_pr_checks(owner, repo, pr_number).await?;
if pr_result.mergeable && pr_result.status == CheckStatus::Success {
Ok((true, true))
} else {
Ok((false, false))
}
}
async fn pr_checks_succeeded(&self, owner: &str, repo: &str, pr_number: u64) -> Result<bool> {
let pr_result = self.github.get_pr_checks(owner, repo, pr_number).await?;
Ok(matches!(pr_result.status, CheckStatus::Success))
}
#[instrument(skip(self))]
async fn execute_trigger_action(&self, watch: &WatchItem) -> Result<()> {
match watch.on_trigger {
TriggerAction::CreateFixTask => {
self.create_fix_task(watch).await?;
}
TriggerAction::NotifyReviewer => {
self.notify_reviewer(watch).await?;
}
TriggerAction::AdvancePipeline => {
self.advance_pipeline(watch).await?;
}
}
self.storage
.log_event(
watch.mission_id,
&format!(
"Watch {} triggered action {:?} for {}",
watch.id, watch.on_trigger, watch.target_ref
),
)
.await?;
Ok(())
}
async fn create_fix_task(&self, watch: &WatchItem) -> Result<()> {
let Some(work_item) = self
.storage
.get_work_item(watch.mission_id, watch.work_item_id)
.await?
else {
warn!("Work item {} not found for watch", watch.work_item_id);
return Ok(());
};
if let Some(agent_id) = work_item.assigned_to {
let mut task = Task::new(format!(
"[Mission Fix Required] {}\n\nWatch type: {:?}\nTarget: {}\n\nInvestigate the reported issue, update the branch/PR, and complete this task with the refreshed PR URL or ref.",
work_item.title, watch.kind, watch.target_ref
))
.with_tags([
"mission-fix-task".to_string(),
format!("mission:{}", watch.mission_id),
format!("work-item:{}", watch.work_item_id),
format!("watch:{}", watch.id),
]);
task.assign(agent_id);
let task_id = task.id;
self.channel.set_task(&task).await?;
let msg = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::TaskAssign {
task_id: task_id.to_string(),
},
);
self.channel.send(&msg).await?;
self.storage
.log_event(
watch.mission_id,
&format!(
"Created fix task {} for work item '{}' due to {:?}",
task_id, work_item.title, watch.kind
),
)
.await?;
info!(
"Created fix task for work item '{}' assigned to agent {:?}",
work_item.title, agent_id
);
}
Ok(())
}
async fn notify_reviewer(&self, watch: &WatchItem) -> Result<()> {
let agents = self.channel.list_agents().await?;
let reviewers: Vec<_> = agents
.iter()
.filter(|a| {
let name = a.name.to_lowercase();
name.contains("review") || name.contains("audit")
})
.collect();
if reviewers.is_empty() {
warn!("No reviewer agents found to notify");
return Ok(());
}
let notification = format!(
"[Mission Notification] Watch triggered for {}\n\nType: {:?}\nTarget: {}",
watch.work_item_id, watch.kind, watch.target_ref
);
for reviewer in reviewers {
let msg = Message::new(
AgentId::supervisor(),
reviewer.id,
MessageType::Informational {
summary: notification.clone(),
},
);
self.channel.send(&msg).await?;
}
info!("Notified reviewers about watch trigger");
Ok(())
}
async fn advance_pipeline(&self, watch: &WatchItem) -> Result<()> {
if let Some(mut work_item) = self
.storage
.get_work_item(watch.mission_id, watch.work_item_id)
.await?
{
if !work_item
.artifact_refs
.iter()
.any(|artifact| artifact == &watch.target_ref)
{
work_item.artifact_refs.push(watch.target_ref.clone());
}
self.storage.save_work_item(&work_item).await?;
self.storage
.log_event(
watch.mission_id,
&format!(
"Work item '{}' is eligible for scheduler-driven pipeline advance",
work_item.title
),
)
.await?;
info!(
"Advanced pipeline: work item '{}' is ready for scheduler gates",
work_item.title
);
}
Ok(())
}
async fn mark_work_blocked(&self, watch: &WatchItem, reason: &str) -> Result<()> {
if let Some(mut work_item) = self
.storage
.get_work_item(watch.mission_id, watch.work_item_id)
.await?
{
work_item.block();
self.storage.save_work_item(&work_item).await?;
self.storage
.log_event(
watch.mission_id,
&format!("Work item '{}' blocked: {}", work_item.title, reason),
)
.await?;
warn!("Blocked work item '{}': {}", work_item.title, reason);
}
Ok(())
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct GhCliGitHubClient;
#[derive(serde::Deserialize)]
struct GhPrView {
#[serde(rename = "statusCheckRollup", default)]
status_check_rollup: Vec<Value>,
#[serde(default)]
mergeable: Option<String>,
#[serde(rename = "reviewDecision", default)]
review_decision: Option<String>,
#[serde(default)]
reviews: Vec<GhReview>,
}
#[derive(serde::Deserialize)]
struct GhReview {
#[serde(default)]
state: Option<String>,
#[serde(default)]
body: Option<String>,
#[serde(default)]
author: Option<GhNamedAuthor>,
}
#[derive(serde::Deserialize)]
struct GhNamedAuthor {
#[serde(default)]
login: Option<String>,
}
#[derive(serde::Deserialize)]
struct GhIssueComment {
#[serde(default)]
body: Option<String>,
#[serde(default)]
user: Option<GhNamedAuthor>,
}
impl GhCliGitHubClient {
async fn gh_json<T: serde::de::DeserializeOwned>(args: &[String]) -> Result<T> {
let output = Command::new("gh").args(args).output().await?;
if !output.status.success() {
return Err(crate::error::Error::Config(format!(
"gh command failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
)));
}
Ok(serde_json::from_slice(&output.stdout)?)
}
fn parse_check_status(checks: &[Value]) -> (CheckStatus, Vec<CheckDetail>) {
let mut overall = CheckStatus::Unknown;
let mut details = Vec::new();
for check in checks {
let name = check
.get("name")
.and_then(Value::as_str)
.or_else(|| check.get("context").and_then(Value::as_str))
.unwrap_or("check")
.to_string();
let raw = check
.get("conclusion")
.and_then(Value::as_str)
.or_else(|| check.get("state").and_then(Value::as_str))
.or_else(|| check.get("status").and_then(Value::as_str))
.unwrap_or("unknown")
.to_lowercase();
let status = match raw.as_str() {
"success" => CheckStatus::Success,
"neutral" | "skipped" => CheckStatus::Success,
"queued" | "pending" | "in_progress" | "expected" => CheckStatus::Pending,
"failure" | "failed" | "timed_out" | "action_required" | "cancelled" => {
CheckStatus::Failure
}
_ => CheckStatus::Unknown,
};
overall = match (overall, status) {
(_, CheckStatus::Failure) => CheckStatus::Failure,
(CheckStatus::Unknown, other) => other,
(CheckStatus::Success, CheckStatus::Pending)
| (CheckStatus::Pending, CheckStatus::Success) => CheckStatus::Pending,
(current, _) => current,
};
details.push(CheckDetail {
name,
status,
details: None,
});
}
(overall, details)
}
fn parse_mergeable(value: Option<String>) -> bool {
matches!(
value.as_deref().map(str::to_ascii_uppercase).as_deref(),
Some("MERGEABLE") | Some("CLEAN") | Some("HAS_HOOKS") | Some("UNSTABLE")
)
}
fn parse_review_state(value: Option<String>) -> ReviewState {
match value.as_deref().map(str::to_ascii_uppercase).as_deref() {
Some("APPROVED") => ReviewState::Approved,
Some("CHANGES_REQUESTED") => ReviewState::ChangesRequested,
Some("REVIEW_REQUIRED") => ReviewState::Pending,
_ => ReviewState::Pending,
}
}
}
#[async_trait::async_trait]
impl GitHubClient for GhCliGitHubClient {
async fn get_pr_checks(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<PrCheckResult> {
let payload: GhPrView = Self::gh_json(&[
"pr".to_string(),
"view".to_string(),
pr_number.to_string(),
"--repo".to_string(),
format!("{owner}/{repo}"),
"--json".to_string(),
"statusCheckRollup,mergeable,reviewDecision,reviews".to_string(),
])
.await?;
let (status, checks) = Self::parse_check_status(&payload.status_check_rollup);
let review_state = Self::parse_review_state(payload.review_decision);
let blocking_comments = payload
.reviews
.iter()
.filter_map(|review| {
let state = review.state.as_deref()?.to_ascii_uppercase();
if state == "CHANGES_REQUESTED" {
review.body.clone()
} else {
None
}
})
.collect();
Ok(PrCheckResult {
pr_number,
repo: format!("{owner}/{repo}"),
status,
checks,
mergeable: Self::parse_mergeable(payload.mergeable),
review_state,
blocking_comments,
})
}
async fn get_pr_reviews(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<ReviewComment>> {
let payload: GhPrView = Self::gh_json(&[
"pr".to_string(),
"view".to_string(),
pr_number.to_string(),
"--repo".to_string(),
format!("{owner}/{repo}"),
"--json".to_string(),
"reviews,reviewDecision".to_string(),
])
.await?;
Ok(payload
.reviews
.into_iter()
.map(|review| {
let state = review.state.unwrap_or_default().to_ascii_uppercase();
let body = review.body.unwrap_or_default();
ReviewComment {
author: review
.author
.and_then(|author| author.login)
.unwrap_or_else(|| "reviewer".to_string()),
is_actionable: state == "CHANGES_REQUESTED",
body,
}
})
.collect())
}
async fn get_bugbot_comments(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<BugbotComment>> {
let comments: Vec<GhIssueComment> = Self::gh_json(&[
"api".to_string(),
format!("repos/{owner}/{repo}/issues/{pr_number}/comments"),
])
.await?;
Ok(comments
.into_iter()
.filter_map(|comment| {
let author = comment.user.and_then(|user| user.login)?;
let body = comment.body.unwrap_or_default();
let lower_author = author.to_ascii_lowercase();
let lower_body = body.to_ascii_lowercase();
if lower_author.contains("bugbot")
|| lower_author.contains("cursor")
|| lower_body.contains("bugbot")
|| lower_body.contains("cursor bugbot")
{
Some(BugbotComment {
bot_name: author,
severity: if lower_body.contains("critical") {
"critical".to_string()
} else if lower_body.contains("high") {
"high".to_string()
} else {
"unknown".to_string()
},
description: body,
file_path: None,
})
} else {
None
}
})
.collect())
}
}
fn parse_pr_ref(target_ref: &str) -> Option<(String, String, u64)> {
if target_ref.contains("github.com") {
let parts: Vec<&str> = target_ref.split('/').collect();
if parts.len() >= 5 {
let owner = parts[parts.len() - 4].to_string();
let repo = parts[parts.len() - 3].to_string();
let pr_number = parts[parts.len() - 1].parse().ok()?;
return Some((owner, repo, pr_number));
}
} else {
let (repo_part, pr_part) = target_ref.split_once('#')?;
let (owner, repo) = repo_part.split_once('/')?;
let pr_number = pr_part.parse().ok()?;
return Some((owner.to_string(), repo.to_string(), pr_number));
}
None
}
#[derive(Default)]
pub struct MockGitHubClient {
pub pr_checks: HashMap<String, PrCheckResult>,
pub reviews: HashMap<String, Vec<ReviewComment>>,
pub bugbot_comments: HashMap<String, Vec<BugbotComment>>,
}
impl MockGitHubClient {
pub fn new() -> Self {
Self::default()
}
pub fn set_pr_checks(&mut self, owner: &str, repo: &str, pr: u64, result: PrCheckResult) {
self.pr_checks
.insert(format!("{}/{}#{}", owner, repo, pr), result);
}
fn get_key(owner: &str, repo: &str, pr: u64) -> String {
format!("{}/{}#{}", owner, repo, pr)
}
}
#[async_trait::async_trait]
impl GitHubClient for MockGitHubClient {
async fn get_pr_checks(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<PrCheckResult> {
let key = Self::get_key(owner, repo, pr_number);
self.pr_checks.get(&key).cloned().ok_or_else(|| {
crate::error::Error::Config(format!("No mock PR check result for {}", key))
})
}
async fn get_pr_reviews(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<ReviewComment>> {
let key = Self::get_key(owner, repo, pr_number);
Ok(self.reviews.get(&key).cloned().unwrap_or_default())
}
async fn get_bugbot_comments(
&self,
owner: &str,
repo: &str,
pr_number: u64,
) -> Result<Vec<BugbotComment>> {
let key = Self::get_key(owner, repo, pr_number);
Ok(self.bugbot_comments.get(&key).cloned().unwrap_or_default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_pr_ref_short_format() {
let (owner, repo, pr) = parse_pr_ref("redis-field-engineering/tinytown#23").unwrap();
assert_eq!(owner, "redis-field-engineering");
assert_eq!(repo, "tinytown");
assert_eq!(pr, 23);
}
#[test]
fn test_parse_pr_ref_url_format() {
let (owner, repo, pr) =
parse_pr_ref("https://github.com/redis-field-engineering/tinytown/pull/23").unwrap();
assert_eq!(owner, "redis-field-engineering");
assert_eq!(repo, "tinytown");
assert_eq!(pr, 23);
}
#[test]
fn test_parse_pr_ref_invalid() {
assert!(parse_pr_ref("invalid").is_none());
assert!(parse_pr_ref("owner/repo").is_none());
}
#[test]
fn test_watch_engine_config_defaults() {
let config = WatchEngineConfig::default();
assert_eq!(config.default_interval_secs, 180);
assert_eq!(config.max_failures, 5);
}
#[test]
fn test_mock_github_client() {
let mut client = MockGitHubClient::new();
let pr_result = PrCheckResult {
pr_number: 1,
repo: "test/repo".to_string(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::Approved,
blocking_comments: vec![],
};
client.set_pr_checks("test", "repo", 1, pr_result);
assert!(client.pr_checks.contains_key("test/repo#1"));
}
}