use crate::agents::{AgentDrain, DrainMode};
use crate::reducer::event::{PipelinePhase, ReviewEvent};
use crate::reducer::state::{
AgentChainState, CommitState, ContinuationState, FixStatus, FixValidatedOutcome, PipelineState,
};
fn clear_fix_drain_progress(state: PipelineState) -> PipelineState {
PipelineState {
review_issues_found: false,
fix_prompt_prepared_pass: None,
fix_required_files_cleaned_pass: None,
fix_agent_invoked_pass: None,
fix_analysis_agent_invoked_pass: None,
fix_result_xml_extracted_pass: None,
fix_validated_outcome: None,
fix_result_xml_archived_pass: None,
..state
}
}
fn transition_to_commit_after_fix(
state: PipelineState,
pass: u32,
increment_review_passes_completed: bool,
) -> PipelineState {
let state = clear_fix_drain_progress(state);
PipelineState {
phase: PipelinePhase::CommitMessage,
previous_phase: Some(PipelinePhase::Review),
reviewer_pass: pass,
agent_chain: state.agent_chain.with_mode(DrainMode::Normal),
commit: CommitState::NotStarted,
commit_prompt_prepared: false,
commit_diff_prepared: false,
commit_diff_empty: false,
commit_diff_content_id_sha256: None,
commit_agent_invoked: false,
commit_required_files_cleaned: false,
commit_xml_extracted: false,
commit_validated_outcome: None,
commit_xml_archived: false,
commit_selected_files: Vec::new(),
commit_excluded_files: Vec::new(),
commit_residual_retry_pass: 0,
continuation: state.continuation.reset(),
metrics: if increment_review_passes_completed {
state.metrics.increment_review_passes_completed()
} else {
state.metrics
},
..state
}
}
pub(super) fn reduce_fix_attempt_started(state: PipelineState) -> PipelineState {
PipelineState {
agent_chain: AgentChainState::initial()
.with_max_cycles(state.agent_chain.max_cycles)
.with_backoff_policy(
state.agent_chain.retry_delay_ms,
state.agent_chain.backoff_multiplier,
state.agent_chain.max_backoff_ms,
)
.reset_for_drain(AgentDrain::Fix),
continuation: ContinuationState {
invalid_output_attempts: 0,
fix_continue_pending: false,
xsd_retry_pending: false,
same_agent_retry_pending: false,
same_agent_retry_reason: None,
last_fix_xsd_error: None,
..state.continuation
},
fix_prompt_prepared_pass: None,
fix_required_files_cleaned_pass: None,
fix_agent_invoked_pass: None,
fix_analysis_agent_invoked_pass: None,
fix_result_xml_extracted_pass: None,
fix_validated_outcome: None,
fix_result_xml_archived_pass: None,
..state
}
}
pub(super) fn reduce_fix_prompt_prepared(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
agent_chain: state.agent_chain.with_drain(AgentDrain::Fix),
fix_prompt_prepared_pass: Some(pass),
continuation: ContinuationState {
xsd_retry_pending: false,
xsd_retry_session_reuse_pending: state.continuation.xsd_retry_session_reuse_pending,
same_agent_retry_pending: false,
same_agent_retry_reason: None,
fix_continue_pending: false,
..state.continuation
},
..state
}
}
pub(super) fn reduce_fix_result_xml_cleaned(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
fix_required_files_cleaned_pass: Some(pass),
..state
}
}
pub(super) fn reduce_fix_agent_invoked(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
agent_chain: state.agent_chain.with_drain(AgentDrain::Fix),
fix_agent_invoked_pass: Some(pass),
continuation: ContinuationState {
xsd_retry_pending: false,
xsd_retry_session_reuse_pending: false,
same_agent_retry_pending: false,
same_agent_retry_reason: None,
..state.continuation
},
metrics: state.metrics.increment_fix_runs_total(),
..state
}
}
pub(super) fn reduce_fix_analysis_agent_invoked(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
agent_chain: state.agent_chain.with_drain(AgentDrain::Analysis),
fix_analysis_agent_invoked_pass: Some(pass),
continuation: ContinuationState {
xsd_retry_pending: false,
xsd_retry_session_reuse_pending: false,
same_agent_retry_pending: false,
same_agent_retry_reason: None,
..state.continuation
},
metrics: state.metrics.increment_fix_analysis_runs_total(),
..state
}
}
pub(super) fn reduce_fix_result_xml_extracted(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
fix_result_xml_extracted_pass: Some(pass),
..state
}
}
pub(super) fn reduce_fix_result_xml_validated(
state: PipelineState,
pass: u32,
status: FixStatus,
summary: Option<String>,
) -> PipelineState {
PipelineState {
fix_validated_outcome: Some(FixValidatedOutcome {
pass,
status,
summary,
}),
continuation: ContinuationState {
last_fix_xsd_error: None,
..state.continuation
},
..state
}
}
pub(super) fn reduce_fix_result_xml_archived(state: PipelineState, pass: u32) -> PipelineState {
PipelineState {
fix_result_xml_archived_pass: Some(pass),
..state
}
}
pub(super) fn reduce_fix_outcome_applied(state: PipelineState, pass: u32) -> PipelineState {
let Some(outcome) = state
.fix_validated_outcome
.as_ref()
.filter(|o| o.pass == pass)
else {
return state;
};
let next_event = if outcome.status.needs_continuation() {
let next_attempt = state.continuation.fix_continuation_attempt + 1;
if next_attempt >= state.continuation.max_fix_continue_count {
ReviewEvent::FixContinuationBudgetExhausted {
pass,
total_attempts: next_attempt,
last_status: outcome.status,
}
} else {
ReviewEvent::FixContinuationTriggered {
pass,
status: outcome.status,
summary: outcome.summary.clone(),
}
}
} else {
let changes_made = matches!(outcome.status, FixStatus::AllIssuesAddressed);
ReviewEvent::FixAttemptCompleted { pass, changes_made }
};
super::reduce_review_event(state, next_event)
}
pub(super) fn reduce_fix_attempt_completed(
state: PipelineState,
pass: u32,
_changes_made: bool,
) -> PipelineState {
transition_to_commit_after_fix(state, pass, true)
}
pub(super) fn reduce_fix_continuation_triggered(
state: PipelineState,
pass: u32,
status: FixStatus,
summary: Option<String>,
) -> PipelineState {
PipelineState {
agent_chain: state
.agent_chain
.with_drain(AgentDrain::Fix)
.with_mode(DrainMode::Continuation),
reviewer_pass: pass,
fix_prompt_prepared_pass: None,
fix_required_files_cleaned_pass: None,
fix_agent_invoked_pass: None,
fix_analysis_agent_invoked_pass: None,
fix_result_xml_extracted_pass: None,
fix_validated_outcome: None,
fix_result_xml_archived_pass: None,
continuation: state.continuation.trigger_fix_continuation(status, summary),
metrics: state
.metrics
.increment_fix_continuations_total()
.increment_fix_continuation_attempt(),
..state
}
}
pub(super) fn reduce_fix_continuation_succeeded(
state: PipelineState,
pass: u32,
_total_attempts: u32,
) -> PipelineState {
transition_to_commit_after_fix(state, pass, true)
}
pub(super) fn reduce_fix_continuation_budget_exhausted(
state: PipelineState,
pass: u32,
_total_attempts: u32,
_last_status: FixStatus,
) -> PipelineState {
transition_to_commit_after_fix(state, pass, false)
}
pub(super) fn reduce_fix_output_validation_failed(
state: PipelineState,
pass: u32,
attempt: u32,
error_detail: Option<String>,
) -> PipelineState {
let new_xsd_count = state.continuation.xsd_retry_count + 1;
let will_retry = new_xsd_count < state.continuation.max_xsd_retry_count;
if new_xsd_count >= state.continuation.max_xsd_retry_count {
let new_agent_chain = state
.agent_chain
.with_drain(AgentDrain::Fix)
.switch_to_next_agent()
.clear_session_id();
PipelineState {
phase: PipelinePhase::Review,
reviewer_pass: pass,
agent_chain: new_agent_chain
.with_drain(AgentDrain::Fix)
.with_mode(DrainMode::Normal),
continuation: ContinuationState {
invalid_output_attempts: 0,
xsd_retry_count: 0,
xsd_retry_pending: false,
xsd_retry_session_reuse_pending: false,
last_fix_xsd_error: None,
..state.continuation
},
fix_prompt_prepared_pass: None,
fix_agent_invoked_pass: None,
fix_analysis_agent_invoked_pass: None,
fix_required_files_cleaned_pass: None,
metrics: if will_retry {
state.metrics.increment_xsd_retry_fix()
} else {
state.metrics
},
..state
}
} else {
PipelineState {
phase: PipelinePhase::Review,
reviewer_pass: pass,
agent_chain: state
.agent_chain
.with_drain(AgentDrain::Fix)
.with_mode(DrainMode::XsdRetry),
continuation: ContinuationState {
invalid_output_attempts: attempt + 1,
xsd_retry_count: new_xsd_count,
xsd_retry_pending: true,
xsd_retry_session_reuse_pending: true,
last_fix_xsd_error: error_detail,
..state.continuation
},
fix_prompt_prepared_pass: None,
fix_agent_invoked_pass: None,
fix_analysis_agent_invoked_pass: None,
fix_required_files_cleaned_pass: None,
metrics: if will_retry {
state.metrics.increment_xsd_retry_fix()
} else {
state.metrics
},
..state
}
}
}