#![allow(dead_code)]
use crate::agent::AgentRunner;
use crate::error::{OrchestratorError, Result};
use crate::history::OutputCollector;
use crate::hooks::{HookContext, HookRunner, HookType};
use crate::openspec::Change;
use crate::process_manager::TerminationOutcome;
use std::time::Duration;
use tracing::info;
use super::output::OutputHandler;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ApplyResult {
Success,
Failed { error: String },
Cancelled,
}
impl ApplyResult {
pub fn is_success(&self) -> bool {
matches!(self, ApplyResult::Success)
}
}
#[derive(Debug, Clone)]
pub struct ApplyContext {
pub changes_processed: usize,
pub total_changes: usize,
pub remaining_changes: usize,
pub apply_count: u32,
}
impl ApplyContext {
pub fn new(
changes_processed: usize,
total_changes: usize,
remaining_changes: usize,
apply_count: u32,
) -> Self {
Self {
changes_processed,
total_changes,
remaining_changes,
apply_count,
}
}
}
pub async fn apply_change<O: OutputHandler>(
change: &Change,
agent: &mut AgentRunner,
ai_runner: &crate::ai_command_runner::AiCommandRunner,
hooks: &HookRunner,
context: &ApplyContext,
output: &O,
) -> Result<ApplyResult> {
info!("Applying change: {}", change.id);
let hook_ctx = HookContext::new(
context.changes_processed,
context.total_changes,
context.remaining_changes,
false,
)
.with_change(&change.id, change.completed_tasks, change.total_tasks)
.with_apply_count(context.apply_count);
if let Err(e) = hooks.run_hook(HookType::PreApply, &hook_ctx).await {
output.on_warn(&format!("pre_apply hook failed: {}", e));
return Err(e);
}
output.on_info(&format!("Applying: {}", change.id));
let status = agent.run_apply_with_runner(&change.id, ai_runner).await?;
if !status.success() {
let error_msg = format!("Apply command failed with exit code: {:?}", status.code());
let error_ctx = hook_ctx.clone().with_error(&error_msg);
let _ = hooks.run_hook(HookType::OnError, &error_ctx).await;
output.on_error(&error_msg);
return Ok(ApplyResult::Failed { error: error_msg });
}
if let Err(e) = hooks.run_hook(HookType::PostApply, &hook_ctx).await {
output.on_warn(&format!("post_apply hook failed: {}", e));
return Err(e);
}
info!("Successfully applied: {}", change.id);
output.on_success(&format!("Applied: {}", change.id));
Ok(ApplyResult::Success)
}
pub async fn apply_change_streaming<O, F>(
change: &Change,
agent: &mut AgentRunner,
hooks: &HookRunner,
context: &ApplyContext,
output: &O,
ai_runner: &crate::ai_command_runner::AiCommandRunner,
cancel_check: F,
) -> Result<ApplyResult>
where
O: OutputHandler,
F: Fn() -> bool,
{
use crate::agent::OutputLine;
info!("Applying change (streaming): {}", change.id);
let hook_ctx = HookContext::new(
context.changes_processed,
context.total_changes,
context.remaining_changes,
false,
)
.with_change(&change.id, change.completed_tasks, change.total_tasks)
.with_apply_count(context.apply_count);
if let Err(e) = hooks.run_hook(HookType::PreApply, &hook_ctx).await {
output.on_warn(&format!("pre_apply hook failed: {}", e));
return Err(e);
}
output.on_info(&format!("Applying: {}", change.id));
let (mut child, mut output_rx, start_time, _command) = agent
.run_apply_streaming_with_runner(&change.id, ai_runner, None)
.await?;
let mut output_collector = OutputCollector::new();
loop {
if cancel_check() {
let termination = child.terminate_with_timeout(Duration::from_secs(5)).await;
let message = match termination {
Ok(TerminationOutcome::Exited(_)) => {
"Process terminated due to cancellation".to_string()
}
Ok(TerminationOutcome::ForceKilled(_)) => {
"Process force killed after cancellation timeout".to_string()
}
Ok(TerminationOutcome::TimedOut) => {
"Process still running after force kill timeout".to_string()
}
Err(e) => format!("Failed to terminate process after cancellation: {}", e),
};
output.on_warn(&message);
return Ok(ApplyResult::Cancelled);
}
match output_rx.try_recv() {
Ok(OutputLine::Stdout(s)) => {
output_collector.add_stdout(&s);
output.on_stdout(&s);
}
Ok(OutputLine::Stderr(s)) => {
output_collector.add_stderr(&s);
output.on_agent_stderr(&s);
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
}
}
let status = child.wait().await.map_err(|e| {
OrchestratorError::AgentCommand(format!(
"Failed to wait for apply command for change '{}': {}",
change.id, e
))
})?;
agent.record_apply_attempt(
&change.id,
&status,
start_time,
output_collector.stdout_tail(),
output_collector.stderr_tail(),
);
if !status.success() {
let error_msg = format!(
"Apply command failed for change '{}' with exit code: {:?}",
change.id,
status.code()
);
let error_ctx = hook_ctx.clone().with_error(&error_msg);
let _ = hooks.run_hook(HookType::OnError, &error_ctx).await;
output.on_error(&error_msg);
return Ok(ApplyResult::Failed { error: error_msg });
}
if let Err(e) = hooks.run_hook(HookType::PostApply, &hook_ctx).await {
output.on_warn(&format!("post_apply hook failed: {}", e));
return Err(e);
}
info!("Successfully applied: {}", change.id);
output.on_success(&format!("Applied: {}", change.id));
Ok(ApplyResult::Success)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_apply_result_is_success() {
assert!(ApplyResult::Success.is_success());
assert!(!ApplyResult::Failed {
error: "test".to_string()
}
.is_success());
assert!(!ApplyResult::Cancelled.is_success());
}
#[test]
fn test_apply_context_new() {
let ctx = ApplyContext::new(1, 5, 4, 2);
assert_eq!(ctx.changes_processed, 1);
assert_eq!(ctx.total_changes, 5);
assert_eq!(ctx.remaining_changes, 4);
assert_eq!(ctx.apply_count, 2);
}
}