1use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5use std::sync::Arc;
6
7use super::fallback::try_agent_with_retries;
8use super::fallback::TryAgentResult;
9use super::model_flag::resolve_model_with_provider;
10use super::prompt::PipelineRuntime;
11
12fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
14 let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
15 for fb in fallbacks {
16 if *fb != primary_agent && !agents_to_try.contains(fb) {
17 agents_to_try.push(fb);
18 }
19 }
20 agents_to_try
21}
22
23fn get_cli_overrides(
25 role: AgentRole,
26 runtime: &PipelineRuntime<'_>,
27) -> (Option<String>, Option<String>) {
28 match role {
29 AgentRole::Developer => (
30 runtime.config.developer_model.clone(),
31 runtime.config.developer_provider.clone(),
32 ),
33 AgentRole::Reviewer => (
34 runtime.config.reviewer_model.clone(),
35 runtime.config.reviewer_provider.clone(),
36 ),
37 AgentRole::Commit => (None, None), }
39}
40
41struct ModelFlagBuildContext<'a> {
43 agent_index: usize,
44 cli_model_override: Option<&'a String>,
45 cli_provider_override: Option<&'a String>,
46 agent_config: &'a AgentConfig,
47 agent_name: &'a str,
48 fallback_config: &'a crate::agents::fallback::FallbackConfig,
49 display_name: &'a str,
50 runtime: &'a PipelineRuntime<'a>,
51}
52
53fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
55 let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
56
57 if ctx.agent_index == 0
60 && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
61 {
62 let resolved = resolve_model_with_provider(
63 ctx.cli_provider_override.map(std::string::String::as_str),
64 ctx.cli_model_override.map(std::string::String::as_str),
65 ctx.agent_config.model_flag.as_deref(),
66 );
67 if resolved.is_some() {
68 model_flags_to_try.push(resolved);
69 }
70 }
71
72 if model_flags_to_try.is_empty() {
74 model_flags_to_try.push(None);
75 }
76
77 if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
79 let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
80 ctx.runtime.logger.info(&format!(
81 "Agent '{}' has {} provider fallback(s) configured",
82 ctx.display_name,
83 provider_fallbacks.len()
84 ));
85 for model in provider_fallbacks {
86 model_flags_to_try.push(Some(model.clone()));
87 }
88 }
89
90 model_flags_to_try
91}
92
93fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
95 let model_ref = ctx.model_flag.map(std::string::String::as_str);
96 let yolo = matches!(ctx.role, AgentRole::Developer)
99 || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
100 || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
101
102 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
103 match ctx.role {
105 AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
106 ctx.agent_config
107 .build_cmd_with_model(true, true, true, model_ref)
108 }),
109 AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
110 ctx.agent_config
111 .build_cmd_with_model(true, true, yolo, model_ref)
112 }),
113 AgentRole::Commit => runtime.config.commit_cmd.clone().unwrap_or_else(|| {
114 ctx.agent_config
115 .build_cmd_with_model(true, true, yolo, model_ref)
116 }),
117 }
118 } else {
119 ctx.agent_config
120 .build_cmd_with_model(true, true, yolo, model_ref)
121 }
122}
123
124fn validate_glm_print_flag(
130 agent_name: &str,
131 agent_config: &AgentConfig,
132 cmd_str: &str,
133 agent_index: usize,
134 cycle: u32,
135 model_index: usize,
136 runtime: &PipelineRuntime<'_>,
137) {
138 if !crate::agents::is_glm_like_agent(agent_name)
141 || agent_index != 0
142 || cycle != 0
143 || model_index != 0
144 {
145 return;
146 }
147
148 let cmd_argv = split_command(cmd_str).ok();
149 let has_print_flag = cmd_argv
150 .as_ref()
151 .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
152 if !has_print_flag {
153 if agent_config.print_flag.is_empty() {
154 runtime.logger.warn(&format!(
155 "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
156 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
157 ));
158 } else {
159 runtime.logger.warn(&format!(
160 "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
161 ));
162 }
163 }
164}
165
166fn build_execution_metadata(
168 model_flag: Option<&String>,
169 display_name: &str,
170 base_label: &str,
171 agent_name: &str,
172 logfile_prefix: &str,
173 model_index: usize,
174) -> (String, String, String) {
175 let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
176 let display_name_with_suffix = format!("{display_name}{model_suffix}");
177 let label = format!("{base_label} ({display_name_with_suffix})");
178 let logfile = super::logfile::build_logfile_path(logfile_prefix, agent_name, model_index);
179 (label, logfile, display_name_with_suffix)
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184enum TrySingleAgentResult {
185 Success,
187 Unrecoverable(i32),
189 Fallback,
191 NoRetry,
193}
194
195struct TryModelContext<'a> {
197 agent_config: &'a AgentConfig,
198 agent_name: &'a str,
199 display_name: &'a str,
200 agent_index: usize,
201 cycle: u32,
202 model_index: usize,
203 role: AgentRole,
204 model_flag: Option<&'a String>,
205 base_label: &'a str,
206 prompt: &'a str,
207 logfile_prefix: &'a str,
208 fallback_config: &'a crate::agents::fallback::FallbackConfig,
209 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
210 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
211}
212
213fn try_single_model(
215 ctx: &TryModelContext<'_>,
216 runtime: &mut PipelineRuntime<'_>,
217) -> std::io::Result<TrySingleAgentResult> {
218 let mut parser_type = ctx.agent_config.json_parser;
219
220 if ctx.role == AgentRole::Reviewer {
221 if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
222 parser_type = JsonParserType::parse(parser_override);
223 if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
224 runtime.logger.info(&format!(
225 "Using JSON parser override '{parser_override}' for reviewer"
226 ));
227 }
228 }
229 }
230
231 let cmd_str = build_command_for_model(ctx, runtime);
232
233 validate_glm_print_flag(
234 ctx.agent_name,
235 ctx.agent_config,
236 &cmd_str,
237 ctx.agent_index,
238 ctx.cycle,
239 ctx.model_index,
240 runtime,
241 );
242
243 let (label, logfile, display_name_with_suffix) = build_execution_metadata(
244 ctx.model_flag,
245 ctx.display_name,
246 ctx.base_label,
247 ctx.agent_name,
248 ctx.logfile_prefix,
249 ctx.model_index,
250 );
251
252 let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
253 agent_name: ctx.agent_name,
254 model_flag: ctx.model_flag.map(std::string::String::as_str),
255 label: &label,
256 display_name: &display_name_with_suffix,
257 cmd_str: &cmd_str,
258 prompt: ctx.prompt,
259 logfile: &logfile,
260 logfile_prefix: ctx.logfile_prefix,
261 parser_type,
262 env_vars: &ctx.agent_config.env_vars,
263 model_index: ctx.model_index,
264 agent_index: ctx.agent_index,
265 cycle: ctx.cycle as usize,
266 fallback_config: ctx.fallback_config,
267 output_validator: ctx.output_validator,
268 retry_timer: Arc::clone(&ctx.retry_timer),
269 };
270 let result = try_agent_with_retries(&attempt_config, runtime)?;
271
272 match result {
273 TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
274 TryAgentResult::Unrecoverable(exit_code) => {
275 Ok(TrySingleAgentResult::Unrecoverable(exit_code))
276 }
277 TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
278 TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
279 }
280}
281
282struct TryAgentContext<'a> {
284 agent_name: &'a str,
285 agent_index: usize,
286 cycle: u32,
287 role: AgentRole,
288 base_label: &'a str,
289 prompt: &'a str,
290 logfile_prefix: &'a str,
291 cli_model_override: Option<&'a String>,
292 cli_provider_override: Option<&'a String>,
293 output_validator: Option<crate::pipeline::fallback::OutputValidator>,
294 retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
295}
296
297fn try_single_agent(
299 ctx: &TryAgentContext<'_>,
300 runtime: &mut PipelineRuntime<'_>,
301 registry: &AgentRegistry,
302 fallback_config: &crate::agents::fallback::FallbackConfig,
303) -> std::io::Result<TrySingleAgentResult> {
304 let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
305 runtime.logger.warn(&format!(
306 "Agent '{}' not found in registry, skipping",
307 ctx.agent_name
308 ));
309 return Ok(TrySingleAgentResult::Fallback);
310 };
311
312 let display_name = registry.display_name(ctx.agent_name);
313 let model_ctx = ModelFlagBuildContext {
314 agent_index: ctx.agent_index,
315 cli_model_override: ctx.cli_model_override,
316 cli_provider_override: ctx.cli_provider_override,
317 agent_config: &agent_config,
318 agent_name: ctx.agent_name,
319 fallback_config,
320 display_name: &display_name,
321 runtime,
322 };
323 let model_flags_to_try = build_model_flags_list(&model_ctx);
324
325 if ctx.agent_index == 0 && ctx.cycle == 0 {
326 for model_flag in model_flags_to_try.iter().flatten() {
327 for warning in validate_model_flag(model_flag) {
328 runtime.logger.warn(&warning);
329 }
330 }
331 }
332
333 for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
334 let model_ctx = TryModelContext {
335 agent_config: &agent_config,
336 agent_name: ctx.agent_name,
337 display_name: &display_name,
338 agent_index: ctx.agent_index,
339 cycle: ctx.cycle,
340 model_index,
341 role: ctx.role,
342 model_flag: model_flag.as_ref(),
343 base_label: ctx.base_label,
344 prompt: ctx.prompt,
345 logfile_prefix: ctx.logfile_prefix,
346 fallback_config,
347 output_validator: ctx.output_validator,
348 retry_timer: Arc::clone(&ctx.retry_timer),
349 };
350 let result = try_single_model(&model_ctx, runtime)?;
351
352 match result {
353 TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
354 TrySingleAgentResult::Unrecoverable(exit_code) => {
355 return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
356 }
357 TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
358 TrySingleAgentResult::NoRetry => {}
359 }
360 }
361
362 Ok(TrySingleAgentResult::NoRetry)
363}
364
365pub struct FallbackConfig<'a, 'b> {
367 pub role: AgentRole,
368 pub base_label: &'a str,
369 pub prompt: &'a str,
370 pub logfile_prefix: &'a str,
371 pub runtime: &'a mut PipelineRuntime<'b>,
372 pub registry: &'a AgentRegistry,
373 pub primary_agent: &'a str,
374 pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
375}
376
377#[cfg_attr(not(test), allow(dead_code))] pub fn run_with_fallback(
380 role: AgentRole,
381 base_label: &str,
382 prompt: &str,
383 logfile_prefix: &str,
384 runtime: &mut PipelineRuntime<'_>,
385 registry: &AgentRegistry,
386 primary_agent: &str,
387) -> std::io::Result<i32> {
388 let mut config = FallbackConfig {
389 role,
390 base_label,
391 prompt,
392 logfile_prefix,
393 runtime,
394 registry,
395 primary_agent,
396 output_validator: None,
397 };
398 run_with_fallback_internal(&mut config)
399}
400
401pub fn run_with_fallback_and_validator(
409 config: &mut FallbackConfig<'_, '_>,
410) -> std::io::Result<i32> {
411 run_with_fallback_internal(config)
412}
413
414fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
419 let fallback_config = config.registry.fallback_config();
420 let fallbacks = config.registry.available_fallbacks(config.role);
421 if fallback_config.has_fallbacks(config.role) {
422 config.runtime.logger.info(&format!(
423 "Agent fallback chain for {}: {}",
424 config.role,
425 fallbacks.join(", ")
426 ));
427 } else {
428 config.runtime.logger.info(&format!(
429 "No configured fallbacks for {}, using primary only",
430 config.role
431 ));
432 }
433
434 let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
435 let (cli_model_override, cli_provider_override) =
436 get_cli_overrides(config.role, config.runtime);
437
438 for cycle in 0..fallback_config.max_cycles {
439 if cycle > 0 {
440 let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
441 config.runtime.logger.info(&format!(
442 "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
443 cycle + 1,
444 fallback_config.max_cycles,
445 backoff_ms
446 ));
447 config
448 .registry
449 .retry_timer()
450 .sleep(std::time::Duration::from_millis(backoff_ms));
451 }
452
453 for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
454 let ctx = TryAgentContext {
455 agent_name,
456 agent_index,
457 cycle,
458 role: config.role,
459 base_label: config.base_label,
460 prompt: config.prompt,
461 logfile_prefix: config.logfile_prefix,
462 cli_model_override: cli_model_override.as_ref(),
463 cli_provider_override: cli_provider_override.as_ref(),
464 output_validator: config.output_validator,
465 retry_timer: config.registry.retry_timer(),
466 };
467 let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
468
469 match result {
470 TrySingleAgentResult::Success => return Ok(0),
471 TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
472 TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
473 }
474 }
475 }
476
477 config.runtime.logger.error(&format!(
478 "All agents exhausted after {} cycles with exponential backoff",
479 fallback_config.max_cycles
480 ));
481 Ok(1)
482}
483
484#[derive(Debug)]
508pub enum SessionContinuationResult {
509 #[allow(dead_code)] Ran { logfile: String, exit_code: i32 },
516 Fallback,
519}
520
521pub struct XsdRetryConfig<'a, 'b> {
523 pub role: AgentRole,
525 pub base_label: &'a str,
527 pub prompt: &'a str,
529 pub logfile_prefix: &'a str,
531 pub runtime: &'a mut PipelineRuntime<'b>,
533 pub registry: &'a AgentRegistry,
535 pub primary_agent: &'a str,
537 pub session_info: Option<&'a crate::pipeline::session::SessionInfo>,
540 pub retry_num: usize,
542 pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
545}
546
547pub fn run_xsd_retry_with_session(config: &mut XsdRetryConfig<'_, '_>) -> std::io::Result<i32> {
578 if config.retry_num > 0 {
580 if let Some(session_info) = config.session_info {
581 config.runtime.logger.info(&format!(
583 " Attempting session continuation with {} (session: {}...)",
584 session_info.agent_name,
585 &session_info.session_id[..8.min(session_info.session_id.len())]
586 ));
587 match try_session_continuation(config, session_info) {
588 SessionContinuationResult::Ran {
589 logfile: _,
590 exit_code,
591 } => {
592 config
596 .runtime
597 .logger
598 .info(" Session continuation succeeded");
599 return Ok(exit_code);
600 }
601 SessionContinuationResult::Fallback => {
602 config
604 .runtime
605 .logger
606 .warn(" Session continuation failed, falling back to new session");
607 }
608 }
609 } else {
610 config
611 .runtime
612 .logger
613 .warn(" No session info available for retry, starting new session");
614 }
615 }
616
617 let mut fallback_config = FallbackConfig {
619 role: config.role,
620 base_label: config.base_label,
621 prompt: config.prompt,
622 logfile_prefix: config.logfile_prefix,
623 runtime: config.runtime,
624 registry: config.registry,
625 primary_agent: config.primary_agent,
626 output_validator: config.output_validator,
627 };
628 run_with_fallback_and_validator(&mut fallback_config)
629}
630
631fn try_session_continuation(
641 config: &mut XsdRetryConfig<'_, '_>,
642 session_info: &crate::pipeline::session::SessionInfo,
643) -> SessionContinuationResult {
644 let registry_name = config
649 .registry
650 .resolve_from_logfile_name(&session_info.agent_name)
651 .unwrap_or_else(|| session_info.agent_name.clone());
652
653 let agent_config = match config.registry.resolve_config(®istry_name) {
655 Some(cfg) => cfg,
656 None => {
657 return SessionContinuationResult::Fallback;
659 }
660 };
661
662 if !agent_config.supports_session_continuation() {
663 return SessionContinuationResult::Fallback;
665 }
666
667 let yolo = matches!(config.role, AgentRole::Developer);
669 let cmd_str = agent_config.build_cmd_with_session(
670 true, yolo, true, None, Some(&session_info.session_id),
675 );
676
677 let sanitized_agent = super::logfile::sanitize_agent_name(&session_info.agent_name);
680 let logfile = format!(
681 "{}_{}_session_{}.log",
682 config.logfile_prefix, sanitized_agent, config.retry_num
683 );
684
685 if config.runtime.config.verbosity.is_debug() {
687 config.runtime.logger.info(&format!(
688 " Attempting session continuation with {} (session: {})",
689 session_info.agent_name, session_info.session_id
690 ));
691 }
692
693 let cmd = crate::pipeline::PromptCommand {
695 cmd_str: &cmd_str,
696 prompt: config.prompt,
697 label: &format!("{} (session)", config.base_label),
698 display_name: &session_info.agent_name,
699 logfile: &logfile,
700 parser_type: agent_config.json_parser,
701 env_vars: &agent_config.env_vars,
702 };
703
704 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
707 crate::pipeline::run_with_prompt(&cmd, config.runtime)
708 }));
709
710 match result {
711 Ok(Ok(cmd_result)) => {
712 SessionContinuationResult::Ran {
715 logfile,
716 exit_code: cmd_result.exit_code,
717 }
718 }
719 Ok(Err(_io_error)) => {
720 SessionContinuationResult::Fallback
723 }
724 Err(_panic) => {
725 SessionContinuationResult::Fallback
728 }
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use std::sync::Mutex;
735
736 static ENV_MUTEX: Mutex<()> = Mutex::new(());
737
738 struct EnvGuard {
739 key: &'static str,
740 prev: Option<std::ffi::OsString>,
741 }
742
743 impl EnvGuard {
744 fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
745 let _lock = ENV_MUTEX
746 .lock()
747 .unwrap_or_else(std::sync::PoisonError::into_inner);
748 vars.iter()
749 .map(|&(key, value)| {
750 let prev = std::env::var_os(key);
751 std::env::set_var(key, value);
752 Self { key, prev }
753 })
754 .collect()
755 }
756 }
757
758 impl Drop for EnvGuard {
759 fn drop(&mut self) {
760 match self.prev.take() {
761 Some(v) => std::env::set_var(self.key, v),
762 None => std::env::remove_var(self.key),
763 }
764 }
765 }
766
767 #[test]
779 fn test_runner_sanitizes_anthropic_env_vars() {
780 const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
782 "ANTHROPIC_API_KEY",
783 "ANTHROPIC_BASE_URL",
784 "ANTHROPIC_AUTH_TOKEN",
785 "ANTHROPIC_MODEL",
786 "ANTHROPIC_DEFAULT_HAIKU_MODEL",
787 "ANTHROPIC_DEFAULT_OPUS_MODEL",
788 "ANTHROPIC_DEFAULT_SONNET_MODEL",
789 ];
790
791 let _guard = EnvGuard::set_multiple(&[
792 ("ANTHROPIC_API_KEY", "test-token-glm"),
793 ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
794 ]);
795
796 let mut cmd = std::process::Command::new("printenv");
799 for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
800 cmd.env_remove(var);
801 }
802
803 let output = match cmd.output() {
805 Ok(o) => o,
806 Err(e) => {
807 eprintln!("Skipping test: printenv not available ({e})");
809 return;
810 }
811 };
812 let stdout = String::from_utf8_lossy(&output.stdout);
813
814 assert!(!stdout.contains("test-token-glm"));
817 assert!(!stdout.contains("https://glm.example.com"));
818 }
819
820 #[test]
821 fn test_runner_does_not_sanitize_explicit_env_vars() {
822 let mut cmd = std::process::Command::new("printenv");
826
827 let agent_env_vars =
829 std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
830
831 for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
833 cmd.env_remove(var);
834 }
835
836 for (key, value) in &agent_env_vars {
838 cmd.env(key, value);
839 }
840
841 let output = match cmd.output() {
842 Ok(o) => o,
843 Err(e) => {
844 eprintln!("Skipping test: printenv not available ({e})");
846 return;
847 }
848 };
849 let stdout = String::from_utf8_lossy(&output.stdout);
850
851 assert!(stdout.contains("agent-specific-key"));
853 }
854}