robotrt-cli 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
use super::*;

pub(super) fn resolve_max_parallel(
    config: &OrchestrationConfig,
    override_value: Option<usize>,
) -> Result<usize, String> {
    let value = override_value.unwrap_or(config.max_parallel);
    if value == 0 {
        return Err(String::from("max_parallel must be >= 1"));
    }
    Ok(value)
}

pub(super) fn run_oneshot_parallel(
    config: &OrchestrationConfig,
    ordered: &[String],
    max_parallel: usize,
    global_continue_on_error: bool,
) -> Result<(String, Vec<TaskRunResult>, Option<String>), String> {
    let logs_dir = PathBuf::from(DEFAULT_RUN_LOG_DIR);
    std::fs::create_dir_all(&logs_dir)
        .map_err(|err| format!("create log directory {} failed: {err}", logs_dir.display()))?;

    let task_index = build_task_index(config)?;
    let selected: HashSet<String> = ordered.iter().cloned().collect();
    let order_pos: HashMap<String, usize> = ordered
        .iter()
        .enumerate()
        .map(|(idx, name)| (name.clone(), idx))
        .collect();

    let mut indegree: HashMap<String, usize> =
        selected.iter().map(|name| (name.clone(), 0usize)).collect();
    let mut adjacency: HashMap<String, Vec<String>> = selected
        .iter()
        .map(|name| (name.clone(), Vec::new()))
        .collect();

    for name in ordered {
        let task = &config.tasks[*task_index
            .get(name)
            .ok_or_else(|| format!("internal error: task not found: {name}"))?];
        for dep in &task.depends_on {
            if selected.contains(dep) {
                *indegree
                    .get_mut(name)
                    .ok_or_else(|| format!("internal error: missing indegree for {name}"))? += 1;
                adjacency
                    .get_mut(dep)
                    .ok_or_else(|| format!("internal error: missing adjacency for {dep}"))?
                    .push(name.clone());
            }
        }
    }

    let mut ready: VecDeque<String> = ordered
        .iter()
        .filter(|name| indegree.get(*name).copied().unwrap_or(0) == 0)
        .cloned()
        .collect();
    let mut active: HashMap<String, RunningTask> = HashMap::new();
    let mut finished = HashSet::new();
    let mut results = Vec::new();
    let mut failure_message = None;
    let mut block_new_tasks = false;

    while !ready.is_empty() || !active.is_empty() {
        while !block_new_tasks && active.len() < max_parallel && !ready.is_empty() {
            let name = ready
                .pop_front()
                .ok_or_else(|| String::from("internal queue error"))?;
            let task = &config.tasks[*task_index
                .get(&name)
                .ok_or_else(|| format!("internal error: task not found: {name}"))?];
            let running = spawn_oneshot_task(task, &logs_dir)?;
            active.insert(name, running);
        }

        if active.is_empty() {
            break;
        }

        let keys = active.keys().cloned().collect::<Vec<_>>();
        let mut progressed = false;
        for key in keys {
            let done = if let Some(running) = active.get_mut(&key) {
                running
                    .child
                    .try_wait()
                    .map_err(|err| format!("poll task {key} failed: {err}"))?
                    .is_some()
            } else {
                false
            };

            if !done {
                continue;
            }

            let running = active
                .remove(&key)
                .ok_or_else(|| format!("internal error: missing running task {key}"))?;
            let output = running
                .child
                .wait_with_output()
                .map_err(|err| format!("collect output for task {key} failed: {err}"))?;
            append_task_log(&running.log_file, &output.stdout, &output.stderr)?;

            let success = output.status.success();
            let task = &config.tasks[*task_index
                .get(&key)
                .ok_or_else(|| format!("internal error: task not found: {key}"))?];

            results.push(TaskRunResult {
                name: key.clone(),
                status: if success {
                    String::from("pass")
                } else {
                    String::from("fail")
                },
                exit_code: output.status.code(),
                elapsed_ms: running.started_at.elapsed().as_millis() as u64,
                log_file: Some(running.log_file.clone()),
            });
            finished.insert(key.clone());
            progressed = true;

            if !success && !global_continue_on_error && !task.continue_on_error {
                block_new_tasks = true;
                if failure_message.is_none() {
                    failure_message = Some(format!(
                        "task {} failed with status {:?}",
                        task.name, output.status
                    ));
                }
                continue;
            }

            if let Some(dependents) = adjacency.get(&key) {
                for next in dependents {
                    let value = indegree
                        .get_mut(next)
                        .ok_or_else(|| format!("internal error: missing indegree for {next}"))?;
                    *value = value.saturating_sub(1);
                    if *value == 0 && !ready.iter().any(|task_name| task_name == next) {
                        ready.push_back(next.clone());
                    }
                }

                let mut sorted = ready.into_iter().collect::<Vec<_>>();
                sorted.sort_by_key(|name| order_pos.get(name).copied().unwrap_or(usize::MAX));
                ready = sorted.into();
            }
        }

        if !progressed {
            std::thread::sleep(Duration::from_millis(10));
        }
    }

    let mut skipped = ordered
        .iter()
        .filter(|name| !finished.contains(*name))
        .cloned()
        .collect::<Vec<_>>();
    skipped.sort_by_key(|name| order_pos.get(name).copied().unwrap_or(usize::MAX));
    for name in skipped {
        results.push(TaskRunResult {
            name,
            status: String::from("skipped"),
            exit_code: None,
            elapsed_ms: 0,
            log_file: None,
        });
    }

    let status = if failure_message.is_some() {
        String::from("fail")
    } else {
        String::from("pass")
    };
    Ok((status, results, failure_message))
}

