ralph_workflow/reducer/fault_tolerant_executor/mod.rs
1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits PipelineEvents
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
19use crate::reducer::event::{AgentErrorKind, PipelineEvent};
20use anyhow::Result;
21
22// Re-export error classification functions for use by other modules
23pub use error_classification::{
24 classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
25 is_retriable_agent_error, is_timeout_error,
26};
27
28/// Result of executing an agent.
29///
30/// Contains the pipeline event and optional session_id for session continuation.
31///
32/// # Session ID Handling
33///
34/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
35/// event to the reducer. This is the proper way to handle session IDs in the reducer
36/// architecture - each piece of information is communicated via a dedicated event.
37///
38/// The handler should:
39/// 1. Process `event` through the reducer
40/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
41///
42/// This two-event approach ensures:
43/// - Clean separation of concerns (success vs session establishment)
44/// - Proper state transitions in the reducer
45/// - Session ID is stored in agent_chain.last_session_id for XSD retry reuse
46pub struct AgentExecutionResult {
47 /// The pipeline event from agent execution (success or failure).
48 pub event: PipelineEvent,
49 /// Session ID from agent's init event, for XSD retry session continuation.
50 ///
51 /// When present, handler must emit `SessionEstablished` event separately.
52 pub session_id: Option<String>,
53}
54
55/// Configuration for fault-tolerant agent execution.
56#[derive(Clone, Copy)]
57pub struct AgentExecutionConfig<'a> {
58 /// Agent role (developer, reviewer, commit agent)
59 pub role: AgentRole,
60 /// Agent name from registry
61 pub agent_name: &'a str,
62 /// Agent command to execute
63 pub cmd_str: &'a str,
64 /// JSON parser type
65 pub parser_type: JsonParserType,
66 /// Environment variables for agent
67 pub env_vars: &'a std::collections::HashMap<String, String>,
68 /// Prompt to send to agent
69 pub prompt: &'a str,
70 /// Display name for logging
71 pub display_name: &'a str,
72 /// Log prefix (without extension) used to associate artifacts.
73 ///
74 /// Example: `.agent/logs/planning_1`.
75 pub log_prefix: &'a str,
76 /// Model fallback index for attribution.
77 pub model_index: usize,
78 /// Attempt counter for attribution.
79 pub attempt: u32,
80 /// Log file path
81 pub logfile: &'a str,
82}
83
84/// Execute an agent with bulletproof error handling.
85///
86/// This function:
87/// 1. Uses `catch_unwind` to catch panics from subprocess
88/// 2. Catches all I/O errors and non-zero exit codes
89/// 3. Never returns errors - always emits PipelineEvents
90/// 4. Classifies errors for retry/fallback decisions
91/// 5. Logs failures but continues pipeline
92///
93/// # Arguments
94///
95/// * `config` - Agent execution configuration
96/// * `runtime` - Pipeline runtime
97///
98/// # Returns
99///
100/// Returns `Ok(AgentExecutionResult)` with:
101/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
102/// - `session_id`: Optional session ID for XSD retry session continuation
103///
104/// The handler MUST emit `SessionEstablished` as a separate event when session_id
105/// is present. This ensures proper state management in the reducer.
106///
107/// This function never returns `Err` - all errors are converted to events.
108pub fn execute_agent_fault_tolerantly(
109 config: AgentExecutionConfig<'_>,
110 runtime: &mut PipelineRuntime<'_>,
111) -> Result<AgentExecutionResult> {
112 let role = config.role;
113
114 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
115 try_agent_execution(config, runtime)
116 }));
117
118 match result {
119 Ok(event_result) => event_result,
120 Err(_) => {
121 let error_kind = AgentErrorKind::InternalError;
122 let retriable = is_retriable_agent_error(&error_kind);
123
124 Ok(AgentExecutionResult {
125 event: PipelineEvent::agent_invocation_failed(
126 role,
127 config.agent_name.to_string(),
128 1,
129 error_kind,
130 retriable,
131 ),
132 session_id: None,
133 })
134 }
135 }
136}
137
138/// Try to execute agent without panic catching.
139///
140/// This function does the actual agent execution and returns
141/// either success or failure events. It's wrapped by
142/// `execute_agent_fault_tolerantly` which handles panics.
143fn try_agent_execution(
144 config: AgentExecutionConfig<'_>,
145 runtime: &mut PipelineRuntime<'_>,
146) -> Result<AgentExecutionResult> {
147 let prompt_cmd = PromptCommand {
148 label: config.agent_name,
149 display_name: config.display_name,
150 cmd_str: config.cmd_str,
151 prompt: config.prompt,
152 log_prefix: config.log_prefix,
153 model_index: Some(config.model_index),
154 attempt: Some(config.attempt),
155 logfile: config.logfile,
156 parser_type: config.parser_type,
157 env_vars: config.env_vars,
158 };
159
160 match run_with_prompt(&prompt_cmd, runtime) {
161 Ok(result) if result.exit_code == 0 => Ok(AgentExecutionResult {
162 event: PipelineEvent::agent_invocation_succeeded(
163 config.role,
164 config.agent_name.to_string(),
165 ),
166 session_id: result.session_id,
167 }),
168 Ok(result) => {
169 let exit_code = result.exit_code;
170
171 // Extract error message from logfile (stdout) for agents that emit errors as JSON
172 // This is critical for OpenCode and similar agents that don't use stderr for errors
173 let stdout_error = crate::pipeline::extract_error_message_from_logfile(
174 config.logfile,
175 runtime.workspace,
176 );
177
178 let error_kind =
179 classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
180
181 // Special handling for rate limit: emit fact event with prompt context
182 if is_rate_limit_error(&error_kind) {
183 return Ok(AgentExecutionResult {
184 event: PipelineEvent::agent_rate_limited(
185 config.role,
186 config.agent_name.to_string(),
187 Some(config.prompt.to_string()),
188 ),
189 session_id: None,
190 });
191 }
192
193 // Special handling for auth failure: emit fact event without prompt context
194 if is_auth_error(&error_kind) {
195 return Ok(AgentExecutionResult {
196 event: PipelineEvent::agent_auth_failed(
197 config.role,
198 config.agent_name.to_string(),
199 ),
200 session_id: None,
201 });
202 }
203
204 // Special handling for timeout: emit fact event (reducer decides retry/fallback)
205 // Unlike rate limits, timeouts do not preserve prompt context.
206 if is_timeout_error(&error_kind) {
207 return Ok(AgentExecutionResult {
208 event: PipelineEvent::agent_timed_out(
209 config.role,
210 config.agent_name.to_string(),
211 ),
212 session_id: None,
213 });
214 }
215
216 let retriable = is_retriable_agent_error(&error_kind);
217
218 Ok(AgentExecutionResult {
219 event: PipelineEvent::agent_invocation_failed(
220 config.role,
221 config.agent_name.to_string(),
222 exit_code,
223 error_kind,
224 retriable,
225 ),
226 session_id: None,
227 })
228 }
229 Err(e) => {
230 // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
231 // instead of attempting to downcast the inner error payload.
232 let error_kind = classify_io_error(&e);
233
234 // Mirror special-case handling from the non-zero exit path.
235 // If `run_with_prompt` itself returns an error classified as Timeout,
236 // emit TimedOut so the reducer can decide retry vs fallback deterministically.
237 if is_timeout_error(&error_kind) {
238 return Ok(AgentExecutionResult {
239 event: PipelineEvent::agent_timed_out(
240 config.role,
241 config.agent_name.to_string(),
242 ),
243 session_id: None,
244 });
245 }
246 let retriable = is_retriable_agent_error(&error_kind);
247
248 Ok(AgentExecutionResult {
249 event: PipelineEvent::agent_invocation_failed(
250 config.role,
251 config.agent_name.to_string(),
252 1,
253 error_kind,
254 retriable,
255 ),
256 session_id: None,
257 })
258 }
259 }
260}