ralph_workflow/reducer/fault_tolerant_executor/mod.rs
1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::logger::Loggable;
19use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
20use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
21use crate::workspace::Workspace;
22use anyhow::Result;
23
24// Re-export error classification functions for use by other modules
25pub use error_classification::{
26 classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
27 is_retriable_agent_error, is_timeout_error,
28};
29
30const ERROR_PREVIEW_MAX_CHARS: usize = 100;
31
32/// Result of executing an agent.
33///
34/// Contains the pipeline event and optional `session_id` for session continuation.
35///
36/// # Session ID Handling
37///
38/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
39/// event to the reducer. This is the proper way to handle session IDs in the reducer
40/// architecture - each piece of information is communicated via a dedicated event.
41///
42/// The handler should:
43/// 1. Process `event` through the reducer
44/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
45///
46/// This two-event approach ensures:
47/// - Clean separation of concerns (success vs session establishment)
48/// - Proper state transitions in the reducer
49/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
50pub struct AgentExecutionResult {
51 /// The pipeline event from agent execution (success or failure).
52 pub event: PipelineEvent,
53 /// Session ID from agent's init event, for XSD retry session continuation.
54 ///
55 /// When present, handler must emit `SessionEstablished` event separately.
56 pub session_id: Option<String>,
57}
58
59/// Configuration for fault-tolerant agent execution.
60#[derive(Clone, Copy)]
61pub struct AgentExecutionConfig<'a> {
62 /// Agent role (developer, reviewer, commit agent)
63 pub role: AgentRole,
64 /// Agent name from registry
65 pub agent_name: &'a str,
66 /// Agent command to execute
67 pub cmd_str: &'a str,
68 /// JSON parser type
69 pub parser_type: JsonParserType,
70 /// Environment variables for agent
71 pub env_vars: &'a std::collections::HashMap<String, String>,
72 /// Prompt to send to agent
73 pub prompt: &'a str,
74 /// Display name for logging
75 pub display_name: &'a str,
76 /// Log prefix (without extension) used to associate artifacts.
77 ///
78 /// Example: `.agent/logs/planning_1`.
79 pub log_prefix: &'a str,
80 /// Model fallback index for attribution.
81 pub model_index: usize,
82 /// Attempt counter for attribution.
83 pub attempt: u32,
84 /// Log file path
85 pub logfile: &'a str,
86}
87
88/// Execute an agent with bulletproof error handling.
89///
90/// This function:
91/// 1. Uses `catch_unwind` to catch panics from subprocess
92/// 2. Catches all I/O errors and non-zero exit codes
93/// 3. Never returns errors - always emits `PipelineEvents`
94/// 4. Classifies errors for retry/fallback decisions
95/// 5. Logs failures but continues pipeline
96///
97/// # Arguments
98///
99/// * `config` - Agent execution configuration
100/// * `runtime` - Pipeline runtime
101///
102/// # Returns
103///
104/// Returns `Ok(AgentExecutionResult)` with:
105/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
106/// - `session_id`: Optional session ID for XSD retry session continuation
107///
108/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
109/// is present. This ensures proper state management in the reducer.
110///
111/// This function never returns `Err` - all errors are converted to events.
112///
113/// # Errors
114///
115/// Returns error if the operation fails.
116pub fn execute_agent_fault_tolerantly(
117 config: AgentExecutionConfig<'_>,
118 runtime: &mut PipelineRuntime<'_>,
119) -> Result<AgentExecutionResult> {
120 let role = config.role;
121
122 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
123 try_agent_execution(config, runtime)
124 }));
125
126 Ok(result.unwrap_or_else(|_| {
127 let error_kind = AgentErrorKind::InternalError;
128 let retriable = is_retriable_agent_error(&error_kind);
129
130 AgentExecutionResult {
131 event: PipelineEvent::agent_invocation_failed(
132 role,
133 config.agent_name.to_string(),
134 1,
135 error_kind,
136 retriable,
137 ),
138 session_id: None,
139 }
140 }))
141}
142
143/// Try to execute agent without panic catching.
144///
145/// This function does the actual agent execution and returns
146/// either success or failure events. It's wrapped by
147/// `execute_agent_fault_tolerantly` which handles panics.
148fn try_agent_execution(
149 config: AgentExecutionConfig<'_>,
150 runtime: &mut PipelineRuntime<'_>,
151) -> AgentExecutionResult {
152 let prompt_cmd = PromptCommand {
153 label: config.agent_name,
154 display_name: config.display_name,
155 cmd_str: config.cmd_str,
156 prompt: config.prompt,
157 log_prefix: config.log_prefix,
158 model_index: Some(config.model_index),
159 attempt: Some(config.attempt),
160 logfile: config.logfile,
161 parser_type: config.parser_type,
162 env_vars: config.env_vars,
163 };
164
165 match run_with_prompt(&prompt_cmd, runtime) {
166 Ok(result) if result.exit_code == 0 => AgentExecutionResult {
167 event: PipelineEvent::agent_invocation_succeeded(
168 config.role,
169 config.agent_name.to_string(),
170 ),
171 session_id: result.session_id,
172 },
173 Ok(result) => {
174 let exit_code = result.exit_code;
175
176 // Extract error message from logfile (stdout) for agents that emit errors as JSON
177 // This is critical for OpenCode and similar agents that don't use stderr for errors
178 //
179 // OpenCode Detection Flow:
180 // 1. Extract JSON error from stdout logfile (OpenCode emits {"type":"error",...})
181 // 2. Pass to classify_agent_error() which checks both stderr and stdout_error
182 // 3. Pattern matching detects usage limit errors from multiple sources:
183 // - Structured codes: insufficient_quota, usage_limit_exceeded, quota_exceeded
184 // - Message patterns: "usage limit reached", "anthropic: usage limit", etc.
185 // 4. If detected, emit AgentEvent::RateLimited for immediate agent fallback
186 let stdout_error = crate::pipeline::extract_error_identifier_from_logfile(
187 config.logfile,
188 runtime.workspace,
189 );
190
191 // Log extracted stdout errors only in debug verbosity.
192 //
193 // This is diagnostic-only and can be noisy in normal runs.
194 if runtime.config.verbosity.is_debug() {
195 if let Some(ref err_msg) = stdout_error {
196 runtime.logger.log(&format!(
197 "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
198 config.agent_name, err_msg
199 ));
200 }
201 }
202
203 let error_kind =
204 classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
205
206 // Special handling for rate limit: emit fact event with prompt context
207 //
208 // Rate limit detection supports both stderr and stdout error sources:
209 // - stderr: Traditional error output (most agents)
210 // - stdout: JSON error events (OpenCode, multi-provider gateways)
211 //
212 // When detected, immediately emit AgentEvent::RateLimited to trigger
213 // agent fallback without retry attempts on the same agent.
214 if is_rate_limit_error(&error_kind) {
215 // Log rate limit detection with error source and message preview
216 let error_source = if stdout_error.is_some() {
217 "stdout"
218 } else {
219 "stderr"
220 };
221 let error_preview = stdout_error
222 .as_deref()
223 .or(Some(result.stderr.as_str()))
224 .unwrap_or("");
225 let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
226 runtime.logger.info(&format!(
227 "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
228 config.agent_name, error_source, preview
229 ));
230
231 return AgentExecutionResult {
232 event: PipelineEvent::agent_rate_limited(
233 config.role,
234 config.agent_name.to_string(),
235 Some(config.prompt.to_string()),
236 ),
237 session_id: None,
238 };
239 }
240
241 // Special handling for auth failure: emit fact event without prompt context
242 if is_auth_error(&error_kind) {
243 return AgentExecutionResult {
244 event: PipelineEvent::agent_auth_failed(
245 config.role,
246 config.agent_name.to_string(),
247 ),
248 session_id: None,
249 };
250 }
251
252 // Special handling for timeout: emit fact event (reducer decides retry/fallback)
253 // Unlike rate limits, timeouts do not preserve prompt context.
254 // Determine output_kind by reading the logfile via workspace.
255 if is_timeout_error(&error_kind) {
256 let output_kind = determine_timeout_output_kind(config.logfile, runtime.workspace);
257 return AgentExecutionResult {
258 event: PipelineEvent::agent_timed_out(
259 config.role,
260 config.agent_name.to_string(),
261 output_kind,
262 Some(config.logfile.to_string()),
263 result.child_status_at_timeout,
264 ),
265 session_id: None,
266 };
267 }
268
269 let retriable = is_retriable_agent_error(&error_kind);
270
271 AgentExecutionResult {
272 event: PipelineEvent::agent_invocation_failed(
273 config.role,
274 config.agent_name.to_string(),
275 exit_code,
276 error_kind,
277 retriable,
278 ),
279 session_id: None,
280 }
281 }
282 Err(e) => {
283 // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
284 // instead of attempting to downcast the inner error payload.
285 let error_kind = classify_io_error(&e);
286
287 // Mirror special-case handling from the non-zero exit path.
288 // If `run_with_prompt` itself returns an error classified as Timeout,
289 // emit TimedOut so the reducer can decide retry vs fallback deterministically.
290 // In the Err case, the execution failed before completion, so we default to NoOutput.
291 if is_timeout_error(&error_kind) {
292 // When run_with_prompt itself errors, the agent never produced any output
293 // (the logfile may not even exist), so default to NoOutput.
294 return AgentExecutionResult {
295 event: PipelineEvent::agent_timed_out(
296 config.role,
297 config.agent_name.to_string(),
298 TimeoutOutputKind::NoOutput,
299 Some(config.logfile.to_string()),
300 None, // No CommandResult available in error path
301 ),
302 session_id: None,
303 };
304 }
305 let retriable = is_retriable_agent_error(&error_kind);
306
307 AgentExecutionResult {
308 event: PipelineEvent::agent_invocation_failed(
309 config.role,
310 config.agent_name.to_string(),
311 1,
312 error_kind,
313 retriable,
314 ),
315 session_id: None,
316 }
317 }
318 }
319}
320
321fn build_error_preview(message: &str, max_chars: usize) -> String {
322 message.chars().take(max_chars).collect()
323}
324
325/// Minimum non-whitespace characters to classify as meaningful output.
326///
327/// This threshold distinguishes between:
328/// - **`NoOutput`**: Agent produced nothing useful (empty, whitespace-only, or trivial fragments)
329/// - **`PartialOutput`**: Agent was doing real work before being cut off
330///
331/// The value of 10 is chosen to exclude noise (a few stray characters, partial words)
332/// while being low enough to recognize any substantive work.
333const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
334
335/// Determine whether a timed-out agent produced meaningful output by reading the logfile.
336///
337/// This is pure I/O observation - no policy is encoded here.
338/// The reducer decides what to do with this information.
339///
340/// # Classification Logic
341///
342/// - `NoOutput`: Logfile is missing, empty, or contains fewer than ~10 non-whitespace characters
343/// - `PartialOutput`: Logfile contains at least 10 non-whitespace characters (indicates real work)
344///
345/// # Fail-Safe Behavior
346///
347/// If the logfile cannot be read, the function returns `NoOutput` to trigger
348/// immediate agent switching rather than retrying a potentially broken agent.
349fn determine_timeout_output_kind(
350 logfile_path: &str,
351 workspace: &dyn Workspace,
352) -> TimeoutOutputKind {
353 let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
354 return TimeoutOutputKind::NoOutput;
355 };
356
357 let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
358
359 if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
360 TimeoutOutputKind::PartialOutput
361 } else {
362 TimeoutOutputKind::NoOutput
363 }
364}