Skip to main content

ralph_workflow/reducer/fault_tolerant_executor/
mod.rs

1//! Fault-tolerant agent executor.
2//!
3//! This module provides bulletproof agent execution wrapper that:
4//! - Catches all panics from subprocess execution
5//! - Catches all I/O errors and non-zero exit codes
6//! - Never returns errors - always emits PipelineEvents
7//! - Provides detailed error classification for reducer-driven retry/fallback policy
8//! - Logs all failures but continues pipeline execution
9//!
10//! Key design principle: **Agent failures should NEVER crash the pipeline**.
11
12mod error_classification;
13
14#[cfg(test)]
15mod tests;
16
17use crate::agents::{AgentRole, JsonParserType};
18use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
19use crate::reducer::event::{AgentErrorKind, PipelineEvent};
20use anyhow::Result;
21
22// Re-export error classification functions for use by other modules
23pub use error_classification::{
24    classify_agent_error, classify_io_error, is_auth_error, is_rate_limit_error,
25    is_retriable_agent_error, is_timeout_error,
26};
27
28/// Result of executing an agent.
29///
30/// Contains the pipeline event and optional session_id for session continuation.
31///
32/// # Session ID Handling
33///
34/// When `session_id` is `Some`, the handler MUST emit a separate `SessionEstablished`
35/// event to the reducer. This is the proper way to handle session IDs in the reducer
36/// architecture - each piece of information is communicated via a dedicated event.
37///
38/// The handler should:
39/// 1. Process `event` through the reducer
40/// 2. If `session_id.is_some()`, emit `SessionEstablished` and process it
41///
42/// This two-event approach ensures:
43/// - Clean separation of concerns (success vs session establishment)
44/// - Proper state transitions in the reducer
45/// - Session ID is stored in agent_chain.last_session_id for XSD retry reuse
46pub struct AgentExecutionResult {
47    /// The pipeline event from agent execution (success or failure).
48    pub event: PipelineEvent,
49    /// Session ID from agent's init event, for XSD retry session continuation.
50    ///
51    /// When present, handler must emit `SessionEstablished` event separately.
52    pub session_id: Option<String>,
53}
54
55/// Configuration for fault-tolerant agent execution.
56#[derive(Clone, Copy)]
57pub struct AgentExecutionConfig<'a> {
58    /// Agent role (developer, reviewer, commit agent)
59    pub role: AgentRole,
60    /// Agent name from registry
61    pub agent_name: &'a str,
62    /// Agent command to execute
63    pub cmd_str: &'a str,
64    /// JSON parser type
65    pub parser_type: JsonParserType,
66    /// Environment variables for agent
67    pub env_vars: &'a std::collections::HashMap<String, String>,
68    /// Prompt to send to agent
69    pub prompt: &'a str,
70    /// Display name for logging
71    pub display_name: &'a str,
72    /// Log prefix (without extension) used to associate artifacts.
73    ///
74    /// Example: `.agent/logs/planning_1`.
75    pub log_prefix: &'a str,
76    /// Model fallback index for attribution.
77    pub model_index: usize,
78    /// Attempt counter for attribution.
79    pub attempt: u32,
80    /// Log file path
81    pub logfile: &'a str,
82}
83
84/// Execute an agent with bulletproof error handling.
85///
86/// This function:
87/// 1. Uses `catch_unwind` to catch panics from subprocess
88/// 2. Catches all I/O errors and non-zero exit codes
89/// 3. Never returns errors - always emits PipelineEvents
90/// 4. Classifies errors for retry/fallback decisions
91/// 5. Logs failures but continues pipeline
92///
93/// # Arguments
94///
95/// * `config` - Agent execution configuration
96/// * `runtime` - Pipeline runtime
97///
98/// # Returns
99///
100/// Returns `Ok(AgentExecutionResult)` with:
101/// - `event`: `AgentInvocationSucceeded` or `AgentInvocationFailed`
102/// - `session_id`: Optional session ID for XSD retry session continuation
103///
104/// The handler MUST emit `SessionEstablished` as a separate event when session_id
105/// is present. This ensures proper state management in the reducer.
106///
107/// This function never returns `Err` - all errors are converted to events.
108pub fn execute_agent_fault_tolerantly(
109    config: AgentExecutionConfig<'_>,
110    runtime: &mut PipelineRuntime<'_>,
111) -> Result<AgentExecutionResult> {
112    let role = config.role;
113
114    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
115        try_agent_execution(config, runtime)
116    }));
117
118    match result {
119        Ok(event_result) => event_result,
120        Err(_) => {
121            let error_kind = AgentErrorKind::InternalError;
122            let retriable = is_retriable_agent_error(&error_kind);
123
124            Ok(AgentExecutionResult {
125                event: PipelineEvent::agent_invocation_failed(
126                    role,
127                    config.agent_name.to_string(),
128                    1,
129                    error_kind,
130                    retriable,
131                ),
132                session_id: None,
133            })
134        }
135    }
136}
137
138/// Try to execute agent without panic catching.
139///
140/// This function does the actual agent execution and returns
141/// either success or failure events. It's wrapped by
142/// `execute_agent_fault_tolerantly` which handles panics.
143fn try_agent_execution(
144    config: AgentExecutionConfig<'_>,
145    runtime: &mut PipelineRuntime<'_>,
146) -> Result<AgentExecutionResult> {
147    let prompt_cmd = PromptCommand {
148        label: config.agent_name,
149        display_name: config.display_name,
150        cmd_str: config.cmd_str,
151        prompt: config.prompt,
152        log_prefix: config.log_prefix,
153        model_index: Some(config.model_index),
154        attempt: Some(config.attempt),
155        logfile: config.logfile,
156        parser_type: config.parser_type,
157        env_vars: config.env_vars,
158    };
159
160    match run_with_prompt(&prompt_cmd, runtime) {
161        Ok(result) if result.exit_code == 0 => Ok(AgentExecutionResult {
162            event: PipelineEvent::agent_invocation_succeeded(
163                config.role,
164                config.agent_name.to_string(),
165            ),
166            session_id: result.session_id,
167        }),
168        Ok(result) => {
169            let exit_code = result.exit_code;
170            let error_kind = classify_agent_error(exit_code, &result.stderr);
171
172            // Special handling for rate limit: emit fact event with prompt context
173            if is_rate_limit_error(&error_kind) {
174                return Ok(AgentExecutionResult {
175                    event: PipelineEvent::agent_rate_limited(
176                        config.role,
177                        config.agent_name.to_string(),
178                        Some(config.prompt.to_string()),
179                    ),
180                    session_id: None,
181                });
182            }
183
184            // Special handling for auth failure: emit fact event without prompt context
185            if is_auth_error(&error_kind) {
186                return Ok(AgentExecutionResult {
187                    event: PipelineEvent::agent_auth_failed(
188                        config.role,
189                        config.agent_name.to_string(),
190                    ),
191                    session_id: None,
192                });
193            }
194
195            // Special handling for timeout: emit fact event (reducer decides retry/fallback)
196            // Unlike rate limits, timeouts do not preserve prompt context.
197            if is_timeout_error(&error_kind) {
198                return Ok(AgentExecutionResult {
199                    event: PipelineEvent::agent_timed_out(
200                        config.role,
201                        config.agent_name.to_string(),
202                    ),
203                    session_id: None,
204                });
205            }
206
207            let retriable = is_retriable_agent_error(&error_kind);
208
209            Ok(AgentExecutionResult {
210                event: PipelineEvent::agent_invocation_failed(
211                    config.role,
212                    config.agent_name.to_string(),
213                    exit_code,
214                    error_kind,
215                    retriable,
216                ),
217                session_id: None,
218            })
219        }
220        Err(e) => {
221            // `run_with_prompt` returns `io::Error` directly. Classify based on the error kind
222            // instead of attempting to downcast the inner error payload.
223            let error_kind = classify_io_error(&e);
224
225            // Mirror special-case handling from the non-zero exit path.
226            // If `run_with_prompt` itself returns an error classified as Timeout,
227            // emit TimedOut so the reducer can decide retry vs fallback deterministically.
228            if is_timeout_error(&error_kind) {
229                return Ok(AgentExecutionResult {
230                    event: PipelineEvent::agent_timed_out(
231                        config.role,
232                        config.agent_name.to_string(),
233                    ),
234                    session_id: None,
235                });
236            }
237            let retriable = is_retriable_agent_error(&error_kind);
238
239            Ok(AgentExecutionResult {
240                event: PipelineEvent::agent_invocation_failed(
241                    config.role,
242                    config.agent_name.to_string(),
243                    1,
244                    error_kind,
245                    retriable,
246                ),
247                session_id: None,
248            })
249        }
250    }
251}