ralph-workflow 0.7.18

PROMPT-driven multi-agent orchestrator for git repos
Documentation
use crate::common::{format_argv_for_log, split_command, truncate_text};
use crate::pipeline::idle_timeout::{
    monitor_idle_timeout_with_interval_and_kill_config_and_observer, new_activity_timestamp,
    time_since_activity, ActivityTrackingReader, MonitorResult,
};
use crate::pipeline::prompt::io::streaming::stream_agent_output_from_handle;
use crate::pipeline::prompt::io_process_wait::wait_for_completion_and_collect_stderr;
use crate::pipeline::prompt::io_stderr_collector::{
    cancel_and_join_stderr_collector, collect_stderr_with_cap_and_drain,
};
use crate::pipeline::prompt::runtime::{cleanup_after_agent_failure, terminate_child_best_effort};
use crate::pipeline::types::{CommandResult, IdleTimeoutCause, TimeoutContext};
use std::io::{self, BufReader};
use std::path::Path;
use std::sync::Arc;

use super::types::{PipelineRuntime, PromptCommand};

#[cfg(test)]
pub fn run_with_agent_spawn_with_monitor_config(
    cmd: &PromptCommand<'_>,
    runtime: &PipelineRuntime<'_>,
    anthropic_env_vars_to_sanitize: &[&str],
    idle_timeout: std::time::Duration,
    monitor_check_interval: std::time::Duration,
    kill_config: crate::pipeline::idle_timeout::KillConfig,
) -> io::Result<CommandResult> {
    use std::sync::atomic::{AtomicBool, Ordering};

    let argv = split_command(cmd.cmd_str)?;
    if argv.is_empty() || cmd.cmd_str.trim().is_empty() {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "Agent command is empty or contains only whitespace",
        ));
    }

    let mut argv_for_log = argv.clone();
    argv_for_log.push("<PROMPT>".to_string());
    let display_cmd = truncate_text(&format_argv_for_log(&argv_for_log), 160);
    runtime.logger.info(&format!(
        "Executing: {}{}{}",
        runtime.colors.dim(),
        display_cmd,
        runtime.colors.reset()
    ));

    let logfile_path = Path::new(cmd.logfile);
    if let Some(parent) = logfile_path.parent().filter(|p| !p.as_os_str().is_empty()) {
        runtime.workspace.create_dir_all(parent)?;
    }
    runtime.workspace.write(logfile_path, "")?;

    let complete_env = super::environment::sanitize_command_env(
        std::env::vars()
            .chain(cmd.env_vars.iter().map(|(k, v)| (k.clone(), v.clone())))
            .collect(),
        cmd.env_vars,
        anthropic_env_vars_to_sanitize,
    );

    let config = crate::executor::AgentSpawnConfig {
        command: argv[0].clone(),
        args: argv[1..].to_vec(),
        env: complete_env,
        prompt: cmd.prompt.to_string(),
        logfile: cmd.logfile.to_string(),
        parser_type: cmd.parser_type,
    };

    let agent_handle = match runtime.executor.spawn_agent(&config) {
        Ok(handle) => handle,
        Err(e) => {
            let (exit_code, detail) = match e.kind() {
                io::ErrorKind::NotFound => (127, "command not found"),
                io::ErrorKind::PermissionDenied => (126, "permission denied"),
                io::ErrorKind::ArgumentListTooLong => {
                    (7, "argument list too long (prompt exceeds OS limit)")
                }
                io::ErrorKind::InvalidInput => (22, "invalid input"),
                io::ErrorKind::OutOfMemory => (12, "out of memory"),
                _ => (1, "spawn failed"),
            };

            return Ok(CommandResult {
                exit_code,
                stderr: format!("{}: {} - {}", argv[0], detail, e),
                session_id: None,
                child_status_at_timeout: None,
                timeout_context: None,
            });
        }
    };

    let stdout = agent_handle.stdout;
    let stderr = agent_handle.stderr;
    let inner = agent_handle.inner;

    let child_shared = Arc::new(std::sync::Mutex::new(inner));
    let child_for_monitor = Arc::clone(&child_shared);

    let activity_timestamp = new_activity_timestamp();
    let stdout_cancel = Arc::new(AtomicBool::new(false));
    let stdout_cancel_for_monitor = Arc::clone(&stdout_cancel);
    let stdout_cancel_for_watcher = Arc::clone(&stdout_cancel);
    let monitor_should_stop = Arc::new(AtomicBool::new(false));
    let monitor_should_stop_clone = Arc::clone(&monitor_should_stop);
    let monitor_should_stop_for_watcher = Arc::clone(&monitor_should_stop);
    let activity_timestamp_clone = activity_timestamp.clone();
    let activity_timestamp_for_watcher = activity_timestamp.clone();
    let child_activity_suppressed = Arc::new(std::sync::Mutex::new(None));
    let child_activity_suppressed_for_monitor = Arc::clone(&child_activity_suppressed);
    let child_for_watcher = Arc::clone(&child_shared);

    std::thread::spawn(move || {
        const CHILD_EXIT_STDOUT_DRAIN_GRACE: std::time::Duration =
            std::time::Duration::from_millis(200);
        let poll = std::time::Duration::from_millis(50);
        loop {
            if monitor_should_stop_for_watcher.load(Ordering::Acquire) {
                return;
            }

            let child_exited = {
                let mut child = child_for_watcher
                    .lock()
                    .expect("child process mutex poisoned - indicates panic in another thread");
                matches!(child.try_wait(), Ok(Some(_)))
            };

            let may_cancel_for_child_exit = child_exited
                && crate::pipeline::idle_timeout::time_since_activity(
                    &activity_timestamp_for_watcher,
                ) >= CHILD_EXIT_STDOUT_DRAIN_GRACE;

            if crate::interrupt::is_user_interrupt_requested() || may_cancel_for_child_exit {
                stdout_cancel_for_watcher.store(true, Ordering::Release);
                return;
            }

            std::thread::sleep(poll);
        }
    });

    let monitor_executor: Arc<dyn crate::executor::ProcessExecutor> =
        std::sync::Arc::clone(&runtime.executor_arc);

    // Mirror production wiring: create a shared tool-activity counter so parser events (tool-start,
    // tool-complete) suppress or resume the idle-timeout monitor during long tool operations.
    // Non-zero = at least one tool is actively executing.
    let tool_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
    let tool_active_for_check = Arc::clone(&tool_active);
    let tool_activity_check: Option<Arc<dyn Fn() -> bool + Send + Sync>> =
        Some(Arc::new(move || {
            tool_active_for_check.load(std::sync::atomic::Ordering::Acquire) > 0
        }));
    let tool_activity_tracker: Option<crate::pipeline::prompt::io::streaming::ToolActivityTracker> =
        Some(Arc::clone(&tool_active));

    let mut monitor_handle: Option<std::thread::JoinHandle<MonitorResult>> =
        Some(std::thread::spawn(move || {
            let result = monitor_idle_timeout_with_interval_and_kill_config_and_observer(
                &activity_timestamp_clone,
                None, // No file activity config
                &child_for_monitor,
                &monitor_should_stop_clone,
                &monitor_executor,
                crate::pipeline::idle_timeout::MonitorConfig {
                    timeout: idle_timeout,
                    check_interval: monitor_check_interval,
                    kill_config,
                    required_idle_confirmations: 2,
                    check_child_processes: true,
                    completion_check: None,

                    partial_completion_check: None,
                    tool_activity_check,
                    max_tool_suppression_ticks: 20,
                },
                Some(&child_activity_suppressed_for_monitor),
            );
            if matches!(result, MonitorResult::TimedOut { .. }) {
                stdout_cancel_for_monitor.store(true, Ordering::Release);
            }
            result
        }));

    let stderr_activity_timestamp = activity_timestamp.clone();
    let stderr_cancel = Arc::new(AtomicBool::new(false));
    let stderr_cancel_for_thread = Arc::clone(&stderr_cancel);
    let mut stderr_join_handle = Some(std::thread::spawn(move || -> io::Result<String> {
        const STDERR_MAX_BYTES: usize = 512 * 1024;
        let tracked_stderr = ActivityTrackingReader::new(stderr, stderr_activity_timestamp);
        let reader = BufReader::new(tracked_stderr);
        collect_stderr_with_cap_and_drain(
            reader,
            STDERR_MAX_BYTES,
            stderr_cancel_for_thread.as_ref(),
        )
    }));

    let activity_timestamp_for_timeout = activity_timestamp.clone();
    if let Err(e) = stream_agent_output_from_handle(
        stdout,
        cmd,
        runtime,
        activity_timestamp,
        &stdout_cancel,
        tool_activity_tracker,
    ) {
        cleanup_after_agent_failure(
            &child_shared,
            &monitor_should_stop,
            &mut monitor_handle,
            &mut stderr_join_handle,
            &stderr_cancel,
            runtime.executor_arc.as_ref(),
            kill_config,
        );
        return Err(e);
    }

    let (exit_code, stderr_output, monitor_result_early) =
        match wait_for_completion_and_collect_stderr(
            &child_shared,
            &mut stderr_join_handle,
            &mut monitor_handle,
            runtime,
        ) {
            Ok(v) => v,
            Err(e) => {
                cleanup_after_agent_failure(
                    &child_shared,
                    &monitor_should_stop,
                    &mut monitor_handle,
                    &mut stderr_join_handle,
                    &stderr_cancel,
                    runtime.executor_arc.as_ref(),
                    kill_config,
                );
                return Err(e);
            }
        };

    if matches!(monitor_result_early, Some(MonitorResult::TimedOut { .. })) {
        let exited =
            terminate_child_best_effort(&child_shared, runtime.executor_arc.as_ref(), kill_config);
        if exited {
            monitor_should_stop.store(true, Ordering::Release);
        }
        cancel_and_join_stderr_collector(
            &stderr_cancel,
            &mut stderr_join_handle,
            std::time::Duration::from_millis(250),
        );
    }

    if !matches!(monitor_result_early, Some(MonitorResult::TimedOut { .. })) {
        monitor_should_stop.store(true, Ordering::Release);
    }

    let monitor_result: MonitorResult = monitor_result_early
        .or_else(|| monitor_handle.take().and_then(|handle| handle.join().ok()))
        .unwrap_or(MonitorResult::ProcessCompleted);

    let child_activity_suppression_info = *child_activity_suppressed
        .lock()
        .expect("child activity observer mutex poisoned");

    let (final_exit_code, child_status, timeout_context) = match monitor_result {
        MonitorResult::TimedOut {
            escalated,
            child_status_at_timeout,
        } => {
            let idle_duration = time_since_activity(&activity_timestamp_for_timeout);
            let escalation_msg = if escalated {
                if cfg!(windows) {
                    ", force killed (taskkill /F)"
                } else {
                    ", escalated to SIGKILL after SIGTERM grace period"
                }
            } else {
                ""
            };
            let idle_timeout_cause =
                child_status_at_timeout.map_or(IdleTimeoutCause::NoQualifying, |info| {
                    if info.has_stalled_children() {
                        IdleTimeoutCause::Stalled(info)
                    } else if info.has_currently_active_children() {
                        IdleTimeoutCause::StaleActive(info)
                    } else {
                        IdleTimeoutCause::NoQualifying
                    }
                });
            let child_msg = match idle_timeout_cause {
                IdleTimeoutCause::NoQualifying => ", no active child processes".to_string(),
                IdleTimeoutCause::Stalled(info) => {
                    format!(
                        ", child processes present but not currently active (0 active of {} total, CPU at {}ms)",
                        info.child_count, info.cpu_time_ms
                    )
                }
                IdleTimeoutCause::StaleActive(info) => {
                    format!(
                        ", child processes still looked active but showed no fresh progress ({} active of {} total, CPU stalled at {}ms)",
                        info.active_child_count, info.child_count, info.cpu_time_ms
                    )
                }
            };
            runtime.logger.warn(&format!(
                "Agent killed due to idle timeout (no stdout/stderr for {:.1} seconds, \
                 last activity {:.1}s ago, process exit code was {}{}{}, \
                 kill reason: IDLE_TIMEOUT_MONITOR)",
                idle_timeout.as_secs_f64(),
                idle_duration.as_secs_f64(),
                exit_code,
                escalation_msg,
                child_msg
            ));
            (
                super::SIGTERM_EXIT_CODE,
                child_status_at_timeout,
                Some(TimeoutContext {
                    escalated,
                    child_status_at_timeout,
                }),
            )
        }
        MonitorResult::ProcessCompleted => {
            if let Some(info) = child_activity_suppression_info {
                runtime.logger.info(&format!(
                    "idle timeout suppression: child processes showed fresh progress and remained relevant \
                     ({} active of {} total, CPU at {}ms, signature {})",
                    info.active_child_count,
                    info.child_count,
                    info.cpu_time_ms,
                    info.descendant_pid_signature
                ));
            }
            (exit_code, None, None)
        }
        MonitorResult::CompleteButWaiting => {
            runtime.logger.info(
                "Agent output ready; process was idle-but-done and was forcibly terminated \
                 (complete-but-waiting). Treating as success.",
            );
            (0, None, None)
        }
    };

    let session_id =
        super::io_streaming::extract_session_id_from_logfile(cmd.logfile, runtime.workspace);

    Ok(CommandResult {
        exit_code: final_exit_code,
        stderr: stderr_output,
        session_id,
        child_status_at_timeout: child_status,
        timeout_context,
    })
}