Skip to main content

ralph_workflow/reducer/handler/
mod.rs

1//
2// This module implements the EffectHandler trait to execute pipeline side effects
3// through the reducer architecture. Effect handlers perform actual work (agent
4// invocation, git operations, file I/O) and emit events.
5//
6// Handler responsibilities vs reducer responsibilities:
7// - Reducer: pure state transitions, policy decisions, phase progression
8// - Handler: effect execution, I/O, cleanup, validation
9//
10// Handlers execute exactly one effect and emit events. They must not perform
11// hidden cleanup, fallback, or retry logic beyond the effect being executed.
12// XML `.processed` files are archives only and are never read as inputs.
13
14mod agent;
15mod analysis;
16mod chain;
17mod checkpoint;
18mod commit;
19mod context;
20mod development;
21mod planning;
22mod rebase;
23mod retry_guidance;
24mod review;
25
26#[cfg(test)]
27mod tests;
28
29use crate::phases::PhaseContext;
30use crate::reducer::effect::{Effect, EffectHandler, EffectResult};
31use crate::reducer::event::{PipelineEvent, PipelinePhase};
32use crate::reducer::state::PipelineState;
33use crate::reducer::ui_event::UIEvent;
34use anyhow::Result;
35
36/// Main effect handler implementation.
37///
38/// This handler executes effects by calling pipeline subsystems and emitting reducer events.
39pub struct MainEffectHandler {
40    /// Current pipeline state
41    pub state: PipelineState,
42    /// Event log for replay/debugging
43    pub event_log: Vec<PipelineEvent>,
44}
45
46impl MainEffectHandler {
47    /// Create a new effect handler.
48    pub fn new(state: PipelineState) -> Self {
49        Self {
50            state,
51            event_log: Vec::new(),
52        }
53    }
54}
55
56impl<'ctx> EffectHandler<'ctx> for MainEffectHandler {
57    fn execute(&mut self, effect: Effect, ctx: &mut PhaseContext<'_>) -> Result<EffectResult> {
58        let result = self.execute_effect(effect, ctx)?;
59        self.event_log.push(result.event.clone());
60        self.event_log
61            .extend(result.additional_events.iter().cloned());
62        Ok(result)
63    }
64}
65
66impl crate::app::event_loop::StatefulHandler for MainEffectHandler {
67    fn update_state(&mut self, state: PipelineState) {
68        self.state = state;
69    }
70}
71
72impl MainEffectHandler {
73    /// Helper to create phase transition UI event.
74    fn phase_transition_ui(&self, to: PipelinePhase) -> UIEvent {
75        UIEvent::PhaseTransition {
76            from: Some(self.state.phase),
77            to,
78        }
79    }
80
81    fn write_completion_marker(ctx: &PhaseContext<'_>, content: &str, is_failure: bool) -> bool {
82        let marker_dir = std::path::Path::new(".agent/tmp");
83        if let Err(err) = ctx.workspace.create_dir_all(marker_dir) {
84            ctx.logger.warn(&format!(
85                "Failed to create completion marker directory: {}",
86                err
87            ));
88        }
89
90        let marker_path = std::path::Path::new(".agent/tmp/completion_marker");
91        match ctx.workspace.write(marker_path, content) {
92            Ok(()) => {
93                ctx.logger.info(&format!(
94                    "Completion marker written: {}",
95                    if is_failure { "failure" } else { "success" }
96                ));
97                true
98            }
99            Err(err) => {
100                ctx.logger
101                    .warn(&format!("Failed to write completion marker: {}", err));
102                false
103            }
104        }
105    }
106
107    fn execute_effect(
108        &mut self,
109        effect: Effect,
110        ctx: &mut PhaseContext<'_>,
111    ) -> Result<EffectResult> {
112        match effect {
113            Effect::AgentInvocation {
114                role,
115                agent,
116                model,
117                prompt,
118            } => self.invoke_agent(ctx, role, agent, model, prompt),
119
120            Effect::InitializeAgentChain { role } => self.initialize_agent_chain(ctx, role),
121
122            Effect::PreparePlanningPrompt {
123                iteration,
124                prompt_mode,
125            } => self.prepare_planning_prompt(ctx, iteration, prompt_mode),
126
127            Effect::MaterializePlanningInputs { iteration } => {
128                self.materialize_planning_inputs(ctx, iteration)
129            }
130
131            Effect::CleanupPlanningXml { iteration } => self.cleanup_planning_xml(ctx, iteration),
132
133            Effect::InvokePlanningAgent { iteration } => self.invoke_planning_agent(ctx, iteration),
134
135            Effect::ExtractPlanningXml { iteration } => self.extract_planning_xml(ctx, iteration),
136
137            Effect::ValidatePlanningXml { iteration } => self.validate_planning_xml(ctx, iteration),
138
139            Effect::WritePlanningMarkdown { iteration } => {
140                self.write_planning_markdown(ctx, iteration)
141            }
142
143            Effect::ArchivePlanningXml { iteration } => self.archive_planning_xml(ctx, iteration),
144
145            Effect::ApplyPlanningOutcome { iteration, valid } => {
146                self.apply_planning_outcome(ctx, iteration, valid)
147            }
148
149            Effect::PrepareDevelopmentContext { iteration } => {
150                self.prepare_development_context(ctx, iteration)
151            }
152
153            Effect::MaterializeDevelopmentInputs { iteration } => {
154                self.materialize_development_inputs(ctx, iteration)
155            }
156
157            Effect::PrepareDevelopmentPrompt {
158                iteration,
159                prompt_mode,
160            } => self.prepare_development_prompt(ctx, iteration, prompt_mode),
161
162            Effect::CleanupDevelopmentXml { iteration } => {
163                self.cleanup_development_xml(ctx, iteration)
164            }
165
166            Effect::InvokeDevelopmentAgent { iteration } => {
167                self.invoke_development_agent(ctx, iteration)
168            }
169
170            Effect::InvokeAnalysisAgent { iteration } => self.invoke_analysis_agent(ctx, iteration),
171
172            Effect::ExtractDevelopmentXml { iteration } => {
173                self.extract_development_xml(ctx, iteration)
174            }
175
176            Effect::ValidateDevelopmentXml { iteration } => {
177                self.validate_development_xml(ctx, iteration)
178            }
179
180            Effect::ApplyDevelopmentOutcome { iteration } => {
181                self.apply_development_outcome(ctx, iteration)
182            }
183
184            Effect::ArchiveDevelopmentXml { iteration } => {
185                self.archive_development_xml(ctx, iteration)
186            }
187
188            Effect::PrepareReviewContext { pass } => self.prepare_review_context(ctx, pass),
189
190            Effect::MaterializeReviewInputs { pass } => self.materialize_review_inputs(ctx, pass),
191
192            Effect::PrepareReviewPrompt { pass, prompt_mode } => {
193                self.prepare_review_prompt(ctx, pass, prompt_mode)
194            }
195
196            Effect::CleanupReviewIssuesXml { pass } => self.cleanup_review_issues_xml(ctx, pass),
197
198            Effect::InvokeReviewAgent { pass } => self.invoke_review_agent(ctx, pass),
199
200            Effect::ExtractReviewIssuesXml { pass } => self.extract_review_issues_xml(ctx, pass),
201
202            Effect::ValidateReviewIssuesXml { pass } => self.validate_review_issues_xml(ctx, pass),
203
204            Effect::WriteIssuesMarkdown { pass } => self.write_issues_markdown(ctx, pass),
205
206            Effect::ExtractReviewIssueSnippets { pass } => {
207                self.extract_review_issue_snippets(ctx, pass)
208            }
209
210            Effect::ArchiveReviewIssuesXml { pass } => self.archive_review_issues_xml(ctx, pass),
211
212            Effect::ApplyReviewOutcome {
213                pass,
214                issues_found,
215                clean_no_issues,
216            } => self.apply_review_outcome(ctx, pass, issues_found, clean_no_issues),
217
218            Effect::PrepareFixPrompt { pass, prompt_mode } => {
219                self.prepare_fix_prompt(ctx, pass, prompt_mode)
220            }
221
222            Effect::CleanupFixResultXml { pass } => self.cleanup_fix_result_xml(ctx, pass),
223
224            Effect::InvokeFixAgent { pass } => self.invoke_fix_agent(ctx, pass),
225
226            Effect::ExtractFixResultXml { pass } => self.extract_fix_result_xml(ctx, pass),
227
228            Effect::ValidateFixResultXml { pass } => self.validate_fix_result_xml(ctx, pass),
229
230            Effect::ApplyFixOutcome { pass } => self.apply_fix_outcome(ctx, pass),
231
232            Effect::ArchiveFixResultXml { pass } => self.archive_fix_result_xml(ctx, pass),
233
234            Effect::RunRebase {
235                phase,
236                target_branch,
237            } => self.run_rebase(ctx, phase, target_branch),
238
239            Effect::ResolveRebaseConflicts { strategy } => {
240                self.resolve_rebase_conflicts(ctx, strategy)
241            }
242
243            Effect::PrepareCommitPrompt { prompt_mode } => {
244                self.prepare_commit_prompt(ctx, prompt_mode)
245            }
246
247            Effect::CheckCommitDiff => self.check_commit_diff(ctx),
248
249            Effect::MaterializeCommitInputs { attempt } => {
250                self.materialize_commit_inputs(ctx, attempt)
251            }
252
253            Effect::InvokeCommitAgent => self.invoke_commit_agent(ctx),
254
255            Effect::CleanupCommitXml => self.cleanup_commit_xml(ctx),
256
257            Effect::ExtractCommitXml => self.extract_commit_xml(ctx),
258
259            Effect::ValidateCommitXml => self.validate_commit_xml(ctx),
260
261            Effect::ApplyCommitMessageOutcome => self.apply_commit_message_outcome(ctx),
262
263            Effect::ArchiveCommitXml => self.archive_commit_xml(ctx),
264
265            Effect::CreateCommit { message } => self.create_commit(ctx, message),
266
267            Effect::SkipCommit { reason } => self.skip_commit(ctx, reason),
268
269            Effect::BackoffWait {
270                role,
271                cycle,
272                duration_ms,
273            } => {
274                use std::time::Duration;
275                ctx.registry
276                    .retry_timer()
277                    .sleep(Duration::from_millis(duration_ms));
278                Ok(EffectResult::event(
279                    PipelineEvent::agent_retry_cycle_started(role, cycle),
280                ))
281            }
282
283            Effect::ReportAgentChainExhausted { role, phase, cycle } => {
284                use crate::reducer::event::ErrorEvent;
285                Err(ErrorEvent::AgentChainExhausted { role, phase, cycle }.into())
286            }
287
288            Effect::ValidateFinalState => self.validate_final_state(ctx),
289
290            Effect::SaveCheckpoint { trigger } => self.save_checkpoint(ctx, trigger),
291
292            Effect::CleanupContext => self.cleanup_context(ctx),
293
294            Effect::RestorePromptPermissions => self.restore_prompt_permissions(ctx),
295
296            Effect::WriteContinuationContext(ref data) => {
297                development::write_continuation_context_to_workspace(
298                    ctx.workspace,
299                    ctx.logger,
300                    data,
301                )?;
302                Ok(EffectResult::event(
303                    PipelineEvent::development_continuation_context_written(
304                        data.iteration,
305                        data.attempt,
306                    ),
307                ))
308            }
309
310            Effect::CleanupContinuationContext => self.cleanup_continuation_context(ctx),
311
312            Effect::TriggerLoopRecovery {
313                detected_loop,
314                loop_count,
315            } => self.trigger_loop_recovery(ctx, detected_loop, loop_count),
316
317            Effect::TriggerDevFixFlow {
318                failed_phase,
319                failed_role,
320                retry_cycle,
321            } => {
322                ctx.logger.error("⚠️  PIPELINE FAILURE DETECTED ⚠️");
323                ctx.logger.warn(&format!(
324                    "Pipeline failure detected (phase: {}, role: {:?}, cycle: {})",
325                    failed_phase, failed_role, retry_cycle
326                ));
327                ctx.logger.info("Entering AwaitingDevFix flow...");
328                ctx.logger
329                    .info("Dispatching dev-fix agent for remediation...");
330
331                let read_or_fallback = |path: &str, label: &str| -> String {
332                    match ctx.workspace.read(std::path::Path::new(path)) {
333                        Ok(content) => content,
334                        Err(err) => {
335                            ctx.logger.warn(&format!(
336                                "Dev-fix prompt fallback: failed to read {}: {}",
337                                label, err
338                            ));
339                            format!("(Missing {}: {})", label, err)
340                        }
341                    }
342                };
343
344                let prompt_content = read_or_fallback("PROMPT.md", "PROMPT.md");
345                let plan_content = read_or_fallback(".agent/PLAN.md", ".agent/PLAN.md");
346                let issues_content = format!(
347                    "# Issues\n\n- [High] Pipeline failure (phase: {}, role: {:?}, cycle: {}).\n  Diagnose the root cause and fix the failure.\n",
348                    failed_phase, failed_role, retry_cycle
349                );
350                let dev_fix_prompt = crate::prompts::prompt_fix_with_context(
351                    ctx.template_context,
352                    &prompt_content,
353                    &plan_content,
354                    &issues_content,
355                    ctx.workspace,
356                );
357
358                if let Err(err) = ctx.workspace.write(
359                    std::path::Path::new(".agent/tmp/dev_fix_prompt.txt"),
360                    &dev_fix_prompt,
361                ) {
362                    ctx.logger.warn(&format!(
363                        "Failed to write dev-fix prompt to workspace: {}",
364                        err
365                    ));
366                }
367
368                let agent = self
369                    .state
370                    .agent_chain
371                    .current_agent()
372                    .cloned()
373                    .unwrap_or_else(|| ctx.developer_agent.to_string());
374
375                let completion_marker_content = format!(
376                    "failure\nPipeline failure: phase={}, role={:?}, cycle={}",
377                    failed_phase, failed_role, retry_cycle
378                );
379                Self::write_completion_marker(ctx, &completion_marker_content, true);
380
381                /// Helper function to detect agent unavailability from error messages.
382                /// Checks for quota/usage/rate limit indicators in error text.
383                fn is_agent_unavailable_error(err_msg: &str) -> bool {
384                    let err_msg_lower = err_msg.to_lowercase();
385                    err_msg_lower.contains("usage limit")
386                        || err_msg_lower.contains("quota exceeded")
387                        || err_msg_lower.contains("rate limit")
388                }
389
390                let agent_result = match self.invoke_agent(
391                    ctx,
392                    crate::agents::AgentRole::Developer,
393                    agent,
394                    None,
395                    dev_fix_prompt,
396                ) {
397                    Ok(result) => Ok(result),
398                    Err(err) => {
399                        let unavailable = is_agent_unavailable_error(&err.to_string());
400
401                        if unavailable {
402                            ctx.logger.warn(&format!(
403                                "Dev-fix agent unavailable: {}. Pipeline will terminate with failure marker.",
404                                err
405                            ));
406                        } else {
407                            ctx.logger
408                                .warn(&format!("Dev-fix agent invocation failed: {}", err));
409                        }
410                        Err(err)
411                    }
412                };
413
414                let is_agent_unavailable = agent_result
415                    .as_ref()
416                    .err()
417                    .map(|err| is_agent_unavailable_error(&err.to_string()))
418                    .unwrap_or(false);
419
420                // Dev-fix success cannot be determined at invocation time - it requires
421                // extraction and validation of fix_result.xml. The InvocationSucceeded event
422                // only indicates the agent started successfully, not that the fix completed.
423                // DevFixCompleted will be emitted by the reducer after validation.
424
425                // Extract error reason for logging and summary
426                let error_reason = agent_result.as_ref().err().map(|e| e.to_string());
427
428                let mut result = match agent_result.as_ref() {
429                    Ok(result) => EffectResult::with_ui(
430                        PipelineEvent::AwaitingDevFix(
431                            crate::reducer::event::AwaitingDevFixEvent::DevFixTriggered {
432                                failed_phase,
433                                failed_role,
434                            },
435                        ),
436                        result.ui_events.clone(),
437                    ),
438                    Err(_) => EffectResult::event(PipelineEvent::AwaitingDevFix(
439                        crate::reducer::event::AwaitingDevFixEvent::DevFixTriggered {
440                            failed_phase,
441                            failed_role,
442                        },
443                    )),
444                };
445
446                // Add any additional events from the agent result if it succeeded
447                if let Ok(ref result_events) = agent_result {
448                    result = result.with_additional_event(result_events.event.clone());
449                    for event in &result_events.additional_events {
450                        result = result.with_additional_event(event.clone());
451                    }
452                }
453
454                // Emit appropriate event based on agent availability.
455                // CompletionMarkerEmitted is ALWAYS emitted because the marker is
456                // written unconditionally at the start of TriggerDevFixFlow.
457                if is_agent_unavailable {
458                    // Agent unavailable (quota/usage limit)
459                    result = result.with_additional_event(PipelineEvent::AwaitingDevFix(
460                        crate::reducer::event::AwaitingDevFixEvent::DevFixAgentUnavailable {
461                            failed_phase,
462                            reason: error_reason.unwrap_or_else(|| "unknown".to_string()),
463                        },
464                    ));
465                }
466                // Note: DevFixCompleted is NOT emitted here. The success of the dev-fix
467                // attempt can only be determined after fix_result.xml is extracted and
468                // validated, which happens in a later phase (during XML output extraction).
469                // The reducer will emit DevFixCompleted with the proper success status
470                // after validation succeeds or fails.
471
472                // Always emit CompletionMarkerEmitted since the marker is written
473                // unconditionally when entering TriggerDevFixFlow.
474                result = result.with_additional_event(PipelineEvent::AwaitingDevFix(
475                    crate::reducer::event::AwaitingDevFixEvent::CompletionMarkerEmitted {
476                        is_failure: true,
477                    },
478                ));
479
480                Ok(result)
481            }
482
483            Effect::EmitCompletionMarkerAndTerminate { is_failure, reason } => {
484                // Write completion marker to .agent/tmp/completion_marker
485                let content = if is_failure {
486                    format!(
487                        "failure\n{}",
488                        reason.unwrap_or_else(|| "unknown".to_string())
489                    )
490                } else {
491                    "success\n".to_string()
492                };
493
494                Self::write_completion_marker(ctx, &content, is_failure);
495
496                // Emit event to transition to Interrupted
497                Ok(EffectResult::event(PipelineEvent::AwaitingDevFix(
498                    crate::reducer::event::AwaitingDevFixEvent::CompletionMarkerEmitted {
499                        is_failure,
500                    },
501                )))
502            }
503        }
504    }
505}