Skip to main content

ralph_workflow/reducer/fault_tolerant_executor/
mod.rs

1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::logger::Loggable;
19use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
20use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
21use crate::workspace::Workspace;
22use anyhow::Result;
23
24// Re-export error classification functions for use by other modules
25pub use error_classification::{
26    classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
27    is_retriable_agent_error, is_timeout_error,
28};
29
30const ERROR_PREVIEW_MAX_CHARS: usize = 100;
31
32/// Result of executing an agent.
33///
34/// Contains the pipeline event and optional `session_id` for session continuation.
35///
36/// # Session ID Handling
37///
38/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
39/// event to the reducer. This is the proper way to handle session IDs in the reducer
40/// architecture - each piece of information is communicated via a dedicated event.
41///
42/// The handler should:
43/// 1. Process `event` through the reducer
44/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
45///
46/// This two-event approach ensures:
47/// - Clean separation of concerns (success vs session establishment)
48/// - Proper state transitions in the reducer
49/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
50pub struct AgentExecutionResult {
51    /// The pipeline event from agent execution (success or failure).
52    pub event: PipelineEvent,
53    /// Session ID from agent's init event, for XSD retry session continuation.
54    ///
55    /// When present, handler must emit `SessionEstablished` event separately.
56    pub session_id: Option<String>,
57}
58
59/// Configuration for fault-tolerant agent execution.
60#[derive(Clone, Copy)]
61pub struct AgentExecutionConfig<'a> {
62    /// Agent role (developer, reviewer, commit agent)
63    pub role: AgentRole,
64    /// Agent name from registry
65    pub agent_name: &'a str,
66    /// Agent command to execute
67    pub cmd_str: &'a str,
68    /// JSON parser type
69    pub parser_type: JsonParserType,
70    /// Environment variables for agent
71    pub env_vars: &'a std::collections::HashMap<String, String>,
72    /// Prompt to send to agent
73    pub prompt: &'a str,
74    /// Display name for logging
75    pub display_name: &'a str,
76    /// Log prefix (without extension) used to associate artifacts.
77    ///
78    /// Example: `.agent/logs/planning_1`.
79    pub log_prefix: &'a str,
80    /// Model fallback index for attribution.
81    pub model_index: usize,
82    /// Attempt counter for attribution.
83    pub attempt: u32,
84    /// Log file path
85    pub logfile: &'a str,
86}
87
88/// Execute an agent with bulletproof error handling.
89///
90/// This function:
91/// 1. Uses `catch_unwind` to catch panics from subprocess
92/// 2. Catches all I/O errors and non-zero exit codes
93/// 3. Never returns errors - always emits `PipelineEvents`
94/// 4. Classifies errors for retry/fallback decisions
95/// 5. Logs failures but continues pipeline
96///
97/// # Arguments
98///
99/// * `config` - Agent execution configuration
100/// * `runtime` - Pipeline runtime
101///
102/// # Returns
103///
104/// Returns `Ok(AgentExecutionResult)` with:
105/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
106/// - `session_id`: Optional session ID for XSD retry session continuation
107///
108/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
109/// is present. This ensures proper state management in the reducer.
110///
111/// This function never returns `Err` - all errors are converted to events.
112///
113/// # Errors
114///
115/// Returns error if the operation fails.
116pub fn execute_agent_fault_tolerantly(
117    config: AgentExecutionConfig<'_>,
118    runtime: &mut PipelineRuntime<'_>,
119) -> Result<AgentExecutionResult> {
120    let role = config.role;
121
122    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
123        try_agent_execution(config, runtime)
124    }));
125
126    Ok(result.unwrap_or_else(|_| {
127        let error_kind = AgentErrorKind::InternalError;
128        let retriable = is_retriable_agent_error(&error_kind);
129
130        AgentExecutionResult {
131            event: PipelineEvent::agent_invocation_failed(
132                role,
133                config.agent_name.to_string(),
134                1,
135                error_kind,
136                retriable,
137            ),
138            session_id: None,
139        }
140    }))
141}
142
143/// Try to execute agent without panic catching.
144///
145/// This function does the actual agent execution and returns
146/// either success or failure events. It's wrapped by
147/// `execute_agent_fault_tolerantly` which handles panics.
148fn try_agent_execution(
149    config: AgentExecutionConfig<'_>,
150    runtime: &mut PipelineRuntime<'_>,
151) -> AgentExecutionResult {
152    let prompt_cmd = PromptCommand {
153        label: config.agent_name,
154        display_name: config.display_name,
155        cmd_str: config.cmd_str,
156        prompt: config.prompt,
157        log_prefix: config.log_prefix,
158        model_index: Some(config.model_index),
159        attempt: Some(config.attempt),
160        logfile: config.logfile,
161        parser_type: config.parser_type,
162        env_vars: config.env_vars,
163    };
164
165    match run_with_prompt(&prompt_cmd, runtime) {
166        Ok(result) if result.exit_code == 0 => AgentExecutionResult {
167            event: PipelineEvent::agent_invocation_succeeded(
168                config.role,
169                config.agent_name.to_string(),
170            ),
171            session_id: result.session_id,
172        },
173        Ok(result) => {
174            let exit_code = result.exit_code;
175
176            // Extract error message from logfile (stdout) for agents that emit errors as JSON
177            // This is critical for OpenCode and similar agents that don't use stderr for errors
178            //
179            // OpenCode Detection Flow:
180            // 1. Extract JSON error from stdout logfile (OpenCode emits {"type":"error",...})
181            // 2. Pass to classify_agent_error() which checks both stderr and stdout_error
182            // 3. Pattern matching detects usage limit errors from multiple sources:
183            //    - Structured codes: insufficient_quota, usage_limit_exceeded, quota_exceeded
184            //    - Message patterns: "usage limit reached", "anthropic: usage limit", etc.
185            // 4. If detected, emit AgentEvent::RateLimited for immediate agent fallback
186            let stdout_error = crate::pipeline::extract_error_identifier_from_logfile(
187                config.logfile,
188                runtime.workspace,
189            );
190
191            // Log extracted stdout errors only in debug verbosity.
192            //
193            // This is diagnostic-only and can be noisy in normal runs.
194            if runtime.config.verbosity.is_debug() {
195                if let Some(ref err_msg) = stdout_error {
196                    runtime.logger.log(&format!(
197                        "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
198                        config.agent_name, err_msg
199                    ));
200                }
201            }
202
203            let error_kind =
204                classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
205
206            // Special handling for rate limit: emit fact event with prompt context
207            //
208            // Rate limit detection supports both stderr and stdout error sources:
209            // - stderr: Traditional error output (most agents)
210            // - stdout: JSON error events (OpenCode, multi-provider gateways)
211            //
212            // When detected, immediately emit AgentEvent::RateLimited to trigger
213            // agent fallback without retry attempts on the same agent.
214            if is_rate_limit_error(&error_kind) {
215                // Log rate limit detection with error source and message preview
216                let error_source = if stdout_error.is_some() {
217                    "stdout"
218                } else {
219                    "stderr"
220                };
221                let error_preview = stdout_error
222                    .as_deref()
223                    .or(Some(result.stderr.as_str()))
224                    .unwrap_or("");
225                let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
226                runtime.logger.info(&format!(
227                    "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
228                    config.agent_name, error_source, preview
229                ));
230
231                return AgentExecutionResult {
232                    event: PipelineEvent::agent_rate_limited(
233                        config.role,
234                        config.agent_name.to_string(),
235                        Some(config.prompt.to_string()),
236                    ),
237                    session_id: None,
238                };
239            }
240
241            // Special handling for auth failure: emit fact event without prompt context
242            if is_auth_error(&error_kind) {
243                return AgentExecutionResult {
244                    event: PipelineEvent::agent_auth_failed(
245                        config.role,
246                        config.agent_name.to_string(),
247                    ),
248                    session_id: None,
249                };
250            }
251
252            // Special handling for timeout: emit fact event (reducer decides retry/fallback)
253            // Unlike rate limits, timeouts do not preserve prompt context.
254            // Determine output_kind by reading the logfile via workspace.
255            if is_timeout_error(&error_kind) {
256                let output_kind = determine_timeout_output_kind(config.logfile, runtime.workspace);
257                return AgentExecutionResult {
258                    event: PipelineEvent::agent_timed_out(
259                        config.role,
260                        config.agent_name.to_string(),
261                        output_kind,
262                        Some(config.logfile.to_string()),
263                        result.child_status_at_timeout,
264                    ),
265                    session_id: None,
266                };
267            }
268
269            let retriable = is_retriable_agent_error(&error_kind);
270
271            AgentExecutionResult {
272                event: PipelineEvent::agent_invocation_failed(
273                    config.role,
274                    config.agent_name.to_string(),
275                    exit_code,
276                    error_kind,
277                    retriable,
278                ),
279                session_id: None,
280            }
281        }
282        Err(e) => {
283            // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
284            // instead of attempting to downcast the inner error payload.
285            let error_kind = classify_io_error(&e);
286
287            // Mirror special-case handling from the non-zero exit path.
288            // If `run_with_prompt` itself returns an error classified as Timeout,
289            // emit TimedOut so the reducer can decide retry vs fallback deterministically.
290            // In the Err case, the execution failed before completion, so we default to NoOutput.
291            if is_timeout_error(&error_kind) {
292                // When run_with_prompt itself errors, the agent never produced any output
293                // (the logfile may not even exist), so default to NoOutput.
294                return AgentExecutionResult {
295                    event: PipelineEvent::agent_timed_out(
296                        config.role,
297                        config.agent_name.to_string(),
298                        TimeoutOutputKind::NoOutput,
299                        Some(config.logfile.to_string()),
300                        None, // No CommandResult available in error path
301                    ),
302                    session_id: None,
303                };
304            }
305            let retriable = is_retriable_agent_error(&error_kind);
306
307            AgentExecutionResult {
308                event: PipelineEvent::agent_invocation_failed(
309                    config.role,
310                    config.agent_name.to_string(),
311                    1,
312                    error_kind,
313                    retriable,
314                ),
315                session_id: None,
316            }
317        }
318    }
319}
320
321fn build_error_preview(message: &str, max_chars: usize) -> String {
322    message.chars().take(max_chars).collect()
323}
324
325/// Minimum non-whitespace characters to classify as meaningful output.
326///
327/// This threshold distinguishes between:
328/// - **`NoOutput`**: Agent produced nothing useful (empty, whitespace-only, or trivial fragments)
329/// - **`PartialOutput`**: Agent was doing real work before being cut off
330///
331/// The value of 10 is chosen to exclude noise (a few stray characters, partial words)
332/// while being low enough to recognize any substantive work.
333const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
334
335/// Determine whether a timed-out agent produced meaningful output by reading the logfile.
336///
337/// This is pure I/O observation - no policy is encoded here.
338/// The reducer decides what to do with this information.
339///
340/// # Classification Logic
341///
342/// - `NoOutput`: Logfile is missing, empty, or contains fewer than ~10 non-whitespace characters
343/// - `PartialOutput`: Logfile contains at least 10 non-whitespace characters (indicates real work)
344///
345/// # Fail-Safe Behavior
346///
347/// If the logfile cannot be read, the function returns `NoOutput` to trigger
348/// immediate agent switching rather than retrying a potentially broken agent.
349fn determine_timeout_output_kind(
350    logfile_path: &str,
351    workspace: &dyn Workspace,
352) -> TimeoutOutputKind {
353    let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
354        return TimeoutOutputKind::NoOutput;
355    };
356
357    let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
358
359    if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
360        TimeoutOutputKind::PartialOutput
361    } else {
362        TimeoutOutputKind::NoOutput
363    }
364}