ralph_workflow/reducer/fault_tolerant_executor/mod.rs
1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits PipelineEvents
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
19use crate::reducer::event::{AgentErrorKind, PipelineEvent};
20use anyhow::Result;
21
22// Re-export error classification functions for use by other modules
23pub use error_classification::{
24 classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
25 is_retriable_agent_error, is_timeout_error,
26};
27
28/// Result of executing an agent.
29///
30/// Contains the pipeline event and optional session_id for session continuation.
31///
32/// # Session ID Handling
33///
34/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
35/// event to the reducer. This is the proper way to handle session IDs in the reducer
36/// architecture - each piece of information is communicated via a dedicated event.
37///
38/// The handler should:
39/// 1. Process `event` through the reducer
40/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
41///
42/// This two-event approach ensures:
43/// - Clean separation of concerns (success vs session establishment)
44/// - Proper state transitions in the reducer
45/// - Session ID is stored in agent_chain.last_session_id for XSD retry reuse
46pub struct AgentExecutionResult {
47 /// The pipeline event from agent execution (success or failure).
48 pub event: PipelineEvent,
49 /// Session ID from agent's init event, for XSD retry session continuation.
50 ///
51 /// When present, handler must emit `SessionEstablished` event separately.
52 pub session_id: Option<String>,
53}
54
55/// Configuration for fault-tolerant agent execution.
56#[derive(Clone, Copy)]
57pub struct AgentExecutionConfig<'a> {
58 /// Agent role (developer, reviewer, commit agent)
59 pub role: AgentRole,
60 /// Agent name from registry
61 pub agent_name: &'a str,
62 /// Agent command to execute
63 pub cmd_str: &'a str,
64 /// JSON parser type
65 pub parser_type: JsonParserType,
66 /// Environment variables for agent
67 pub env_vars: &'a std::collections::HashMap<String, String>,
68 /// Prompt to send to agent
69 pub prompt: &'a str,
70 /// Display name for logging
71 pub display_name: &'a str,
72 /// Log prefix (without extension) used to associate artifacts.
73 ///
74 /// Example: `.agent/logs/planning_1`.
75 pub log_prefix: &'a str,
76 /// Model fallback index for attribution.
77 pub model_index: usize,
78 /// Attempt counter for attribution.
79 pub attempt: u32,
80 /// Log file path
81 pub logfile: &'a str,
82}
83
84/// Execute an agent with bulletproof error handling.
85///
86/// This function:
87/// 1. Uses `catch_unwind` to catch panics from subprocess
88/// 2. Catches all I/O errors and non-zero exit codes
89/// 3. Never returns errors - always emits PipelineEvents
90/// 4. Classifies errors for retry/fallback decisions
91/// 5. Logs failures but continues pipeline
92///
93/// # Arguments
94///
95/// * `config` - Agent execution configuration
96/// * `runtime` - Pipeline runtime
97///
98/// # Returns
99///
100/// Returns `Ok(AgentExecutionResult)` with:
101/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
102/// - `session_id`: Optional session ID for XSD retry session continuation
103///
104/// The handler MUST emit `SessionEstablished` as a separate event when session_id
105/// is present. This ensures proper state management in the reducer.
106///
107/// This function never returns `Err` - all errors are converted to events.
108pub fn execute_agent_fault_tolerantly(
109 config: AgentExecutionConfig<'_>,
110 runtime: &mut PipelineRuntime<'_>,
111) -> Result<AgentExecutionResult> {
112 let role = config.role;
113
114 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
115 try_agent_execution(config, runtime)
116 }));
117
118 match result {
119 Ok(event_result) => event_result,
120 Err(_) => {
121 let error_kind = AgentErrorKind::InternalError;
122 let retriable = is_retriable_agent_error(&error_kind);
123
124 Ok(AgentExecutionResult {
125 event: PipelineEvent::agent_invocation_failed(
126 role,
127 config.agent_name.to_string(),
128 1,
129 error_kind,
130 retriable,
131 ),
132 session_id: None,
133 })
134 }
135 }
136}
137
138/// Try to execute agent without panic catching.
139///
140/// This function does the actual agent execution and returns
141/// either success or failure events. It's wrapped by
142/// `execute_agent_fault_tolerantly` which handles panics.
143fn try_agent_execution(
144 config: AgentExecutionConfig<'_>,
145 runtime: &mut PipelineRuntime<'_>,
146) -> Result<AgentExecutionResult> {
147 let prompt_cmd = PromptCommand {
148 label: config.agent_name,
149 display_name: config.display_name,
150 cmd_str: config.cmd_str,
151 prompt: config.prompt,
152 log_prefix: config.log_prefix,
153 model_index: Some(config.model_index),
154 attempt: Some(config.attempt),
155 logfile: config.logfile,
156 parser_type: config.parser_type,
157 env_vars: config.env_vars,
158 };
159
160 match run_with_prompt(&prompt_cmd, runtime) {
161 Ok(result) if result.exit_code == 0 => Ok(AgentExecutionResult {
162 event: PipelineEvent::agent_invocation_succeeded(
163 config.role,
164 config.agent_name.to_string(),
165 ),
166 session_id: result.session_id,
167 }),
168 Ok(result) => {
169 let exit_code = result.exit_code;
170 let error_kind = classify_agent_error(exit_code, &result.stderr);
171
172 // Special handling for rate limit: emit fact event with prompt context
173 if is_rate_limit_error(&error_kind) {
174 return Ok(AgentExecutionResult {
175 event: PipelineEvent::agent_rate_limited(
176 config.role,
177 config.agent_name.to_string(),
178 Some(config.prompt.to_string()),
179 ),
180 session_id: None,
181 });
182 }
183
184 // Special handling for auth failure: emit fact event without prompt context
185 if is_auth_error(&error_kind) {
186 return Ok(AgentExecutionResult {
187 event: PipelineEvent::agent_auth_failed(
188 config.role,
189 config.agent_name.to_string(),
190 ),
191 session_id: None,
192 });
193 }
194
195 // Special handling for timeout: emit fact event (reducer decides retry/fallback)
196 // Unlike rate limits, timeouts do not preserve prompt context.
197 if is_timeout_error(&error_kind) {
198 return Ok(AgentExecutionResult {
199 event: PipelineEvent::agent_timed_out(
200 config.role,
201 config.agent_name.to_string(),
202 ),
203 session_id: None,
204 });
205 }
206
207 let retriable = is_retriable_agent_error(&error_kind);
208
209 Ok(AgentExecutionResult {
210 event: PipelineEvent::agent_invocation_failed(
211 config.role,
212 config.agent_name.to_string(),
213 exit_code,
214 error_kind,
215 retriable,
216 ),
217 session_id: None,
218 })
219 }
220 Err(e) => {
221 // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
222 // instead of attempting to downcast the inner error payload.
223 let error_kind = classify_io_error(&e);
224
225 // Mirror special-case handling from the non-zero exit path.
226 // If `run_with_prompt` itself returns an error classified as Timeout,
227 // emit TimedOut so the reducer can decide retry vs fallback deterministically.
228 if is_timeout_error(&error_kind) {
229 return Ok(AgentExecutionResult {
230 event: PipelineEvent::agent_timed_out(
231 config.role,
232 config.agent_name.to_string(),
233 ),
234 session_id: None,
235 });
236 }
237 let retriable = is_retriable_agent_error(&error_kind);
238
239 Ok(AgentExecutionResult {
240 event: PipelineEvent::agent_invocation_failed(
241 config.role,
242 config.agent_name.to_string(),
243 1,
244 error_kind,
245 retriable,
246 ),
247 session_id: None,
248 })
249 }
250 }
251}