ralph_workflow/pipeline/
runner.rs

1//! Command execution helpers and fallback orchestration.
2
3use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5use std::sync::Arc;
6
7use super::fallback::try_agent_with_retries;
8use super::fallback::TryAgentResult;
9use super::model_flag::resolve_model_with_provider;
10use super::prompt::PipelineRuntime;
11
12/// Build the list of agents to try and log the fallback chain.
13fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
14    let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
15    for fb in fallbacks {
16        if *fb != primary_agent && !agents_to_try.contains(fb) {
17            agents_to_try.push(fb);
18        }
19    }
20    agents_to_try
21}
22
23/// Get CLI model/provider overrides based on role.
24fn get_cli_overrides(
25    role: AgentRole,
26    runtime: &PipelineRuntime<'_>,
27) -> (Option<String>, Option<String>) {
28    match role {
29        AgentRole::Developer => (
30            runtime.config.developer_model.clone(),
31            runtime.config.developer_provider.clone(),
32        ),
33        AgentRole::Reviewer => (
34            runtime.config.reviewer_model.clone(),
35            runtime.config.reviewer_provider.clone(),
36        ),
37        AgentRole::Commit => (None, None), // Commit role doesn't have CLI overrides
38    }
39}
40
41/// Context for building model flags.
42struct ModelFlagBuildContext<'a> {
43    agent_index: usize,
44    cli_model_override: Option<&'a String>,
45    cli_provider_override: Option<&'a String>,
46    agent_config: &'a AgentConfig,
47    agent_name: &'a str,
48    fallback_config: &'a crate::agents::fallback::FallbackConfig,
49    display_name: &'a str,
50    runtime: &'a PipelineRuntime<'a>,
51}
52
53/// Build the list of model flags to try for an agent.
54fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
55    let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
56
57    // CLI override takes highest priority for primary agent
58    // Provider override can modify the model's provider prefix
59    if ctx.agent_index == 0
60        && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
61    {
62        let resolved = resolve_model_with_provider(
63            ctx.cli_provider_override.map(std::string::String::as_str),
64            ctx.cli_model_override.map(std::string::String::as_str),
65            ctx.agent_config.model_flag.as_deref(),
66        );
67        if resolved.is_some() {
68            model_flags_to_try.push(resolved);
69        }
70    }
71
72    // Add the agent's default model (None means use agent's configured model_flag or no model)
73    if model_flags_to_try.is_empty() {
74        model_flags_to_try.push(None);
75    }
76
77    // Add provider fallback models for this agent
78    if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
79        let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
80        ctx.runtime.logger.info(&format!(
81            "Agent '{}' has {} provider fallback(s) configured",
82            ctx.display_name,
83            provider_fallbacks.len()
84        ));
85        for model in provider_fallbacks {
86            model_flags_to_try.push(Some(model.clone()));
87        }
88    }
89
90    model_flags_to_try
91}
92
93/// Build the command string for a specific model configuration.
94fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
95    let model_ref = ctx.model_flag.map(std::string::String::as_str);
96    // Enable yolo for Developer role always.
97    // For Reviewer and Commit, only enable in fix mode (detected via base_label starting with "fix").
98    let yolo = matches!(ctx.role, AgentRole::Developer)
99        || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
100        || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
101
102    if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
103        // For primary agent on first cycle, respect env var command overrides
104        match ctx.role {
105            AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
106                ctx.agent_config
107                    .build_cmd_with_model(true, true, true, model_ref)
108            }),
109            AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
110                ctx.agent_config
111                    .build_cmd_with_model(true, true, yolo, model_ref)
112            }),
113            AgentRole::Commit => runtime.config.commit_cmd.clone().unwrap_or_else(|| {
114                ctx.agent_config
115                    .build_cmd_with_model(true, true, yolo, model_ref)
116            }),
117        }
118    } else {
119        ctx.agent_config
120            .build_cmd_with_model(true, true, yolo, model_ref)
121    }
122}
123
124/// GLM-specific validation for print flag.
125///
126/// This validation only applies to CCS/Claude-based GLM agents that use the `-p` flag
127/// for non-interactive mode. OpenCode agents are excluded because they use
128/// `--auto-approve` for non-interactive mode instead.
129fn validate_glm_print_flag(
130    agent_name: &str,
131    agent_config: &AgentConfig,
132    cmd_str: &str,
133    agent_index: usize,
134    cycle: u32,
135    model_index: usize,
136    runtime: &PipelineRuntime<'_>,
137) {
138    // Skip validation for non-CCS/Claude GLM agents
139    // is_glm_like_agent only matches CCS/Claude-based GLM agents, not OpenCode
140    if !crate::agents::is_glm_like_agent(agent_name)
141        || agent_index != 0
142        || cycle != 0
143        || model_index != 0
144    {
145        return;
146    }
147
148    let cmd_argv = split_command(cmd_str).ok();
149    let has_print_flag = cmd_argv
150        .as_ref()
151        .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
152    if !has_print_flag {
153        if agent_config.print_flag.is_empty() {
154            runtime.logger.warn(&format!(
155                "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
156                 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
157            ));
158        } else {
159            runtime.logger.warn(&format!(
160                "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
161            ));
162        }
163    }
164}
165
166/// Build label and logfile paths for execution.
167fn build_execution_metadata(
168    model_flag: Option<&String>,
169    display_name: &str,
170    base_label: &str,
171    agent_name: &str,
172    logfile_prefix: &str,
173    model_index: usize,
174) -> (String, String, String) {
175    let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
176    let display_name_with_suffix = format!("{display_name}{model_suffix}");
177    let label = format!("{base_label} ({display_name_with_suffix})");
178    // Sanitize agent name for log file path - replace "/" with "-" to avoid
179    // creating subdirectories (e.g., "ccs/glm" -> "ccs-glm")
180    let safe_agent_name = agent_name.replace('/', "-");
181    let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
182    (label, logfile, display_name_with_suffix)
183}
184
185/// Result of trying a single agent.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187enum TrySingleAgentResult {
188    /// Agent succeeded - return success
189    Success,
190    /// Unrecoverable error - abort immediately
191    Unrecoverable(i32),
192    /// Should fall back to next agent
193    Fallback,
194    /// Continue to next model (no retry)
195    NoRetry,
196}
197
198/// Context for trying a single model.
199struct TryModelContext<'a> {
200    agent_config: &'a AgentConfig,
201    agent_name: &'a str,
202    display_name: &'a str,
203    agent_index: usize,
204    cycle: u32,
205    model_index: usize,
206    role: AgentRole,
207    model_flag: Option<&'a String>,
208    base_label: &'a str,
209    prompt: &'a str,
210    logfile_prefix: &'a str,
211    fallback_config: &'a crate::agents::fallback::FallbackConfig,
212    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
213    retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
214}
215
216/// Try a single model configuration for an agent.
217fn try_single_model(
218    ctx: &TryModelContext<'_>,
219    runtime: &mut PipelineRuntime<'_>,
220) -> std::io::Result<TrySingleAgentResult> {
221    let mut parser_type = ctx.agent_config.json_parser;
222
223    if ctx.role == AgentRole::Reviewer {
224        if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
225            parser_type = JsonParserType::parse(parser_override);
226            if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
227                runtime.logger.info(&format!(
228                    "Using JSON parser override '{parser_override}' for reviewer"
229                ));
230            }
231        }
232    }
233
234    let cmd_str = build_command_for_model(ctx, runtime);
235
236    validate_glm_print_flag(
237        ctx.agent_name,
238        ctx.agent_config,
239        &cmd_str,
240        ctx.agent_index,
241        ctx.cycle,
242        ctx.model_index,
243        runtime,
244    );
245
246    let (label, logfile, display_name_with_suffix) = build_execution_metadata(
247        ctx.model_flag,
248        ctx.display_name,
249        ctx.base_label,
250        ctx.agent_name,
251        ctx.logfile_prefix,
252        ctx.model_index,
253    );
254
255    let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
256        agent_name: ctx.agent_name,
257        model_flag: ctx.model_flag.map(std::string::String::as_str),
258        label: &label,
259        display_name: &display_name_with_suffix,
260        cmd_str: &cmd_str,
261        prompt: ctx.prompt,
262        logfile: &logfile,
263        logfile_prefix: ctx.logfile_prefix,
264        parser_type,
265        env_vars: &ctx.agent_config.env_vars,
266        model_index: ctx.model_index,
267        agent_index: ctx.agent_index,
268        cycle: ctx.cycle as usize,
269        fallback_config: ctx.fallback_config,
270        output_validator: ctx.output_validator,
271        retry_timer: Arc::clone(&ctx.retry_timer),
272    };
273    let result = try_agent_with_retries(&attempt_config, runtime)?;
274
275    match result {
276        TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
277        TryAgentResult::Unrecoverable(exit_code) => {
278            Ok(TrySingleAgentResult::Unrecoverable(exit_code))
279        }
280        TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
281        TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
282    }
283}
284
285/// Context for trying a single agent.
286struct TryAgentContext<'a> {
287    agent_name: &'a str,
288    agent_index: usize,
289    cycle: u32,
290    role: AgentRole,
291    base_label: &'a str,
292    prompt: &'a str,
293    logfile_prefix: &'a str,
294    cli_model_override: Option<&'a String>,
295    cli_provider_override: Option<&'a String>,
296    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
297    retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
298}
299
300/// Try a single agent with all its model configurations.
301fn try_single_agent(
302    ctx: &TryAgentContext<'_>,
303    runtime: &mut PipelineRuntime<'_>,
304    registry: &AgentRegistry,
305    fallback_config: &crate::agents::fallback::FallbackConfig,
306) -> std::io::Result<TrySingleAgentResult> {
307    let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
308        runtime.logger.warn(&format!(
309            "Agent '{}' not found in registry, skipping",
310            ctx.agent_name
311        ));
312        return Ok(TrySingleAgentResult::Fallback);
313    };
314
315    let display_name = registry.display_name(ctx.agent_name);
316    let model_ctx = ModelFlagBuildContext {
317        agent_index: ctx.agent_index,
318        cli_model_override: ctx.cli_model_override,
319        cli_provider_override: ctx.cli_provider_override,
320        agent_config: &agent_config,
321        agent_name: ctx.agent_name,
322        fallback_config,
323        display_name: &display_name,
324        runtime,
325    };
326    let model_flags_to_try = build_model_flags_list(&model_ctx);
327
328    if ctx.agent_index == 0 && ctx.cycle == 0 {
329        for model_flag in model_flags_to_try.iter().flatten() {
330            for warning in validate_model_flag(model_flag) {
331                runtime.logger.warn(&warning);
332            }
333        }
334    }
335
336    for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
337        let model_ctx = TryModelContext {
338            agent_config: &agent_config,
339            agent_name: ctx.agent_name,
340            display_name: &display_name,
341            agent_index: ctx.agent_index,
342            cycle: ctx.cycle,
343            model_index,
344            role: ctx.role,
345            model_flag: model_flag.as_ref(),
346            base_label: ctx.base_label,
347            prompt: ctx.prompt,
348            logfile_prefix: ctx.logfile_prefix,
349            fallback_config,
350            output_validator: ctx.output_validator,
351            retry_timer: Arc::clone(&ctx.retry_timer),
352        };
353        let result = try_single_model(&model_ctx, runtime)?;
354
355        match result {
356            TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
357            TrySingleAgentResult::Unrecoverable(exit_code) => {
358                return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
359            }
360            TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
361            TrySingleAgentResult::NoRetry => {}
362        }
363    }
364
365    Ok(TrySingleAgentResult::NoRetry)
366}
367
368/// Configuration for running with fallback.
369pub struct FallbackConfig<'a, 'b> {
370    pub role: AgentRole,
371    pub base_label: &'a str,
372    pub prompt: &'a str,
373    pub logfile_prefix: &'a str,
374    pub runtime: &'a mut PipelineRuntime<'b>,
375    pub registry: &'a AgentRegistry,
376    pub primary_agent: &'a str,
377    pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
378}
379
380/// Run a command with automatic fallback to alternative agents on failure.
381pub fn run_with_fallback(
382    role: AgentRole,
383    base_label: &str,
384    prompt: &str,
385    logfile_prefix: &str,
386    runtime: &mut PipelineRuntime<'_>,
387    registry: &AgentRegistry,
388    primary_agent: &str,
389) -> std::io::Result<i32> {
390    let mut config = FallbackConfig {
391        role,
392        base_label,
393        prompt,
394        logfile_prefix,
395        runtime,
396        registry,
397        primary_agent,
398        output_validator: None,
399    };
400    run_with_fallback_internal(&mut config)
401}
402
403/// Run a command with automatic fallback to alternative agents on failure.
404///
405/// Includes an optional output validator callback that checks if the agent
406/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
407///
408/// This variant takes a `FallbackConfig` directly for cases where you need
409/// to specify an output validator.
410pub fn run_with_fallback_and_validator(
411    config: &mut FallbackConfig<'_, '_>,
412) -> std::io::Result<i32> {
413    run_with_fallback_internal(config)
414}
415
416/// Run a command with automatic fallback to alternative agents on failure.
417///
418/// Includes an optional output validator callback that checks if the agent
419/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
420fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
421    let fallback_config = config.registry.fallback_config();
422    let fallbacks = config.registry.available_fallbacks(config.role);
423    if fallback_config.has_fallbacks(config.role) {
424        config.runtime.logger.info(&format!(
425            "Agent fallback chain for {}: {}",
426            config.role,
427            fallbacks.join(", ")
428        ));
429    } else {
430        config.runtime.logger.info(&format!(
431            "No configured fallbacks for {}, using primary only",
432            config.role
433        ));
434    }
435
436    let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
437    let (cli_model_override, cli_provider_override) =
438        get_cli_overrides(config.role, config.runtime);
439
440    for cycle in 0..fallback_config.max_cycles {
441        if cycle > 0 {
442            let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
443            config.runtime.logger.info(&format!(
444                "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
445                cycle + 1,
446                fallback_config.max_cycles,
447                backoff_ms
448            ));
449            config
450                .registry
451                .retry_timer()
452                .sleep(std::time::Duration::from_millis(backoff_ms));
453        }
454
455        for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
456            let ctx = TryAgentContext {
457                agent_name,
458                agent_index,
459                cycle,
460                role: config.role,
461                base_label: config.base_label,
462                prompt: config.prompt,
463                logfile_prefix: config.logfile_prefix,
464                cli_model_override: cli_model_override.as_ref(),
465                cli_provider_override: cli_provider_override.as_ref(),
466                output_validator: config.output_validator,
467                retry_timer: config.registry.retry_timer(),
468            };
469            let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
470
471            match result {
472                TrySingleAgentResult::Success => return Ok(0),
473                TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
474                TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
475            }
476        }
477    }
478
479    config.runtime.logger.error(&format!(
480        "All agents exhausted after {} cycles with exponential backoff",
481        fallback_config.max_cycles
482    ));
483    Ok(1)
484}
485
486#[cfg(test)]
487mod tests {
488    use std::sync::Mutex;
489
490    static ENV_MUTEX: Mutex<()> = Mutex::new(());
491
492    struct EnvGuard {
493        key: &'static str,
494        prev: Option<std::ffi::OsString>,
495    }
496
497    impl EnvGuard {
498        fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
499            let _lock = ENV_MUTEX
500                .lock()
501                .unwrap_or_else(std::sync::PoisonError::into_inner);
502            vars.iter()
503                .map(|&(key, value)| {
504                    let prev = std::env::var_os(key);
505                    std::env::set_var(key, value);
506                    Self { key, prev }
507                })
508                .collect()
509        }
510    }
511
512    impl Drop for EnvGuard {
513        fn drop(&mut self) {
514            match self.prev.take() {
515                Some(v) => std::env::set_var(self.key, v),
516                None => std::env::remove_var(self.key),
517            }
518        }
519    }
520
521    /// Test that environment variable sanitization works correctly.
522    ///
523    /// This regression test ensures that when running agents with empty `env_vars`,
524    /// GLM/CCS environment variables from the parent shell are NOT passed to
525    /// the subprocess. This is critical for preventing "Invalid API key" errors
526    /// when switching between GLM (CCS) and standard Claude agents.
527    ///
528    /// The test:
529    /// 1. Sets GLM-like environment variables in the test process
530    /// 2. Creates a Command that would be used for an agent with empty `env_vars`
531    /// 3. Verifies that the problematic Anthropic env vars are cleared
532    #[test]
533    fn test_runner_sanitizes_anthropic_env_vars() {
534        // Anthropic environment variables to sanitize
535        const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
536            "ANTHROPIC_API_KEY",
537            "ANTHROPIC_BASE_URL",
538            "ANTHROPIC_AUTH_TOKEN",
539            "ANTHROPIC_MODEL",
540            "ANTHROPIC_DEFAULT_HAIKU_MODEL",
541            "ANTHROPIC_DEFAULT_OPUS_MODEL",
542            "ANTHROPIC_DEFAULT_SONNET_MODEL",
543        ];
544
545        let _guard = EnvGuard::set_multiple(&[
546            ("ANTHROPIC_API_KEY", "test-token-glm"),
547            ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
548        ]);
549
550        // Simulate running an agent with empty env_vars (like codex)
551        // The ANTHROPIC_* vars should be sanitized from the parent environment
552        let mut cmd = std::process::Command::new("printenv");
553        for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
554            cmd.env_remove(var);
555        }
556
557        // Execute the command and check that GLM variables are NOT present
558        let output = match cmd.output() {
559            Ok(o) => o,
560            Err(e) => {
561                // printenv might not be available on all systems
562                eprintln!("Skipping test: printenv not available ({e})");
563                return;
564            }
565        };
566        let stdout = String::from_utf8_lossy(&output.stdout);
567
568        // The GLM-set variables should NOT be in the subprocess environment
569        // (they were sanitized by env_remove)
570        assert!(!stdout.contains("test-token-glm"));
571        assert!(!stdout.contains("https://glm.example.com"));
572    }
573
574    #[test]
575    fn test_runner_does_not_sanitize_explicit_env_vars() {
576        // If an agent explicitly sets ANTHROPIC_API_KEY in its env_vars,
577        // that should NOT be sanitized
578
579        let mut cmd = std::process::Command::new("printenv");
580
581        // Simulate agent setting its own ANTHROPIC_API_KEY
582        let agent_env_vars =
583            std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
584
585        // First, sanitize all Anthropic vars
586        for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
587            cmd.env_remove(var);
588        }
589
590        // Then, apply agent's env_vars (which should NOT be sanitized)
591        for (key, value) in &agent_env_vars {
592            cmd.env(key, value);
593        }
594
595        let output = match cmd.output() {
596            Ok(o) => o,
597            Err(e) => {
598                // printenv might not be available on all systems
599                eprintln!("Skipping test: printenv not available ({e})");
600                return;
601            }
602        };
603        let stdout = String::from_utf8_lossy(&output.stdout);
604
605        // The agent-specific key should be present
606        assert!(stdout.contains("agent-specific-key"));
607    }
608}