1use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5use std::sync::Arc;
6
7use super::fallback::try_agent_with_retries;
8use super::fallback::TryAgentResult;
9use super::model_flag::resolve_model_with_provider;
10use super::prompt::PipelineRuntime;
11
12fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
14 let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
15 for fb in fallbacks {
16 if *fb != primary_agent && !agents_to_try.contains(fb) {
17 agents_to_try.push(fb);
18 }
19 }
20 agents_to_try
21}
22
23fn get_cli_overrides(
25 role: AgentRole,
26 runtime: &PipelineRuntime<'_>,
27) -> (Option<String>, Option<String>) {
28 match role {
29 AgentRole::Developer => (
30 runtime.config.developer_model.clone(),
31 runtime.config.developer_provider.clone(),
32 ),
33 AgentRole::Reviewer => (
34 runtime.config.reviewer_model.clone(),
35 runtime.config.reviewer_provider.clone(),
36 ),
37 AgentRole::Commit => (None, None), }
39}
40
41struct ModelFlagBuildContext<'a> {
43 agent_index: usize,
44 cli_model_override: Option<&'a String>,
45 cli_provider_override: Option<&'a String>,
46 agent_config: &'a AgentConfig,
47 agent_name: &'a str,
48 fallback_config: &'a crate::agents::fallback::FallbackConfig,
49 display_name: &'a str,
50 runtime: &'a PipelineRuntime<'a>,
51}
52
53fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
55 let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
56
57 if ctx.agent_index == 0
60 && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
61 {
62 let resolved = resolve_model_with_provider(
63 ctx.cli_provider_override.map(std::string::String::as_str),
64 ctx.cli_model_override.map(std::string::String::as_str),
65 ctx.agent_config.model_flag.as_deref(),
66 );
67 if resolved.is_some() {
68 model_flags_to_try.push(resolved);
69 }
70 }
71
72 if model_flags_to_try.is_empty() {
74 model_flags_to_try.push(None);
75 }
76
77 if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
79 let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
80 ctx.runtime.logger.info(&format!(
81 "Agent '{}' has {} provider fallback(s) configured",
82 ctx.display_name,
83 provider_fallbacks.len()
84 ));
85 for model in provider_fallbacks {
86 model_flags_to_try.push(Some(model.clone()));
87 }
88 }
89
90 model_flags_to_try
91}
92
93fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
95 let model_ref = ctx.model_flag.map(std::string::String::as_str);
96 let yolo = matches!(ctx.role, AgentRole::Developer)
99 || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
100 || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
101
102 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
103 match ctx.role {
105 AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
106 ctx.agent_config
107 .build_cmd_with_model(true, true, true, model_ref)
108 }),
109 AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
110 ctx.agent_config
111 .build_cmd_with_model(true, true, yolo, model_ref)
112 }),
113 AgentRole::Commit => runtime.config.commit_cmd.clone().unwrap_or_else(|| {
114 ctx.agent_config
115 .build_cmd_with_model(true, true, yolo, model_ref)
116 }),
117 }
118 } else {
119 ctx.agent_config
120 .build_cmd_with_model(true, true, yolo, model_ref)
121 }
122}
123
124fn validate_glm_print_flag(
130 agent_name: &str,
131 agent_config: &AgentConfig,
132 cmd_str: &str,
133 agent_index: usize,
134 cycle: u32,
135 model_index: usize,
136 runtime: &PipelineRuntime<'_>,
137) {
138 if !crate::agents::is_glm_like_agent(agent_name)
141 || agent_index != 0
142 || cycle != 0
143 || model_index != 0
144 {
145 return;
146 }
147
148 let cmd_argv = split_command(cmd_str).ok();
149 let has_print_flag = cmd_argv
150 .as_ref()
151 .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
152 if !has_print_flag {
153 if agent_config.print_flag.is_empty() {
154 runtime.logger.warn(&format!(
155 "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
156 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
157 ));
158 } else {
159 runtime.logger.warn(&format!(
160 "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
161 ));
162 }
163 }
164}
165
166fn build_execution_metadata(
168 model_flag: Option<&String>,
169 display_name: &str,
170 base_label: &str,
171 agent_name: &str,
172 logfile_prefix: &str,
173 model_index: usize,
174) -> (String, String, String) {
175 let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
176 let display_name_with_suffix = format!("{display_name}{model_suffix}");
177 let label = format!("{base_label} ({display_name_with_suffix})");
178 let safe_agent_name = agent_name.replace('/', "-");
181 let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
182 (label, logfile, display_name_with_suffix)
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187enum TrySingleAgentResult {
188 Success,
190 Unrecoverable(i32),
192 Fallback,
194 NoRetry,
196}
197
198struct TryModelContext<'a> {
200 agent_config: &'a AgentConfig,
201 agent_name: &'a str,
202 display_name: &'a str,
203 agent_index: usize,
204 cycle: u32,
205 model_index: usize,
206 role: AgentRole,
207 model_flag: Option<&'a String>,
208 base_label: &'a str,
209 prompt: &'a str,
210 logfile_prefix: &'a str,
211 fallback_config: &'a crate::agents::fallback::FallbackConfig,
212 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
213 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
214}
215
216fn try_single_model(
218 ctx: &TryModelContext<'_>,
219 runtime: &mut PipelineRuntime<'_>,
220) -> std::io::Result<TrySingleAgentResult> {
221 let mut parser_type = ctx.agent_config.json_parser;
222
223 if ctx.role == AgentRole::Reviewer {
224 if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
225 parser_type = JsonParserType::parse(parser_override);
226 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
227 runtime.logger.info(&format!(
228 "Using JSON parser override '{parser_override}' for reviewer"
229 ));
230 }
231 }
232 }
233
234 let cmd_str = build_command_for_model(ctx, runtime);
235
236 validate_glm_print_flag(
237 ctx.agent_name,
238 ctx.agent_config,
239 &cmd_str,
240 ctx.agent_index,
241 ctx.cycle,
242 ctx.model_index,
243 runtime,
244 );
245
246 let (label, logfile, display_name_with_suffix) = build_execution_metadata(
247 ctx.model_flag,
248 ctx.display_name,
249 ctx.base_label,
250 ctx.agent_name,
251 ctx.logfile_prefix,
252 ctx.model_index,
253 );
254
255 let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
256 agent_name: ctx.agent_name,
257 model_flag: ctx.model_flag.map(std::string::String::as_str),
258 label: &label,
259 display_name: &display_name_with_suffix,
260 cmd_str: &cmd_str,
261 prompt: ctx.prompt,
262 logfile: &logfile,
263 logfile_prefix: ctx.logfile_prefix,
264 parser_type,
265 env_vars: &ctx.agent_config.env_vars,
266 model_index: ctx.model_index,
267 agent_index: ctx.agent_index,
268 cycle: ctx.cycle as usize,
269 fallback_config: ctx.fallback_config,
270 output_validator: ctx.output_validator,
271 retry_timer: Arc::clone(&ctx.retry_timer),
272 };
273 let result = try_agent_with_retries(&attempt_config, runtime)?;
274
275 match result {
276 TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
277 TryAgentResult::Unrecoverable(exit_code) => {
278 Ok(TrySingleAgentResult::Unrecoverable(exit_code))
279 }
280 TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
281 TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
282 }
283}
284
285struct TryAgentContext<'a> {
287 agent_name: &'a str,
288 agent_index: usize,
289 cycle: u32,
290 role: AgentRole,
291 base_label: &'a str,
292 prompt: &'a str,
293 logfile_prefix: &'a str,
294 cli_model_override: Option<&'a String>,
295 cli_provider_override: Option<&'a String>,
296 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
297 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
298}
299
300fn try_single_agent(
302 ctx: &TryAgentContext<'_>,
303 runtime: &mut PipelineRuntime<'_>,
304 registry: &AgentRegistry,
305 fallback_config: &crate::agents::fallback::FallbackConfig,
306) -> std::io::Result<TrySingleAgentResult> {
307 let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
308 runtime.logger.warn(&format!(
309 "Agent '{}' not found in registry, skipping",
310 ctx.agent_name
311 ));
312 return Ok(TrySingleAgentResult::Fallback);
313 };
314
315 let display_name = registry.display_name(ctx.agent_name);
316 let model_ctx = ModelFlagBuildContext {
317 agent_index: ctx.agent_index,
318 cli_model_override: ctx.cli_model_override,
319 cli_provider_override: ctx.cli_provider_override,
320 agent_config: &agent_config,
321 agent_name: ctx.agent_name,
322 fallback_config,
323 display_name: &display_name,
324 runtime,
325 };
326 let model_flags_to_try = build_model_flags_list(&model_ctx);
327
328 if ctx.agent_index == 0 && ctx.cycle == 0 {
329 for model_flag in model_flags_to_try.iter().flatten() {
330 for warning in validate_model_flag(model_flag) {
331 runtime.logger.warn(&warning);
332 }
333 }
334 }
335
336 for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
337 let model_ctx = TryModelContext {
338 agent_config: &agent_config,
339 agent_name: ctx.agent_name,
340 display_name: &display_name,
341 agent_index: ctx.agent_index,
342 cycle: ctx.cycle,
343 model_index,
344 role: ctx.role,
345 model_flag: model_flag.as_ref(),
346 base_label: ctx.base_label,
347 prompt: ctx.prompt,
348 logfile_prefix: ctx.logfile_prefix,
349 fallback_config,
350 output_validator: ctx.output_validator,
351 retry_timer: Arc::clone(&ctx.retry_timer),
352 };
353 let result = try_single_model(&model_ctx, runtime)?;
354
355 match result {
356 TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
357 TrySingleAgentResult::Unrecoverable(exit_code) => {
358 return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
359 }
360 TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
361 TrySingleAgentResult::NoRetry => {}
362 }
363 }
364
365 Ok(TrySingleAgentResult::NoRetry)
366}
367
368pub struct FallbackConfig<'a, 'b> {
370 pub role: AgentRole,
371 pub base_label: &'a str,
372 pub prompt: &'a str,
373 pub logfile_prefix: &'a str,
374 pub runtime: &'a mut PipelineRuntime<'b>,
375 pub registry: &'a AgentRegistry,
376 pub primary_agent: &'a str,
377 pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
378}
379
380pub fn run_with_fallback(
382 role: AgentRole,
383 base_label: &str,
384 prompt: &str,
385 logfile_prefix: &str,
386 runtime: &mut PipelineRuntime<'_>,
387 registry: &AgentRegistry,
388 primary_agent: &str,
389) -> std::io::Result<i32> {
390 let mut config = FallbackConfig {
391 role,
392 base_label,
393 prompt,
394 logfile_prefix,
395 runtime,
396 registry,
397 primary_agent,
398 output_validator: None,
399 };
400 run_with_fallback_internal(&mut config)
401}
402
403pub fn run_with_fallback_and_validator(
411 config: &mut FallbackConfig<'_, '_>,
412) -> std::io::Result<i32> {
413 run_with_fallback_internal(config)
414}
415
416fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
421 let fallback_config = config.registry.fallback_config();
422 let fallbacks = config.registry.available_fallbacks(config.role);
423 if fallback_config.has_fallbacks(config.role) {
424 config.runtime.logger.info(&format!(
425 "Agent fallback chain for {}: {}",
426 config.role,
427 fallbacks.join(", ")
428 ));
429 } else {
430 config.runtime.logger.info(&format!(
431 "No configured fallbacks for {}, using primary only",
432 config.role
433 ));
434 }
435
436 let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
437 let (cli_model_override, cli_provider_override) =
438 get_cli_overrides(config.role, config.runtime);
439
440 for cycle in 0..fallback_config.max_cycles {
441 if cycle > 0 {
442 let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
443 config.runtime.logger.info(&format!(
444 "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
445 cycle + 1,
446 fallback_config.max_cycles,
447 backoff_ms
448 ));
449 config
450 .registry
451 .retry_timer()
452 .sleep(std::time::Duration::from_millis(backoff_ms));
453 }
454
455 for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
456 let ctx = TryAgentContext {
457 agent_name,
458 agent_index,
459 cycle,
460 role: config.role,
461 base_label: config.base_label,
462 prompt: config.prompt,
463 logfile_prefix: config.logfile_prefix,
464 cli_model_override: cli_model_override.as_ref(),
465 cli_provider_override: cli_provider_override.as_ref(),
466 output_validator: config.output_validator,
467 retry_timer: config.registry.retry_timer(),
468 };
469 let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
470
471 match result {
472 TrySingleAgentResult::Success => return Ok(0),
473 TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
474 TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
475 }
476 }
477 }
478
479 config.runtime.logger.error(&format!(
480 "All agents exhausted after {} cycles with exponential backoff",
481 fallback_config.max_cycles
482 ));
483 Ok(1)
484}
485
486#[cfg(test)]
487mod tests {
488 use std::sync::Mutex;
489
490 static ENV_MUTEX: Mutex<()> = Mutex::new(());
491
492 struct EnvGuard {
493 key: &'static str,
494 prev: Option<std::ffi::OsString>,
495 }
496
497 impl EnvGuard {
498 fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
499 let _lock = ENV_MUTEX
500 .lock()
501 .unwrap_or_else(std::sync::PoisonError::into_inner);
502 vars.iter()
503 .map(|&(key, value)| {
504 let prev = std::env::var_os(key);
505 std::env::set_var(key, value);
506 Self { key, prev }
507 })
508 .collect()
509 }
510 }
511
512 impl Drop for EnvGuard {
513 fn drop(&mut self) {
514 match self.prev.take() {
515 Some(v) => std::env::set_var(self.key, v),
516 None => std::env::remove_var(self.key),
517 }
518 }
519 }
520
521 #[test]
533 fn test_runner_sanitizes_anthropic_env_vars() {
534 const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
536 "ANTHROPIC_API_KEY",
537 "ANTHROPIC_BASE_URL",
538 "ANTHROPIC_AUTH_TOKEN",
539 "ANTHROPIC_MODEL",
540 "ANTHROPIC_DEFAULT_HAIKU_MODEL",
541 "ANTHROPIC_DEFAULT_OPUS_MODEL",
542 "ANTHROPIC_DEFAULT_SONNET_MODEL",
543 ];
544
545 let _guard = EnvGuard::set_multiple(&[
546 ("ANTHROPIC_API_KEY", "test-token-glm"),
547 ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
548 ]);
549
550 let mut cmd = std::process::Command::new("printenv");
553 for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
554 cmd.env_remove(var);
555 }
556
557 let output = match cmd.output() {
559 Ok(o) => o,
560 Err(e) => {
561 eprintln!("Skipping test: printenv not available ({e})");
563 return;
564 }
565 };
566 let stdout = String::from_utf8_lossy(&output.stdout);
567
568 assert!(!stdout.contains("test-token-glm"));
571 assert!(!stdout.contains("https://glm.example.com"));
572 }
573
574 #[test]
575 fn test_runner_does_not_sanitize_explicit_env_vars() {
576 let mut cmd = std::process::Command::new("printenv");
580
581 let agent_env_vars =
583 std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
584
585 for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
587 cmd.env_remove(var);
588 }
589
590 for (key, value) in &agent_env_vars {
592 cmd.env(key, value);
593 }
594
595 let output = match cmd.output() {
596 Ok(o) => o,
597 Err(e) => {
598 eprintln!("Skipping test: printenv not available ({e})");
600 return;
601 }
602 };
603 let stdout = String::from_utf8_lossy(&output.stdout);
604
605 assert!(stdout.contains("agent-specific-key"));
607 }
608}