fn spawn_oneshot_task(task: &OrchestrationTask, logs_dir: &Path) -> Result<RunningTask, String> {
    if task.command.is_empty() {
        return Err(format!("task {} has empty command", task.name));
    }

    let mut cmd = Command::new(&task.command[0]);
    if task.command.len() > 1 {
        cmd.args(&task.command[1..]);
    }
    if let Some(cwd) = &task.cwd {
        cmd.current_dir(cwd);
    }
    for (key, value) in &task.env {
        cmd.env(key, value);
    }
    cmd.stdout(Stdio::piped());
    cmd.stderr(Stdio::piped());

    let child = cmd
        .spawn()
        .map_err(|err| format!("spawn task {} failed: {err}", task.name))?;
    let log_file = logs_dir.join(format!("{}.log", sanitize_name(&task.name)));
    Ok(RunningTask {
        child,
        started_at: Instant::now(),
        log_file,
    })
}

fn append_task_log(log_file: &Path, stdout: &[u8], stderr: &[u8]) -> Result<(), String> {
    if let Some(parent) = log_file.parent() {
        std::fs::create_dir_all(parent)
            .map_err(|err| format!("create log directory {} failed: {err}", parent.display()))?;
    }

    let mut file = OpenOptions::new()
        .create(true)
        .truncate(true)
        .write(true)
        .open(log_file)
        .map_err(|err| format!("open log file {} failed: {err}", log_file.display()))?;

    file.write_all(b"# STDOUT\n")
        .map_err(|err| format!("write log failed: {err}"))?;
    file.write_all(stdout)
        .map_err(|err| format!("write log failed: {err}"))?;
    file.write_all(b"\n# STDERR\n")
        .map_err(|err| format!("write log failed: {err}"))?;
    file.write_all(stderr)
        .map_err(|err| format!("write log failed: {err}"))?;
    Ok(())
}

pub(super) fn run_oneshot_task(
    task: &OrchestrationTask,
    inherited_env: &HashMap<String, String>,
) -> Result<(), String> {
    if task.command.is_empty() {
        return Err(format!("task {} has empty command", task.name));
    }

    let mut cmd = Command::new(&task.command[0]);
    if task.command.len() > 1 {
        cmd.args(&task.command[1..]);
    }
    if let Some(cwd) = &task.cwd {
        cmd.current_dir(cwd);
    }
    for (key, value) in inherited_env {
        cmd.env(key, value);
    }
    for (key, value) in &task.env {
        cmd.env(key, value);
    }

    let status = cmd
        .status()
        .map_err(|err| format!("run task {} failed: {err}", task.name))?;
    if !status.success() {
        return Err(format!(
            "task {} failed with status {:?}",
            task.name, status
        ));
    }
    Ok(())
}

pub(super) fn spawn_service_task(
    task: &OrchestrationTask,
    log_file: &Path,
) -> Result<Child, String> {
    if task.command.is_empty() {
        return Err(format!("task {} has empty command", task.name));
    }

    if let Some(parent) = log_file.parent() {
        std::fs::create_dir_all(parent)
            .map_err(|err| format!("create log directory {} failed: {err}", parent.display()))?;
    }

    let stdout_file = OpenOptions::new()
        .create(true)
        .append(true)
        .open(log_file)
        .map_err(|err| format!("open service log {} failed: {err}", log_file.display()))?;
    let stderr_file = stdout_file
        .try_clone()
        .map_err(|err| format!("clone service log {} failed: {err}", log_file.display()))?;

    let mut cmd = Command::new(&task.command[0]);
    if task.command.len() > 1 {
        cmd.args(&task.command[1..]);
    }
    if let Some(cwd) = &task.cwd {
        cmd.current_dir(cwd);
    }
    for (key, value) in &task.env {
        cmd.env(key, value);
    }
    cmd.stdout(Stdio::from(stdout_file));
    cmd.stderr(Stdio::from(stderr_file));

    let mut child = cmd
        .spawn()
        .map_err(|err| format!("spawn service task {} failed: {err}", task.name))?;

    std::thread::sleep(Duration::from_millis(60));
    if let Some(status) = child
        .try_wait()
        .map_err(|err| format!("poll service task {} failed: {err}", task.name))?
    {
        return Err(format!(
            "service task {} exited too early with status {:?}",
            task.name, status
        ));
    }

    Ok(child)
}