Skip to main content

ralph_workflow/reducer/fault_tolerant_executor/
mod.rs

1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::common::domain_types::AgentName;
19use crate::logger::Loggable;
20use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
21use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
22use crate::workspace::Workspace;
23use anyhow::Result;
24
25// Re-export error classification functions for use by other modules
26pub use error_classification::{
27    classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
28    is_retriable_agent_error, is_timeout_error,
29};
30
31const ERROR_PREVIEW_MAX_CHARS: usize = 100;
32
33/// Result of executing an agent.
34///
35/// Contains the pipeline event and optional `session_id` for session continuation.
36///
37/// # Session ID Handling
38///
39/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
40/// event to the reducer. This is the proper way to handle session IDs in the reducer
41/// architecture - each piece of information is communicated via a dedicated event.
42///
43/// The handler should:
44/// 1. Process `event` through the reducer
45/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
46///
47/// This two-event approach ensures:
48/// - Clean separation of concerns (success vs session establishment)
49/// - Proper state transitions in the reducer
50/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
51pub struct AgentExecutionResult {
52    /// The pipeline event from agent execution (success or failure).
53    pub event: PipelineEvent,
54    /// Session ID from agent's init event, for XSD retry session continuation.
55    ///
56    /// When present, handler must emit `SessionEstablished` event separately.
57    pub session_id: Option<String>,
58}
59
60/// Configuration for fault-tolerant agent execution.
61#[derive(Clone, Copy)]
62pub struct AgentExecutionConfig<'a> {
63    /// Agent role (developer, reviewer, commit agent)
64    pub role: AgentRole,
65    /// Agent name from registry
66    pub agent_name: &'a str,
67    /// Agent command to execute
68    pub cmd_str: &'a str,
69    /// JSON parser type
70    pub parser_type: JsonParserType,
71    /// Environment variables for agent
72    pub env_vars: &'a std::collections::HashMap<String, String>,
73    /// Prompt to send to agent
74    pub prompt: &'a str,
75    /// Display name for logging
76    pub display_name: &'a str,
77    /// Log prefix (without extension) used to associate artifacts.
78    ///
79    /// Example: `.agent/logs/planning_1`.
80    pub log_prefix: &'a str,
81    /// Model fallback index for attribution.
82    pub model_index: usize,
83    /// Attempt counter for attribution.
84    pub attempt: u32,
85    /// Log file path
86    pub logfile: &'a str,
87    /// Path to the file this phase is expected to produce.
88    ///
89    /// When set, the idle timeout monitor uses its existence as a
90    /// "complete-but-waiting" signal: if the file exists and the process is
91    /// idle, the process is killed and the phase advances as success.
92    pub completion_output_path: Option<&'a std::path::Path>,
93}
94
95/// Execute an agent with bulletproof error handling.
96///
97/// This function:
98/// 1. Uses `catch_unwind` to catch panics from subprocess
99/// 2. Catches all I/O errors and non-zero exit codes
100/// 3. Never returns errors - always emits `PipelineEvents`
101/// 4. Classifies errors for retry/fallback decisions
102/// 5. Logs failures but continues pipeline
103///
104/// # Arguments
105///
106/// * `config` - Agent execution configuration
107/// * `runtime` - Pipeline runtime
108///
109/// # Returns
110///
111/// Returns `Ok(AgentExecutionResult)` with:
112/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
113/// - `session_id`: Optional session ID for XSD retry session continuation
114///
115/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
116/// is present. This ensures proper state management in the reducer.
117///
118/// This function never returns `Err` - all errors are converted to events.
119///
120/// # Errors
121///
122/// Returns error if the operation fails.
123pub fn execute_agent_fault_tolerantly(
124    config: AgentExecutionConfig<'_>,
125    runtime: &mut PipelineRuntime<'_>,
126) -> Result<AgentExecutionResult> {
127    let role = config.role;
128    let agent_name = AgentName::from(config.agent_name.to_string());
129
130    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
131        try_agent_execution(config, runtime)
132    }));
133
134    Ok(result.unwrap_or_else(|_| {
135        let error_kind = AgentErrorKind::InternalError;
136        let retriable = is_retriable_agent_error(&error_kind);
137
138        AgentExecutionResult {
139            event: PipelineEvent::agent_invocation_failed(
140                role,
141                agent_name.clone(),
142                1,
143                error_kind,
144                retriable,
145            ),
146            session_id: None,
147        }
148    }))
149}
150
151/// Try to execute agent without panic catching.
152///
153/// This function does the actual agent execution and returns
154/// either success or failure events. It's wrapped by
155/// `execute_agent_fault_tolerantly` which handles panics.
156fn try_agent_execution(
157    config: AgentExecutionConfig<'_>,
158    runtime: &mut PipelineRuntime<'_>,
159) -> AgentExecutionResult {
160    let agent_name = AgentName::from(config.agent_name.to_string());
161    let prompt_cmd = PromptCommand {
162        label: config.agent_name,
163        display_name: config.display_name,
164        cmd_str: config.cmd_str,
165        prompt: config.prompt,
166        log_prefix: config.log_prefix,
167        model_index: Some(config.model_index),
168        attempt: Some(config.attempt),
169        logfile: config.logfile,
170        parser_type: config.parser_type,
171        env_vars: config.env_vars,
172        completion_output_path: config.completion_output_path,
173    };
174
175    match run_with_prompt(&prompt_cmd, runtime) {
176        Ok(result) if result.exit_code == 0 => AgentExecutionResult {
177            event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
178            session_id: result.session_id,
179        },
180        Ok(result) => classify_nonzero_command_result(result, &config, &agent_name, runtime),
181        Err(e) => {
182            // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
183            // instead of attempting to downcast the inner error payload.
184            let error_kind = classify_io_error(&e);
185
186            // Result-file pre-check: a valid result file means the agent completed its work
187            // before run_with_prompt failed. Treat as success regardless of the I/O error.
188            if let Some(path) = config.completion_output_path {
189                if crate::files::llm_output_extraction::has_valid_xml_output(
190                    runtime.workspace,
191                    path,
192                ) {
193                    return AgentExecutionResult {
194                        event: PipelineEvent::agent_invocation_succeeded(
195                            config.role,
196                            agent_name.clone(),
197                        ),
198                        session_id: None,
199                    };
200                }
201            }
202
203            // I/O errors from run_with_prompt itself (e.g., prompt file write failure, spawn
204            // failure) are NOT wall-clock timeouts — they never carry `timeout_context`.
205            //
206            // TIMEOUT CONTRACT: only `timeout_context: Some(_)` (set by the idle-timeout
207            // monitor in the Ok path) constitutes definitive timeout evidence. An I/O error
208            // classified as `AgentErrorKind::Timeout` (e.g., `io::ErrorKind::TimedOut` from a
209            // filesystem write) is an infrastructure failure, not a wall-clock idle timeout.
210            // Always emit InvocationFailed here — never TimedOut.
211            let retriable = is_retriable_agent_error(&error_kind);
212
213            AgentExecutionResult {
214                event: PipelineEvent::agent_invocation_failed(
215                    config.role,
216                    agent_name.clone(),
217                    1,
218                    error_kind,
219                    retriable,
220                ),
221                session_id: None,
222            }
223        }
224    }
225}
226
227/// Classify a non-zero (and non-success) `CommandResult` into an `AgentExecutionResult`.
228///
229/// This function implements the full classification matrix for non-zero exits:
230///
231/// | timeout_context | Valid result file | Expected event          |
232/// |-----------------|------------------|-------------------------|
233/// | None            | Yes              | InvocationSucceeded     |
234/// | None            | No               | InvocationFailed        |
235/// | Some            | Yes              | InvocationSucceeded     |
236/// | Some            | No (absent)      | TimedOut(NoResult)      |
237/// | Some            | No (invalid XML) | TimedOut(PartialResult) |
238///
239/// # Classification order (mandatory)
240///
241/// 1. **Result file check (highest priority)** — a valid result file overrides ALL other
242///    signals, including rate limit, auth failure, and explicit timeout evidence.
243/// 2. Rate limit — emit `AgentRateLimited` for immediate agent fallback.
244/// 3. Auth failure — emit `AgentAuthFailed` for immediate agent fallback.
245/// 4. Explicit timeout — emit `TimedOut` with output kind classification.
246/// 5. Anything else — emit `InvocationFailed` with classified error kind.
247///
248/// Note: the exit_code == 0 case is handled by the caller before reaching this function.
249pub(crate) fn classify_nonzero_command_result(
250    result: crate::pipeline::CommandResult,
251    config: &AgentExecutionConfig<'_>,
252    agent_name: &AgentName,
253    runtime: &PipelineRuntime<'_>,
254) -> AgentExecutionResult {
255    let exit_code = result.exit_code;
256    let has_explicit_timeout = result.timeout_context.is_some();
257
258    // Extract error message from logfile (stdout) for agents that emit errors as JSON.
259    // This is critical for OpenCode and similar agents that don't use stderr for errors.
260    let stdout_error =
261        crate::pipeline::extract_error_identifier_from_logfile(config.logfile, runtime.workspace);
262
263    // Log extracted stdout errors only in debug verbosity.
264    if runtime.config.verbosity.is_debug() {
265        if let Some(ref err_msg) = stdout_error {
266            runtime.logger.log(&format!(
267                "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
268                config.agent_name, err_msg
269            ));
270        }
271    }
272
273    let error_kind = classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
274
275    // RESULT FILE PRE-CHECK — highest priority, overrides all error signals.
276    //
277    // A valid result file is definitive evidence that the agent completed its
278    // work successfully, regardless of what error signal fired after the fact:
279    //
280    // - Proprietary exit codes (e.g., reason:91 from OpenCode) — Bug 1
281    // - Rate limit detected in stderr/stdout after the agent finished writing
282    // - Auth failure signal emitted after work was already done
283    // - Explicit timeout (wall-clock exceeded) after a valid result was produced
284    //
285    // This check MUST happen before any error classification early-returns so
286    // that a completed result cannot be misclassified as a failure.
287    if let Some(path) = config.completion_output_path {
288        if crate::files::llm_output_extraction::has_valid_xml_output(runtime.workspace, path) {
289            return AgentExecutionResult {
290                event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
291                session_id: result.session_id,
292            };
293        }
294    }
295
296    // Rate limit: emit fact event with prompt context for immediate agent fallback.
297    if is_rate_limit_error(&error_kind) {
298        let error_source = if stdout_error.is_some() {
299            "stdout"
300        } else {
301            "stderr"
302        };
303        let error_preview = stdout_error
304            .as_deref()
305            .or(Some(result.stderr.as_str()))
306            .unwrap_or("");
307        let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
308        runtime.logger.info(&format!(
309            "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
310            config.agent_name, error_source, preview
311        ));
312
313        return AgentExecutionResult {
314            event: PipelineEvent::agent_rate_limited(
315                config.role,
316                agent_name.clone(),
317                Some(config.prompt.to_string()),
318            ),
319            session_id: None,
320        };
321    }
322
323    // Auth failure: emit fact event without prompt context.
324    if is_auth_error(&error_kind) {
325        return AgentExecutionResult {
326            event: PipelineEvent::agent_auth_failed(config.role, agent_name.clone()),
327            session_id: None,
328        };
329    }
330
331    // Explicit timeout: emit fact event with output kind classification.
332    //
333    // TIMEOUT CLASSIFICATION RULE:
334    // `result.timeout_context.is_some()` is the DEFINITIVE signal that a real wall-clock
335    // timeout was enforced by the idle-timeout monitor. Exit code 143 (SIGTERM) alone
336    // is NOT sufficient evidence — SIGTERM may be sent for other reasons.
337    //
338    // The result-file pre-check above has already promoted a valid result to success.
339    // Reaching here means the result file is absent or invalid — classify by output kind.
340    if has_explicit_timeout {
341        let output_kind = determine_timeout_output_kind(
342            config.logfile,
343            config.completion_output_path,
344            runtime.workspace,
345        );
346        return AgentExecutionResult {
347            event: PipelineEvent::agent_timed_out(
348                config.role,
349                agent_name.clone(),
350                output_kind,
351                Some(config.logfile.to_string()),
352                result.child_status_at_timeout,
353            ),
354            session_id: None,
355        };
356    }
357
358    // If we reach here, no explicit timeout evidence exists.
359    // SIGTERM (exit 143) without `timeout_context` is classified by
360    // `classify_agent_error` as `AgentErrorKind::Timeout`, but it must NOT be
361    // promoted to a `TimedOut` event — fall through to `InvocationFailed`.
362    let retriable = is_retriable_agent_error(&error_kind);
363
364    AgentExecutionResult {
365        event: PipelineEvent::agent_invocation_failed(
366            config.role,
367            agent_name.clone(),
368            exit_code,
369            error_kind,
370            retriable,
371        ),
372        session_id: None,
373    }
374}
375
376fn build_error_preview(message: &str, max_chars: usize) -> String {
377    message.chars().take(max_chars).collect()
378}
379
380/// Minimum non-whitespace characters to classify as meaningful output.
381///
382/// Used only in the logfile-heuristic fallback path (when no `completion_output_path`
383/// is configured, e.g. Analysis drain).
384const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
385
386/// Determine whether a timed-out agent produced a valid result.
387///
388/// When a `completion_output_path` is provided, classification is based on
389/// whether that file exists on disk:
390/// - File present (even if invalid XML) → `PartialResult`
391/// - File absent → `NoResult`
392///
393/// Note: callers MUST check `has_valid_xml_output` BEFORE calling this function
394/// and promote a valid result to success. By the time this function is reached,
395/// the valid-result case has already been handled.
396///
397/// When no `completion_output_path` is provided (e.g., Analysis drain), falls
398/// back to logfile content heuristic.
399///
400/// # Fail-Safe Behavior
401///
402/// If the logfile cannot be read in the fallback path, returns `NoResult` to
403/// trigger immediate agent switching rather than retrying a potentially broken agent.
404fn determine_timeout_output_kind(
405    logfile_path: &str,
406    completion_output_path: Option<&std::path::Path>,
407    workspace: &dyn Workspace,
408) -> TimeoutOutputKind {
409    // When a completion file is expected, classify based on file existence.
410    // The valid-file case is handled upstream; here we distinguish missing vs. present-but-invalid.
411    if let Some(path) = completion_output_path {
412        return if workspace.exists(path) {
413            TimeoutOutputKind::PartialResult
414        } else {
415            TimeoutOutputKind::NoResult
416        };
417    }
418
419    // Fallback: no completion path configured (e.g. Analysis drain).
420    // Use logfile content as a proxy for whether the agent did any real work.
421    let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
422        return TimeoutOutputKind::NoResult;
423    };
424
425    let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
426    if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
427        TimeoutOutputKind::PartialResult
428    } else {
429        TimeoutOutputKind::NoResult
430    }
431}