1use crate::agents::{AgentRole, JsonParserType};
13use crate::pipeline::{run_with_prompt, PipelineRuntime, PromptCommand};
14use crate::reducer::event::{AgentErrorKind, PipelineEvent};
15use anyhow::Result;
16use serde_json::Value;
17use std::io;
18
19#[derive(Clone, Copy)]
21pub struct AgentExecutionConfig<'a> {
22 pub role: AgentRole,
24 pub agent_name: &'a str,
26 pub cmd_str: &'a str,
28 pub parser_type: JsonParserType,
30 pub env_vars: &'a std::collections::HashMap<String, String>,
32 pub prompt: &'a str,
34 pub display_name: &'a str,
36 pub logfile: &'a str,
38}
39
40pub fn execute_agent_fault_tolerantly(
62 config: AgentExecutionConfig<'_>,
63 runtime: &mut PipelineRuntime<'_>,
64) -> Result<PipelineEvent> {
65 let role = config.role;
66
67 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
68 try_agent_execution(config, runtime)
69 }));
70
71 match result {
72 Ok(event_result) => event_result,
73 Err(_) => {
74 let error_kind = AgentErrorKind::InternalError;
75 let retriable = is_retriable_agent_error(&error_kind);
76
77 Ok(PipelineEvent::AgentInvocationFailed {
78 role,
79 agent: config.agent_name.to_string(),
80 exit_code: 1,
81 error_kind,
82 retriable,
83 })
84 }
85 }
86}
87
88fn try_agent_execution(
94 config: AgentExecutionConfig<'_>,
95 runtime: &mut PipelineRuntime<'_>,
96) -> Result<PipelineEvent> {
97 let prompt_cmd = PromptCommand {
98 label: config.agent_name,
99 display_name: config.display_name,
100 cmd_str: config.cmd_str,
101 prompt: config.prompt,
102 logfile: config.logfile,
103 parser_type: config.parser_type,
104 env_vars: config.env_vars,
105 };
106
107 match run_with_prompt(&prompt_cmd, runtime) {
108 Ok(result) if result.exit_code == 0 => Ok(PipelineEvent::AgentInvocationSucceeded {
109 role: config.role,
110 agent: config.agent_name.to_string(),
111 }),
112 Ok(result) => {
113 let exit_code = result.exit_code;
114 let error_kind = classify_agent_error(exit_code, &result.stderr);
115
116 if is_rate_limit_error(&error_kind) {
118 return Ok(PipelineEvent::AgentRateLimitFallback {
119 role: config.role,
120 agent: config.agent_name.to_string(),
121 prompt_context: Some(config.prompt.to_string()),
122 });
123 }
124
125 let retriable = is_retriable_agent_error(&error_kind);
126
127 Ok(PipelineEvent::AgentInvocationFailed {
128 role: config.role,
129 agent: config.agent_name.to_string(),
130 exit_code,
131 error_kind,
132 retriable,
133 })
134 }
135 Err(e) => {
136 let error_kind = if let Ok(io_err) = e.downcast::<io::Error>() {
137 classify_io_error(&io_err)
138 } else {
139 AgentErrorKind::InternalError
140 };
141 let retriable = is_retriable_agent_error(&error_kind);
142
143 Ok(PipelineEvent::AgentInvocationFailed {
144 role: config.role,
145 agent: config.agent_name.to_string(),
146 exit_code: 1,
147 error_kind,
148 retriable,
149 })
150 }
151 }
152}
153
154fn classify_agent_error(exit_code: i32, stderr: &str) -> AgentErrorKind {
156 const SIGSEGV: i32 = 139;
157 const SIGABRT: i32 = 134;
158 const SIGTERM: i32 = 143;
159
160 match exit_code {
161 SIGSEGV | SIGABRT => AgentErrorKind::InternalError,
162 SIGTERM => AgentErrorKind::Timeout,
163 _ => {
164 let stderr_lower = stderr.to_lowercase();
165
166 if stderr_lower.contains("network")
167 || stderr_lower.contains("connection")
168 || stderr_lower.contains("timeout")
169 {
170 AgentErrorKind::Network
171 } else if stderr_lower.contains("auth")
172 || stderr_lower.contains("api key")
173 || stderr_lower.contains("unauthorized")
174 {
175 AgentErrorKind::Authentication
176 } else if is_rate_limit_stderr(&stderr_lower, stderr) {
177 AgentErrorKind::RateLimit
178 } else if stderr_lower.contains("model")
179 && (stderr_lower.contains("not found") || stderr_lower.contains("unavailable"))
180 {
181 AgentErrorKind::ModelUnavailable
182 } else if stderr_lower.contains("parse")
183 || stderr_lower.contains("invalid")
184 || stderr_lower.contains("malformed")
185 {
186 AgentErrorKind::ParsingError
187 } else if stderr_lower.contains("permission")
188 || stderr_lower.contains("access denied")
189 || stderr_lower.contains("file")
190 {
191 AgentErrorKind::FileSystem
192 } else {
193 AgentErrorKind::InternalError
194 }
195 }
196 }
197}
198
199fn is_rate_limit_stderr(stderr_lower: &str, stderr_raw: &str) -> bool {
200 if is_structured_rate_limit_error(stderr_raw) {
202 return true;
203 }
204
205 if stderr_lower.contains("rate limit reached") || stderr_lower.contains("rate limit exceeded") {
207 return true;
208 }
209
210 if stderr_lower.contains("too many requests") {
211 return true;
212 }
213
214 if stderr_lower.contains("http 429") || stderr_lower.contains("status 429") {
215 return stderr_lower.contains("rate limit") || stderr_lower.contains("too many requests");
216 }
217
218 if stderr_lower.contains("exceeded your current quota") {
219 return true;
220 }
221
222 false
223}
224
225fn is_structured_rate_limit_error(stderr: &str) -> bool {
226 let Some(value) = try_parse_json_object(stderr) else {
230 return false;
231 };
232
233 let code = extract_error_code(&value);
234 matches!(code.as_deref(), Some("rate_limit_exceeded"))
235}
236
237fn try_parse_json_object(text: &str) -> Option<Value> {
238 let start = text.find('{')?;
239 let end = text.rfind('}')?;
240 let json_str = text.get(start..=end)?;
241 serde_json::from_str(json_str).ok()
242}
243
244fn extract_error_code(value: &Value) -> Option<String> {
245 value
249 .pointer("/error/code")
250 .and_then(Value::as_str)
251 .map(|s| s.to_string())
252 .or_else(|| {
253 value
254 .pointer("/error/error/code")
255 .and_then(Value::as_str)
256 .map(|s| s.to_string())
257 })
258}
259
260fn classify_io_error(error: &io::Error) -> AgentErrorKind {
262 let error_msg = error.to_string().to_lowercase();
263
264 if error_msg.contains("timeout") {
265 AgentErrorKind::Timeout
266 } else if error_msg.contains("permission")
267 || error_msg.contains("access denied")
268 || error_msg.contains("no such file")
269 || error_msg.contains("not found")
270 {
271 AgentErrorKind::FileSystem
272 } else if error_msg.contains("broken pipe") || error_msg.contains("connection") {
273 AgentErrorKind::Network
274 } else {
275 AgentErrorKind::InternalError
276 }
277}
278
279fn is_retriable_agent_error(error_kind: &AgentErrorKind) -> bool {
289 matches!(
290 error_kind,
291 AgentErrorKind::Network | AgentErrorKind::Timeout | AgentErrorKind::ModelUnavailable
292 )
293}
294
295fn is_rate_limit_error(error_kind: &AgentErrorKind) -> bool {
300 matches!(error_kind, AgentErrorKind::RateLimit)
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_classify_agent_error_sigsegv() {
309 let error_kind = classify_agent_error(139, "");
310 assert_eq!(error_kind, AgentErrorKind::InternalError);
311 }
312
313 #[test]
314 fn test_classify_agent_error_sigabrt() {
315 let error_kind = classify_agent_error(134, "");
316 assert_eq!(error_kind, AgentErrorKind::InternalError);
317 }
318
319 #[test]
320 fn test_classify_agent_error_sigterm() {
321 let error_kind = classify_agent_error(143, "");
322 assert_eq!(error_kind, AgentErrorKind::Timeout);
323 }
324
325 #[test]
326 fn test_classify_agent_error_network() {
327 let error_kind = classify_agent_error(1, "Connection timeout");
328 assert_eq!(error_kind, AgentErrorKind::Network);
329 }
330
331 #[test]
332 fn test_classify_agent_error_rate_limit() {
333 let error_kind = classify_agent_error(1, "Rate limit exceeded");
334 assert_eq!(error_kind, AgentErrorKind::RateLimit);
335 }
336
337 #[test]
338 fn test_classify_agent_error_rate_limit_matches_http_429() {
339 let error_kind = classify_agent_error(1, "HTTP 429: Rate limit reached for requests");
340 assert_eq!(error_kind, AgentErrorKind::RateLimit);
341 }
342
343 #[test]
344 fn test_classify_agent_error_rate_limit_from_opencode_json_error() {
345 let stderr = r#"✗ Error: {"type":"error","sequence_number":2,"error":{"type":"tokens","code":"rate_limit_exceeded","message":"Rate limit reached"}}"#;
346 let error_kind = classify_agent_error(1, stderr);
347 assert_eq!(error_kind, AgentErrorKind::RateLimit);
348 }
349
350 #[test]
351 fn test_classify_agent_error_does_not_treat_429_token_count_as_rate_limit() {
352 let error_kind = classify_agent_error(1, "Parse error: expected 429 tokens");
353 assert_eq!(error_kind, AgentErrorKind::ParsingError);
354 }
355
356 #[test]
357 fn test_classify_agent_error_does_not_treat_quota_word_as_rate_limit() {
358 let error_kind = classify_agent_error(1, "quota.rs:1:1: syntax error");
359 assert_ne!(error_kind, AgentErrorKind::RateLimit);
360 }
361
362 #[test]
363 fn test_classify_agent_error_authentication() {
364 let error_kind = classify_agent_error(1, "Invalid API key");
365 assert_eq!(error_kind, AgentErrorKind::Authentication);
366 }
367
368 #[test]
369 fn test_classify_agent_error_model_unavailable() {
370 let error_kind = classify_agent_error(1, "Model not found");
371 assert_eq!(error_kind, AgentErrorKind::ModelUnavailable);
372 }
373
374 #[test]
375 fn test_is_retriable_agent_error() {
376 assert!(is_retriable_agent_error(&AgentErrorKind::Network));
378 assert!(is_retriable_agent_error(&AgentErrorKind::Timeout));
379 assert!(is_retriable_agent_error(&AgentErrorKind::ModelUnavailable));
380 assert!(!is_retriable_agent_error(&AgentErrorKind::RateLimit));
382 assert!(!is_retriable_agent_error(&AgentErrorKind::Authentication));
384 assert!(!is_retriable_agent_error(&AgentErrorKind::ParsingError));
385 assert!(!is_retriable_agent_error(&AgentErrorKind::FileSystem));
386 assert!(!is_retriable_agent_error(&AgentErrorKind::InternalError));
387 }
388
389 #[test]
390 fn test_is_rate_limit_error() {
391 assert!(is_rate_limit_error(&AgentErrorKind::RateLimit));
393 assert!(!is_rate_limit_error(&AgentErrorKind::Network));
395 assert!(!is_rate_limit_error(&AgentErrorKind::Timeout));
396 assert!(!is_rate_limit_error(&AgentErrorKind::ModelUnavailable));
397 assert!(!is_rate_limit_error(&AgentErrorKind::Authentication));
398 assert!(!is_rate_limit_error(&AgentErrorKind::ParsingError));
399 assert!(!is_rate_limit_error(&AgentErrorKind::FileSystem));
400 assert!(!is_rate_limit_error(&AgentErrorKind::InternalError));
401 }
402
403 #[test]
404 fn test_classify_io_error_timeout() {
405 let error = io::Error::new(io::ErrorKind::TimedOut, "Operation timeout");
406 let error_kind = classify_io_error(&error);
407 assert_eq!(error_kind, AgentErrorKind::Timeout);
408 }
409
410 #[test]
411 fn test_classify_io_error_filesystem() {
412 let error = io::Error::new(io::ErrorKind::PermissionDenied, "Permission denied");
413 let error_kind = classify_io_error(&error);
414 assert_eq!(error_kind, AgentErrorKind::FileSystem);
415 }
416
417 #[test]
418 fn test_classify_io_error_network() {
419 let error = io::Error::new(io::ErrorKind::BrokenPipe, "Broken pipe");
420 let error_kind = classify_io_error(&error);
421 assert_eq!(error_kind, AgentErrorKind::Network);
422 }
423}