Skip to main content

ralph_workflow/reducer/
fault_tolerant_executor.rs

1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits PipelineEvents
7//! - Provides detailed error classification for retry vs fallback decisions
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12use crate::agents::{AgentRole, JsonParserType};
13use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
14use crate::reducer::event::{AgentErrorKind, PipelineEvent};
15use anyhow::Result;
16use serde_json::Value;
17use std::io;
18
19/// Configuration for fault-tolerant agent execution.
20#[derive(Clone, Copy)]
21pub struct AgentExecutionConfig<'a> {
22    /// Agent role (developer, reviewer, commit agent)
23    pub role: AgentRole,
24    /// Agent name from registry
25    pub agent_name: &'a str,
26    /// Agent command to execute
27    pub cmd_str: &'a str,
28    /// JSON parser type
29    pub parser_type: JsonParserType,
30    /// Environment variables for agent
31    pub env_vars: &'a std::collections::HashMap<String, String>,
32    /// Prompt to send to agent
33    pub prompt: &'a str,
34    /// Display name for logging
35    pub display_name: &'a str,
36    /// Log file path
37    pub logfile: &'a str,
38}
39
40/// Execute an agent with bulletproof error handling.
41///
42/// This function:
43/// 1. Uses `catch_unwind` to catch panics from subprocess
44/// 2. Catches all I/O errors and non-zero exit codes
45/// 3. Never returns errors - always emits PipelineEvents
46/// 4. Classifies errors for retry/fallback decisions
47/// 5. Logs failures but continues pipeline
48///
49/// # Arguments
50///
51/// * `config` - Agent execution configuration
52/// * `runtime` - Pipeline runtime
53///
54/// # Returns
55///
56/// Returns `Ok(PipelineEvent)` with either:
57/// - `AgentInvocationSucceeded` - agent completed successfully
58/// - `AgentInvocationFailed` - agent failed with error classification
59///
60/// This function never returns `Err` - all errors are converted to events.
61pub fn execute_agent_fault_tolerantly(
62    config: AgentExecutionConfig<'_>,
63    runtime: &mut PipelineRuntime<'_>,
64) -> Result<PipelineEvent> {
65    let role = config.role;
66
67    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
68        try_agent_execution(config, runtime)
69    }));
70
71    match result {
72        Ok(event_result) => event_result,
73        Err(_) => {
74            let error_kind = AgentErrorKind::InternalError;
75            let retriable = is_retriable_agent_error(&error_kind);
76
77            Ok(PipelineEvent::AgentInvocationFailed {
78                role,
79                agent: config.agent_name.to_string(),
80                exit_code: 1,
81                error_kind,
82                retriable,
83            })
84        }
85    }
86}
87
88/// Try to execute agent without panic catching.
89///
90/// This function does the actual agent execution and returns
91/// either success or failure events. It's wrapped by
92/// `execute_agent_fault_tolerantly` which handles panics.
93fn try_agent_execution(
94    config: AgentExecutionConfig<'_>,
95    runtime: &mut PipelineRuntime<'_>,
96) -> Result<PipelineEvent> {
97    let prompt_cmd = PromptCommand {
98        label: config.agent_name,
99        display_name: config.display_name,
100        cmd_str: config.cmd_str,
101        prompt: config.prompt,
102        logfile: config.logfile,
103        parser_type: config.parser_type,
104        env_vars: config.env_vars,
105    };
106
107    match run_with_prompt(&prompt_cmd, runtime) {
108        Ok(result) if result.exit_code == 0 => Ok(PipelineEvent::AgentInvocationSucceeded {
109            role: config.role,
110            agent: config.agent_name.to_string(),
111        }),
112        Ok(result) => {
113            let exit_code = result.exit_code;
114            let error_kind = classify_agent_error(exit_code, &result.stderr);
115
116            // Special handling for rate limit: emit fallback event with prompt context
117            if is_rate_limit_error(&error_kind) {
118                return Ok(PipelineEvent::AgentRateLimitFallback {
119                    role: config.role,
120                    agent: config.agent_name.to_string(),
121                    prompt_context: Some(config.prompt.to_string()),
122                });
123            }
124
125            let retriable = is_retriable_agent_error(&error_kind);
126
127            Ok(PipelineEvent::AgentInvocationFailed {
128                role: config.role,
129                agent: config.agent_name.to_string(),
130                exit_code,
131                error_kind,
132                retriable,
133            })
134        }
135        Err(e) => {
136            let error_kind = if let Ok(io_err) = e.downcast::<io::Error>() {
137                classify_io_error(&io_err)
138            } else {
139                AgentErrorKind::InternalError
140            };
141            let retriable = is_retriable_agent_error(&error_kind);
142
143            Ok(PipelineEvent::AgentInvocationFailed {
144                role: config.role,
145                agent: config.agent_name.to_string(),
146                exit_code: 1,
147                error_kind,
148                retriable,
149            })
150        }
151    }
152}
153
154/// Classify agent error from exit code and stderr.
155fn classify_agent_error(exit_code: i32, stderr: &str) -> AgentErrorKind {
156    const SIGSEGV: i32 = 139;
157    const SIGABRT: i32 = 134;
158    const SIGTERM: i32 = 143;
159
160    match exit_code {
161        SIGSEGV | SIGABRT => AgentErrorKind::InternalError,
162        SIGTERM => AgentErrorKind::Timeout,
163        _ => {
164            let stderr_lower = stderr.to_lowercase();
165
166            if stderr_lower.contains("network")
167                || stderr_lower.contains("connection")
168                || stderr_lower.contains("timeout")
169            {
170                AgentErrorKind::Network
171            } else if stderr_lower.contains("auth")
172                || stderr_lower.contains("api key")
173                || stderr_lower.contains("unauthorized")
174            {
175                AgentErrorKind::Authentication
176            } else if is_rate_limit_stderr(&stderr_lower, stderr) {
177                AgentErrorKind::RateLimit
178            } else if stderr_lower.contains("model")
179                && (stderr_lower.contains("not found") || stderr_lower.contains("unavailable"))
180            {
181                AgentErrorKind::ModelUnavailable
182            } else if stderr_lower.contains("parse")
183                || stderr_lower.contains("invalid")
184                || stderr_lower.contains("malformed")
185            {
186                AgentErrorKind::ParsingError
187            } else if stderr_lower.contains("permission")
188                || stderr_lower.contains("access denied")
189                || stderr_lower.contains("file")
190            {
191                AgentErrorKind::FileSystem
192            } else {
193                AgentErrorKind::InternalError
194            }
195        }
196    }
197}
198
199fn is_rate_limit_stderr(stderr_lower: &str, stderr_raw: &str) -> bool {
200    // Prefer structured formats when available.
201    if is_structured_rate_limit_error(stderr_raw) {
202        return true;
203    }
204
205    // Match documented OpenAI 429 wording (avoid broad substring matches like "429" or "quota").
206    if stderr_lower.contains("rate limit reached") || stderr_lower.contains("rate limit exceeded") {
207        return true;
208    }
209
210    if stderr_lower.contains("too many requests") {
211        return true;
212    }
213
214    if stderr_lower.contains("http 429") || stderr_lower.contains("status 429") {
215        return stderr_lower.contains("rate limit") || stderr_lower.contains("too many requests");
216    }
217
218    if stderr_lower.contains("exceeded your current quota") {
219        return true;
220    }
221
222    false
223}
224
225fn is_structured_rate_limit_error(stderr: &str) -> bool {
226    // OpenCode (and some providers) emit structured JSON errors containing a stable code.
227    // Example observed in CI:
228    //   "✗ Error: {\"type\":\"error\",...,\"error\":{\"code\":\"rate_limit_exceeded\",...}}"
229    let Some(value) = try_parse_json_object(stderr) else {
230        return false;
231    };
232
233    let code = extract_error_code(&value);
234    matches!(code.as_deref(), Some("rate_limit_exceeded"))
235}
236
237fn try_parse_json_object(text: &str) -> Option<Value> {
238    let start = text.find('{')?;
239    let end = text.rfind('}')?;
240    let json_str = text.get(start..=end)?;
241    serde_json::from_str(json_str).ok()
242}
243
244fn extract_error_code(value: &Value) -> Option<String> {
245    // Support a couple of common nestings.
246    // - OpenCode: {"error": {"code": "rate_limit_exceeded", ...}}
247    // - Some SDKs: {"error": {"error": {"code": "..."}}}
248    value
249        .pointer("/error/code")
250        .and_then(Value::as_str)
251        .map(|s| s.to_string())
252        .or_else(|| {
253            value
254                .pointer("/error/error/code")
255                .and_then(Value::as_str)
256                .map(|s| s.to_string())
257        })
258}
259
260/// Classify I/O error during agent execution.
261fn classify_io_error(error: &io::Error) -> AgentErrorKind {
262    let error_msg = error.to_string().to_lowercase();
263
264    if error_msg.contains("timeout") {
265        AgentErrorKind::Timeout
266    } else if error_msg.contains("permission")
267        || error_msg.contains("access denied")
268        || error_msg.contains("no such file")
269        || error_msg.contains("not found")
270    {
271        AgentErrorKind::FileSystem
272    } else if error_msg.contains("broken pipe") || error_msg.contains("connection") {
273        AgentErrorKind::Network
274    } else {
275        AgentErrorKind::InternalError
276    }
277}
278
279/// Determine if agent error is retriable.
280///
281/// Retriable errors should trigger model fallback (same agent, different model).
282/// Non-retriable errors should trigger agent fallback (different agent).
283///
284/// Note: RateLimit (429) is intentionally NOT retriable - it triggers immediate
285/// agent fallback to continue work without waiting. This is handled specially
286/// via the `AgentRateLimitFallback` event which switches to the next agent
287/// immediately rather than retrying with the same agent.
288fn is_retriable_agent_error(error_kind: &AgentErrorKind) -> bool {
289    matches!(
290        error_kind,
291        AgentErrorKind::Network | AgentErrorKind::Timeout | AgentErrorKind::ModelUnavailable
292    )
293}
294
295/// Check if an error kind represents a rate limit (429) error.
296///
297/// Rate limit errors get special handling - they trigger immediate agent
298/// fallback via `AgentRateLimitFallback` event instead of model fallback.
299fn is_rate_limit_error(error_kind: &AgentErrorKind) -> bool {
300    matches!(error_kind, AgentErrorKind::RateLimit)
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_classify_agent_error_sigsegv() {
309        let error_kind = classify_agent_error(139, "");
310        assert_eq!(error_kind, AgentErrorKind::InternalError);
311    }
312
313    #[test]
314    fn test_classify_agent_error_sigabrt() {
315        let error_kind = classify_agent_error(134, "");
316        assert_eq!(error_kind, AgentErrorKind::InternalError);
317    }
318
319    #[test]
320    fn test_classify_agent_error_sigterm() {
321        let error_kind = classify_agent_error(143, "");
322        assert_eq!(error_kind, AgentErrorKind::Timeout);
323    }
324
325    #[test]
326    fn test_classify_agent_error_network() {
327        let error_kind = classify_agent_error(1, "Connection timeout");
328        assert_eq!(error_kind, AgentErrorKind::Network);
329    }
330
331    #[test]
332    fn test_classify_agent_error_rate_limit() {
333        let error_kind = classify_agent_error(1, "Rate limit exceeded");
334        assert_eq!(error_kind, AgentErrorKind::RateLimit);
335    }
336
337    #[test]
338    fn test_classify_agent_error_rate_limit_matches_http_429() {
339        let error_kind = classify_agent_error(1, "HTTP 429: Rate limit reached for requests");
340        assert_eq!(error_kind, AgentErrorKind::RateLimit);
341    }
342
343    #[test]
344    fn test_classify_agent_error_rate_limit_from_opencode_json_error() {
345        let stderr = r#"✗ Error: {"type":"error","sequence_number":2,"error":{"type":"tokens","code":"rate_limit_exceeded","message":"Rate limit reached"}}"#;
346        let error_kind = classify_agent_error(1, stderr);
347        assert_eq!(error_kind, AgentErrorKind::RateLimit);
348    }
349
350    #[test]
351    fn test_classify_agent_error_does_not_treat_429_token_count_as_rate_limit() {
352        let error_kind = classify_agent_error(1, "Parse error: expected 429 tokens");
353        assert_eq!(error_kind, AgentErrorKind::ParsingError);
354    }
355
356    #[test]
357    fn test_classify_agent_error_does_not_treat_quota_word_as_rate_limit() {
358        let error_kind = classify_agent_error(1, "quota.rs:1:1: syntax error");
359        assert_ne!(error_kind, AgentErrorKind::RateLimit);
360    }
361
362    #[test]
363    fn test_classify_agent_error_authentication() {
364        let error_kind = classify_agent_error(1, "Invalid API key");
365        assert_eq!(error_kind, AgentErrorKind::Authentication);
366    }
367
368    #[test]
369    fn test_classify_agent_error_model_unavailable() {
370        let error_kind = classify_agent_error(1, "Model not found");
371        assert_eq!(error_kind, AgentErrorKind::ModelUnavailable);
372    }
373
374    #[test]
375    fn test_is_retriable_agent_error() {
376        // Network, Timeout, ModelUnavailable are retriable (model fallback)
377        assert!(is_retriable_agent_error(&AgentErrorKind::Network));
378        assert!(is_retriable_agent_error(&AgentErrorKind::Timeout));
379        assert!(is_retriable_agent_error(&AgentErrorKind::ModelUnavailable));
380        // RateLimit is NOT retriable - it triggers immediate agent fallback
381        assert!(!is_retriable_agent_error(&AgentErrorKind::RateLimit));
382        // Non-retriable errors trigger agent fallback
383        assert!(!is_retriable_agent_error(&AgentErrorKind::Authentication));
384        assert!(!is_retriable_agent_error(&AgentErrorKind::ParsingError));
385        assert!(!is_retriable_agent_error(&AgentErrorKind::FileSystem));
386        assert!(!is_retriable_agent_error(&AgentErrorKind::InternalError));
387    }
388
389    #[test]
390    fn test_is_rate_limit_error() {
391        // Only RateLimit should match
392        assert!(is_rate_limit_error(&AgentErrorKind::RateLimit));
393        // All others should NOT be rate limit errors
394        assert!(!is_rate_limit_error(&AgentErrorKind::Network));
395        assert!(!is_rate_limit_error(&AgentErrorKind::Timeout));
396        assert!(!is_rate_limit_error(&AgentErrorKind::ModelUnavailable));
397        assert!(!is_rate_limit_error(&AgentErrorKind::Authentication));
398        assert!(!is_rate_limit_error(&AgentErrorKind::ParsingError));
399        assert!(!is_rate_limit_error(&AgentErrorKind::FileSystem));
400        assert!(!is_rate_limit_error(&AgentErrorKind::InternalError));
401    }
402
403    #[test]
404    fn test_classify_io_error_timeout() {
405        let error = io::Error::new(io::ErrorKind::TimedOut, "Operation timeout");
406        let error_kind = classify_io_error(&error);
407        assert_eq!(error_kind, AgentErrorKind::Timeout);
408    }
409
410    #[test]
411    fn test_classify_io_error_filesystem() {
412        let error = io::Error::new(io::ErrorKind::PermissionDenied, "Permission denied");
413        let error_kind = classify_io_error(&error);
414        assert_eq!(error_kind, AgentErrorKind::FileSystem);
415    }
416
417    #[test]
418    fn test_classify_io_error_network() {
419        let error = io::Error::new(io::ErrorKind::BrokenPipe, "Broken pipe");
420        let error_kind = classify_io_error(&error);
421        assert_eq!(error_kind, AgentErrorKind::Network);
422    }
423}