use crate::{
error::Result,
events::{OrchestratorEvent, TerminationReason},
reaction_engine::{parse_duration, status_to_reaction_key, ReactionEngine},
reactions::{ReactionAction, ReactionOutcome},
scm::{CheckStatus, CiStatus, MergeReadiness, PrState, PullRequest, ReviewDecision},
scm_transitions::{derive_scm_status, ScmObservation},
session_manager::SessionManager,
traits::{Agent, Runtime, Scm, Workspace},
types::{ActivityState, Session, SessionId, SessionStatus},
};
use std::{
collections::{HashMap, HashSet},
hash::{Hash, Hasher},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
mod scm_poll;
mod stuck;
mod tick;
mod transition;
const EVENT_CHANNEL_CAPACITY: usize = 1024;
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(10);
const REVIEW_BACKLOG_THROTTLE: Duration = Duration::from_secs(120);
pub struct LifecycleManager {
pub(super) sessions: Arc<SessionManager>,
pub(super) runtime: Arc<dyn Runtime>,
pub(super) agent: Arc<dyn Agent>,
pub(super) events_tx: broadcast::Sender<OrchestratorEvent>,
poll_interval: Duration,
pub(super) reaction_engine: Option<Arc<ReactionEngine>>,
pub(super) scm: Option<Arc<dyn Scm>>,
pub(super) workspace: Option<Arc<dyn Workspace>>,
pub(super) idle_since: Mutex<HashMap<SessionId, Instant>>,
pub(super) pr_enrichment_cache: Mutex<HashMap<String, ScmObservation>>,
pub(super) last_review_backlog_check: Mutex<HashMap<SessionId, Instant>>,
pub(super) detected_prs_cache: Mutex<HashMap<SessionId, Option<PullRequest>>>,
pub(super) startup_ms: AtomicU64,
pub(super) all_complete_fired: AtomicBool,
}
impl LifecycleManager {
pub fn new(
sessions: Arc<SessionManager>,
runtime: Arc<dyn Runtime>,
agent: Arc<dyn Agent>,
) -> Self {
let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
sessions,
runtime,
agent,
events_tx,
poll_interval: DEFAULT_POLL_INTERVAL,
reaction_engine: None,
scm: None,
workspace: None,
idle_since: Mutex::new(HashMap::new()),
pr_enrichment_cache: Mutex::new(HashMap::new()),
last_review_backlog_check: Mutex::new(HashMap::new()),
detected_prs_cache: Mutex::new(HashMap::new()),
startup_ms: AtomicU64::new(0),
all_complete_fired: AtomicBool::new(false),
}
}
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn with_reaction_engine(mut self, engine: Arc<ReactionEngine>) -> Self {
self.reaction_engine = Some(engine);
self
}
pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
self.scm = Some(scm);
self
}
pub fn with_workspace(mut self, workspace: Arc<dyn Workspace>) -> Self {
self.workspace = Some(workspace);
self
}
pub fn events_sender(&self) -> broadcast::Sender<OrchestratorEvent> {
self.events_tx.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<OrchestratorEvent> {
self.events_tx.subscribe()
}
pub fn spawn(self: Arc<Self>) -> LifecycleHandle {
let token = CancellationToken::new();
let child_token = token.clone();
let this = self.clone();
let join = tokio::spawn(async move {
this.run_loop(child_token).await;
});
LifecycleHandle { join, token }
}
async fn run_loop(self: Arc<Self>, token: CancellationToken) {
let mut seen: HashSet<SessionId> = HashSet::new();
let startup_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
self.startup_ms.store(startup_ms.max(1), Ordering::Relaxed);
self.sweep_merged_worktrees().await;
let mut ticker = tokio::time::interval(self.poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = token.cancelled() => {
tracing::debug!("lifecycle loop received cancel");
return;
}
_ = ticker.tick() => {
if let Err(e) = self.tick(&mut seen).await {
tracing::error!("lifecycle tick failed: {e}");
}
}
}
}
}
pub async fn tick(&self, seen: &mut HashSet<SessionId>) -> Result<()> {
let sessions = self.sessions.list().await?;
{
let mut cache = self.pr_enrichment_cache.lock().unwrap_or_else(|e| {
tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
cache.clear();
}
let mut detected_prs: HashMap<SessionId, Option<PullRequest>> = HashMap::new();
if let Some(scm) = self.scm.as_ref() {
let mut prs_for_batch = Vec::new();
for session in &sessions {
if session.is_terminal() {
continue;
}
let id = session.id.clone();
match scm.detect_pr(session).await {
Ok(pr) => {
if let Some(ref p) = pr {
prs_for_batch.push(p.clone());
}
detected_prs.insert(id, pr);
}
Err(e) => {
self.emit(OrchestratorEvent::TickError {
id: id.clone(),
message: format!("scm.detect_pr: {e}"),
});
detected_prs.insert(id, None);
}
}
}
if !prs_for_batch.is_empty() {
match scm.enrich_prs_batch(&prs_for_batch).await {
Ok(enrichment) => {
if !enrichment.is_empty() {
tracing::debug!(
"[batch enrichment] cached {} PR observations",
enrichment.len()
);
let mut cache = self
.pr_enrichment_cache
.lock()
.unwrap_or_else(|e| {
tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
*cache = enrichment;
}
}
Err(e) => {
tracing::warn!("[batch enrichment] failed: {e}");
}
}
}
}
{
let mut cache = self.detected_prs_cache.lock().unwrap_or_else(|e| {
tracing::error!("detected_prs_cache mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
*cache = detected_prs;
}
let startup_ms = self.startup_ms.load(Ordering::Relaxed);
let mut any_active = false;
for session in sessions {
let id = session.id.clone();
if seen.insert(id.clone()) {
if startup_ms != 0 && session.created_at < startup_ms {
self.emit(OrchestratorEvent::SessionRestored {
id: id.clone(),
project_id: session.project_id.clone(),
status: session.status,
});
} else {
self.emit(OrchestratorEvent::Spawned {
id,
project_id: session.project_id.clone(),
});
}
}
if session.is_terminal() {
continue;
}
any_active = true;
self.all_complete_fired.store(false, Ordering::Relaxed);
if let Err(e) = self.poll_one(session).await {
tracing::warn!("poll_one failed: {e}");
}
}
if !any_active && !seen.is_empty() && !self.all_complete_fired.load(Ordering::Relaxed) {
if let Some(engine) = self.reaction_engine.as_ref() {
let sentinel = all_complete_sentinel();
match engine.dispatch(&sentinel, "all-complete").await {
Ok(_) => {
self.all_complete_fired.store(true, Ordering::Relaxed);
}
Err(e) => {
tracing::warn!(error = %e, "all-complete dispatch failed");
}
}
}
}
Ok(())
}
pub(super) fn update_idle_since(&self, session_id: &SessionId, activity: ActivityState) {
let mut map = self.idle_since.lock().unwrap_or_else(|e| {
tracing::error!("lifecycle idle_since mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
match activity {
ActivityState::Idle | ActivityState::Blocked => {
map.entry(session_id.clone()).or_insert_with(Instant::now);
}
_ => {
map.remove(session_id);
}
}
}
pub(super) fn emit(&self, event: OrchestratorEvent) {
let _ = self.events_tx.send(event);
}
}
pub(super) fn assemble_observation(
state: Result<PrState>,
ci: Result<CiStatus>,
review: Result<ReviewDecision>,
readiness: Result<MergeReadiness>,
) -> std::result::Result<ScmObservation, String> {
match (state, ci, review, readiness) {
(Ok(state), Ok(ci), Ok(review), Ok(readiness)) => Ok(ScmObservation {
state,
ci,
review,
readiness,
}),
(state, ci, review, readiness) => {
let parts: Vec<String> = [
state.err().map(|e| format!("pr_state: {e}")),
ci.err().map(|e| format!("ci_status: {e}")),
review.err().map(|e| format!("review_decision: {e}")),
readiness.err().map(|e| format!("mergeability: {e}")),
]
.into_iter()
.flatten()
.collect();
Err(parts.join("; "))
}
}
}
pub(super) fn should_park_in_merge_failed(
engine: &ReactionEngine,
session: &Session,
to: SessionStatus,
reaction_key: &str,
outcome: &ReactionOutcome,
) -> bool {
to == SessionStatus::Mergeable
&& reaction_key == "approved-and-green"
&& engine
.resolve_reaction_config(session, reaction_key)
.is_some_and(|c| c.action == ReactionAction::AutoMerge)
&& !outcome.success
&& !outcome.escalated
}
pub(super) fn clear_tracker_on_transition(
engine: &ReactionEngine,
session_id: &SessionId,
from: SessionStatus,
to: SessionStatus,
) {
let parking_loop_edge = matches!(
(from, to),
(SessionStatus::Mergeable, SessionStatus::MergeFailed)
| (SessionStatus::MergeFailed, SessionStatus::Mergeable)
);
if parking_loop_edge {
return;
}
if from == SessionStatus::MergeFailed {
engine.clear_tracker(session_id, "approved-and-green");
return;
}
if from == SessionStatus::CiFailed {
engine.clear_tracker(session_id, "ci-failed");
return;
}
if let Some(prev_key) = status_to_reaction_key(from) {
engine.clear_tracker(session_id, prev_key);
}
}
pub(super) const fn is_review_stable(status: SessionStatus) -> bool {
matches!(
status,
SessionStatus::ChangesRequested | SessionStatus::ReviewPending | SessionStatus::Approved
)
}
pub(super) const fn is_orchestrator_notifiable(status: SessionStatus) -> bool {
matches!(
status,
SessionStatus::PrOpen
| SessionStatus::ReviewPending
| SessionStatus::CiFailed
| SessionStatus::ChangesRequested
| SessionStatus::Approved
| SessionStatus::Merged
| SessionStatus::MergeFailed
| SessionStatus::Killed
| SessionStatus::Terminated
| SessionStatus::Errored
| SessionStatus::NeedsInput
| SessionStatus::Stuck
)
}
pub(super) fn format_orchestrator_notification(worker: &Session, to: SessionStatus) -> String {
let short: String = worker.id.0.chars().take(8).collect();
let pr = worker
.claimed_pr_url
.as_deref()
.or(worker.issue_url.as_deref())
.unwrap_or("none");
format!(
"[ao-rs] worker {short} is now {to} — branch: {branch}, url: {pr}",
branch = worker.branch,
)
}
pub(super) const fn is_stuck_eligible(status: SessionStatus) -> bool {
match status {
SessionStatus::Working
| SessionStatus::PrOpen
| SessionStatus::CiFailed
| SessionStatus::ReviewPending
| SessionStatus::ChangesRequested
| SessionStatus::Approved
| SessionStatus::Mergeable => true,
SessionStatus::Spawning
| SessionStatus::Idle
| SessionStatus::NeedsInput
| SessionStatus::Stuck
| SessionStatus::MergeFailed
| SessionStatus::Killed
| SessionStatus::Terminated
| SessionStatus::Done
| SessionStatus::Cleanup
| SessionStatus::Errored
| SessionStatus::Merged => false,
}
}
pub struct LifecycleHandle {
join: tokio::task::JoinHandle<()>,
token: CancellationToken,
}
impl LifecycleHandle {
pub async fn stop(self) {
self.token.cancel();
let _ = self.join.await;
}
pub fn cancellation_token(&self) -> CancellationToken {
self.token.clone()
}
}
pub(super) fn fingerprint_comments(comments: &[crate::scm::ReviewComment]) -> u64 {
use std::collections::hash_map::DefaultHasher;
let mut keys: Vec<(&str, &str, &str)> = comments
.iter()
.map(|c| (c.author.as_str(), c.body.as_str(), c.url.as_str()))
.collect();
keys.sort_unstable();
let mut h = DefaultHasher::new();
keys.hash(&mut h);
h.finish()
}
pub(super) fn all_complete_sentinel() -> Session {
use crate::types::{now_ms, SessionId};
Session {
id: SessionId("__all_complete__".into()),
project_id: String::new(),
status: SessionStatus::Done,
agent: String::new(),
agent_config: None,
branch: String::new(),
task: String::new(),
workspace_path: None,
runtime_handle: None,
runtime: String::new(),
activity: None,
created_at: now_ms(),
cost: None,
issue_id: None,
issue_url: None,
claimed_pr_number: None,
claimed_pr_url: None,
initial_prompt_override: None,
spawned_by: None,
last_merge_conflict_dispatched: None,
last_review_backlog_fingerprint: None,
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::scm::{
CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
ReviewComment, ReviewDecision,
};
use crate::traits::Workspace;
use crate::types::{now_ms, SessionId, WorkspaceCreateConfig};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
pub(crate) fn unique_temp_dir(label: &str) -> PathBuf {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("ao-rs-lifecycle-{label}-{nanos}-{n}"))
}
pub(crate) fn fake_session(id: &str, project: &str) -> Session {
Session {
id: SessionId(id.into()),
project_id: project.into(),
status: SessionStatus::Spawning,
agent: "claude-code".into(),
agent_config: None,
branch: format!("ao-{id}"),
task: "test task".into(),
workspace_path: Some(PathBuf::from("/tmp/ws")),
runtime_handle: Some(format!("runtime-{id}")),
runtime: "tmux".into(),
activity: None,
created_at: now_ms(),
cost: None,
issue_id: None,
issue_url: None,
claimed_pr_number: None,
claimed_pr_url: None,
initial_prompt_override: None,
spawned_by: None,
last_merge_conflict_dispatched: None,
last_review_backlog_fingerprint: None,
}
}
pub(crate) struct MockRuntime {
pub(crate) alive: AtomicBool,
pub(crate) sends: Mutex<Vec<(String, String)>>,
pub(crate) destroys: Mutex<Vec<String>>,
}
impl MockRuntime {
pub(crate) fn new(alive: bool) -> Self {
Self {
alive: AtomicBool::new(alive),
sends: Mutex::new(Vec::new()),
destroys: Mutex::new(Vec::new()),
}
}
pub(crate) fn sends(&self) -> Vec<(String, String)> {
self.sends.lock().unwrap().clone()
}
#[allow(dead_code)]
pub(crate) fn destroyed_handles(&self) -> Vec<String> {
self.destroys.lock().unwrap().clone()
}
}
#[async_trait]
impl Runtime for MockRuntime {
async fn create(
&self,
_session_id: &str,
_cwd: &Path,
_launch_command: &str,
_env: &[(String, String)],
) -> Result<String> {
Ok("mock-handle".into())
}
async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
self.sends
.lock()
.unwrap()
.push((handle.to_string(), msg.to_string()));
Ok(())
}
async fn is_alive(&self, _handle: &str) -> Result<bool> {
Ok(self.alive.load(Ordering::SeqCst))
}
async fn destroy(&self, handle: &str) -> Result<()> {
self.destroys.lock().unwrap().push(handle.to_string());
Ok(())
}
}
pub(crate) struct MockAgent {
next: Mutex<ActivityState>,
}
impl MockAgent {
pub(crate) fn new(initial: ActivityState) -> Self {
Self {
next: Mutex::new(initial),
}
}
pub(crate) fn set(&self, state: ActivityState) {
*self.next.lock().unwrap() = state;
}
}
#[async_trait]
impl Agent for MockAgent {
fn launch_command(&self, _session: &Session) -> String {
"mock".into()
}
fn environment(&self, _session: &Session) -> Vec<(String, String)> {
vec![]
}
fn initial_prompt(&self, _session: &Session) -> String {
"".into()
}
async fn detect_activity(&self, _session: &Session) -> Result<ActivityState> {
Ok(*self.next.lock().unwrap())
}
}
#[allow(dead_code)]
pub(crate) struct MockWorkspace {
destroyed: Mutex<Vec<PathBuf>>,
}
#[allow(dead_code)]
impl MockWorkspace {
pub(crate) fn new() -> Self {
Self {
destroyed: Mutex::new(Vec::new()),
}
}
pub(crate) fn destroyed_paths(&self) -> Vec<PathBuf> {
self.destroyed.lock().unwrap().clone()
}
}
#[async_trait]
impl Workspace for MockWorkspace {
async fn create(&self, _cfg: &WorkspaceCreateConfig) -> Result<PathBuf> {
Ok(PathBuf::from("/tmp/ws"))
}
async fn destroy(&self, workspace_path: &Path) -> Result<()> {
self.destroyed
.lock()
.unwrap()
.push(workspace_path.to_path_buf());
Ok(())
}
}
pub(crate) struct MockScm {
pub(crate) pr: Mutex<Option<PullRequest>>,
pub(crate) state: Mutex<PrState>,
pub(crate) ci: Mutex<CiStatus>,
pub(crate) review: Mutex<ReviewDecision>,
pub(crate) readiness: Mutex<MergeReadiness>,
pub(crate) detect_calls: AtomicUsize,
pub(crate) detect_pr_errors: AtomicBool,
pub(crate) pr_state_errors: AtomicBool,
pub(crate) ci_status_errors: AtomicBool,
pub(crate) review_decision_errors: AtomicBool,
pub(crate) mergeability_errors: AtomicBool,
pub(crate) merge_errors: AtomicBool,
pub(crate) merge_calls: Mutex<Vec<(u32, Option<MergeMethod>)>>,
pub(crate) pending_comments_result: Mutex<Vec<ReviewComment>>,
pub(crate) ci_checks_result: Mutex<Vec<CheckRun>>,
}
impl MockScm {
pub(crate) fn new() -> Self {
Self {
pr: Mutex::new(None),
state: Mutex::new(PrState::Open),
ci: Mutex::new(CiStatus::Pending),
review: Mutex::new(ReviewDecision::None),
readiness: Mutex::new(MergeReadiness {
mergeable: false,
ci_passing: false,
approved: false,
no_conflicts: true,
blockers: vec!["pending".into()],
}),
detect_calls: AtomicUsize::new(0),
detect_pr_errors: AtomicBool::new(false),
pr_state_errors: AtomicBool::new(false),
ci_status_errors: AtomicBool::new(false),
review_decision_errors: AtomicBool::new(false),
mergeability_errors: AtomicBool::new(false),
merge_errors: AtomicBool::new(false),
merge_calls: Mutex::new(Vec::new()),
pending_comments_result: Mutex::new(vec![]),
ci_checks_result: Mutex::new(vec![]),
}
}
pub(crate) fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
self.merge_calls.lock().unwrap().clone()
}
pub(crate) fn set_pending_comments(&self, comments: Vec<ReviewComment>) {
*self.pending_comments_result.lock().unwrap() = comments;
}
pub(crate) fn set_ci_checks(&self, checks: Vec<CheckRun>) {
*self.ci_checks_result.lock().unwrap() = checks;
}
pub(crate) fn set_pr(&self, pr: Option<PullRequest>) {
*self.pr.lock().unwrap() = pr;
}
pub(crate) fn set_state(&self, s: PrState) {
*self.state.lock().unwrap() = s;
}
pub(crate) fn set_ci(&self, c: CiStatus) {
*self.ci.lock().unwrap() = c;
}
pub(crate) fn set_review(&self, r: ReviewDecision) {
*self.review.lock().unwrap() = r;
}
pub(crate) fn set_readiness(&self, r: MergeReadiness) {
*self.readiness.lock().unwrap() = r;
}
}
#[async_trait]
impl Scm for MockScm {
fn name(&self) -> &str {
"mock"
}
async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
self.detect_calls.fetch_add(1, Ordering::SeqCst);
if self.detect_pr_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock detect_pr".into()));
}
Ok(self.pr.lock().unwrap().clone())
}
async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
if self.pr_state_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock pr_state".into()));
}
Ok(*self.state.lock().unwrap())
}
async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
Ok(self.ci_checks_result.lock().unwrap().clone())
}
async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
if self.ci_status_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock ci_status".into()));
}
Ok(*self.ci.lock().unwrap())
}
async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
Ok(vec![])
}
async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
if self.review_decision_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime(
"mock review_decision".into(),
));
}
Ok(*self.review.lock().unwrap())
}
async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
Ok(self.pending_comments_result.lock().unwrap().clone())
}
async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
if self.mergeability_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock mergeability".into()));
}
Ok(self.readiness.lock().unwrap().clone())
}
async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
if self.merge_errors.load(Ordering::SeqCst) {
return Err(crate::error::AoError::Runtime("mock merge".into()));
}
self.merge_calls.lock().unwrap().push((pr.number, method));
Ok(())
}
}
pub(crate) fn fake_pr(number: u32, branch: &str) -> PullRequest {
PullRequest {
number,
url: format!("https://github.com/acme/widgets/pull/{number}"),
title: "fix the widgets".into(),
owner: "acme".into(),
repo: "widgets".into(),
branch: branch.into(),
base_branch: "main".into(),
is_draft: false,
}
}
pub(crate) async fn setup(
label: &str,
initial_activity: ActivityState,
) -> (
Arc<LifecycleManager>,
Arc<SessionManager>,
Arc<MockRuntime>,
Arc<MockAgent>,
PathBuf,
) {
use crate::session_manager::SessionManager;
let base = unique_temp_dir(label);
let sessions = Arc::new(SessionManager::new(base.clone()));
let runtime = Arc::new(MockRuntime::new(true));
let agent = Arc::new(MockAgent::new(initial_activity));
let lifecycle = Arc::new(LifecycleManager::new(
sessions.clone(),
runtime.clone() as Arc<dyn Runtime>,
agent.clone() as Arc<dyn Agent>,
));
(lifecycle, sessions, runtime, agent, base)
}
pub(crate) async fn recv_timeout(
rx: &mut broadcast::Receiver<OrchestratorEvent>,
) -> Option<OrchestratorEvent> {
tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.ok()
.and_then(|r| r.ok())
}
pub(crate) async fn drain_events(
rx: &mut broadcast::Receiver<OrchestratorEvent>,
) -> Vec<OrchestratorEvent> {
let mut out = Vec::new();
while let Some(e) = recv_timeout(rx).await {
out.push(e);
}
out
}
pub(crate) fn rewind_idle_since(
lifecycle: &LifecycleManager,
session_id: &SessionId,
by: Duration,
) {
let mut map = lifecycle.idle_since.lock().unwrap_or_else(|e| {
tracing::error!("idle_since mutex poisoned; recovering inner state: {e}");
e.into_inner()
});
let rewound = Instant::now()
.checked_sub(by)
.expect("test clock rewind underflowed Instant");
map.insert(session_id.clone(), rewound);
}
pub(crate) async fn setup_with_scm(
label: &str,
) -> (
Arc<LifecycleManager>,
Arc<SessionManager>,
Arc<MockScm>,
PathBuf,
) {
use crate::session_manager::SessionManager;
let base = unique_temp_dir(label);
let sessions = Arc::new(SessionManager::new(base.clone()));
let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
let scm = Arc::new(MockScm::new());
let lifecycle = Arc::new(
LifecycleManager::new(sessions.clone(), runtime, agent)
.with_scm(scm.clone() as Arc<dyn Scm>),
);
(lifecycle, sessions, scm, base)
}
pub(crate) fn script_ready_pr(scm: &MockScm, pr_number: u32) {
scm.set_pr(Some(fake_pr(pr_number, "ao-s1")));
scm.set_state(PrState::Open);
scm.set_ci(CiStatus::Passing);
scm.set_review(ReviewDecision::Approved);
scm.set_readiness(MergeReadiness {
mergeable: true,
ci_passing: true,
approved: true,
no_conflicts: true,
blockers: vec![],
});
}
pub(crate) async fn setup_with_scm_and_auto_merge_engine(
label: &str,
retries: Option<u32>,
) -> (
Arc<LifecycleManager>,
Arc<SessionManager>,
Arc<MockScm>,
Arc<ReactionEngine>,
PathBuf,
) {
use crate::reactions::ReactionConfig;
use crate::session_manager::SessionManager;
let base = unique_temp_dir(label);
let sessions = Arc::new(SessionManager::new(base.clone()));
let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
let scm = Arc::new(MockScm::new());
let lifecycle = LifecycleManager::new(sessions.clone(), runtime, agent);
let mut cfg = ReactionConfig::new(ReactionAction::AutoMerge);
cfg.retries = retries;
let mut map = std::collections::HashMap::new();
map.insert("approved-and-green".into(), cfg);
let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
let engine = Arc::new(
ReactionEngine::new(map, engine_runtime, lifecycle.events_sender())
.with_scm(scm.clone() as Arc<dyn Scm>),
);
let lifecycle = Arc::new(
lifecycle
.with_reaction_engine(engine.clone())
.with_scm(scm.clone() as Arc<dyn Scm>),
);
(lifecycle, sessions, scm, engine, base)
}
pub(crate) async fn setup_with_merge_conflicts_engine(
label: &str,
) -> (
Arc<LifecycleManager>,
Arc<SessionManager>,
Arc<MockScm>,
Arc<MockRuntime>,
Arc<ReactionEngine>,
PathBuf,
) {
use crate::reactions::ReactionConfig;
use crate::session_manager::SessionManager;
let base = unique_temp_dir(label);
let sessions = Arc::new(SessionManager::new(base.clone()));
let runtime = Arc::new(MockRuntime::new(true));
let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
let scm = Arc::new(MockScm::new());
let lifecycle =
LifecycleManager::new(sessions.clone(), runtime.clone() as Arc<dyn Runtime>, agent);
let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
cfg.message = Some("please rebase".into());
let mut map = std::collections::HashMap::new();
map.insert("merge-conflicts".into(), cfg);
let engine_runtime: Arc<dyn Runtime> = runtime.clone() as Arc<dyn Runtime>;
let engine = Arc::new(ReactionEngine::new(
map,
engine_runtime,
lifecycle.events_sender(),
));
let lifecycle = Arc::new(
lifecycle
.with_reaction_engine(engine.clone())
.with_scm(scm.clone() as Arc<dyn Scm>),
);
(lifecycle, sessions, scm, runtime, engine, base)
}
pub(crate) async fn setup_stuck(
label: &str,
threshold: Option<&str>,
) -> (
Arc<LifecycleManager>,
Arc<SessionManager>,
Arc<MockAgent>,
PathBuf,
) {
use crate::reactions::ReactionConfig;
use crate::session_manager::SessionManager;
let base = unique_temp_dir(label);
let sessions = Arc::new(SessionManager::new(base.clone()));
let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
let agent = Arc::new(MockAgent::new(ActivityState::Idle));
let lifecycle =
LifecycleManager::new(sessions.clone(), runtime, agent.clone() as Arc<dyn Agent>);
let mut cfg = ReactionConfig::new(ReactionAction::Notify);
cfg.message = Some("stuck!".into());
cfg.threshold = threshold.map(String::from);
let mut map = std::collections::HashMap::new();
map.insert("agent-stuck".into(), cfg);
let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
let engine = Arc::new(ReactionEngine::new(
map,
engine_runtime,
lifecycle.events_sender(),
));
let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine));
(lifecycle, sessions, agent, base)
}
pub(crate) fn build_engine_with_ci_failed(
lifecycle: &LifecycleManager,
message: &str,
) -> Arc<ReactionEngine> {
use crate::reactions::ReactionConfig;
let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
cfg.message = Some(message.into());
let mut map = std::collections::HashMap::new();
map.insert("ci-failed".into(), cfg);
let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
Arc::new(ReactionEngine::new(map, runtime, lifecycle.events_sender()))
}
}