#[cfg(feature = "test-utils")]
pub fn run_pipeline_with_effect_handler<'ctx, H>(
ctx: &PipelineContext,
effect_handler: &mut H,
) -> anyhow::Result<()>
where
H: crate::reducer::EffectHandler<'ctx> + crate::app::StatefulHandler,
{
use crate::app::config::{
create_initial_state_with_config, overlay_checkpoint_progress_onto_base_state,
EventLoopConfig,
};
use crate::app::finalization::finalize_pipeline;
use crate::app::resume::{handle_resume_with_validation, offer_resume_if_checkpoint_exists};
use crate::app::runner::pipeline_execution::write_defensive_completion_marker;
use crate::app::MAX_EVENT_LOOP_ITERATIONS;
use crate::checkpoint::{save_checkpoint_with_workspace, CheckpointBuilder, RunContext};
use crate::files::update_status_with_workspace;
use crate::pipeline::{AgentPhaseGuard, Timer};
use crate::reducer::PipelineState;
let resume_result = offer_resume_if_checkpoint_exists(
&ctx.args,
&ctx.config,
&ctx.registry,
&ctx.logger,
&ctx.developer_agent,
&ctx.reviewer_agent,
&*ctx.workspace,
);
let resume_result = match resume_result {
Some(result) => Some(result),
None => handle_resume_with_validation(
&ctx.args,
&ctx.config,
&ctx.registry,
&ctx.logger,
&ctx.developer_display,
&ctx.reviewer_display,
&*ctx.workspace,
)?,
};
let resume_checkpoint = resume_result.map(|r| r.checkpoint);
let run_context = resume_checkpoint
.as_ref()
.map_or_else(RunContext::new, RunContext::from_checkpoint);
let config = resume_checkpoint.as_ref().map_or_else(
|| ctx.config.clone(),
|checkpoint| {
use crate::checkpoint::apply_checkpoint_to_config;
let mut restored_config = ctx.config.clone();
apply_checkpoint_to_config(&mut restored_config, checkpoint);
restored_config
},
);
let mut git_helpers = crate::git_helpers::GitHelpers::new();
#[cfg(feature = "test-utils")]
{
use crate::git_helpers::{
cleanup_orphaned_marker_with_workspace, create_marker_with_workspace,
};
cleanup_orphaned_marker_with_workspace(&*ctx.workspace, &ctx.logger)?;
create_marker_with_workspace(&*ctx.workspace)?;
}
#[cfg(not(feature = "test-utils"))]
{
cleanup_orphaned_marker(&ctx.logger)?;
start_agent_phase(&mut git_helpers)?;
}
let mut agent_phase_guard =
AgentPhaseGuard::new(&mut git_helpers, &ctx.logger, &*ctx.workspace);
print_welcome_banner(ctx.colors, &ctx.developer_display, &ctx.reviewer_display);
print_pipeline_info_with_config(ctx, &config);
validate_prompt_and_setup_backup(ctx)?;
let mut prompt_monitor = setup_prompt_monitor(ctx);
let (_project_stack, review_guidelines) =
detect_project_stack(&config, &ctx.repo_root, &ctx.logger, ctx.colors);
print_review_guidelines(ctx, review_guidelines.as_ref());
println!();
let cloud_reporter = crate::cloud::NoopCloudReporter;
let mut timer = Timer::new();
let mut phase_ctx = create_phase_context_with_config(
ctx,
&config,
&mut timer,
review_guidelines.as_ref(),
&run_context,
resume_checkpoint.as_ref(),
&cloud_reporter,
);
save_start_commit_or_warn(ctx);
let initial_phase = resume_checkpoint
.as_ref()
.map_or(PipelinePhase::Planning, |checkpoint| checkpoint.phase);
let initial_prompt_history: std::collections::HashMap<
String,
crate::prompts::PromptHistoryEntry,
> = resume_checkpoint
.as_ref()
.and_then(|c| c.prompt_history.clone())
.unwrap_or_default();
setup_interrupt_context_for_pipeline(
initial_phase,
config.developer_iters,
config.reviewer_reviews,
&phase_ctx.execution_history,
&initial_prompt_history,
&run_context,
std::sync::Arc::clone(&ctx.workspace),
);
let _interrupt_guard = defer_clear_interrupt_context();
let should_run_rebase = ctx.args.rebase_flags.with_rebase;
update_interrupt_context_from_phase(
&phase_ctx.execution_history,
initial_prompt_history,
initial_phase,
config.developer_iters,
config.reviewer_reviews,
&run_context,
std::sync::Arc::clone(&ctx.workspace),
);
let mut initial_state = if let Some(ref checkpoint) = resume_checkpoint {
let base_state = create_initial_state_with_config(&phase_ctx);
let migrated = PipelineState::from_checkpoint_with_execution_history_limit(
checkpoint.clone(),
phase_ctx.config.execution_history_limit,
);
overlay_checkpoint_progress_onto_base_state(
base_state,
migrated,
phase_ctx.config.execution_history_limit,
)
} else {
create_initial_state_with_config(&phase_ctx)
};
if should_run_rebase {
if matches!(
initial_state.rebase,
crate::reducer::state::RebaseState::NotStarted
) {
let default_branch =
crate::git_helpers::get_default_branch().unwrap_or_else(|_| "main".to_string());
initial_state.rebase = crate::reducer::state::RebaseState::InProgress {
original_head: "HEAD".to_string(),
target_branch: default_branch,
};
}
} else if matches!(
initial_state.rebase,
crate::reducer::state::RebaseState::NotStarted
) {
initial_state.rebase = crate::reducer::state::RebaseState::Skipped;
}
let event_loop_config = EventLoopConfig {
max_iterations: MAX_EVENT_LOOP_ITERATIONS,
};
effect_handler.update_state(initial_state.clone());
let loop_result = {
use crate::app::core::run_event_loop_with_handler;
let phase_ctx_ref = &mut phase_ctx;
run_event_loop_with_handler(
phase_ctx_ref,
Some(initial_state),
event_loop_config,
effect_handler,
)
};
let loop_result = loop_result?;
if loop_result.completed {
ctx.logger
.success("Pipeline completed successfully via reducer event loop");
ctx.logger.info(&format!(
"Total events processed: {}",
loop_result.events_processed
));
} else {
ctx.logger.warn("Pipeline exited without completion marker");
write_defensive_completion_marker(&*ctx.workspace, &ctx.logger, loop_result.final_phase);
}
if config.features.checkpoint_enabled
&& should_write_complete_checkpoint(loop_result.final_phase)
{
let builder = CheckpointBuilder::new()
.phase(
PipelinePhase::Complete,
config.developer_iters,
config.developer_iters,
)
.reviewer_pass(config.reviewer_reviews, config.reviewer_reviews)
.capture_from_context(
&config,
&ctx.registry,
&ctx.developer_agent,
&ctx.reviewer_agent,
&ctx.logger,
&run_context,
)
.with_executor_from_context(std::sync::Arc::clone(&ctx.executor));
let builder = builder
.with_execution_history(phase_ctx.execution_history.clone())
.with_prompt_history(loop_result.final_state.prompt_history.clone());
if let Some(checkpoint) = builder.build_with_workspace(&*ctx.workspace) {
let mut checkpoint = checkpoint;
checkpoint.dev_fix_attempt_count = loop_result.final_state.dev_fix_attempt_count;
checkpoint.recovery_epoch = loop_result.final_state.recovery_epoch;
checkpoint.recovery_escalation_level =
loop_result.final_state.recovery_escalation_level;
checkpoint.failed_phase_for_recovery =
loop_result.final_state.failed_phase_for_recovery;
checkpoint.interrupted_by_user = loop_result.final_state.interrupted_by_user;
let _ = save_checkpoint_with_workspace(&*ctx.workspace, &checkpoint);
}
}
check_prompt_restoration(ctx, &mut prompt_monitor, "event loop");
update_status_with_workspace(&*ctx.workspace, "In progress.", config.isolation_mode)?;
finalize_pipeline(
&mut agent_phase_guard,
crate::app::finalization::FinalizeContext {
logger: &ctx.logger,
colors: ctx.colors,
config: &config,
timer: &timer,
workspace: &*ctx.workspace,
},
&loop_result.final_state,
prompt_monitor,
);
Ok(())
}