1use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5use std::sync::Arc;
6
7use super::fallback::try_agent_with_retries;
8use super::fallback::TryAgentResult;
9use super::model_flag::resolve_model_with_provider;
10use super::prompt::PipelineRuntime;
11
12fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
14 let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
15 for fb in fallbacks {
16 if *fb != primary_agent && !agents_to_try.contains(fb) {
17 agents_to_try.push(fb);
18 }
19 }
20 agents_to_try
21}
22
23fn get_cli_overrides(
25 role: AgentRole,
26 runtime: &PipelineRuntime<'_>,
27) -> (Option<String>, Option<String>) {
28 match role {
29 AgentRole::Developer => (
30 runtime.config.developer_model.clone(),
31 runtime.config.developer_provider.clone(),
32 ),
33 AgentRole::Reviewer => (
34 runtime.config.reviewer_model.clone(),
35 runtime.config.reviewer_provider.clone(),
36 ),
37 AgentRole::Commit => (None, None), }
39}
40
41struct ModelFlagBuildContext<'a> {
43 agent_index: usize,
44 cli_model_override: Option<&'a String>,
45 cli_provider_override: Option<&'a String>,
46 agent_config: &'a AgentConfig,
47 agent_name: &'a str,
48 fallback_config: &'a crate::agents::fallback::FallbackConfig,
49 display_name: &'a str,
50 runtime: &'a PipelineRuntime<'a>,
51}
52
53fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
55 let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
56
57 if ctx.agent_index == 0
60 && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
61 {
62 let resolved = resolve_model_with_provider(
63 ctx.cli_provider_override.map(std::string::String::as_str),
64 ctx.cli_model_override.map(std::string::String::as_str),
65 ctx.agent_config.model_flag.as_deref(),
66 );
67 if resolved.is_some() {
68 model_flags_to_try.push(resolved);
69 }
70 }
71
72 if model_flags_to_try.is_empty() {
74 model_flags_to_try.push(None);
75 }
76
77 if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
79 let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
80 ctx.runtime.logger.info(&format!(
81 "Agent '{}' has {} provider fallback(s) configured",
82 ctx.display_name,
83 provider_fallbacks.len()
84 ));
85 for model in provider_fallbacks {
86 model_flags_to_try.push(Some(model.clone()));
87 }
88 }
89
90 model_flags_to_try
91}
92
93fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
95 let model_ref = ctx.model_flag.map(std::string::String::as_str);
96 let yolo = matches!(ctx.role, AgentRole::Developer)
99 || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
100 || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
101
102 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
103 match ctx.role {
105 AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
106 ctx.agent_config
107 .build_cmd_with_model(true, true, true, model_ref)
108 }),
109 AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
110 ctx.agent_config
111 .build_cmd_with_model(true, true, yolo, model_ref)
112 }),
113 AgentRole::Commit => runtime.config.commit_cmd.clone().unwrap_or_else(|| {
114 ctx.agent_config
115 .build_cmd_with_model(true, true, yolo, model_ref)
116 }),
117 }
118 } else {
119 ctx.agent_config
120 .build_cmd_with_model(true, true, yolo, model_ref)
121 }
122}
123
124fn validate_glm_print_flag(
126 agent_name: &str,
127 agent_config: &AgentConfig,
128 cmd_str: &str,
129 agent_index: usize,
130 cycle: u32,
131 model_index: usize,
132 runtime: &PipelineRuntime<'_>,
133) {
134 if !crate::agents::is_glm_like_agent(agent_name)
135 || agent_index != 0
136 || cycle != 0
137 || model_index != 0
138 {
139 return;
140 }
141
142 let cmd_argv = split_command(cmd_str).ok();
143 let has_print_flag = cmd_argv
144 .as_ref()
145 .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
146 if !has_print_flag {
147 if agent_config.print_flag.is_empty() {
148 runtime.logger.warn(&format!(
149 "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
150 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
151 ));
152 } else {
153 runtime.logger.warn(&format!(
154 "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
155 ));
156 }
157 }
158}
159
160fn build_execution_metadata(
162 model_flag: Option<&String>,
163 display_name: &str,
164 base_label: &str,
165 agent_name: &str,
166 logfile_prefix: &str,
167 model_index: usize,
168) -> (String, String, String) {
169 let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
170 let display_name_with_suffix = format!("{display_name}{model_suffix}");
171 let label = format!("{base_label} ({display_name_with_suffix})");
172 let safe_agent_name = agent_name.replace('/', "-");
175 let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
176 (label, logfile, display_name_with_suffix)
177}
178
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181enum TrySingleAgentResult {
182 Success,
184 Unrecoverable(i32),
186 Fallback,
188 NoRetry,
190}
191
192struct TryModelContext<'a> {
194 agent_config: &'a AgentConfig,
195 agent_name: &'a str,
196 display_name: &'a str,
197 agent_index: usize,
198 cycle: u32,
199 model_index: usize,
200 role: AgentRole,
201 model_flag: Option<&'a String>,
202 base_label: &'a str,
203 prompt: &'a str,
204 logfile_prefix: &'a str,
205 fallback_config: &'a crate::agents::fallback::FallbackConfig,
206 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
207 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
208}
209
210fn try_single_model(
212 ctx: &TryModelContext<'_>,
213 runtime: &mut PipelineRuntime<'_>,
214) -> std::io::Result<TrySingleAgentResult> {
215 let mut parser_type = ctx.agent_config.json_parser;
216
217 if ctx.role == AgentRole::Reviewer {
218 if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
219 parser_type = JsonParserType::parse(parser_override);
220 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
221 runtime.logger.info(&format!(
222 "Using JSON parser override '{parser_override}' for reviewer"
223 ));
224 }
225 }
226 }
227
228 let cmd_str = build_command_for_model(ctx, runtime);
229
230 validate_glm_print_flag(
231 ctx.agent_name,
232 ctx.agent_config,
233 &cmd_str,
234 ctx.agent_index,
235 ctx.cycle,
236 ctx.model_index,
237 runtime,
238 );
239
240 let (label, logfile, display_name_with_suffix) = build_execution_metadata(
241 ctx.model_flag,
242 ctx.display_name,
243 ctx.base_label,
244 ctx.agent_name,
245 ctx.logfile_prefix,
246 ctx.model_index,
247 );
248
249 let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
250 agent_name: ctx.agent_name,
251 model_flag: ctx.model_flag.map(std::string::String::as_str),
252 label: &label,
253 display_name: &display_name_with_suffix,
254 cmd_str: &cmd_str,
255 prompt: ctx.prompt,
256 logfile: &logfile,
257 logfile_prefix: ctx.logfile_prefix,
258 parser_type,
259 env_vars: &ctx.agent_config.env_vars,
260 model_index: ctx.model_index,
261 agent_index: ctx.agent_index,
262 cycle: ctx.cycle as usize,
263 fallback_config: ctx.fallback_config,
264 output_validator: ctx.output_validator,
265 retry_timer: Arc::clone(&ctx.retry_timer),
266 };
267 let result = try_agent_with_retries(&attempt_config, runtime)?;
268
269 match result {
270 TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
271 TryAgentResult::Unrecoverable(exit_code) => {
272 Ok(TrySingleAgentResult::Unrecoverable(exit_code))
273 }
274 TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
275 TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
276 }
277}
278
279struct TryAgentContext<'a> {
281 agent_name: &'a str,
282 agent_index: usize,
283 cycle: u32,
284 role: AgentRole,
285 base_label: &'a str,
286 prompt: &'a str,
287 logfile_prefix: &'a str,
288 cli_model_override: Option<&'a String>,
289 cli_provider_override: Option<&'a String>,
290 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
291 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
292}
293
294fn try_single_agent(
296 ctx: &TryAgentContext<'_>,
297 runtime: &mut PipelineRuntime<'_>,
298 registry: &AgentRegistry,
299 fallback_config: &crate::agents::fallback::FallbackConfig,
300) -> std::io::Result<TrySingleAgentResult> {
301 let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
302 runtime.logger.warn(&format!(
303 "Agent '{}' not found in registry, skipping",
304 ctx.agent_name
305 ));
306 return Ok(TrySingleAgentResult::Fallback);
307 };
308
309 let display_name = registry.display_name(ctx.agent_name);
310 let model_ctx = ModelFlagBuildContext {
311 agent_index: ctx.agent_index,
312 cli_model_override: ctx.cli_model_override,
313 cli_provider_override: ctx.cli_provider_override,
314 agent_config: &agent_config,
315 agent_name: ctx.agent_name,
316 fallback_config,
317 display_name: &display_name,
318 runtime,
319 };
320 let model_flags_to_try = build_model_flags_list(&model_ctx);
321
322 if ctx.agent_index == 0 && ctx.cycle == 0 {
323 for model_flag in model_flags_to_try.iter().flatten() {
324 for warning in validate_model_flag(model_flag) {
325 runtime.logger.warn(&warning);
326 }
327 }
328 }
329
330 for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
331 let model_ctx = TryModelContext {
332 agent_config: &agent_config,
333 agent_name: ctx.agent_name,
334 display_name: &display_name,
335 agent_index: ctx.agent_index,
336 cycle: ctx.cycle,
337 model_index,
338 role: ctx.role,
339 model_flag: model_flag.as_ref(),
340 base_label: ctx.base_label,
341 prompt: ctx.prompt,
342 logfile_prefix: ctx.logfile_prefix,
343 fallback_config,
344 output_validator: ctx.output_validator,
345 retry_timer: Arc::clone(&ctx.retry_timer),
346 };
347 let result = try_single_model(&model_ctx, runtime)?;
348
349 match result {
350 TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
351 TrySingleAgentResult::Unrecoverable(exit_code) => {
352 return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
353 }
354 TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
355 TrySingleAgentResult::NoRetry => {}
356 }
357 }
358
359 Ok(TrySingleAgentResult::NoRetry)
360}
361
362pub struct FallbackConfig<'a, 'b> {
364 pub role: AgentRole,
365 pub base_label: &'a str,
366 pub prompt: &'a str,
367 pub logfile_prefix: &'a str,
368 pub runtime: &'a mut PipelineRuntime<'b>,
369 pub registry: &'a AgentRegistry,
370 pub primary_agent: &'a str,
371 pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
372}
373
374pub fn run_with_fallback(
376 role: AgentRole,
377 base_label: &str,
378 prompt: &str,
379 logfile_prefix: &str,
380 runtime: &mut PipelineRuntime<'_>,
381 registry: &AgentRegistry,
382 primary_agent: &str,
383) -> std::io::Result<i32> {
384 let mut config = FallbackConfig {
385 role,
386 base_label,
387 prompt,
388 logfile_prefix,
389 runtime,
390 registry,
391 primary_agent,
392 output_validator: None,
393 };
394 run_with_fallback_internal(&mut config)
395}
396
397pub fn run_with_fallback_and_validator(
405 config: &mut FallbackConfig<'_, '_>,
406) -> std::io::Result<i32> {
407 run_with_fallback_internal(config)
408}
409
410fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
415 let fallback_config = config.registry.fallback_config();
416 let fallbacks = config.registry.available_fallbacks(config.role);
417 if fallback_config.has_fallbacks(config.role) {
418 config.runtime.logger.info(&format!(
419 "Agent fallback chain for {}: {}",
420 config.role,
421 fallbacks.join(", ")
422 ));
423 } else {
424 config.runtime.logger.info(&format!(
425 "No configured fallbacks for {}, using primary only",
426 config.role
427 ));
428 }
429
430 let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
431 let (cli_model_override, cli_provider_override) =
432 get_cli_overrides(config.role, config.runtime);
433
434 for cycle in 0..fallback_config.max_cycles {
435 if cycle > 0 {
436 let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
437 config.runtime.logger.info(&format!(
438 "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
439 cycle + 1,
440 fallback_config.max_cycles,
441 backoff_ms
442 ));
443 config
444 .registry
445 .retry_timer()
446 .sleep(std::time::Duration::from_millis(backoff_ms));
447 }
448
449 for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
450 let ctx = TryAgentContext {
451 agent_name,
452 agent_index,
453 cycle,
454 role: config.role,
455 base_label: config.base_label,
456 prompt: config.prompt,
457 logfile_prefix: config.logfile_prefix,
458 cli_model_override: cli_model_override.as_ref(),
459 cli_provider_override: cli_provider_override.as_ref(),
460 output_validator: config.output_validator,
461 retry_timer: config.registry.retry_timer(),
462 };
463 let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
464
465 match result {
466 TrySingleAgentResult::Success => return Ok(0),
467 TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
468 TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
469 }
470 }
471 }
472
473 config.runtime.logger.error(&format!(
474 "All agents exhausted after {} cycles with exponential backoff",
475 fallback_config.max_cycles
476 ));
477 Ok(1)
478}
479
480#[cfg(test)]
481mod tests {
482 use std::sync::Mutex;
483
484 static ENV_MUTEX: Mutex<()> = Mutex::new(());
485
486 struct EnvGuard {
487 key: &'static str,
488 prev: Option<std::ffi::OsString>,
489 }
490
491 impl EnvGuard {
492 fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
493 let _lock = ENV_MUTEX
494 .lock()
495 .unwrap_or_else(std::sync::PoisonError::into_inner);
496 vars.iter()
497 .map(|&(key, value)| {
498 let prev = std::env::var_os(key);
499 std::env::set_var(key, value);
500 Self { key, prev }
501 })
502 .collect()
503 }
504 }
505
506 impl Drop for EnvGuard {
507 fn drop(&mut self) {
508 match self.prev.take() {
509 Some(v) => std::env::set_var(self.key, v),
510 None => std::env::remove_var(self.key),
511 }
512 }
513 }
514
515 #[test]
527 fn test_runner_sanitizes_anthropic_env_vars() {
528 const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
530 "ANTHROPIC_API_KEY",
531 "ANTHROPIC_BASE_URL",
532 "ANTHROPIC_AUTH_TOKEN",
533 "ANTHROPIC_MODEL",
534 "ANTHROPIC_DEFAULT_HAIKU_MODEL",
535 "ANTHROPIC_DEFAULT_OPUS_MODEL",
536 "ANTHROPIC_DEFAULT_SONNET_MODEL",
537 ];
538
539 let _guard = EnvGuard::set_multiple(&[
540 ("ANTHROPIC_API_KEY", "test-token-glm"),
541 ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
542 ]);
543
544 let mut cmd = std::process::Command::new("printenv");
547 for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
548 cmd.env_remove(var);
549 }
550
551 let output = match cmd.output() {
553 Ok(o) => o,
554 Err(e) => {
555 eprintln!("Skipping test: printenv not available ({e})");
557 return;
558 }
559 };
560 let stdout = String::from_utf8_lossy(&output.stdout);
561
562 assert!(!stdout.contains("test-token-glm"));
565 assert!(!stdout.contains("https://glm.example.com"));
566 }
567
568 #[test]
569 fn test_runner_does_not_sanitize_explicit_env_vars() {
570 let mut cmd = std::process::Command::new("printenv");
574
575 let agent_env_vars =
577 std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
578
579 for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
581 cmd.env_remove(var);
582 }
583
584 for (key, value) in &agent_env_vars {
586 cmd.env(key, value);
587 }
588
589 let output = match cmd.output() {
590 Ok(o) => o,
591 Err(e) => {
592 eprintln!("Skipping test: printenv not available ({e})");
594 return;
595 }
596 };
597 let stdout = String::from_utf8_lossy(&output.stdout);
598
599 assert!(stdout.contains("agent-specific-key"));
601 }
602}