ralph_workflow/reducer/fault_tolerant_executor/mod.rs
1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::common::domain_types::AgentName;
19use crate::logger::Loggable;
20use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
21use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
22use crate::workspace::Workspace;
23use anyhow::Result;
24
25// Re-export error classification functions for use by other modules
26pub use error_classification::{
27 classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
28 is_retriable_agent_error, is_timeout_error,
29};
30
31const ERROR_PREVIEW_MAX_CHARS: usize = 100;
32
33/// Result of executing an agent.
34///
35/// Contains the pipeline event and optional `session_id` for session continuation.
36///
37/// # Session ID Handling
38///
39/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
40/// event to the reducer. This is the proper way to handle session IDs in the reducer
41/// architecture - each piece of information is communicated via a dedicated event.
42///
43/// The handler should:
44/// 1. Process `event` through the reducer
45/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
46///
47/// This two-event approach ensures:
48/// - Clean separation of concerns (success vs session establishment)
49/// - Proper state transitions in the reducer
50/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
51pub struct AgentExecutionResult {
52 /// The pipeline event from agent execution (success or failure).
53 pub event: PipelineEvent,
54 /// Session ID from agent's init event, for XSD retry session continuation.
55 ///
56 /// When present, handler must emit `SessionEstablished` event separately.
57 pub session_id: Option<String>,
58}
59
60/// Configuration for fault-tolerant agent execution.
61#[derive(Clone, Copy)]
62pub struct AgentExecutionConfig<'a> {
63 /// Agent role (developer, reviewer, commit agent)
64 pub role: AgentRole,
65 /// Agent name from registry
66 pub agent_name: &'a str,
67 /// Agent command to execute
68 pub cmd_str: &'a str,
69 /// JSON parser type
70 pub parser_type: JsonParserType,
71 /// Environment variables for agent
72 pub env_vars: &'a std::collections::HashMap<String, String>,
73 /// Prompt to send to agent
74 pub prompt: &'a str,
75 /// Display name for logging
76 pub display_name: &'a str,
77 /// Log prefix (without extension) used to associate artifacts.
78 ///
79 /// Example: `.agent/logs/planning_1`.
80 pub log_prefix: &'a str,
81 /// Model fallback index for attribution.
82 pub model_index: usize,
83 /// Attempt counter for attribution.
84 pub attempt: u32,
85 /// Log file path
86 pub logfile: &'a str,
87}
88
89/// Execute an agent with bulletproof error handling.
90///
91/// This function:
92/// 1. Uses `catch_unwind` to catch panics from subprocess
93/// 2. Catches all I/O errors and non-zero exit codes
94/// 3. Never returns errors - always emits `PipelineEvents`
95/// 4. Classifies errors for retry/fallback decisions
96/// 5. Logs failures but continues pipeline
97///
98/// # Arguments
99///
100/// * `config` - Agent execution configuration
101/// * `runtime` - Pipeline runtime
102///
103/// # Returns
104///
105/// Returns `Ok(AgentExecutionResult)` with:
106/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
107/// - `session_id`: Optional session ID for XSD retry session continuation
108///
109/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
110/// is present. This ensures proper state management in the reducer.
111///
112/// This function never returns `Err` - all errors are converted to events.
113///
114/// # Errors
115///
116/// Returns error if the operation fails.
117pub fn execute_agent_fault_tolerantly(
118 config: AgentExecutionConfig<'_>,
119 runtime: &mut PipelineRuntime<'_>,
120) -> Result<AgentExecutionResult> {
121 let role = config.role;
122 let agent_name = AgentName::from(config.agent_name.to_string());
123
124 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
125 try_agent_execution(config, runtime)
126 }));
127
128 Ok(result.unwrap_or_else(|_| {
129 let error_kind = AgentErrorKind::InternalError;
130 let retriable = is_retriable_agent_error(&error_kind);
131
132 AgentExecutionResult {
133 event: PipelineEvent::agent_invocation_failed(
134 role,
135 agent_name.clone(),
136 1,
137 error_kind,
138 retriable,
139 ),
140 session_id: None,
141 }
142 }))
143}
144
145/// Try to execute agent without panic catching.
146///
147/// This function does the actual agent execution and returns
148/// either success or failure events. It's wrapped by
149/// `execute_agent_fault_tolerantly` which handles panics.
150fn try_agent_execution(
151 config: AgentExecutionConfig<'_>,
152 runtime: &mut PipelineRuntime<'_>,
153) -> AgentExecutionResult {
154 let agent_name = AgentName::from(config.agent_name.to_string());
155 let prompt_cmd = PromptCommand {
156 label: config.agent_name,
157 display_name: config.display_name,
158 cmd_str: config.cmd_str,
159 prompt: config.prompt,
160 log_prefix: config.log_prefix,
161 model_index: Some(config.model_index),
162 attempt: Some(config.attempt),
163 logfile: config.logfile,
164 parser_type: config.parser_type,
165 env_vars: config.env_vars,
166 };
167
168 match run_with_prompt(&prompt_cmd, runtime) {
169 Ok(result) if result.exit_code == 0 => AgentExecutionResult {
170 event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
171 session_id: result.session_id,
172 },
173 Ok(result) => {
174 let exit_code = result.exit_code;
175
176 // Extract error message from logfile (stdout) for agents that emit errors as JSON
177 // This is critical for OpenCode and similar agents that don't use stderr for errors
178 //
179 // OpenCode Detection Flow:
180 // 1. Extract JSON error from stdout logfile (OpenCode emits {"type":"error",...})
181 // 2. Pass to classify_agent_error() which checks both stderr and stdout_error
182 // 3. Pattern matching detects usage limit errors from multiple sources:
183 // - Structured codes: insufficient_quota, usage_limit_exceeded, quota_exceeded
184 // - Message patterns: "usage limit reached", "anthropic: usage limit", etc.
185 // 4. If detected, emit AgentEvent::RateLimited for immediate agent fallback
186 let stdout_error = crate::pipeline::extract_error_identifier_from_logfile(
187 config.logfile,
188 runtime.workspace,
189 );
190
191 // Log extracted stdout errors only in debug verbosity.
192 //
193 // This is diagnostic-only and can be noisy in normal runs.
194 if runtime.config.verbosity.is_debug() {
195 if let Some(ref err_msg) = stdout_error {
196 runtime.logger.log(&format!(
197 "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
198 config.agent_name, err_msg
199 ));
200 }
201 }
202
203 let error_kind =
204 classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
205
206 // Special handling for rate limit: emit fact event with prompt context
207 //
208 // Rate limit detection supports both stderr and stdout error sources:
209 // - stderr: Traditional error output (most agents)
210 // - stdout: JSON error events (OpenCode, multi-provider gateways)
211 //
212 // When detected, immediately emit AgentEvent::RateLimited to trigger
213 // agent fallback without retry attempts on the same agent.
214 if is_rate_limit_error(&error_kind) {
215 // Log rate limit detection with error source and message preview
216 let error_source = if stdout_error.is_some() {
217 "stdout"
218 } else {
219 "stderr"
220 };
221 let error_preview = stdout_error
222 .as_deref()
223 .or(Some(result.stderr.as_str()))
224 .unwrap_or("");
225 let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
226 runtime.logger.info(&format!(
227 "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
228 config.agent_name, error_source, preview
229 ));
230
231 return AgentExecutionResult {
232 event: PipelineEvent::agent_rate_limited(
233 config.role,
234 agent_name.clone(),
235 Some(config.prompt.to_string()),
236 ),
237 session_id: None,
238 };
239 }
240
241 // Special handling for auth failure: emit fact event without prompt context
242 if is_auth_error(&error_kind) {
243 return AgentExecutionResult {
244 event: PipelineEvent::agent_auth_failed(config.role, agent_name.clone()),
245 session_id: None,
246 };
247 }
248
249 // Special handling for timeout: emit fact event (reducer decides retry/fallback)
250 // Unlike rate limits, timeouts do not preserve prompt context.
251 // Determine output_kind by reading the logfile via workspace.
252 if is_timeout_error(&error_kind) {
253 let output_kind = determine_timeout_output_kind(config.logfile, runtime.workspace);
254 return AgentExecutionResult {
255 event: PipelineEvent::agent_timed_out(
256 config.role,
257 agent_name.clone(),
258 output_kind,
259 Some(config.logfile.to_string()),
260 result.child_status_at_timeout,
261 ),
262 session_id: None,
263 };
264 }
265
266 let retriable = is_retriable_agent_error(&error_kind);
267
268 AgentExecutionResult {
269 event: PipelineEvent::agent_invocation_failed(
270 config.role,
271 agent_name.clone(),
272 exit_code,
273 error_kind,
274 retriable,
275 ),
276 session_id: None,
277 }
278 }
279 Err(e) => {
280 // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
281 // instead of attempting to downcast the inner error payload.
282 let error_kind = classify_io_error(&e);
283
284 // Mirror special-case handling from the non-zero exit path.
285 // If `run_with_prompt` itself returns an error classified as Timeout,
286 // emit TimedOut so the reducer can decide retry vs fallback deterministically.
287 // In the Err case, the execution failed before completion, so we default to NoOutput.
288 if is_timeout_error(&error_kind) {
289 // When run_with_prompt itself errors, the agent never produced any output
290 // (the logfile may not even exist), so default to NoOutput.
291 return AgentExecutionResult {
292 event: PipelineEvent::agent_timed_out(
293 config.role,
294 agent_name.clone(),
295 TimeoutOutputKind::NoOutput,
296 Some(config.logfile.to_string()),
297 None, // No CommandResult available in error path
298 ),
299 session_id: None,
300 };
301 }
302 let retriable = is_retriable_agent_error(&error_kind);
303
304 AgentExecutionResult {
305 event: PipelineEvent::agent_invocation_failed(
306 config.role,
307 agent_name.clone(),
308 1,
309 error_kind,
310 retriable,
311 ),
312 session_id: None,
313 }
314 }
315 }
316}
317
318fn build_error_preview(message: &str, max_chars: usize) -> String {
319 message.chars().take(max_chars).collect()
320}
321
322/// Minimum non-whitespace characters to classify as meaningful output.
323///
324/// This threshold distinguishes between:
325/// - **`NoOutput`**: Agent produced nothing useful (empty, whitespace-only, or trivial fragments)
326/// - **`PartialOutput`**: Agent was doing real work before being cut off
327///
328/// The value of 10 is chosen to exclude noise (a few stray characters, partial words)
329/// while being low enough to recognize any substantive work.
330const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
331
332/// Determine whether a timed-out agent produced meaningful output by reading the logfile.
333///
334/// This is pure I/O observation - no policy is encoded here.
335/// The reducer decides what to do with this information.
336///
337/// # Classification Logic
338///
339/// - `NoOutput`: Logfile is missing, empty, or contains fewer than ~10 non-whitespace characters
340/// - `PartialOutput`: Logfile contains at least 10 non-whitespace characters (indicates real work)
341///
342/// # Fail-Safe Behavior
343///
344/// If the logfile cannot be read, the function returns `NoOutput` to trigger
345/// immediate agent switching rather than retrying a potentially broken agent.
346fn determine_timeout_output_kind(
347 logfile_path: &str,
348 workspace: &dyn Workspace,
349) -> TimeoutOutputKind {
350 let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
351 return TimeoutOutputKind::NoOutput;
352 };
353
354 let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
355
356 if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
357 TimeoutOutputKind::PartialOutput
358 } else {
359 TimeoutOutputKind::NoOutput
360 }
361}