ralph_workflow/pipeline/
runner.rs

1//! Command execution helpers and fallback orchestration.
2
3use crate::agents::{validate_model_flag, AgentConfig, AgentRegistry, AgentRole, JsonParserType};
4use crate::common::split_command;
5use std::sync::Arc;
6
7use super::fallback::try_agent_with_retries;
8use super::fallback::TryAgentResult;
9use super::model_flag::resolve_model_with_provider;
10use super::prompt::PipelineRuntime;
11
12/// Build the list of agents to try and log the fallback chain.
13fn build_agents_to_try<'a>(fallbacks: &'a [&'a str], primary_agent: &'a str) -> Vec<&'a str> {
14    let mut agents_to_try: Vec<&'a str> = vec![primary_agent];
15    for fb in fallbacks {
16        if *fb != primary_agent && !agents_to_try.contains(fb) {
17            agents_to_try.push(fb);
18        }
19    }
20    agents_to_try
21}
22
23/// Get CLI model/provider overrides based on role.
24fn get_cli_overrides(
25    role: AgentRole,
26    runtime: &PipelineRuntime<'_>,
27) -> (Option<String>, Option<String>) {
28    match role {
29        AgentRole::Developer => (
30            runtime.config.developer_model.clone(),
31            runtime.config.developer_provider.clone(),
32        ),
33        AgentRole::Reviewer => (
34            runtime.config.reviewer_model.clone(),
35            runtime.config.reviewer_provider.clone(),
36        ),
37        AgentRole::Commit => (None, None), // Commit role doesn't have CLI overrides
38    }
39}
40
41/// Context for building model flags.
42struct ModelFlagBuildContext<'a> {
43    agent_index: usize,
44    cli_model_override: Option<&'a String>,
45    cli_provider_override: Option<&'a String>,
46    agent_config: &'a AgentConfig,
47    agent_name: &'a str,
48    fallback_config: &'a crate::agents::fallback::FallbackConfig,
49    display_name: &'a str,
50    runtime: &'a PipelineRuntime<'a>,
51}
52
53/// Build the list of model flags to try for an agent.
54fn build_model_flags_list(ctx: &ModelFlagBuildContext<'_>) -> Vec<Option<String>> {
55    let mut model_flags_to_try: Vec<Option<String>> = Vec::new();
56
57    // CLI override takes highest priority for primary agent
58    // Provider override can modify the model's provider prefix
59    if ctx.agent_index == 0
60        && (ctx.cli_model_override.is_some() || ctx.cli_provider_override.is_some())
61    {
62        let resolved = resolve_model_with_provider(
63            ctx.cli_provider_override.map(std::string::String::as_str),
64            ctx.cli_model_override.map(std::string::String::as_str),
65            ctx.agent_config.model_flag.as_deref(),
66        );
67        if resolved.is_some() {
68            model_flags_to_try.push(resolved);
69        }
70    }
71
72    // Add the agent's default model (None means use agent's configured model_flag or no model)
73    if model_flags_to_try.is_empty() {
74        model_flags_to_try.push(None);
75    }
76
77    // Add provider fallback models for this agent
78    if ctx.fallback_config.has_provider_fallbacks(ctx.agent_name) {
79        let provider_fallbacks = ctx.fallback_config.get_provider_fallbacks(ctx.agent_name);
80        ctx.runtime.logger.info(&format!(
81            "Agent '{}' has {} provider fallback(s) configured",
82            ctx.display_name,
83            provider_fallbacks.len()
84        ));
85        for model in provider_fallbacks {
86            model_flags_to_try.push(Some(model.clone()));
87        }
88    }
89
90    model_flags_to_try
91}
92
93/// Build the command string for a specific model configuration.
94fn build_command_for_model(ctx: &TryModelContext<'_>, runtime: &PipelineRuntime<'_>) -> String {
95    let model_ref = ctx.model_flag.map(std::string::String::as_str);
96    // Enable yolo for Developer role always.
97    // For Reviewer and Commit, only enable in fix mode (detected via base_label starting with "fix").
98    let yolo = matches!(ctx.role, AgentRole::Developer)
99        || (ctx.role == AgentRole::Commit && ctx.base_label.starts_with("fix"))
100        || (ctx.role == AgentRole::Reviewer && ctx.base_label.starts_with("fix"));
101
102    if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
103        // For primary agent on first cycle, respect env var command overrides
104        match ctx.role {
105            AgentRole::Developer => runtime.config.developer_cmd.clone().unwrap_or_else(|| {
106                ctx.agent_config
107                    .build_cmd_with_model(true, true, true, model_ref)
108            }),
109            AgentRole::Reviewer => runtime.config.reviewer_cmd.clone().unwrap_or_else(|| {
110                ctx.agent_config
111                    .build_cmd_with_model(true, true, yolo, model_ref)
112            }),
113            AgentRole::Commit => runtime.config.commit_cmd.clone().unwrap_or_else(|| {
114                ctx.agent_config
115                    .build_cmd_with_model(true, true, yolo, model_ref)
116            }),
117        }
118    } else {
119        ctx.agent_config
120            .build_cmd_with_model(true, true, yolo, model_ref)
121    }
122}
123
124/// GLM-specific validation for print flag.
125fn validate_glm_print_flag(
126    agent_name: &str,
127    agent_config: &AgentConfig,
128    cmd_str: &str,
129    agent_index: usize,
130    cycle: u32,
131    model_index: usize,
132    runtime: &PipelineRuntime<'_>,
133) {
134    if !crate::agents::is_glm_like_agent(agent_name)
135        || agent_index != 0
136        || cycle != 0
137        || model_index != 0
138    {
139        return;
140    }
141
142    let cmd_argv = split_command(cmd_str).ok();
143    let has_print_flag = cmd_argv
144        .as_ref()
145        .is_some_and(|argv| argv.iter().any(|arg| arg == "-p"));
146    if !has_print_flag {
147        if agent_config.print_flag.is_empty() {
148            runtime.logger.warn(&format!(
149                "GLM agent '{agent_name}' is missing '-p' flag: print_flag is empty in configuration. \
150                 Add 'print_flag = \"-p\"' to [ccs] section in ~/.config/ralph-workflow.toml"
151            ));
152        } else {
153            runtime.logger.warn(&format!(
154                "GLM agent '{agent_name}' may be missing '-p' flag in command. Check configuration."
155            ));
156        }
157    }
158}
159
160/// Build label and logfile paths for execution.
161fn build_execution_metadata(
162    model_flag: Option<&String>,
163    display_name: &str,
164    base_label: &str,
165    agent_name: &str,
166    logfile_prefix: &str,
167    model_index: usize,
168) -> (String, String, String) {
169    let model_suffix = model_flag.map(|m| format!(" [{m}]")).unwrap_or_default();
170    let display_name_with_suffix = format!("{display_name}{model_suffix}");
171    let label = format!("{base_label} ({display_name_with_suffix})");
172    // Sanitize agent name for log file path - replace "/" with "-" to avoid
173    // creating subdirectories (e.g., "ccs/glm" -> "ccs-glm")
174    let safe_agent_name = agent_name.replace('/', "-");
175    let logfile = format!("{logfile_prefix}_{safe_agent_name}_{model_index}.log");
176    (label, logfile, display_name_with_suffix)
177}
178
179/// Result of trying a single agent.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181enum TrySingleAgentResult {
182    /// Agent succeeded - return success
183    Success,
184    /// Unrecoverable error - abort immediately
185    Unrecoverable(i32),
186    /// Should fall back to next agent
187    Fallback,
188    /// Continue to next model (no retry)
189    NoRetry,
190}
191
192/// Context for trying a single model.
193struct TryModelContext<'a> {
194    agent_config: &'a AgentConfig,
195    agent_name: &'a str,
196    display_name: &'a str,
197    agent_index: usize,
198    cycle: u32,
199    model_index: usize,
200    role: AgentRole,
201    model_flag: Option<&'a String>,
202    base_label: &'a str,
203    prompt: &'a str,
204    logfile_prefix: &'a str,
205    fallback_config: &'a crate::agents::fallback::FallbackConfig,
206    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
207    retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
208}
209
210/// Try a single model configuration for an agent.
211fn try_single_model(
212    ctx: &TryModelContext<'_>,
213    runtime: &mut PipelineRuntime<'_>,
214) -> std::io::Result<TrySingleAgentResult> {
215    let mut parser_type = ctx.agent_config.json_parser;
216
217    if ctx.role == AgentRole::Reviewer {
218        if let Some(ref parser_override) = runtime.config.reviewer_json_parser {
219            parser_type = JsonParserType::parse(parser_override);
220            if ctx.agent_index == 0 && ctx.cycle == 0 && ctx.model_index == 0 {
221                runtime.logger.info(&format!(
222                    "Using JSON parser override '{parser_override}' for reviewer"
223                ));
224            }
225        }
226    }
227
228    let cmd_str = build_command_for_model(ctx, runtime);
229
230    validate_glm_print_flag(
231        ctx.agent_name,
232        ctx.agent_config,
233        &cmd_str,
234        ctx.agent_index,
235        ctx.cycle,
236        ctx.model_index,
237        runtime,
238    );
239
240    let (label, logfile, display_name_with_suffix) = build_execution_metadata(
241        ctx.model_flag,
242        ctx.display_name,
243        ctx.base_label,
244        ctx.agent_name,
245        ctx.logfile_prefix,
246        ctx.model_index,
247    );
248
249    let attempt_config = crate::pipeline::fallback::AgentAttemptConfig {
250        agent_name: ctx.agent_name,
251        model_flag: ctx.model_flag.map(std::string::String::as_str),
252        label: &label,
253        display_name: &display_name_with_suffix,
254        cmd_str: &cmd_str,
255        prompt: ctx.prompt,
256        logfile: &logfile,
257        logfile_prefix: ctx.logfile_prefix,
258        parser_type,
259        env_vars: &ctx.agent_config.env_vars,
260        model_index: ctx.model_index,
261        agent_index: ctx.agent_index,
262        cycle: ctx.cycle as usize,
263        fallback_config: ctx.fallback_config,
264        output_validator: ctx.output_validator,
265        retry_timer: Arc::clone(&ctx.retry_timer),
266    };
267    let result = try_agent_with_retries(&attempt_config, runtime)?;
268
269    match result {
270        TryAgentResult::Success => Ok(TrySingleAgentResult::Success),
271        TryAgentResult::Unrecoverable(exit_code) => {
272            Ok(TrySingleAgentResult::Unrecoverable(exit_code))
273        }
274        TryAgentResult::Fallback => Ok(TrySingleAgentResult::Fallback),
275        TryAgentResult::NoRetry => Ok(TrySingleAgentResult::NoRetry),
276    }
277}
278
279/// Context for trying a single agent.
280struct TryAgentContext<'a> {
281    agent_name: &'a str,
282    agent_index: usize,
283    cycle: u32,
284    role: AgentRole,
285    base_label: &'a str,
286    prompt: &'a str,
287    logfile_prefix: &'a str,
288    cli_model_override: Option<&'a String>,
289    cli_provider_override: Option<&'a String>,
290    output_validator: Option<crate::pipeline::fallback::OutputValidator>,
291    retry_timer: Arc<dyn crate::agents::RetryTimerProvider>,
292}
293
294/// Try a single agent with all its model configurations.
295fn try_single_agent(
296    ctx: &TryAgentContext<'_>,
297    runtime: &mut PipelineRuntime<'_>,
298    registry: &AgentRegistry,
299    fallback_config: &crate::agents::fallback::FallbackConfig,
300) -> std::io::Result<TrySingleAgentResult> {
301    let Some(agent_config) = registry.resolve_config(ctx.agent_name) else {
302        runtime.logger.warn(&format!(
303            "Agent '{}' not found in registry, skipping",
304            ctx.agent_name
305        ));
306        return Ok(TrySingleAgentResult::Fallback);
307    };
308
309    let display_name = registry.display_name(ctx.agent_name);
310    let model_ctx = ModelFlagBuildContext {
311        agent_index: ctx.agent_index,
312        cli_model_override: ctx.cli_model_override,
313        cli_provider_override: ctx.cli_provider_override,
314        agent_config: &agent_config,
315        agent_name: ctx.agent_name,
316        fallback_config,
317        display_name: &display_name,
318        runtime,
319    };
320    let model_flags_to_try = build_model_flags_list(&model_ctx);
321
322    if ctx.agent_index == 0 && ctx.cycle == 0 {
323        for model_flag in model_flags_to_try.iter().flatten() {
324            for warning in validate_model_flag(model_flag) {
325                runtime.logger.warn(&warning);
326            }
327        }
328    }
329
330    for (model_index, model_flag) in model_flags_to_try.iter().enumerate() {
331        let model_ctx = TryModelContext {
332            agent_config: &agent_config,
333            agent_name: ctx.agent_name,
334            display_name: &display_name,
335            agent_index: ctx.agent_index,
336            cycle: ctx.cycle,
337            model_index,
338            role: ctx.role,
339            model_flag: model_flag.as_ref(),
340            base_label: ctx.base_label,
341            prompt: ctx.prompt,
342            logfile_prefix: ctx.logfile_prefix,
343            fallback_config,
344            output_validator: ctx.output_validator,
345            retry_timer: Arc::clone(&ctx.retry_timer),
346        };
347        let result = try_single_model(&model_ctx, runtime)?;
348
349        match result {
350            TrySingleAgentResult::Success => return Ok(TrySingleAgentResult::Success),
351            TrySingleAgentResult::Unrecoverable(exit_code) => {
352                return Ok(TrySingleAgentResult::Unrecoverable(exit_code))
353            }
354            TrySingleAgentResult::Fallback => return Ok(TrySingleAgentResult::Fallback),
355            TrySingleAgentResult::NoRetry => {}
356        }
357    }
358
359    Ok(TrySingleAgentResult::NoRetry)
360}
361
362/// Configuration for running with fallback.
363pub struct FallbackConfig<'a, 'b> {
364    pub role: AgentRole,
365    pub base_label: &'a str,
366    pub prompt: &'a str,
367    pub logfile_prefix: &'a str,
368    pub runtime: &'a mut PipelineRuntime<'b>,
369    pub registry: &'a AgentRegistry,
370    pub primary_agent: &'a str,
371    pub output_validator: Option<crate::pipeline::fallback::OutputValidator>,
372}
373
374/// Run a command with automatic fallback to alternative agents on failure.
375pub fn run_with_fallback(
376    role: AgentRole,
377    base_label: &str,
378    prompt: &str,
379    logfile_prefix: &str,
380    runtime: &mut PipelineRuntime<'_>,
381    registry: &AgentRegistry,
382    primary_agent: &str,
383) -> std::io::Result<i32> {
384    let mut config = FallbackConfig {
385        role,
386        base_label,
387        prompt,
388        logfile_prefix,
389        runtime,
390        registry,
391        primary_agent,
392        output_validator: None,
393    };
394    run_with_fallback_internal(&mut config)
395}
396
397/// Run a command with automatic fallback to alternative agents on failure.
398///
399/// Includes an optional output validator callback that checks if the agent
400/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
401///
402/// This variant takes a `FallbackConfig` directly for cases where you need
403/// to specify an output validator.
404pub fn run_with_fallback_and_validator(
405    config: &mut FallbackConfig<'_, '_>,
406) -> std::io::Result<i32> {
407    run_with_fallback_internal(config)
408}
409
410/// Run a command with automatic fallback to alternative agents on failure.
411///
412/// Includes an optional output validator callback that checks if the agent
413/// produced valid output after `exit_code=0`. If validation fails, triggers fallback.
414fn run_with_fallback_internal(config: &mut FallbackConfig<'_, '_>) -> std::io::Result<i32> {
415    let fallback_config = config.registry.fallback_config();
416    let fallbacks = config.registry.available_fallbacks(config.role);
417    if fallback_config.has_fallbacks(config.role) {
418        config.runtime.logger.info(&format!(
419            "Agent fallback chain for {}: {}",
420            config.role,
421            fallbacks.join(", ")
422        ));
423    } else {
424        config.runtime.logger.info(&format!(
425            "No configured fallbacks for {}, using primary only",
426            config.role
427        ));
428    }
429
430    let agents_to_try = build_agents_to_try(&fallbacks, config.primary_agent);
431    let (cli_model_override, cli_provider_override) =
432        get_cli_overrides(config.role, config.runtime);
433
434    for cycle in 0..fallback_config.max_cycles {
435        if cycle > 0 {
436            let backoff_ms = fallback_config.calculate_backoff(cycle - 1);
437            config.runtime.logger.info(&format!(
438                "Cycle {}/{}: All agents exhausted, waiting {}ms before retry (exponential backoff)...",
439                cycle + 1,
440                fallback_config.max_cycles,
441                backoff_ms
442            ));
443            config
444                .registry
445                .retry_timer()
446                .sleep(std::time::Duration::from_millis(backoff_ms));
447        }
448
449        for (agent_index, agent_name) in agents_to_try.iter().enumerate() {
450            let ctx = TryAgentContext {
451                agent_name,
452                agent_index,
453                cycle,
454                role: config.role,
455                base_label: config.base_label,
456                prompt: config.prompt,
457                logfile_prefix: config.logfile_prefix,
458                cli_model_override: cli_model_override.as_ref(),
459                cli_provider_override: cli_provider_override.as_ref(),
460                output_validator: config.output_validator,
461                retry_timer: config.registry.retry_timer(),
462            };
463            let result = try_single_agent(&ctx, config.runtime, config.registry, fallback_config)?;
464
465            match result {
466                TrySingleAgentResult::Success => return Ok(0),
467                TrySingleAgentResult::Unrecoverable(exit_code) => return Ok(exit_code),
468                TrySingleAgentResult::Fallback | TrySingleAgentResult::NoRetry => {}
469            }
470        }
471    }
472
473    config.runtime.logger.error(&format!(
474        "All agents exhausted after {} cycles with exponential backoff",
475        fallback_config.max_cycles
476    ));
477    Ok(1)
478}
479
480#[cfg(test)]
481mod tests {
482    use std::sync::Mutex;
483
484    static ENV_MUTEX: Mutex<()> = Mutex::new(());
485
486    struct EnvGuard {
487        key: &'static str,
488        prev: Option<std::ffi::OsString>,
489    }
490
491    impl EnvGuard {
492        fn set_multiple(vars: &[(&'static str, &str)]) -> Vec<Self> {
493            let _lock = ENV_MUTEX
494                .lock()
495                .unwrap_or_else(std::sync::PoisonError::into_inner);
496            vars.iter()
497                .map(|&(key, value)| {
498                    let prev = std::env::var_os(key);
499                    std::env::set_var(key, value);
500                    Self { key, prev }
501                })
502                .collect()
503        }
504    }
505
506    impl Drop for EnvGuard {
507        fn drop(&mut self) {
508            match self.prev.take() {
509                Some(v) => std::env::set_var(self.key, v),
510                None => std::env::remove_var(self.key),
511            }
512        }
513    }
514
515    /// Test that environment variable sanitization works correctly.
516    ///
517    /// This regression test ensures that when running agents with empty `env_vars`,
518    /// GLM/CCS environment variables from the parent shell are NOT passed to
519    /// the subprocess. This is critical for preventing "Invalid API key" errors
520    /// when switching between GLM (CCS) and standard Claude agents.
521    ///
522    /// The test:
523    /// 1. Sets GLM-like environment variables in the test process
524    /// 2. Creates a Command that would be used for an agent with empty `env_vars`
525    /// 3. Verifies that the problematic Anthropic env vars are cleared
526    #[test]
527    fn test_runner_sanitizes_anthropic_env_vars() {
528        // Anthropic environment variables to sanitize
529        const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
530            "ANTHROPIC_API_KEY",
531            "ANTHROPIC_BASE_URL",
532            "ANTHROPIC_AUTH_TOKEN",
533            "ANTHROPIC_MODEL",
534            "ANTHROPIC_DEFAULT_HAIKU_MODEL",
535            "ANTHROPIC_DEFAULT_OPUS_MODEL",
536            "ANTHROPIC_DEFAULT_SONNET_MODEL",
537        ];
538
539        let _guard = EnvGuard::set_multiple(&[
540            ("ANTHROPIC_API_KEY", "test-token-glm"),
541            ("ANTHROPIC_BASE_URL", "https://glm.example.com"),
542        ]);
543
544        // Simulate running an agent with empty env_vars (like codex)
545        // The ANTHROPIC_* vars should be sanitized from the parent environment
546        let mut cmd = std::process::Command::new("printenv");
547        for &var in ANTHROPIC_ENV_VARS_TO_SANITIZE {
548            cmd.env_remove(var);
549        }
550
551        // Execute the command and check that GLM variables are NOT present
552        let output = match cmd.output() {
553            Ok(o) => o,
554            Err(e) => {
555                // printenv might not be available on all systems
556                eprintln!("Skipping test: printenv not available ({e})");
557                return;
558            }
559        };
560        let stdout = String::from_utf8_lossy(&output.stdout);
561
562        // The GLM-set variables should NOT be in the subprocess environment
563        // (they were sanitized by env_remove)
564        assert!(!stdout.contains("test-token-glm"));
565        assert!(!stdout.contains("https://glm.example.com"));
566    }
567
568    #[test]
569    fn test_runner_does_not_sanitize_explicit_env_vars() {
570        // If an agent explicitly sets ANTHROPIC_API_KEY in its env_vars,
571        // that should NOT be sanitized
572
573        let mut cmd = std::process::Command::new("printenv");
574
575        // Simulate agent setting its own ANTHROPIC_API_KEY
576        let agent_env_vars =
577            std::collections::HashMap::from([("ANTHROPIC_API_KEY", "agent-specific-key")]);
578
579        // First, sanitize all Anthropic vars
580        for &var in &["ANTHROPIC_API_KEY", "ANTHROPIC_BASE_URL"] {
581            cmd.env_remove(var);
582        }
583
584        // Then, apply agent's env_vars (which should NOT be sanitized)
585        for (key, value) in &agent_env_vars {
586            cmd.env(key, value);
587        }
588
589        let output = match cmd.output() {
590            Ok(o) => o,
591            Err(e) => {
592                // printenv might not be available on all systems
593                eprintln!("Skipping test: printenv not available ({e})");
594                return;
595            }
596        };
597        let stdout = String::from_utf8_lossy(&output.stdout);
598
599        // The agent-specific key should be present
600        assert!(stdout.contains("agent-specific-key"));
601    }
602}