ralph_workflow/pipeline/
runner.rs

1//! Command execution helpers and fallback orchestration.
2
3use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5
6use super::fallback::try_agent_with_retries;
7use super::fallback::TryAgentResult;
8use super::model_flag::resolve_model_with_provider;
9use super::prompt::PipelineRuntime;
10
11/// Build the list of agents to try and log the fallback chain.
12fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
13    let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
14    for fb in fallbacks {
15        if *fb != primary_agent && !agents_to_try.contains(fb) {
16            agents_to_try.push(fb);
17        }
18    }
19    agents_to_try
20}
21
22/// Get CLI model/provider overrides based on role.
23fn get_cli_overrides(
24    role: AgentRole,
25    runtime: &PipelineRuntime<'_>,
26) -> (Option<String>, Option<String>) {
27    match role {
28        AgentRole::Developer => (
29            runtime.config.developer_model.clone(),
30            runtime.config.developer_provider.clone(),
31        ),
32        AgentRole::Reviewer => (
33            runtime.config.reviewer_model.clone(),
34            runtime.config.reviewer_provider.clone(),
35        ),
36        AgentRole::Commit => (None, None), // Commit role doesn't have CLI overrides
37    }
38}
39
40/// Context for building model flags.
41struct ModelFlagBuildContext<'a> {
42    agent_index: usize,
43    cli_model_override: Option<&'a String>,
44    cli_provider_override: Option<&'a String>,
45    agent_config: &'a AgentConfig,
46    agent_name: &'a str,
47    fallback_config: &'a crate::agents::fallback::FallbackConfig,
48    display_name: &'a str,
49    runtime: &'a PipelineRuntime<'a>,
50}
51
52/// Build the list of model flags to try for an agent.
53fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
54    let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
55
56    // CLI override takes highest priority for primary agent
57    // Provider override can modify the model's provider prefix
58    if ctx.agent_index == 0
59        && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
60    {
61        let resolved = resolve_model_with_provider(
62            ctx.cli_provider_override.map(std::string::String::as_str),
63            ctx.cli_model_override.map(std::string::String::as_str),
64            ctx.agent_config.model_flag.as_deref(),
65        );
66        if resolved.is_some() {
67            model_flags_to_try.push(resolved);
68        }
69    }
70
71    // Add the agent's default model (None means use agent's configured model_flag or no model)
72    if model_flags_to_try.is_empty() {
73        model_flags_to_try.push(None);
74    }
75
76    // Add provider fallback models for this agent
77    if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
78        let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
79        ctx.runtime.logger.info(&format!(
80            "Agent '{}' has {} provider fallback(s) configured",
81            ctx.display_name,
82            provider_fallbacks.len()
83        ));
84        for model in provider_fallbacks {
85            model_flags_to_try.push(Some(model.clone()));
86        }
87    }
88
89    model_flags_to_try
90}
91
92/// Build the command string for a specific model configuration.
93fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
94    let model_ref = ctx.model_flag.map(std::string::String::as_str);
95    // Enable yolo for Developer role always.
96    // For Reviewer and Commit, only enable in fix mode (detected via base_label starting with "fix").
97    let yolo = matches!(ctx.role, AgentRole::Developer)
98        || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
99        || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
100
101    if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
102        // For primary agent on first cycle, respect env var command overrides
103        match ctx.role {
104            AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
105                ctx.agent_config
106                    .build_cmd_with_model(true, true, true, model_ref)
107            }),
108            AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
109                ctx.agent_config
110                    .build_cmd_with_model(true, true, yolo, model_ref)
111            }),
112            AgentRole::Commit => {
113                // Commit role doesn't have cmd override, use default
114                ctx.agent_config
115                    .build_cmd_with_model(true, true, yolo, model_ref)
116            }
117        }
118    } else {
119        ctx.agent_config
120            .build_cmd_with_model(true, true, yolo, model_ref)
121    }
122}
123
124/// GLM-specific validation for print flag.
125fn validate_glm_print_flag(
126    agent_name: &str,
127    agent_config: &AgentConfig,
128    cmd_str: &str,
129    agent_index: usize,
130    cycle: u32,
131    model_index: usize,
132    runtime: &PipelineRuntime<'_>,
133) {
134    if !crate::agents::is_glm_like_agent(agent_name)
135        || agent_index != 0
136        || cycle != 0
137        || model_index != 0
138    {
139        return;
140    }
141
142    let cmd_argv = split_command(cmd_str).ok();
143    let has_print_flag = cmd_argv
144        .as_ref()
145        .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
146    if !has_print_flag {
147        if agent_config.print_flag.is_empty() {
148            runtime.logger.warn(&format!(
149                "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
150                 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
151            ));
152        } else {
153            runtime.logger.warn(&format!(
154                "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
155            ));
156        }
157    }
158}
159
160/// Build label and logfile paths for execution.
161fn build_execution_metadata(
162    model_flag: Option<&String>,
163    display_name: &str,
164    base_label: &str,
165    agent_name: &str,
166    logfile_prefix: &str,
167    model_index: usize,
168) -> (String, String, String) {
169    let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
170    let display_name_with_suffix = format!("{display_name}{model_suffix}");
171    let label = format!("{base_label} ({display_name_with_suffix})");
172    // Sanitize agent name for log file path - replace "/" with "-" to avoid
173    // creating subdirectories (e.g., "ccs/glm" -> "ccs-glm")
174    let safe_agent_name = agent_name.replace('/', "-");
175    let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
176    (label, logfile, display_name_with_suffix)
177}
178
179/// Result of trying a single agent.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181enum TrySingleAgentResult {
182    /// Agent succeeded - return success
183    Success,
184    /// Unrecoverable error - abort immediately
185    Unrecoverable(i32),
186    /// Should fall back to next agent
187    Fallback,
188    /// Continue to next model (no retry)
189    NoRetry,
190}
191
192/// Context for trying a single model.
193struct TryModelContext<'a> {
194    agent_config: &'a AgentConfig,
195    agent_name: &'a str,
196    display_name: &'a str,
197    agent_index: usize,
198    cycle: u32,
199    model_index: usize,
200    role: AgentRole,
201    model_flag: Option<&'a String>,
202    base_label: &'a str,
203    prompt: &'a str,
204    logfile_prefix: &'a str,
205    fallback_config: &'a crate::agents::fallback::FallbackConfig,
206    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
207}
208
209/// Try a single model configuration for an agent.
210fn try_single_model(
211    ctx: &TryModelContext<'_>,
212    runtime: &mut PipelineRuntime<'_>,
213) -> std::io::Result<TrySingleAgentResult> {
214    let mut parser_type = ctx.agent_config.json_parser;
215
216    if ctx.role == AgentRole::Reviewer {
217        if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
218            parser_type = JsonParserType::parse(parser_override);
219            if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
220                runtime.logger.info(&format!(
221                    "Using JSON parser override '{parser_override}' for reviewer"
222                ));
223            }
224        }
225    }
226
227    let cmd_str = build_command_for_model(ctx, runtime);
228
229    validate_glm_print_flag(
230        ctx.agent_name,
231        ctx.agent_config,
232        &cmd_str,
233        ctx.agent_index,
234        ctx.cycle,
235        ctx.model_index,
236        runtime,
237    );
238
239    let (label, logfile, display_name_with_suffix) = build_execution_metadata(
240        ctx.model_flag,
241        ctx.display_name,
242        ctx.base_label,
243        ctx.agent_name,
244        ctx.logfile_prefix,
245        ctx.model_index,
246    );
247
248    let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
249        agent_name: ctx.agent_name,
250        model_flag: ctx.model_flag.map(std::string::String::as_str),
251        label: &label,
252        display_name: &display_name_with_suffix,
253        cmd_str: &cmd_str,
254        prompt: ctx.prompt,
255        logfile: &logfile,
256        logfile_prefix: ctx.logfile_prefix,
257        parser_type,
258        env_vars: &ctx.agent_config.env_vars,
259        model_index: ctx.model_index,
260        agent_index: ctx.agent_index,
261        cycle: ctx.cycle as usize,
262        fallback_config: ctx.fallback_config,
263        output_validator: ctx.output_validator,
264    };
265    let result = try_agent_with_retries(&attempt_config, runtime)?;
266
267    match result {
268        TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
269        TryAgentResult::Unrecoverable(exit_code) => {
270            Ok(TrySingleAgentResult::Unrecoverable(exit_code))
271        }
272        TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
273        TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
274    }
275}
276
277/// Context for trying a single agent.
278struct TryAgentContext<'a> {
279    agent_name: &'a str,
280    agent_index: usize,
281    cycle: u32,
282    role: AgentRole,
283    base_label: &'a str,
284    prompt: &'a str,
285    logfile_prefix: &'a str,
286    cli_model_override: Option<&'a String>,
287    cli_provider_override: Option<&'a String>,
288    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
289}
290
291/// Try a single agent with all its model configurations.
292fn try_single_agent(
293    ctx: &TryAgentContext<'_>,
294    runtime: &mut PipelineRuntime<'_>,
295    registry: &AgentRegistry,
296    fallback_config: &crate::agents::fallback::FallbackConfig,
297) -> std::io::Result<TrySingleAgentResult> {
298    let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
299        runtime.logger.warn(&format!(
300            "Agent '{}' not found in registry, skipping",
301            ctx.agent_name
302        ));
303        return Ok(TrySingleAgentResult::Fallback);
304    };
305
306    let display_name = registry.display_name(ctx.agent_name);
307    let model_ctx = ModelFlagBuildContext {
308        agent_index: ctx.agent_index,
309        cli_model_override: ctx.cli_model_override,
310        cli_provider_override: ctx.cli_provider_override,
311        agent_config: &agent_config,
312        agent_name: ctx.agent_name,
313        fallback_config,
314        display_name: &display_name,
315        runtime,
316    };
317    let model_flags_to_try = build_model_flags_list(&model_ctx);
318
319    if ctx.agent_index == 0 && ctx.cycle == 0 {
320        for model_flag in model_flags_to_try.iter().flatten() {
321            for warning in validate_model_flag(model_flag) {
322                runtime.logger.warn(&warning);
323            }
324        }
325    }
326
327    for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
328        let model_ctx = TryModelContext {
329            agent_config: &agent_config,
330            agent_name: ctx.agent_name,
331            display_name: &display_name,
332            agent_index: ctx.agent_index,
333            cycle: ctx.cycle,
334            model_index,
335            role: ctx.role,
336            model_flag: model_flag.as_ref(),
337            base_label: ctx.base_label,
338            prompt: ctx.prompt,
339            logfile_prefix: ctx.logfile_prefix,
340            fallback_config,
341            output_validator: ctx.output_validator,
342        };
343        let result = try_single_model(&model_ctx, runtime)?;
344
345        match result {
346            TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
347            TrySingleAgentResult::Unrecoverable(exit_code) => {
348                return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
349            }
350            TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
351            TrySingleAgentResult::NoRetry => {}
352        }
353    }
354
355    Ok(TrySingleAgentResult::NoRetry)
356}
357
358/// Configuration for running with fallback.
359pub struct FallbackConfig<'a, 'b> {
360    pub role: AgentRole,
361    pub base_label: &'a str,
362    pub prompt: &'a str,
363    pub logfile_prefix: &'a str,
364    pub runtime: &'a mut PipelineRuntime<'b>,
365    pub registry: &'a AgentRegistry,
366    pub primary_agent: &'a str,
367    pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
368}
369
370/// Run a command with automatic fallback to alternative agents on failure.
371pub fn run_with_fallback(
372    role: AgentRole,
373    base_label: &str,
374    prompt: &str,
375    logfile_prefix: &str,
376    runtime: &mut PipelineRuntime<'_>,
377    registry: &AgentRegistry,
378    primary_agent: &str,
379) -> std::io::Result<i32> {
380    let mut config = FallbackConfig {
381        role,
382        base_label,
383        prompt,
384        logfile_prefix,
385        runtime,
386        registry,
387        primary_agent,
388        output_validator: None,
389    };
390    run_with_fallback_internal(&mut config)
391}
392
393/// Run a command with automatic fallback to alternative agents on failure.
394///
395/// Includes an optional output validator callback that checks if the agent
396/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
397///
398/// This variant takes a `FallbackConfig` directly for cases where you need
399/// to specify an output validator.
400pub fn run_with_fallback_and_validator(
401    config: &mut FallbackConfig<'_, '_>,
402) -> std::io::Result<i32> {
403    run_with_fallback_internal(config)
404}
405
406/// Run a command with automatic fallback to alternative agents on failure.
407///
408/// Includes an optional output validator callback that checks if the agent
409/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
410fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
411    let fallback_config = config.registry.fallback_config();
412    let fallbacks = config.registry.available_fallbacks(config.role);
413    if fallback_config.has_fallbacks(config.role) {
414        config.runtime.logger.info(&format!(
415            "Agent fallback chain for {}: {}",
416            config.role,
417            fallbacks.join(", ")
418        ));
419    } else {
420        config.runtime.logger.info(&format!(
421            "No configured fallbacks for {}, using primary only",
422            config.role
423        ));
424    }
425
426    let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
427    let (cli_model_override, cli_provider_override) =
428        get_cli_overrides(config.role, config.runtime);
429
430    for cycle in 0..fallback_config.max_cycles {
431        if cycle > 0 {
432            let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
433            config.runtime.logger.info(&format!(
434                "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
435                cycle + 1,
436                fallback_config.max_cycles,
437                backoff_ms
438            ));
439            std::thread::sleep(std::time::Duration::from_millis(backoff_ms));
440        }
441
442        for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
443            let ctx = TryAgentContext {
444                agent_name,
445                agent_index,
446                cycle,
447                role: config.role,
448                base_label: config.base_label,
449                prompt: config.prompt,
450                logfile_prefix: config.logfile_prefix,
451                cli_model_override: cli_model_override.as_ref(),
452                cli_provider_override: cli_provider_override.as_ref(),
453                output_validator: config.output_validator,
454            };
455            let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
456
457            match result {
458                TrySingleAgentResult::Success => return Ok(0),
459                TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
460                TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
461            }
462        }
463    }
464
465    config.runtime.logger.error(&format!(
466        "All agents exhausted after {} cycles with exponential backoff",
467        fallback_config.max_cycles
468    ));
469    Ok(1)
470}
471
472#[cfg(test)]
473mod tests {
474    use std::sync::Mutex;
475
476    static ENV_MUTEX: Mutex<()> = Mutex::new(());
477
478    struct EnvGuard {
479        key: &'static str,
480        prev: Option<std::ffi::OsString>,
481    }
482
483    impl EnvGuard {
484        fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
485            let _lock = ENV_MUTEX
486                .lock()
487                .unwrap_or_else(std::sync::PoisonError::into_inner);
488            vars.iter()
489                .map(|&(key, value)| {
490                    let prev = std::env::var_os(key);
491                    std::env::set_var(key, value);
492                    Self { key, prev }
493                })
494                .collect()
495        }
496    }
497
498    impl Drop for EnvGuard {
499        fn drop(&mut self) {
500            match self.prev.take() {
501                Some(v) => std::env::set_var(self.key, v),
502                None => std::env::remove_var(self.key),
503            }
504        }
505    }
506
507    /// Test that environment variable sanitization works correctly.
508    ///
509    /// This regression test ensures that when running agents with empty `env_vars`,
510    /// GLM/CCS environment variables from the parent shell are NOT passed to
511    /// the subprocess. This is critical for preventing "Invalid API key" errors
512    /// when switching between GLM (CCS) and standard Claude agents.
513    ///
514    /// The test:
515    /// 1. Sets GLM-like environment variables in the test process
516    /// 2. Creates a Command that would be used for an agent with empty `env_vars`
517    /// 3. Verifies that the problematic Anthropic env vars are cleared
518    #[test]
519    fn test_runner_sanitizes_anthropic_env_vars() {
520        // Anthropic environment variables to sanitize
521        const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
522            "ANTHROPIC_API_KEY",
523            "ANTHROPIC_BASE_URL",
524            "ANTHROPIC_AUTH_TOKEN",
525            "ANTHROPIC_MODEL",
526            "ANTHROPIC_DEFAULT_HAIKU_MODEL",
527            "ANTHROPIC_DEFAULT_OPUS_MODEL",
528            "ANTHROPIC_DEFAULT_SONNET_MODEL",
529        ];
530
531        let _guard = EnvGuard::set_multiple(&[
532            ("ANTHROPIC_API_KEY", "test-token-glm"),
533            ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
534        ]);
535
536        // Simulate running an agent with empty env_vars (like codex)
537        // The ANTHROPIC_* vars should be sanitized from the parent environment
538        let mut cmd = std::process::Command::new("printenv");
539        for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
540            cmd.env_remove(var);
541        }
542
543        // Execute the command and check that GLM variables are NOT present
544        let output = match cmd.output() {
545            Ok(o) => o,
546            Err(e) => {
547                // printenv might not be available on all systems
548                eprintln!("Skipping test: printenv not available ({e})");
549                return;
550            }
551        };
552        let stdout = String::from_utf8_lossy(&output.stdout);
553
554        // The GLM-set variables should NOT be in the subprocess environment
555        // (they were sanitized by env_remove)
556        assert!(!stdout.contains("test-token-glm"));
557        assert!(!stdout.contains("https://glm.example.com"));
558    }
559
560    #[test]
561    fn test_runner_does_not_sanitize_explicit_env_vars() {
562        // If an agent explicitly sets ANTHROPIC_API_KEY in its env_vars,
563        // that should NOT be sanitized
564
565        let mut cmd = std::process::Command::new("printenv");
566
567        // Simulate agent setting its own ANTHROPIC_API_KEY
568        let agent_env_vars =
569            std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
570
571        // First, sanitize all Anthropic vars
572        for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
573            cmd.env_remove(var);
574        }
575
576        // Then, apply agent's env_vars (which should NOT be sanitized)
577        for (key, value) in &agent_env_vars {
578            cmd.env(key, value);
579        }
580
581        let output = match cmd.output() {
582            Ok(o) => o,
583            Err(e) => {
584                // printenv might not be available on all systems
585                eprintln!("Skipping test: printenv not available ({e})");
586                return;
587            }
588        };
589        let stdout = String::from_utf8_lossy(&output.stdout);
590
591        // The agent-specific key should be present
592        assert!(stdout.contains("agent-specific-key"));
593    }
594}