ralph_workflow/reducer/fault_tolerant_executor/mod.rs
1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::common::domain_types::AgentName;
19use crate::logger::Loggable;
20use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
21use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
22use crate::workspace::Workspace;
23use anyhow::Result;
24
25// Re-export error classification functions for use by other modules
26pub use error_classification::{
27 classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
28 is_retriable_agent_error, is_timeout_error,
29};
30
31const ERROR_PREVIEW_MAX_CHARS: usize = 100;
32
33/// Result of executing an agent.
34///
35/// Contains the pipeline event and optional `session_id` for session continuation.
36///
37/// # Session ID Handling
38///
39/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
40/// event to the reducer. This is the proper way to handle session IDs in the reducer
41/// architecture - each piece of information is communicated via a dedicated event.
42///
43/// The handler should:
44/// 1. Process `event` through the reducer
45/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
46///
47/// This two-event approach ensures:
48/// - Clean separation of concerns (success vs session establishment)
49/// - Proper state transitions in the reducer
50/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
51pub struct AgentExecutionResult {
52 /// The pipeline event from agent execution (success or failure).
53 pub event: PipelineEvent,
54 /// Session ID from agent's init event, for XSD retry session continuation.
55 ///
56 /// When present, handler must emit `SessionEstablished` event separately.
57 pub session_id: Option<String>,
58}
59
60/// Configuration for fault-tolerant agent execution.
61#[derive(Clone, Copy)]
62pub struct AgentExecutionConfig<'a> {
63 /// Agent role (developer, reviewer, commit agent)
64 pub role: AgentRole,
65 /// Agent name from registry
66 pub agent_name: &'a str,
67 /// Agent command to execute
68 pub cmd_str: &'a str,
69 /// JSON parser type
70 pub parser_type: JsonParserType,
71 /// Environment variables for agent
72 pub env_vars: &'a std::collections::HashMap<String, String>,
73 /// Prompt to send to agent
74 pub prompt: &'a str,
75 /// Display name for logging
76 pub display_name: &'a str,
77 /// Log prefix (without extension) used to associate artifacts.
78 ///
79 /// Example: `.agent/logs/planning_1`.
80 pub log_prefix: &'a str,
81 /// Model fallback index for attribution.
82 pub model_index: usize,
83 /// Attempt counter for attribution.
84 pub attempt: u32,
85 /// Log file path
86 pub logfile: &'a str,
87 /// Path to the file this phase is expected to produce.
88 ///
89 /// When set, the idle timeout monitor uses its existence as a
90 /// "complete-but-waiting" signal: if the file exists and the process is
91 /// idle, the process is killed and the phase advances as success.
92 pub completion_output_path: Option<&'a std::path::Path>,
93}
94
95/// Execute an agent with bulletproof error handling.
96///
97/// This function:
98/// 1. Uses `catch_unwind` to catch panics from subprocess
99/// 2. Catches all I/O errors and non-zero exit codes
100/// 3. Never returns errors - always emits `PipelineEvents`
101/// 4. Classifies errors for retry/fallback decisions
102/// 5. Logs failures but continues pipeline
103///
104/// # Arguments
105///
106/// * `config` - Agent execution configuration
107/// * `runtime` - Pipeline runtime
108///
109/// # Returns
110///
111/// Returns `Ok(AgentExecutionResult)` with:
112/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
113/// - `session_id`: Optional session ID for XSD retry session continuation
114///
115/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
116/// is present. This ensures proper state management in the reducer.
117///
118/// This function never returns `Err` - all errors are converted to events.
119///
120/// # Errors
121///
122/// Returns error if the operation fails.
123pub fn execute_agent_fault_tolerantly(
124 config: AgentExecutionConfig<'_>,
125 runtime: &mut PipelineRuntime<'_>,
126) -> Result<AgentExecutionResult> {
127 let role = config.role;
128 let agent_name = AgentName::from(config.agent_name.to_string());
129
130 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
131 try_agent_execution(config, runtime)
132 }));
133
134 Ok(result.unwrap_or_else(|_| {
135 let error_kind = AgentErrorKind::InternalError;
136 let retriable = is_retriable_agent_error(&error_kind);
137
138 AgentExecutionResult {
139 event: PipelineEvent::agent_invocation_failed(
140 role,
141 agent_name.clone(),
142 1,
143 error_kind,
144 retriable,
145 ),
146 session_id: None,
147 }
148 }))
149}
150
151/// Try to execute agent without panic catching.
152///
153/// This function does the actual agent execution and returns
154/// either success or failure events. It's wrapped by
155/// `execute_agent_fault_tolerantly` which handles panics.
156fn try_agent_execution(
157 config: AgentExecutionConfig<'_>,
158 runtime: &mut PipelineRuntime<'_>,
159) -> AgentExecutionResult {
160 let agent_name = AgentName::from(config.agent_name.to_string());
161 let prompt_cmd = PromptCommand {
162 label: config.agent_name,
163 display_name: config.display_name,
164 cmd_str: config.cmd_str,
165 prompt: config.prompt,
166 log_prefix: config.log_prefix,
167 model_index: Some(config.model_index),
168 attempt: Some(config.attempt),
169 logfile: config.logfile,
170 parser_type: config.parser_type,
171 env_vars: config.env_vars,
172 completion_output_path: config.completion_output_path,
173 };
174
175 match run_with_prompt(&prompt_cmd, runtime) {
176 Ok(result) if result.exit_code == 0 => AgentExecutionResult {
177 event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
178 session_id: result.session_id,
179 },
180 Ok(result) => classify_nonzero_command_result(result, &config, &agent_name, runtime),
181 Err(e) => {
182 // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
183 // instead of attempting to downcast the inner error payload.
184 let error_kind = classify_io_error(&e);
185
186 // Result-file pre-check: a valid result file means the agent completed its work
187 // before run_with_prompt failed. Treat as success regardless of the I/O error.
188 if let Some(path) = config.completion_output_path {
189 if crate::files::llm_output_extraction::has_valid_xml_output(
190 runtime.workspace,
191 path,
192 ) {
193 return AgentExecutionResult {
194 event: PipelineEvent::agent_invocation_succeeded(
195 config.role,
196 agent_name.clone(),
197 ),
198 session_id: None,
199 };
200 }
201 }
202
203 // I/O errors from run_with_prompt itself (e.g., prompt file write failure, spawn
204 // failure) are NOT wall-clock timeouts — they never carry `timeout_context`.
205 //
206 // TIMEOUT CONTRACT: only `timeout_context: Some(_)` (set by the idle-timeout
207 // monitor in the Ok path) constitutes definitive timeout evidence. An I/O error
208 // classified as `AgentErrorKind::Timeout` (e.g., `io::ErrorKind::TimedOut` from a
209 // filesystem write) is an infrastructure failure, not a wall-clock idle timeout.
210 // Always emit InvocationFailed here — never TimedOut.
211 let retriable = is_retriable_agent_error(&error_kind);
212
213 AgentExecutionResult {
214 event: PipelineEvent::agent_invocation_failed(
215 config.role,
216 agent_name.clone(),
217 1,
218 error_kind,
219 retriable,
220 ),
221 session_id: None,
222 }
223 }
224 }
225}
226
227/// Classify a non-zero (and non-success) `CommandResult` into an `AgentExecutionResult`.
228///
229/// This function implements the full classification matrix for non-zero exits:
230///
231/// | timeout_context | Valid result file | Expected event |
232/// |-----------------|------------------|-------------------------|
233/// | None | Yes | InvocationSucceeded |
234/// | None | No | InvocationFailed |
235/// | Some | Yes | InvocationSucceeded |
236/// | Some | No (absent) | TimedOut(NoResult) |
237/// | Some | No (invalid XML) | TimedOut(PartialResult) |
238///
239/// # Classification order (mandatory)
240///
241/// 1. **Result file check (highest priority)** — a valid result file overrides ALL other
242/// signals, including rate limit, auth failure, and explicit timeout evidence.
243/// 2. Rate limit — emit `AgentRateLimited` for immediate agent fallback.
244/// 3. Auth failure — emit `AgentAuthFailed` for immediate agent fallback.
245/// 4. Explicit timeout — emit `TimedOut` with output kind classification.
246/// 5. Anything else — emit `InvocationFailed` with classified error kind.
247///
248/// Note: the exit_code == 0 case is handled by the caller before reaching this function.
249pub(crate) fn classify_nonzero_command_result(
250 result: crate::pipeline::CommandResult,
251 config: &AgentExecutionConfig<'_>,
252 agent_name: &AgentName,
253 runtime: &PipelineRuntime<'_>,
254) -> AgentExecutionResult {
255 let exit_code = result.exit_code;
256 let has_explicit_timeout = result.timeout_context.is_some();
257
258 // Extract error message from logfile (stdout) for agents that emit errors as JSON.
259 // This is critical for OpenCode and similar agents that don't use stderr for errors.
260 let stdout_error =
261 crate::pipeline::extract_error_identifier_from_logfile(config.logfile, runtime.workspace);
262
263 // Log extracted stdout errors only in debug verbosity.
264 if runtime.config.verbosity.is_debug() {
265 if let Some(ref err_msg) = stdout_error {
266 runtime.logger.log(&format!(
267 "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
268 config.agent_name, err_msg
269 ));
270 }
271 }
272
273 let error_kind = classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
274
275 // RESULT FILE PRE-CHECK — highest priority, overrides all error signals.
276 //
277 // A valid result file is definitive evidence that the agent completed its
278 // work successfully, regardless of what error signal fired after the fact:
279 //
280 // - Proprietary exit codes (e.g., reason:91 from OpenCode) — Bug 1
281 // - Rate limit detected in stderr/stdout after the agent finished writing
282 // - Auth failure signal emitted after work was already done
283 // - Explicit timeout (wall-clock exceeded) after a valid result was produced
284 //
285 // This check MUST happen before any error classification early-returns so
286 // that a completed result cannot be misclassified as a failure.
287 if let Some(path) = config.completion_output_path {
288 if crate::files::llm_output_extraction::has_valid_xml_output(runtime.workspace, path) {
289 return AgentExecutionResult {
290 event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
291 session_id: result.session_id,
292 };
293 }
294 }
295
296 // Rate limit: emit fact event with prompt context for immediate agent fallback.
297 if is_rate_limit_error(&error_kind) {
298 let error_source = if stdout_error.is_some() {
299 "stdout"
300 } else {
301 "stderr"
302 };
303 let error_preview = stdout_error
304 .as_deref()
305 .or(Some(result.stderr.as_str()))
306 .unwrap_or("");
307 let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
308 runtime.logger.info(&format!(
309 "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
310 config.agent_name, error_source, preview
311 ));
312
313 return AgentExecutionResult {
314 event: PipelineEvent::agent_rate_limited(
315 config.role,
316 agent_name.clone(),
317 Some(config.prompt.to_string()),
318 ),
319 session_id: None,
320 };
321 }
322
323 // Auth failure: emit fact event without prompt context.
324 if is_auth_error(&error_kind) {
325 return AgentExecutionResult {
326 event: PipelineEvent::agent_auth_failed(config.role, agent_name.clone()),
327 session_id: None,
328 };
329 }
330
331 // Explicit timeout: emit fact event with output kind classification.
332 //
333 // TIMEOUT CLASSIFICATION RULE:
334 // `result.timeout_context.is_some()` is the DEFINITIVE signal that a real wall-clock
335 // timeout was enforced by the idle-timeout monitor. Exit code 143 (SIGTERM) alone
336 // is NOT sufficient evidence — SIGTERM may be sent for other reasons.
337 //
338 // The result-file pre-check above has already promoted a valid result to success.
339 // Reaching here means the result file is absent or invalid — classify by output kind.
340 if has_explicit_timeout {
341 let output_kind = determine_timeout_output_kind(
342 config.logfile,
343 config.completion_output_path,
344 runtime.workspace,
345 );
346 return AgentExecutionResult {
347 event: PipelineEvent::agent_timed_out(
348 config.role,
349 agent_name.clone(),
350 output_kind,
351 Some(config.logfile.to_string()),
352 result.child_status_at_timeout,
353 ),
354 session_id: None,
355 };
356 }
357
358 // If we reach here, no explicit timeout evidence exists.
359 // SIGTERM (exit 143) without `timeout_context` is classified by
360 // `classify_agent_error` as `AgentErrorKind::Timeout`, but it must NOT be
361 // promoted to a `TimedOut` event — fall through to `InvocationFailed`.
362 let retriable = is_retriable_agent_error(&error_kind);
363
364 AgentExecutionResult {
365 event: PipelineEvent::agent_invocation_failed(
366 config.role,
367 agent_name.clone(),
368 exit_code,
369 error_kind,
370 retriable,
371 ),
372 session_id: None,
373 }
374}
375
376fn build_error_preview(message: &str, max_chars: usize) -> String {
377 message.chars().take(max_chars).collect()
378}
379
380/// Minimum non-whitespace characters to classify as meaningful output.
381///
382/// Used only in the logfile-heuristic fallback path (when no `completion_output_path`
383/// is configured, e.g. Analysis drain).
384const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
385
386/// Determine whether a timed-out agent produced a valid result.
387///
388/// When a `completion_output_path` is provided, classification is based on
389/// whether that file exists on disk:
390/// - File present (even if invalid XML) → `PartialResult`
391/// - File absent → `NoResult`
392///
393/// Note: callers MUST check `has_valid_xml_output` BEFORE calling this function
394/// and promote a valid result to success. By the time this function is reached,
395/// the valid-result case has already been handled.
396///
397/// When no `completion_output_path` is provided (e.g., Analysis drain), falls
398/// back to logfile content heuristic.
399///
400/// # Fail-Safe Behavior
401///
402/// If the logfile cannot be read in the fallback path, returns `NoResult` to
403/// trigger immediate agent switching rather than retrying a potentially broken agent.
404fn determine_timeout_output_kind(
405 logfile_path: &str,
406 completion_output_path: Option<&std::path::Path>,
407 workspace: &dyn Workspace,
408) -> TimeoutOutputKind {
409 // When a completion file is expected, classify based on file existence.
410 // The valid-file case is handled upstream; here we distinguish missing vs. present-but-invalid.
411 if let Some(path) = completion_output_path {
412 return if workspace.exists(path) {
413 TimeoutOutputKind::PartialResult
414 } else {
415 TimeoutOutputKind::NoResult
416 };
417 }
418
419 // Fallback: no completion path configured (e.g. Analysis drain).
420 // Use logfile content as a proxy for whether the agent did any real work.
421 let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
422 return TimeoutOutputKind::NoResult;
423 };
424
425 let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
426 if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
427 TimeoutOutputKind::PartialResult
428 } else {
429 TimeoutOutputKind::NoResult
430 }
431}