ralph_workflow/pipeline/
runner.rs

1//! Command execution helpers and fallback orchestration.
2
3use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5
6use super::fallback::try_agent_with_retries;
7use super::fallback::TryAgentResult;
8use super::model_flag::resolve_model_with_provider;
9use super::prompt::PipelineRuntime;
10
11/// Build the list of agents to try and log the fallback chain.
12fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
13    let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
14    for fb in fallbacks {
15        if *fb != primary_agent && !agents_to_try.contains(fb) {
16            agents_to_try.push(fb);
17        }
18    }
19    agents_to_try
20}
21
22/// Get CLI model/provider overrides based on role.
23fn get_cli_overrides(
24    role: AgentRole,
25    runtime: &PipelineRuntime<'_>,
26) -> (Option<String>, Option<String>) {
27    match role {
28        AgentRole::Developer => (
29            runtime.config.developer_model.clone(),
30            runtime.config.developer_provider.clone(),
31        ),
32        AgentRole::Reviewer => (
33            runtime.config.reviewer_model.clone(),
34            runtime.config.reviewer_provider.clone(),
35        ),
36        AgentRole::Commit => (None, None), // Commit role doesn't have CLI overrides
37    }
38}
39
40/// Context for building model flags.
41struct ModelFlagBuildContext<'a> {
42    agent_index: usize,
43    cli_model_override: Option<&'a String>,
44    cli_provider_override: Option<&'a String>,
45    agent_config: &'a AgentConfig,
46    agent_name: &'a str,
47    fallback_config: &'a crate::agents::fallback::FallbackConfig,
48    display_name: &'a str,
49    runtime: &'a PipelineRuntime<'a>,
50}
51
52/// Build the list of model flags to try for an agent.
53fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
54    let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
55
56    // CLI override takes highest priority for primary agent
57    // Provider override can modify the model's provider prefix
58    if ctx.agent_index == 0
59        && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
60    {
61        let resolved = resolve_model_with_provider(
62            ctx.cli_provider_override.map(std::string::String::as_str),
63            ctx.cli_model_override.map(std::string::String::as_str),
64            ctx.agent_config.model_flag.as_deref(),
65        );
66        if resolved.is_some() {
67            model_flags_to_try.push(resolved);
68        }
69    }
70
71    // Add the agent's default model (None means use agent's configured model_flag or no model)
72    if model_flags_to_try.is_empty() {
73        model_flags_to_try.push(None);
74    }
75
76    // Add provider fallback models for this agent
77    if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
78        let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
79        ctx.runtime.logger.info(&format!(
80            "Agent '{}' has {} provider fallback(s) configured",
81            ctx.display_name,
82            provider_fallbacks.len()
83        ));
84        for model in provider_fallbacks {
85            model_flags_to_try.push(Some(model.clone()));
86        }
87    }
88
89    model_flags_to_try
90}
91
92/// Build the command string for a specific model configuration.
93fn build_command_for_model(
94    agent_index: usize,
95    cycle: u32,
96    model_index: usize,
97    role: AgentRole,
98    model_flag: Option<&str>,
99    agent_config: &AgentConfig,
100    runtime: &PipelineRuntime<'_>,
101) -> String {
102    let model_ref = model_flag;
103    if agent_index == 0 && cycle == 0 && model_index == 0 {
104        // For primary agent on first cycle, respect env var command overrides
105        match role {
106            AgentRole::Developer => {
107                runtime.config.developer_cmd.clone().unwrap_or_else(|| {
108                    agent_config.build_cmd_with_model(true, true, true, model_ref)
109                })
110            }
111            AgentRole::Reviewer => {
112                runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
113                    agent_config.build_cmd_with_model(true, true, false, model_ref)
114                })
115            }
116            AgentRole::Commit => {
117                // Commit role doesn't have cmd override, use default
118                agent_config.build_cmd_with_model(true, true, false, model_ref)
119            }
120        }
121    } else {
122        agent_config.build_cmd_with_model(true, true, role == AgentRole::Developer, model_ref)
123    }
124}
125
126/// GLM-specific validation for print flag.
127fn validate_glm_print_flag(
128    agent_name: &str,
129    agent_config: &AgentConfig,
130    cmd_str: &str,
131    agent_index: usize,
132    cycle: u32,
133    model_index: usize,
134    runtime: &PipelineRuntime<'_>,
135) {
136    if !crate::agents::is_glm_like_agent(agent_name)
137        || agent_index != 0
138        || cycle != 0
139        || model_index != 0
140    {
141        return;
142    }
143
144    let cmd_argv = split_command(cmd_str).ok();
145    let has_print_flag = cmd_argv
146        .as_ref()
147        .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
148    if !has_print_flag {
149        if agent_config.print_flag.is_empty() {
150            runtime.logger.warn(&format!(
151                "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
152                 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
153            ));
154        } else {
155            runtime.logger.warn(&format!(
156                "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
157            ));
158        }
159    }
160}
161
162/// Build label and logfile paths for execution.
163fn build_execution_metadata(
164    model_flag: Option<&String>,
165    display_name: &str,
166    base_label: &str,
167    agent_name: &str,
168    logfile_prefix: &str,
169    model_index: usize,
170) -> (String, String, String) {
171    let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
172    let display_name_with_suffix = format!("{display_name}{model_suffix}");
173    let label = format!("{base_label} ({display_name_with_suffix})");
174    // Sanitize agent name for log file path - replace "/" with "-" to avoid
175    // creating subdirectories (e.g., "ccs/glm" -> "ccs-glm")
176    let safe_agent_name = agent_name.replace('/', "-");
177    let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
178    (label, logfile, display_name_with_suffix)
179}
180
181/// Result of trying a single agent.
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183enum TrySingleAgentResult {
184    /// Agent succeeded - return success
185    Success,
186    /// Unrecoverable error - abort immediately
187    Unrecoverable(i32),
188    /// Should fall back to next agent
189    Fallback,
190    /// Continue to next model (no retry)
191    NoRetry,
192}
193
194/// Context for trying a single model.
195struct TryModelContext<'a> {
196    agent_config: &'a AgentConfig,
197    agent_name: &'a str,
198    display_name: &'a str,
199    agent_index: usize,
200    cycle: u32,
201    model_index: usize,
202    role: AgentRole,
203    model_flag: Option<&'a String>,
204    base_label: &'a str,
205    prompt: &'a str,
206    logfile_prefix: &'a str,
207    fallback_config: &'a crate::agents::fallback::FallbackConfig,
208    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
209}
210
211/// Try a single model configuration for an agent.
212fn try_single_model(
213    ctx: &TryModelContext<'_>,
214    runtime: &mut PipelineRuntime<'_>,
215) -> std::io::Result<TrySingleAgentResult> {
216    let mut parser_type = ctx.agent_config.json_parser;
217
218    if ctx.role == AgentRole::Reviewer {
219        if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
220            parser_type = JsonParserType::parse(parser_override);
221            if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
222                runtime.logger.info(&format!(
223                    "Using JSON parser override '{parser_override}' for reviewer"
224                ));
225            }
226        }
227    }
228
229    let cmd_str = build_command_for_model(
230        ctx.agent_index,
231        ctx.cycle,
232        ctx.model_index,
233        ctx.role,
234        ctx.model_flag.map(std::string::String::as_str),
235        ctx.agent_config,
236        runtime,
237    );
238
239    validate_glm_print_flag(
240        ctx.agent_name,
241        ctx.agent_config,
242        &cmd_str,
243        ctx.agent_index,
244        ctx.cycle,
245        ctx.model_index,
246        runtime,
247    );
248
249    let (label, logfile, display_name_with_suffix) = build_execution_metadata(
250        ctx.model_flag,
251        ctx.display_name,
252        ctx.base_label,
253        ctx.agent_name,
254        ctx.logfile_prefix,
255        ctx.model_index,
256    );
257
258    let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
259        agent_name: ctx.agent_name,
260        model_flag: ctx.model_flag.map(std::string::String::as_str),
261        label: &label,
262        display_name: &display_name_with_suffix,
263        cmd_str: &cmd_str,
264        prompt: ctx.prompt,
265        logfile: &logfile,
266        logfile_prefix: ctx.logfile_prefix,
267        parser_type,
268        env_vars: &ctx.agent_config.env_vars,
269        model_index: ctx.model_index,
270        agent_index: ctx.agent_index,
271        cycle: ctx.cycle as usize,
272        fallback_config: ctx.fallback_config,
273        output_validator: ctx.output_validator,
274    };
275    let result = try_agent_with_retries(&attempt_config, runtime)?;
276
277    match result {
278        TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
279        TryAgentResult::Unrecoverable(exit_code) => {
280            Ok(TrySingleAgentResult::Unrecoverable(exit_code))
281        }
282        TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
283        TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
284    }
285}
286
287/// Context for trying a single agent.
288struct TryAgentContext<'a> {
289    agent_name: &'a str,
290    agent_index: usize,
291    cycle: u32,
292    role: AgentRole,
293    base_label: &'a str,
294    prompt: &'a str,
295    logfile_prefix: &'a str,
296    cli_model_override: Option<&'a String>,
297    cli_provider_override: Option<&'a String>,
298    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
299}
300
301/// Try a single agent with all its model configurations.
302fn try_single_agent(
303    ctx: &TryAgentContext<'_>,
304    runtime: &mut PipelineRuntime<'_>,
305    registry: &AgentRegistry,
306    fallback_config: &crate::agents::fallback::FallbackConfig,
307) -> std::io::Result<TrySingleAgentResult> {
308    let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
309        runtime.logger.warn(&format!(
310            "Agent '{}' not found in registry, skipping",
311            ctx.agent_name
312        ));
313        return Ok(TrySingleAgentResult::Fallback);
314    };
315
316    let display_name = registry.display_name(ctx.agent_name);
317    let model_ctx = ModelFlagBuildContext {
318        agent_index: ctx.agent_index,
319        cli_model_override: ctx.cli_model_override,
320        cli_provider_override: ctx.cli_provider_override,
321        agent_config: &agent_config,
322        agent_name: ctx.agent_name,
323        fallback_config,
324        display_name: &display_name,
325        runtime,
326    };
327    let model_flags_to_try = build_model_flags_list(&model_ctx);
328
329    if ctx.agent_index == 0 && ctx.cycle == 0 {
330        for model_flag in model_flags_to_try.iter().flatten() {
331            for warning in validate_model_flag(model_flag) {
332                runtime.logger.warn(&warning);
333            }
334        }
335    }
336
337    for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
338        let model_ctx = TryModelContext {
339            agent_config: &agent_config,
340            agent_name: ctx.agent_name,
341            display_name: &display_name,
342            agent_index: ctx.agent_index,
343            cycle: ctx.cycle,
344            model_index,
345            role: ctx.role,
346            model_flag: model_flag.as_ref(),
347            base_label: ctx.base_label,
348            prompt: ctx.prompt,
349            logfile_prefix: ctx.logfile_prefix,
350            fallback_config,
351            output_validator: ctx.output_validator,
352        };
353        let result = try_single_model(&model_ctx, runtime)?;
354
355        match result {
356            TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
357            TrySingleAgentResult::Unrecoverable(exit_code) => {
358                return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
359            }
360            TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
361            TrySingleAgentResult::NoRetry => {}
362        }
363    }
364
365    Ok(TrySingleAgentResult::NoRetry)
366}
367
368/// Configuration for running with fallback.
369pub struct FallbackConfig<'a, 'b> {
370    pub role: AgentRole,
371    pub base_label: &'a str,
372    pub prompt: &'a str,
373    pub logfile_prefix: &'a str,
374    pub runtime: &'a mut PipelineRuntime<'b>,
375    pub registry: &'a AgentRegistry,
376    pub primary_agent: &'a str,
377    pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
378}
379
380/// Run a command with automatic fallback to alternative agents on failure.
381pub fn run_with_fallback(
382    role: AgentRole,
383    base_label: &str,
384    prompt: &str,
385    logfile_prefix: &str,
386    runtime: &mut PipelineRuntime<'_>,
387    registry: &AgentRegistry,
388    primary_agent: &str,
389) -> std::io::Result<i32> {
390    let mut config = FallbackConfig {
391        role,
392        base_label,
393        prompt,
394        logfile_prefix,
395        runtime,
396        registry,
397        primary_agent,
398        output_validator: None,
399    };
400    run_with_fallback_internal(&mut config)
401}
402
403/// Run a command with automatic fallback to alternative agents on failure.
404///
405/// Includes an optional output validator callback that checks if the agent
406/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
407///
408/// This variant takes a `FallbackConfig` directly for cases where you need
409/// to specify an output validator.
410pub fn run_with_fallback_and_validator(
411    config: &mut FallbackConfig<'_, '_>,
412) -> std::io::Result<i32> {
413    run_with_fallback_internal(config)
414}
415
416/// Run a command with automatic fallback to alternative agents on failure.
417///
418/// Includes an optional output validator callback that checks if the agent
419/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
420fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
421    let fallback_config = config.registry.fallback_config();
422    let fallbacks = config.registry.available_fallbacks(config.role);
423    if fallback_config.has_fallbacks(config.role) {
424        config.runtime.logger.info(&format!(
425            "Agent fallback chain for {}: {}",
426            config.role,
427            fallbacks.join(", ")
428        ));
429    } else {
430        config.runtime.logger.info(&format!(
431            "No configured fallbacks for {}, using primary only",
432            config.role
433        ));
434    }
435
436    let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
437    let (cli_model_override, cli_provider_override) =
438        get_cli_overrides(config.role, config.runtime);
439
440    for cycle in 0..fallback_config.max_cycles {
441        if cycle > 0 {
442            let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
443            config.runtime.logger.info(&format!(
444                "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
445                cycle + 1,
446                fallback_config.max_cycles,
447                backoff_ms
448            ));
449            std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
450        }
451
452        for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
453            let ctx = TryAgentContext {
454                agent_name,
455                agent_index,
456                cycle,
457                role: config.role,
458                base_label: config.base_label,
459                prompt: config.prompt,
460                logfile_prefix: config.logfile_prefix,
461                cli_model_override: cli_model_override.as_ref(),
462                cli_provider_override: cli_provider_override.as_ref(),
463                output_validator: config.output_validator,
464            };
465            let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
466
467            match result {
468                TrySingleAgentResult::Success => return Ok(0),
469                TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
470                TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
471            }
472        }
473    }
474
475    config.runtime.logger.error(&format!(
476        "All agents exhausted after {} cycles with exponential backoff",
477        fallback_config.max_cycles
478    ));
479    Ok(1)
480}
481
482#[cfg(test)]
483mod tests {
484    use std::sync::Mutex;
485
486    static ENV_MUTEX: Mutex<()> = Mutex::new(());
487
488    struct EnvGuard {
489        key: &'static str,
490        prev: Option<std::ffi::OsString>,
491    }
492
493    impl EnvGuard {
494        fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
495            let _lock = ENV_MUTEX
496                .lock()
497                .unwrap_or_else(std::sync::PoisonError::into_inner);
498            vars.iter()
499                .map(|&(key, value)| {
500                    let prev = std::env::var_os(key);
501                    std::env::set_var(key, value);
502                    Self { key, prev }
503                })
504                .collect()
505        }
506    }
507
508    impl Drop for EnvGuard {
509        fn drop(&mut self) {
510            match self.prev.take() {
511                Some(v) => std::env::set_var(self.key, v),
512                None => std::env::remove_var(self.key),
513            }
514        }
515    }
516
517    /// Test that environment variable sanitization works correctly.
518    ///
519    /// This regression test ensures that when running agents with empty `env_vars`,
520    /// GLM/CCS environment variables from the parent shell are NOT passed to
521    /// the subprocess. This is critical for preventing "Invalid API key" errors
522    /// when switching between GLM (CCS) and standard Claude agents.
523    ///
524    /// The test:
525    /// 1. Sets GLM-like environment variables in the test process
526    /// 2. Creates a Command that would be used for an agent with empty `env_vars`
527    /// 3. Verifies that the problematic Anthropic env vars are cleared
528    #[test]
529    fn test_runner_sanitizes_anthropic_env_vars() {
530        // Anthropic environment variables to sanitize
531        const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
532            "ANTHROPIC_API_KEY",
533            "ANTHROPIC_BASE_URL",
534            "ANTHROPIC_AUTH_TOKEN",
535            "ANTHROPIC_MODEL",
536            "ANTHROPIC_DEFAULT_HAIKU_MODEL",
537            "ANTHROPIC_DEFAULT_OPUS_MODEL",
538            "ANTHROPIC_DEFAULT_SONNET_MODEL",
539        ];
540
541        let _guard = EnvGuard::set_multiple(&[
542            ("ANTHROPIC_API_KEY", "test-token-glm"),
543            ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
544        ]);
545
546        // Simulate running an agent with empty env_vars (like codex)
547        // The ANTHROPIC_* vars should be sanitized from the parent environment
548        let mut cmd = std::process::Command::new("printenv");
549        for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
550            cmd.env_remove(var);
551        }
552
553        // Execute the command and check that GLM variables are NOT present
554        let output = match cmd.output() {
555            Ok(o) => o,
556            Err(e) => {
557                // printenv might not be available on all systems
558                eprintln!("Skipping test: printenv not available ({e})");
559                return;
560            }
561        };
562        let stdout = String::from_utf8_lossy(&output.stdout);
563
564        // The GLM-set variables should NOT be in the subprocess environment
565        // (they were sanitized by env_remove)
566        assert!(!stdout.contains("test-token-glm"));
567        assert!(!stdout.contains("https://glm.example.com"));
568    }
569
570    #[test]
571    fn test_runner_does_not_sanitize_explicit_env_vars() {
572        // If an agent explicitly sets ANTHROPIC_API_KEY in its env_vars,
573        // that should NOT be sanitized
574
575        let mut cmd = std::process::Command::new("printenv");
576
577        // Simulate agent setting its own ANTHROPIC_API_KEY
578        let agent_env_vars =
579            std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
580
581        // First, sanitize all Anthropic vars
582        for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
583            cmd.env_remove(var);
584        }
585
586        // Then, apply agent's env_vars (which should NOT be sanitized)
587        for (key, value) in &agent_env_vars {
588            cmd.env(key, value);
589        }
590
591        let output = match cmd.output() {
592            Ok(o) => o,
593            Err(e) => {
594                // printenv might not be available on all systems
595                eprintln!("Skipping test: printenv not available ({e})");
596                return;
597            }
598        };
599        let stdout = String::from_utf8_lossy(&output.stdout);
600
601        // The agent-specific key should be present
602        assert!(stdout.contains("agent-specific-key"));
603    }
604}