Skip to main content

ralph_workflow/reducer/fault_tolerant_executor/
mod.rs

1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits `PipelineEvents`
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::common::domain_types::AgentName;
19use crate::logger::Loggable;
20use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
21use crate::reducer::event::{AgentErrorKind, PipelineEvent, TimeoutOutputKind};
22use crate::workspace::Workspace;
23use anyhow::Result;
24
25// Re-export error classification functions for use by other modules
26pub use error_classification::{
27    classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
28    is_retriable_agent_error, is_timeout_error,
29};
30
31const ERROR_PREVIEW_MAX_CHARS: usize = 100;
32
33/// Result of executing an agent.
34///
35/// Contains the pipeline event and optional `session_id` for session continuation.
36///
37/// # Session ID Handling
38///
39/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
40/// event to the reducer. This is the proper way to handle session IDs in the reducer
41/// architecture - each piece of information is communicated via a dedicated event.
42///
43/// The handler should:
44/// 1. Process `event` through the reducer
45/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
46///
47/// This two-event approach ensures:
48/// - Clean separation of concerns (success vs session establishment)
49/// - Proper state transitions in the reducer
50/// - Session ID is stored in `agent_chain.last_session_id` for XSD retry reuse
51pub struct AgentExecutionResult {
52    /// The pipeline event from agent execution (success or failure).
53    pub event: PipelineEvent,
54    /// Session ID from agent's init event, for XSD retry session continuation.
55    ///
56    /// When present, handler must emit `SessionEstablished` event separately.
57    pub session_id: Option<String>,
58}
59
60/// Configuration for fault-tolerant agent execution.
61#[derive(Clone, Copy)]
62pub struct AgentExecutionConfig<'a> {
63    /// Agent role (developer, reviewer, commit agent)
64    pub role: AgentRole,
65    /// Agent name from registry
66    pub agent_name: &'a str,
67    /// Agent command to execute
68    pub cmd_str: &'a str,
69    /// JSON parser type
70    pub parser_type: JsonParserType,
71    /// Environment variables for agent
72    pub env_vars: &'a std::collections::HashMap<String, String>,
73    /// Prompt to send to agent
74    pub prompt: &'a str,
75    /// Display name for logging
76    pub display_name: &'a str,
77    /// Log prefix (without extension) used to associate artifacts.
78    ///
79    /// Example: `.agent/logs/planning_1`.
80    pub log_prefix: &'a str,
81    /// Model fallback index for attribution.
82    pub model_index: usize,
83    /// Attempt counter for attribution.
84    pub attempt: u32,
85    /// Log file path
86    pub logfile: &'a str,
87}
88
89/// Execute an agent with bulletproof error handling.
90///
91/// This function:
92/// 1. Uses `catch_unwind` to catch panics from subprocess
93/// 2. Catches all I/O errors and non-zero exit codes
94/// 3. Never returns errors - always emits `PipelineEvents`
95/// 4. Classifies errors for retry/fallback decisions
96/// 5. Logs failures but continues pipeline
97///
98/// # Arguments
99///
100/// * `config` - Agent execution configuration
101/// * `runtime` - Pipeline runtime
102///
103/// # Returns
104///
105/// Returns `Ok(AgentExecutionResult)` with:
106/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
107/// - `session_id`: Optional session ID for XSD retry session continuation
108///
109/// The handler MUST emit `SessionEstablished` as a separate event when `session_id`
110/// is present. This ensures proper state management in the reducer.
111///
112/// This function never returns `Err` - all errors are converted to events.
113///
114/// # Errors
115///
116/// Returns error if the operation fails.
117pub fn execute_agent_fault_tolerantly(
118    config: AgentExecutionConfig<'_>,
119    runtime: &mut PipelineRuntime<'_>,
120) -> Result<AgentExecutionResult> {
121    let role = config.role;
122    let agent_name = AgentName::from(config.agent_name.to_string());
123
124    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
125        try_agent_execution(config, runtime)
126    }));
127
128    Ok(result.unwrap_or_else(|_| {
129        let error_kind = AgentErrorKind::InternalError;
130        let retriable = is_retriable_agent_error(&error_kind);
131
132        AgentExecutionResult {
133            event: PipelineEvent::agent_invocation_failed(
134                role,
135                agent_name.clone(),
136                1,
137                error_kind,
138                retriable,
139            ),
140            session_id: None,
141        }
142    }))
143}
144
145/// Try to execute agent without panic catching.
146///
147/// This function does the actual agent execution and returns
148/// either success or failure events. It's wrapped by
149/// `execute_agent_fault_tolerantly` which handles panics.
150fn try_agent_execution(
151    config: AgentExecutionConfig<'_>,
152    runtime: &mut PipelineRuntime<'_>,
153) -> AgentExecutionResult {
154    let agent_name = AgentName::from(config.agent_name.to_string());
155    let prompt_cmd = PromptCommand {
156        label: config.agent_name,
157        display_name: config.display_name,
158        cmd_str: config.cmd_str,
159        prompt: config.prompt,
160        log_prefix: config.log_prefix,
161        model_index: Some(config.model_index),
162        attempt: Some(config.attempt),
163        logfile: config.logfile,
164        parser_type: config.parser_type,
165        env_vars: config.env_vars,
166    };
167
168    match run_with_prompt(&prompt_cmd, runtime) {
169        Ok(result) if result.exit_code == 0 => AgentExecutionResult {
170            event: PipelineEvent::agent_invocation_succeeded(config.role, agent_name.clone()),
171            session_id: result.session_id,
172        },
173        Ok(result) => {
174            let exit_code = result.exit_code;
175
176            // Extract error message from logfile (stdout) for agents that emit errors as JSON
177            // This is critical for OpenCode and similar agents that don't use stderr for errors
178            //
179            // OpenCode Detection Flow:
180            // 1. Extract JSON error from stdout logfile (OpenCode emits {"type":"error",...})
181            // 2. Pass to classify_agent_error() which checks both stderr and stdout_error
182            // 3. Pattern matching detects usage limit errors from multiple sources:
183            //    - Structured codes: insufficient_quota, usage_limit_exceeded, quota_exceeded
184            //    - Message patterns: "usage limit reached", "anthropic: usage limit", etc.
185            // 4. If detected, emit AgentEvent::RateLimited for immediate agent fallback
186            let stdout_error = crate::pipeline::extract_error_identifier_from_logfile(
187                config.logfile,
188                runtime.workspace,
189            );
190
191            // Log extracted stdout errors only in debug verbosity.
192            //
193            // This is diagnostic-only and can be noisy in normal runs.
194            if runtime.config.verbosity.is_debug() {
195                if let Some(ref err_msg) = stdout_error {
196                    runtime.logger.log(&format!(
197                        "[DEBUG] [OpenCode] Extracted error from logfile for agent '{}': {}",
198                        config.agent_name, err_msg
199                    ));
200                }
201            }
202
203            let error_kind =
204                classify_agent_error(exit_code, &result.stderr, stdout_error.as_deref());
205
206            // Special handling for rate limit: emit fact event with prompt context
207            //
208            // Rate limit detection supports both stderr and stdout error sources:
209            // - stderr: Traditional error output (most agents)
210            // - stdout: JSON error events (OpenCode, multi-provider gateways)
211            //
212            // When detected, immediately emit AgentEvent::RateLimited to trigger
213            // agent fallback without retry attempts on the same agent.
214            if is_rate_limit_error(&error_kind) {
215                // Log rate limit detection with error source and message preview
216                let error_source = if stdout_error.is_some() {
217                    "stdout"
218                } else {
219                    "stderr"
220                };
221                let error_preview = stdout_error
222                    .as_deref()
223                    .or(Some(result.stderr.as_str()))
224                    .unwrap_or("");
225                let preview = build_error_preview(error_preview, ERROR_PREVIEW_MAX_CHARS);
226                runtime.logger.info(&format!(
227                    "[OpenCode] Rate limit detected for agent '{}' (source: {}): {}",
228                    config.agent_name, error_source, preview
229                ));
230
231                return AgentExecutionResult {
232                    event: PipelineEvent::agent_rate_limited(
233                        config.role,
234                        agent_name.clone(),
235                        Some(config.prompt.to_string()),
236                    ),
237                    session_id: None,
238                };
239            }
240
241            // Special handling for auth failure: emit fact event without prompt context
242            if is_auth_error(&error_kind) {
243                return AgentExecutionResult {
244                    event: PipelineEvent::agent_auth_failed(config.role, agent_name.clone()),
245                    session_id: None,
246                };
247            }
248
249            // Special handling for timeout: emit fact event (reducer decides retry/fallback)
250            // Unlike rate limits, timeouts do not preserve prompt context.
251            // Determine output_kind by reading the logfile via workspace.
252            if is_timeout_error(&error_kind) {
253                let output_kind = determine_timeout_output_kind(config.logfile, runtime.workspace);
254                return AgentExecutionResult {
255                    event: PipelineEvent::agent_timed_out(
256                        config.role,
257                        agent_name.clone(),
258                        output_kind,
259                        Some(config.logfile.to_string()),
260                        result.child_status_at_timeout,
261                    ),
262                    session_id: None,
263                };
264            }
265
266            let retriable = is_retriable_agent_error(&error_kind);
267
268            AgentExecutionResult {
269                event: PipelineEvent::agent_invocation_failed(
270                    config.role,
271                    agent_name.clone(),
272                    exit_code,
273                    error_kind,
274                    retriable,
275                ),
276                session_id: None,
277            }
278        }
279        Err(e) => {
280            // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
281            // instead of attempting to downcast the inner error payload.
282            let error_kind = classify_io_error(&e);
283
284            // Mirror special-case handling from the non-zero exit path.
285            // If `run_with_prompt` itself returns an error classified as Timeout,
286            // emit TimedOut so the reducer can decide retry vs fallback deterministically.
287            // In the Err case, the execution failed before completion, so we default to NoOutput.
288            if is_timeout_error(&error_kind) {
289                // When run_with_prompt itself errors, the agent never produced any output
290                // (the logfile may not even exist), so default to NoOutput.
291                return AgentExecutionResult {
292                    event: PipelineEvent::agent_timed_out(
293                        config.role,
294                        agent_name.clone(),
295                        TimeoutOutputKind::NoOutput,
296                        Some(config.logfile.to_string()),
297                        None, // No CommandResult available in error path
298                    ),
299                    session_id: None,
300                };
301            }
302            let retriable = is_retriable_agent_error(&error_kind);
303
304            AgentExecutionResult {
305                event: PipelineEvent::agent_invocation_failed(
306                    config.role,
307                    agent_name.clone(),
308                    1,
309                    error_kind,
310                    retriable,
311                ),
312                session_id: None,
313            }
314        }
315    }
316}
317
318fn build_error_preview(message: &str, max_chars: usize) -> String {
319    message.chars().take(max_chars).collect()
320}
321
322/// Minimum non-whitespace characters to classify as meaningful output.
323///
324/// This threshold distinguishes between:
325/// - **`NoOutput`**: Agent produced nothing useful (empty, whitespace-only, or trivial fragments)
326/// - **`PartialOutput`**: Agent was doing real work before being cut off
327///
328/// The value of 10 is chosen to exclude noise (a few stray characters, partial words)
329/// while being low enough to recognize any substantive work.
330const MEANINGFUL_OUTPUT_THRESHOLD: usize = 10;
331
332/// Determine whether a timed-out agent produced meaningful output by reading the logfile.
333///
334/// This is pure I/O observation - no policy is encoded here.
335/// The reducer decides what to do with this information.
336///
337/// # Classification Logic
338///
339/// - `NoOutput`: Logfile is missing, empty, or contains fewer than ~10 non-whitespace characters
340/// - `PartialOutput`: Logfile contains at least 10 non-whitespace characters (indicates real work)
341///
342/// # Fail-Safe Behavior
343///
344/// If the logfile cannot be read, the function returns `NoOutput` to trigger
345/// immediate agent switching rather than retrying a potentially broken agent.
346fn determine_timeout_output_kind(
347    logfile_path: &str,
348    workspace: &dyn Workspace,
349) -> TimeoutOutputKind {
350    let Some(content) = workspace.read(std::path::Path::new(logfile_path)).ok() else {
351        return TimeoutOutputKind::NoOutput;
352    };
353
354    let non_whitespace_count = content.chars().filter(|c| !c.is_whitespace()).count();
355
356    if non_whitespace_count >= MEANINGFUL_OUTPUT_THRESHOLD {
357        TimeoutOutputKind::PartialOutput
358    } else {
359        TimeoutOutputKind::NoOutput
360    }
361}