use crate::agent::{AgentRunner, OutputLine};
use crate::ai_command_runner::AiCommandRunner;
use crate::config::OrchestratorConfig;
use crate::error::Result;
use crate::execution::apply as common_apply;
use crate::hooks::{HookContext, HookRunner, HookType};
use crate::openspec::{self, Change};
use crate::orchestration::{
acceptance_test_streaming, archive_change, execute_rejection_flow,
handle_resume_apply_from_rejecting, run_rejection_review, AcceptanceResult, ArchiveContext,
ArchiveResult, OutputHandler, RejectionReviewVerdict,
};
use crate::stall::{StallDetector, StallPhase};
use crate::task_parser;
use crate::task_parser::TaskProgress;
use crate::vcs::VcsBackend;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
pub struct SerialRunService {
config: OrchestratorConfig,
repo_root: PathBuf,
apply_counts: HashMap<String, u32>,
current_change_id: Option<String>,
completed_change_ids: HashSet<String>,
stalled_change_ids: HashSet<String>,
stall_detector: StallDetector,
changes_processed: usize,
iteration: u32,
}
impl SerialRunService {
pub fn new(repo_root: PathBuf, config: OrchestratorConfig) -> Self {
let stall_config = config.get_stall_detection();
Self {
config,
repo_root,
apply_counts: HashMap::new(),
current_change_id: None,
completed_change_ids: HashSet::new(),
stalled_change_ids: HashSet::new(),
stall_detector: StallDetector::new(stall_config),
changes_processed: 0,
iteration: 0,
}
}
#[allow(dead_code)] pub fn repo_root(&self) -> &PathBuf {
&self.repo_root
}
#[allow(dead_code)] pub fn iteration(&self) -> u32 {
self.iteration
}
#[allow(dead_code)] pub fn changes_processed(&self) -> usize {
self.changes_processed
}
#[allow(dead_code)] pub fn current_change_id(&self) -> Option<&String> {
self.current_change_id.as_ref()
}
pub fn apply_count(&self, change_id: &str) -> u32 {
*self.apply_counts.get(change_id).unwrap_or(&0)
}
fn increment_apply_count(&mut self, change_id: &str) {
let count = self.apply_counts.entry(change_id.to_string()).or_insert(0);
*count += 1;
}
pub fn is_stalled(&self, change_id: &str) -> bool {
self.stalled_change_ids.contains(change_id)
}
pub fn is_completed(&self, change_id: &str) -> bool {
self.completed_change_ids.contains(change_id)
}
pub fn select_next_change<'a>(&self, changes: &'a [Change]) -> Option<&'a Change> {
let eligible: Vec<_> = changes
.iter()
.filter(|c| !self.is_completed(&c.id) && !self.is_stalled(&c.id))
.collect();
let filtered: Vec<_> = eligible
.iter()
.filter(|c| {
!c.dependencies
.iter()
.any(|dep| self.stalled_change_ids.contains(dep))
})
.copied()
.collect();
if filtered.is_empty() {
return None;
}
let incomplete: Vec<_> = filtered.iter().filter(|c| !c.is_complete()).collect();
if !incomplete.is_empty() {
return incomplete
.into_iter()
.max_by(|a, b| {
let a_progress = if a.total_tasks > 0 {
a.completed_tasks as f32 / a.total_tasks as f32
} else {
0.0
};
let b_progress = if b.total_tasks > 0 {
b.completed_tasks as f32 / b.total_tasks as f32
} else {
0.0
};
a_progress
.partial_cmp(&b_progress)
.unwrap_or(std::cmp::Ordering::Equal)
})
.copied();
}
filtered.first().copied()
}
pub fn mark_stalled(&mut self, change_id: &str, reason: &str) {
warn!("Marking {} as stalled: {}", change_id, reason);
self.stalled_change_ids.insert(change_id.to_string());
}
#[allow(clippy::too_many_arguments)]
pub async fn process_change<O: OutputHandler, F, G>(
&mut self,
change: &Change,
agent: &mut AgentRunner,
ai_runner: &AiCommandRunner,
hooks: &HookRunner,
output: &O,
total_changes: usize,
remaining_changes: usize,
cancel_check: F,
is_single_change_stopped: G,
operation_tracker: Option<std::sync::Arc<std::sync::RwLock<String>>>,
) -> Result<ChangeProcessResult>
where
F: Fn() -> bool + Clone + Send + 'static,
G: Fn() -> bool + Clone,
{
self.iteration += 1;
let change_id = &change.id;
let is_new_change = self.current_change_id.as_ref() != Some(change_id);
if is_new_change {
let change_start_context = HookContext::new(
self.changes_processed,
total_changes,
remaining_changes,
false,
)
.with_change(change_id, change.completed_tasks, change.total_tasks)
.with_apply_count(0);
hooks
.run_hook(HookType::OnChangeStart, &change_start_context)
.await?;
self.current_change_id = Some(change_id.clone());
}
let apply_count = self.apply_count(change_id);
if change.is_complete() {
self.archive_change_internal(
change,
agent,
ai_runner,
hooks,
output,
total_changes,
remaining_changes,
apply_count,
operation_tracker,
)
.await
} else {
self.apply_change_internal(
change,
agent,
ai_runner,
hooks,
output,
total_changes,
remaining_changes,
apply_count,
&cancel_check,
&is_single_change_stopped,
operation_tracker,
)
.await
}
}
#[allow(clippy::too_many_arguments)]
async fn archive_change_internal<O: OutputHandler>(
&mut self,
change: &Change,
agent: &mut AgentRunner,
ai_runner: &AiCommandRunner,
hooks: &HookRunner,
output: &O,
total_changes: usize,
remaining_changes: usize,
apply_count: u32,
operation_tracker: Option<std::sync::Arc<std::sync::RwLock<String>>>,
) -> Result<ChangeProcessResult> {
info!("Change {} is complete, archiving...", change.id);
Self::update_operation_tracker(&operation_tracker, "archive");
let archive_ctx = ArchiveContext::new(
self.changes_processed,
total_changes,
remaining_changes,
apply_count,
);
let stall_config = self.config.get_stall_detection();
match archive_change(
change,
agent,
ai_runner,
hooks,
&archive_ctx,
output,
None,
&stall_config,
)
.await
{
Ok(ArchiveResult::Success) => {
self.changes_processed += 1;
agent.clear_acceptance_history(&change.id);
let new_remaining = remaining_changes.saturating_sub(1);
let change_end_context =
HookContext::new(self.changes_processed, total_changes, new_remaining, false)
.with_change(&change.id, change.completed_tasks, change.total_tasks)
.with_apply_count(apply_count);
hooks
.run_hook(HookType::OnChangeEnd, &change_end_context)
.await?;
let merged_context =
HookContext::new(self.changes_processed, total_changes, new_remaining, false)
.with_change(&change.id, change.completed_tasks, change.total_tasks)
.with_apply_count(apply_count);
hooks.run_hook(HookType::OnMerged, &merged_context).await?;
self.completed_change_ids.insert(change.id.clone());
self.current_change_id = None;
self.apply_counts.remove(&change.id);
self.stall_detector.clear_change(&change.id);
Ok(ChangeProcessResult::Archived)
}
Ok(ArchiveResult::Stalled { error }) => {
self.mark_stalled(&change.id, &error);
Ok(ChangeProcessResult::Stalled { error })
}
Ok(ArchiveResult::Failed { error }) => Ok(ChangeProcessResult::Failed { error }),
Ok(ArchiveResult::Cancelled) => Ok(ChangeProcessResult::Cancelled),
Err(e) => Err(e),
}
}
#[allow(clippy::too_many_arguments)]
async fn apply_change_internal<O: OutputHandler, F, G>(
&mut self,
change: &Change,
agent: &mut AgentRunner,
ai_runner: &AiCommandRunner,
hooks: &HookRunner,
output: &O,
total_changes: usize,
remaining_changes: usize,
_apply_count: u32,
cancel_check: &F,
is_single_change_stopped: &G,
operation_tracker: Option<std::sync::Arc<std::sync::RwLock<String>>>,
) -> Result<ChangeProcessResult>
where
F: Fn() -> bool + Clone + Send + 'static,
G: Fn() -> bool + Clone,
{
info!("Applying change: {}", change.id);
let event_handler = SerialApplyEventHandler::new(output);
let hook_ctx = common_apply::ApplyLoopHookContext::serial(
self.changes_processed,
total_changes,
remaining_changes,
);
let cancel_token = CancellationToken::new();
let cancel_token_for_task = cancel_token.clone();
let cancel_check_clone = cancel_check.clone();
let cancel_task = tokio::spawn(async move {
loop {
if cancel_check_clone() {
cancel_token_for_task.cancel();
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
let apply_result = match common_apply::execute_apply_loop(
&change.id,
&self.repo_root,
&self.config,
agent,
VcsBackend::Git,
None, Some(hooks),
&hook_ctx,
&event_handler,
Some(&cancel_token), ai_runner,
|line| async move {
match &line {
OutputLine::Stdout(s) => output.on_stdout(s),
OutputLine::Stderr(s) => output.on_agent_stderr(s),
}
},
)
.await
{
Ok(result) => result,
Err(crate::error::OrchestratorError::PermissionBlocked {
denied_path,
guidance,
}) => {
cancel_task.abort();
let error_message = format!(
"Permission auto-rejected for: {}\n{}",
denied_path, guidance
);
self.mark_stalled(&change.id, &error_message);
return Ok(ChangeProcessResult::Stalled {
error: error_message,
});
}
Err(e) => {
cancel_task.abort();
return Err(e);
}
};
cancel_task.abort();
let apply_blocked_handoff = apply_result.blocked_handoff.clone();
if apply_result.completed || apply_blocked_handoff.is_some() {
if apply_result.completed {
info!(
"Apply loop completed for {} after {} iterations",
change.id, apply_result.iterations
);
} else if let Some(ref handoff) = apply_blocked_handoff {
warn!(
change_id = %change.id,
rejected_path = %handoff.rejected_path.display(),
iterations = apply_result.iterations,
"Apply blocked handoff detected; proceeding to rejecting review with incomplete tasks"
);
}
self.increment_apply_count(&change.id);
let (updated_change, is_complete) = Self::refetch_change_after_apply(&change.id);
if is_complete || apply_blocked_handoff.is_some() {
let updated_change = updated_change.unwrap_or_else(|| change.clone());
if let Some(ref handoff) = apply_blocked_handoff {
warn!(
change_id = %change.id,
rejected_path = %handoff.rejected_path.display(),
"Running rejecting review for apply-blocked handoff with unchecked tasks"
);
Self::update_operation_tracker(&operation_tracker, "rejecting");
match run_rejection_review(&change.id, &self.repo_root, &self.config, ai_runner)
.await?
{
RejectionReviewVerdict::Confirm => {
let reason = format!(
"Apply-blocked rejection confirmed by rejecting review (proposal: {})",
handoff.rejected_path.display()
);
let base_branch =
crate::vcs::git::commands::get_current_branch(&self.repo_root)
.await
.map_err(crate::error::OrchestratorError::from_vcs_error)?
.unwrap_or_else(|| "main".to_string());
execute_rejection_flow(
&change.id,
&reason,
&self.repo_root,
&base_branch,
&self.repo_root,
)
.await?;
Ok(ChangeProcessResult::Rejected { reason })
}
RejectionReviewVerdict::Resume => {
handle_resume_apply_from_rejecting(&change.id, &self.repo_root).await?;
Ok(ChangeProcessResult::ApplySuccessIncomplete)
}
}
} else {
info!(
"Tasks complete for {}, running acceptance test...",
change.id
);
Self::update_operation_tracker(&operation_tracker, "acceptance");
match acceptance_test_streaming(
&updated_change,
agent,
ai_runner,
&self.config,
output,
cancel_check,
)
.await
{
Ok((AcceptanceResult::Blocked, _attempt_number, _command)) => {
let reason = "Implementation blocker detected".to_string();
let base_branch =
crate::vcs::git::commands::get_current_branch(&self.repo_root)
.await
.map_err(crate::error::OrchestratorError::from_vcs_error)?
.unwrap_or_else(|| "main".to_string());
execute_rejection_flow(
&change.id,
&reason,
&self.repo_root,
&base_branch,
&self.repo_root,
)
.await?;
Ok(ChangeProcessResult::Rejected { reason })
}
Ok((result, _attempt_number, _command)) => Ok(self
.process_acceptance_result(
&change.id,
&self.repo_root,
agent,
result,
is_single_change_stopped,
)),
Err(e) => {
error!("Acceptance error for {}: {}", change.id, e);
Err(e)
}
}
}
} else {
info!(
"Apply completed for {}, but tasks not yet complete",
change.id
);
Ok(ChangeProcessResult::ApplySuccessIncomplete)
}
} else {
error!(
"Apply loop did not complete for {} after {} iterations",
change.id, apply_result.iterations
);
Ok(ChangeProcessResult::ApplyFailed {
error: format!(
"Apply loop did not complete after {} iterations",
apply_result.iterations
),
})
}
}
pub fn check_stall_after_apply(
&mut self,
change_id: &str,
progress: &TaskProgress,
is_empty_commit: Option<bool>,
) -> Option<String> {
if let Some(is_empty) = is_empty_commit {
if !is_progress_complete(progress)
&& self
.stall_detector
.register_commit(change_id, StallPhase::Apply, is_empty)
{
let count = self
.stall_detector
.current_count(change_id, StallPhase::Apply);
let threshold = self.stall_detector.config().threshold;
let message = format!(
"Stall detected for {} after {} empty WIP commits (apply)",
change_id, count
);
return Some(format!("{} (threshold {})", message, threshold));
}
}
None
}
fn refetch_change_after_apply(change_id: &str) -> (Option<Change>, bool) {
let updated_changes = openspec::list_changes_native().unwrap_or_default();
let updated_change = updated_changes.iter().find(|c| c.id == change_id).cloned();
let is_complete = updated_change.as_ref().is_some_and(|c| c.is_complete());
(updated_change, is_complete)
}
fn process_acceptance_result<F>(
&self,
change_id: &str,
workspace_path: &std::path::Path,
agent: &AgentRunner,
acceptance_result: AcceptanceResult,
is_single_change_stopped: F,
) -> ChangeProcessResult
where
F: Fn() -> bool,
{
match acceptance_result {
AcceptanceResult::Pass => {
info!("Acceptance passed for {}, ready for archive", change_id);
ChangeProcessResult::AcceptancePassed
}
AcceptanceResult::Continue => {
let continue_count = agent.count_consecutive_acceptance_continues(change_id);
let max_continues = self.config.get_acceptance_max_continues();
if continue_count >= max_continues {
warn!(
"Acceptance CONTINUE limit ({}) exceeded for {}, treating as FAIL",
max_continues, change_id
);
ChangeProcessResult::AcceptanceContinueExceeded
} else {
info!(
"Acceptance requires continuation for {} (attempt {}/{}), retrying...",
change_id, continue_count, max_continues
);
ChangeProcessResult::AcceptanceContinue
}
}
AcceptanceResult::Blocked => {
warn!(
"Acceptance blocked for {} - implementation blocker detected",
change_id
);
ChangeProcessResult::ApplyFailed {
error: "Blocked acceptance reached unexpected fallback path".to_string(),
}
}
AcceptanceResult::Fail { findings } => {
let blocking_gate_context = findings
.first()
.cloned()
.unwrap_or_else(|| "no acceptance findings captured".to_string());
warn!(
"Acceptance failed for {} ({} findings), blocking gate context: {}; will retry apply",
change_id,
findings.len(),
blocking_gate_context
);
let tasks_path = workspace_path
.join("openspec")
.join("changes")
.join(change_id)
.join("tasks.md");
if let Err(err) = task_parser::record_acceptance_follow_up(
&tasks_path,
agent.next_acceptance_attempt_number(change_id),
&findings,
) {
return ChangeProcessResult::AcceptanceCommandFailed {
error: format!(
"Failed to record acceptance follow-up tasks in {}: {}",
tasks_path.display(),
err
),
};
}
ChangeProcessResult::AcceptanceFailed { findings }
}
AcceptanceResult::CommandFailed {
error,
findings: _findings,
} => {
error!("Acceptance command failed for {}: {}", change_id, error);
ChangeProcessResult::AcceptanceCommandFailed { error }
}
AcceptanceResult::Cancelled => {
if is_single_change_stopped() {
info!("Single change {} stopped during acceptance", change_id);
ChangeProcessResult::ChangeStopped
} else {
info!("Acceptance cancelled for {} (global cancel)", change_id);
ChangeProcessResult::Cancelled
}
}
}
}
fn update_operation_tracker(
operation_tracker: &Option<std::sync::Arc<std::sync::RwLock<String>>>,
operation: &str,
) {
if let Some(ref tracker) = operation_tracker {
*tracker.write().unwrap() = operation.to_string();
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum ChangeProcessResult {
Archived,
Stalled { error: String },
Failed { error: String },
Cancelled,
ChangeStopped,
ApplySuccessIncomplete,
ApplyFailed { error: String },
AcceptancePassed,
AcceptanceFailed { findings: Vec<String> },
AcceptanceCommandFailed { error: String },
AcceptanceContinue,
AcceptanceContinueExceeded,
Rejected { reason: String },
}
fn is_progress_complete(progress: &TaskProgress) -> bool {
progress.total > 0 && progress.completed >= progress.total
}
struct SerialApplyEventHandler<'a, O: OutputHandler> {
#[allow(dead_code)] output: &'a O,
}
impl<'a, O: OutputHandler> SerialApplyEventHandler<'a, O> {
fn new(output: &'a O) -> Self {
Self { output }
}
}
impl<'a, O: OutputHandler> common_apply::ApplyEventHandler for SerialApplyEventHandler<'a, O> {
fn on_apply_started(&self, _change_id: &str, _command: &str) {
}
fn on_progress_updated(&self, _change_id: &str, _completed: u32, _total: u32) {
}
fn on_hook_started(&self, _change_id: &str, _hook_type: &str) {
}
fn on_hook_completed(&self, _change_id: &str, _hook_type: &str) {
}
fn on_hook_failed(&self, _change_id: &str, _hook_type: &str, _error: &str) {
}
fn on_apply_output(&self, _change_id: &str, _line: &OutputLine, _iteration: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::OrchestratorConfig;
use crate::openspec::ProposalMetadata;
use tempfile::TempDir;
fn create_test_change(id: &str, completed: u32, total: u32) -> Change {
Change {
id: id.to_string(),
completed_tasks: completed,
total_tasks: total,
last_modified: "1m ago".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
#[test]
fn test_select_next_change_prioritizes_progress() {
let temp_dir = TempDir::new().unwrap();
let service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("a", 1, 10), create_test_change("b", 5, 10), create_test_change("c", 8, 10), ];
let next = service.select_next_change(&changes);
assert_eq!(next.map(|c| c.id.as_str()), Some("c"));
}
#[test]
fn test_select_next_change_excludes_stalled() {
let temp_dir = TempDir::new().unwrap();
let mut service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
service.mark_stalled("b", "test");
let changes = vec![
create_test_change("a", 1, 10),
create_test_change("b", 8, 10), create_test_change("c", 5, 10),
];
let next = service.select_next_change(&changes);
assert_eq!(next.map(|c| c.id.as_str()), Some("c")); }
#[test]
fn test_select_next_change_prioritizes_complete_for_archive() {
let temp_dir = TempDir::new().unwrap();
let service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("a", 5, 10), create_test_change("b", 10, 10), ];
let next = service.select_next_change(&changes);
assert_eq!(next.map(|c| c.id.as_str()), Some("a"));
}
#[test]
fn test_process_acceptance_result_archive_readiness_fail_blocks_archive_progression() {
use crate::agent::AgentRunner;
use crate::orchestration::AcceptanceResult;
let temp_dir = TempDir::new().unwrap();
let service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let agent = AgentRunner::new(OrchestratorConfig::default());
let findings = vec![
"blocking gate: cargo clippy -- -D warnings".to_string(),
"src/orchestration/archive.rs:459".to_string(),
];
let change_dir = temp_dir
.path()
.join("openspec")
.join("changes")
.join("test-change");
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [x] done\n",
)
.unwrap();
let result = service.process_acceptance_result(
"test-change",
temp_dir.path(),
&agent,
AcceptanceResult::Fail {
findings: findings.clone(),
},
|| false,
);
assert!(matches!(
result,
ChangeProcessResult::AcceptanceFailed { findings: returned }
if returned == findings
));
}
#[test]
fn test_process_acceptance_result_archive_readiness_pass_allows_archive_progression() {
use crate::agent::AgentRunner;
use crate::orchestration::AcceptanceResult;
let temp_dir = TempDir::new().unwrap();
let service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let agent = AgentRunner::new(OrchestratorConfig::default());
let result = service.process_acceptance_result(
"test-change",
temp_dir.path(),
&agent,
AcceptanceResult::Pass,
|| false,
);
assert!(matches!(result, ChangeProcessResult::AcceptancePassed));
}
#[test]
fn test_process_acceptance_result_blocked_returns_fallback_error_variant() {
use crate::agent::AgentRunner;
use crate::orchestration::AcceptanceResult;
let temp_dir = TempDir::new().unwrap();
let service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let agent = AgentRunner::new(OrchestratorConfig::default());
let result = service.process_acceptance_result(
"test-change",
temp_dir.path(),
&agent,
AcceptanceResult::Blocked,
|| false, );
assert!(matches!(
result,
ChangeProcessResult::ApplyFailed { ref error }
if error == "Blocked acceptance reached unexpected fallback path"
));
}
#[test]
fn test_mark_stalled_prevents_reselection() {
let temp_dir = TempDir::new().unwrap();
let mut service =
SerialRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("a", 5, 10),
create_test_change("b", 8, 10), ];
let next = service.select_next_change(&changes);
assert_eq!(next.map(|c| c.id.as_str()), Some("b"));
service.mark_stalled("b", "Implementation blocker detected");
let next = service.select_next_change(&changes);
assert_eq!(next.map(|c| c.id.as_str()), Some("a"));
assert!(service.is_stalled("b"));
assert!(!service.is_stalled("a"));
}
}