use crate::config::PkgsrcEnv;
use crate::sandbox::{SHUTDOWN_POLL_INTERVAL, SandboxScope, wait_with_shutdown};
use crate::scan::{ResolvedPackage, SkipReason, SkippedCounts};
use crate::tui::{MultiProgress, REFRESH_INTERVAL, format_duration};
use crate::{Config, RunContext, Sandbox};
use anyhow::{Context, bail};
use crossterm::event;
use glob::Pattern;
use indexmap::IndexMap;
use pkgsrc::{PkgName, PkgPath};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs::{self, File, OpenOptions};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, mpsc, mpsc::Sender};
use std::time::{Duration, Instant};
use tracing::{debug, error, info, info_span, trace, warn};
const OUTPUT_BATCH_INTERVAL: Duration = Duration::from_millis(100);
const WORKER_BACKOFF_INTERVAL: Duration = Duration::from_millis(100);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Stage {
PreClean,
Depends,
Checksum,
Configure,
Build,
Install,
Package,
Deinstall,
Clean,
}
impl Stage {
fn as_str(&self) -> &'static str {
match self {
Stage::PreClean => "pre-clean",
Stage::Depends => "depends",
Stage::Checksum => "checksum",
Stage::Configure => "configure",
Stage::Build => "build",
Stage::Install => "install",
Stage::Package => "package",
Stage::Deinstall => "deinstall",
Stage::Clean => "clean",
}
}
}
#[derive(Debug)]
enum PkgBuildResult {
Success,
Failed,
Skipped,
}
#[derive(Debug, Clone, Copy)]
enum RunAs {
Root,
User,
}
trait BuildCallback: Send {
fn stage(&mut self, stage: &str);
}
#[derive(Debug)]
struct BuildSession {
config: Config,
pkgsrc_env: PkgsrcEnv,
sandbox: Sandbox,
options: BuildOptions,
shutdown: Arc<AtomicBool>,
}
struct PkgBuilder<'a> {
session: &'a BuildSession,
sandbox_id: usize,
pkginfo: &'a ResolvedPackage,
logdir: PathBuf,
build_user: Option<String>,
envs: Vec<(String, String)>,
output_tx: Option<Sender<ChannelCommand>>,
}
impl<'a> PkgBuilder<'a> {
fn new(
session: &'a BuildSession,
sandbox_id: usize,
pkginfo: &'a ResolvedPackage,
envs: Vec<(String, String)>,
output_tx: Option<Sender<ChannelCommand>>,
) -> Self {
let logdir = session
.config
.logdir()
.join(pkginfo.index.pkgname.pkgname());
let build_user = session.config.build_user().map(|s| s.to_string());
Self {
session,
sandbox_id,
pkginfo,
logdir,
build_user,
envs,
output_tx,
}
}
fn run_cmd(&self, cmd: &Path, args: &[&str]) -> Option<String> {
let mut command = self.session.sandbox.command(self.sandbox_id, cmd);
command.args(args);
self.apply_envs(&mut command, &[]);
match command.output() {
Ok(output) if output.status.success() => {
Some(String::from_utf8_lossy(&output.stdout).into_owned())
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
debug!(
cmd = %cmd.display(),
exit_code = ?output.status.code(),
stderr = %stderr.trim(),
"command failed"
);
None
}
Err(e) => {
debug!(cmd = %cmd.display(), error = %e, "command execution error");
None
}
}
}
fn check_up_to_date(&self) -> anyhow::Result<bool> {
let pkgname = self.pkginfo.index.pkgname.pkgname();
let pkgfile = self
.session
.pkgsrc_env
.packages
.join("All")
.join(format!("{}.tgz", pkgname));
if !pkgfile.exists() {
debug!(path = %pkgfile.display(), "Package file not found");
return Ok(false);
}
let pkgfile_str = pkgfile.to_string_lossy();
let pkg_info = self.session.pkgsrc_env.pkgtools.join("pkg_info");
let pkg_admin = self.session.pkgsrc_env.pkgtools.join("pkg_admin");
let Some(build_info) = self.run_cmd(&pkg_info, &["-qb", &pkgfile_str]) else {
debug!("pkg_info -qb failed or returned empty");
return Ok(false);
};
debug!(lines = build_info.lines().count(), "Checking BUILD_INFO");
for line in build_info.lines() {
let Some((file, file_id)) = line.split_once(':') else {
continue;
};
let file_id = file_id.trim();
if file.is_empty() || file_id.is_empty() {
continue;
}
let src_file = self.session.config.pkgsrc().join(file);
if !src_file.exists() {
debug!(file, "Source file missing");
return Ok(false);
}
if file_id.starts_with("$NetBSD") {
let Ok(content) = std::fs::read_to_string(&src_file) else {
return Ok(false);
};
let id = content.lines().find_map(|line| {
if let Some(start) = line.find("$NetBSD") {
if let Some(end) = line[start + 1..].find('$') {
return Some(&line[start..start + 1 + end + 1]);
}
}
None
});
if id != Some(file_id) {
debug!(file, "CVS ID mismatch");
return Ok(false);
}
} else {
let src_file_str = src_file.to_string_lossy();
let Some(hash) = self.run_cmd(&pkg_admin, &["digest", &src_file_str]) else {
debug!(file, "pkg_admin digest failed");
return Ok(false);
};
let hash = hash.trim();
if hash != file_id {
debug!(
file,
path = %src_file.display(),
expected = file_id,
actual = hash,
"Hash mismatch"
);
return Ok(false);
}
}
}
let Some(pkg_deps) = self.run_cmd(&pkg_info, &["-qN", &pkgfile_str]) else {
return Ok(false);
};
let recorded_deps: HashSet<&str> = pkg_deps
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect();
let expected_deps: HashSet<&str> =
self.pkginfo.depends().iter().map(|d| d.pkgname()).collect();
if recorded_deps != expected_deps {
debug!(
recorded = recorded_deps.len(),
expected = expected_deps.len(),
"Dependency list changed"
);
return Ok(false);
}
let pkgfile_mtime = match pkgfile.metadata().and_then(|m| m.modified()) {
Ok(t) => t,
Err(_) => return Ok(false),
};
for dep in &recorded_deps {
let dep_pkg = self
.session
.pkgsrc_env
.packages
.join("All")
.join(format!("{}.tgz", dep));
if !dep_pkg.exists() {
debug!(dep, "Dependency package missing");
return Ok(false);
}
let dep_mtime = match dep_pkg.metadata().and_then(|m| m.modified()) {
Ok(t) => t,
Err(_) => return Ok(false),
};
if dep_mtime > pkgfile_mtime {
debug!(dep, "Dependency is newer");
return Ok(false);
}
}
debug!("Package is up-to-date");
Ok(true)
}
fn build<C: BuildCallback>(&self, callback: &mut C) -> anyhow::Result<PkgBuildResult> {
let pkgname_str = self.pkginfo.pkgname().pkgname();
let pkgpath = &self.pkginfo.pkgpath;
if !self.session.options.force_rebuild && self.check_up_to_date()? {
return Ok(PkgBuildResult::Skipped);
}
if self.logdir.exists() {
fs::remove_dir_all(&self.logdir)?;
}
fs::create_dir_all(&self.logdir)?;
let work_log = self.logdir.join("work.log");
File::create(&work_log)?;
if let Some(ref user) = self.build_user {
let bob_log = File::options()
.create(true)
.append(true)
.open(self.logdir.join("bob.log"))?;
let bob_log_err = bob_log.try_clone()?;
let _ = Command::new("chown")
.arg(user)
.arg(&work_log)
.stdout(bob_log)
.stderr(bob_log_err)
.status();
}
let pkgdir = self.session.config.pkgsrc().join(pkgpath.as_path());
callback.stage(Stage::PreClean.as_str());
self.run_make_stage(Stage::PreClean, &pkgdir, &["clean"], RunAs::Root, false)?;
if !self.pkginfo.depends().is_empty() {
callback.stage(Stage::Depends.as_str());
let _ = self.write_stage(Stage::Depends);
if !self.install_dependencies()? {
return Ok(PkgBuildResult::Failed);
}
}
callback.stage(Stage::Checksum.as_str());
if !self.run_make_stage(Stage::Checksum, &pkgdir, &["checksum"], RunAs::Root, true)? {
return Ok(PkgBuildResult::Failed);
}
callback.stage(Stage::Configure.as_str());
let configure_log = self.logdir.join("configure.log");
if !self.run_usergroup_if_needed(Stage::Configure, &pkgdir, &configure_log)? {
return Ok(PkgBuildResult::Failed);
}
if !self.run_make_stage(
Stage::Configure,
&pkgdir,
&["configure"],
self.build_run_as(),
true,
)? {
return Ok(PkgBuildResult::Failed);
}
callback.stage(Stage::Build.as_str());
let build_log = self.logdir.join("build.log");
if !self.run_usergroup_if_needed(Stage::Build, &pkgdir, &build_log)? {
return Ok(PkgBuildResult::Failed);
}
if !self.run_make_stage(Stage::Build, &pkgdir, &["all"], self.build_run_as(), true)? {
return Ok(PkgBuildResult::Failed);
}
callback.stage(Stage::Install.as_str());
let install_log = self.logdir.join("install.log");
if !self.run_usergroup_if_needed(Stage::Install, &pkgdir, &install_log)? {
return Ok(PkgBuildResult::Failed);
}
if !self.run_make_stage(
Stage::Install,
&pkgdir,
&["stage-install"],
self.build_run_as(),
true,
)? {
return Ok(PkgBuildResult::Failed);
}
callback.stage(Stage::Package.as_str());
if !self.run_make_stage(
Stage::Package,
&pkgdir,
&["stage-package-create"],
RunAs::Root,
true,
)? {
return Ok(PkgBuildResult::Failed);
}
let pkgfile = self.get_make_var(&pkgdir, "STAGE_PKGFILE")?;
let is_bootstrap = self.pkginfo.bootstrap_pkg() == Some("yes");
if !is_bootstrap {
if !self.pkg_add(&pkgfile)? {
return Ok(PkgBuildResult::Failed);
}
callback.stage(Stage::Deinstall.as_str());
let _ = self.write_stage(Stage::Deinstall);
if !self.pkg_delete(pkgname_str)? {
return Ok(PkgBuildResult::Failed);
}
}
let packages_dir = self.session.pkgsrc_env.packages.join("All");
fs::create_dir_all(&packages_dir)?;
let dest = packages_dir.join(
Path::new(&pkgfile)
.file_name()
.context("Invalid package file path")?,
);
let host_pkgfile = if self.session.sandbox.enabled() {
self.session
.sandbox
.path(self.sandbox_id)
.join(pkgfile.trim_start_matches('/'))
} else {
PathBuf::from(&pkgfile)
};
fs::copy(&host_pkgfile, &dest)?;
callback.stage(Stage::Clean.as_str());
let _ = self.run_make_stage(Stage::Clean, &pkgdir, &["clean"], RunAs::Root, false);
let _ = fs::remove_dir_all(&self.logdir);
Ok(PkgBuildResult::Success)
}
fn build_run_as(&self) -> RunAs {
if self.build_user.is_some() {
RunAs::User
} else {
RunAs::Root
}
}
fn write_stage(&self, stage: Stage) -> anyhow::Result<()> {
let stage_file = self.logdir.join(".stage");
fs::write(&stage_file, stage.as_str())?;
Ok(())
}
fn run_make_stage(
&self,
stage: Stage,
pkgdir: &Path,
targets: &[&str],
run_as: RunAs,
include_make_flags: bool,
) -> anyhow::Result<bool> {
let _ = self.write_stage(stage);
let logfile = self.logdir.join(format!("{}.log", stage.as_str()));
let work_log = self.logdir.join("work.log");
let owned_args = self.make_args(pkgdir, targets, include_make_flags, &work_log);
let args: Vec<&str> = owned_args.iter().map(|s| s.as_str()).collect();
info!(stage = stage.as_str(), "Running make stage");
let status =
self.run_command_logged(self.session.config.make(), &args, run_as, &logfile)?;
Ok(status.success())
}
fn run_command_logged(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
logfile: &Path,
) -> anyhow::Result<ExitStatus> {
self.run_command_logged_with_env(cmd, args, run_as, logfile, &[])
}
fn run_command_logged_with_env(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
logfile: &Path,
extra_envs: &[(&str, &str)],
) -> anyhow::Result<ExitStatus> {
use std::io::{BufRead, BufReader, Write};
let mut log = OpenOptions::new().create(true).append(true).open(logfile)?;
let _ = writeln!(log, "=> {:?} {:?}", cmd, args);
let _ = log.flush();
if let Some(ref output_tx) = self.output_tx {
let shell_cmd = self.build_shell_command(cmd, args, run_as, extra_envs);
let mut child = self
.session
.sandbox
.command(self.sandbox_id, Path::new("/bin/sh"))
.arg("-c")
.arg(&shell_cmd)
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.context("Failed to spawn shell command")?;
let stdout = child.stdout.take().unwrap();
let output_tx = output_tx.clone();
let sandbox_id = self.sandbox_id;
let tee_handle = std::thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut buf = Vec::new();
let mut batch = Vec::with_capacity(50);
let mut last_send = Instant::now();
let send_interval = OUTPUT_BATCH_INTERVAL;
loop {
buf.clear();
match reader.read_until(b'\n', &mut buf) {
Ok(0) => break,
Ok(_) => {}
Err(_) => break,
};
let _ = log.write_all(&buf);
let line = String::from_utf8_lossy(&buf);
let line = line.trim_end_matches('\n').to_string();
batch.push(line);
if last_send.elapsed() >= send_interval || batch.len() >= 50 {
let _ = output_tx.send(ChannelCommand::OutputLines(
sandbox_id,
std::mem::take(&mut batch),
));
last_send = Instant::now();
}
}
if !batch.is_empty() {
let _ = output_tx.send(ChannelCommand::OutputLines(sandbox_id, batch));
}
});
let status = wait_with_shutdown(&mut child, &self.session.shutdown)?;
let _ = tee_handle.join();
trace!(?cmd, ?status, "Command completed");
Ok(status)
} else {
let status = self.spawn_command_to_file(cmd, args, run_as, extra_envs, log)?;
trace!(?cmd, ?status, "Command completed");
Ok(status)
}
}
fn spawn_command_to_file(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
extra_envs: &[(&str, &str)],
log: File,
) -> anyhow::Result<ExitStatus> {
let log_err = log.try_clone()?;
match run_as {
RunAs::Root => {
let mut command = self.session.sandbox.command(self.sandbox_id, cmd);
command.args(args);
self.apply_envs(&mut command, extra_envs);
let mut child = command
.stdout(Stdio::from(log))
.stderr(Stdio::from(log_err))
.spawn()
.with_context(|| format!("Failed to spawn {}", cmd.display()))?;
wait_with_shutdown(&mut child, &self.session.shutdown)
}
RunAs::User => {
let user = self.build_user.as_ref().unwrap();
let mut parts = Vec::with_capacity(args.len() + 1);
parts.push(cmd.display().to_string());
parts.extend(args.iter().map(|arg| arg.to_string()));
let inner_cmd = parts
.iter()
.map(|part| Self::shell_escape(part))
.collect::<Vec<_>>()
.join(" ");
let mut command = self
.session
.sandbox
.command(self.sandbox_id, Path::new("su"));
command.arg(user).arg("-c").arg(&inner_cmd);
self.apply_envs(&mut command, extra_envs);
let mut child = command
.stdout(Stdio::from(log))
.stderr(Stdio::from(log_err))
.spawn()
.context("Failed to spawn su command")?;
wait_with_shutdown(&mut child, &self.session.shutdown)
}
}
}
fn get_make_var(&self, pkgdir: &Path, varname: &str) -> anyhow::Result<String> {
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
self.apply_envs(&mut cmd, &[]);
let work_log = self.logdir.join("work.log");
let make_args = self.make_args(
pkgdir,
&["show-var", &format!("VARNAME={}", varname)],
true,
&work_log,
);
let bob_log = File::options()
.create(true)
.append(true)
.open(self.logdir.join("bob.log"))?;
let output = cmd.args(&make_args).stderr(Stdio::from(bob_log)).output()?;
if !output.status.success() {
bail!("Failed to get make variable {}", varname);
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
fn install_dependencies(&self) -> anyhow::Result<bool> {
let deps: Vec<String> = self
.pkginfo
.depends()
.iter()
.map(|d| d.to_string())
.collect();
let pkg_path = self.session.pkgsrc_env.packages.join("All");
let logfile = self.logdir.join("depends.log");
let mut args = vec![];
for dep in &deps {
args.push(dep.as_str());
}
let status = self.run_pkg_add_with_path(&args, &pkg_path, &logfile)?;
Ok(status.success())
}
fn run_pkg_add_with_path(
&self,
packages: &[&str],
pkg_path: &Path,
logfile: &Path,
) -> anyhow::Result<ExitStatus> {
let pkg_add = self.session.pkgsrc_env.pkgtools.join("pkg_add");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let pkg_path_value = pkg_path.to_string_lossy().to_string();
let extra_envs = [("PKG_PATH", pkg_path_value.as_str())];
let mut args = vec!["-K", &*pkg_dbdir];
args.extend(packages.iter().copied());
self.run_command_logged_with_env(&pkg_add, &args, RunAs::Root, logfile, &extra_envs)
}
fn pkg_add(&self, pkgfile: &str) -> anyhow::Result<bool> {
let pkg_add = self.session.pkgsrc_env.pkgtools.join("pkg_add");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let logfile = self.logdir.join("package.log");
let status = self.run_command_logged(
&pkg_add,
&["-K", &*pkg_dbdir, pkgfile],
RunAs::Root,
&logfile,
)?;
Ok(status.success())
}
fn pkg_delete(&self, pkgname: &str) -> anyhow::Result<bool> {
let pkg_delete = self.session.pkgsrc_env.pkgtools.join("pkg_delete");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let logfile = self.logdir.join("deinstall.log");
let status = self.run_command_logged(
&pkg_delete,
&["-K", &*pkg_dbdir, pkgname],
RunAs::Root,
&logfile,
)?;
Ok(status.success())
}
fn run_usergroup_if_needed(
&self,
stage: Stage,
pkgdir: &Path,
logfile: &Path,
) -> anyhow::Result<bool> {
let usergroup_phase = self.pkginfo.usergroup_phase().unwrap_or("");
let should_run = match stage {
Stage::Configure => usergroup_phase.ends_with("configure"),
Stage::Build => usergroup_phase.ends_with("build"),
Stage::Install => usergroup_phase == "pre-install",
_ => false,
};
if !should_run {
return Ok(true);
}
let mut args = vec!["-C", pkgdir.to_str().unwrap(), "create-usergroup"];
if stage == Stage::Configure {
args.push("clean");
}
let status =
self.run_command_logged(self.session.config.make(), &args, RunAs::Root, logfile)?;
Ok(status.success())
}
fn make_args(
&self,
pkgdir: &Path,
targets: &[&str],
include_make_flags: bool,
work_log: &Path,
) -> Vec<String> {
let mut owned_args: Vec<String> =
vec!["-C".to_string(), pkgdir.to_str().unwrap().to_string()];
owned_args.extend(targets.iter().map(|s| s.to_string()));
if include_make_flags {
owned_args.push("BATCH=1".to_string());
owned_args.push("DEPENDS_TARGET=/nonexistent".to_string());
if let Some(multi_version) = self.pkginfo.multi_version() {
for flag in multi_version {
owned_args.push(flag.clone());
}
}
owned_args.push(format!("WRKLOG={}", work_log.display()));
}
owned_args
}
fn apply_envs(&self, cmd: &mut Command, extra_envs: &[(&str, &str)]) {
for (key, value) in &self.envs {
cmd.env(key, value);
}
for (key, value) in extra_envs {
cmd.env(key, value);
}
}
fn shell_escape(value: &str) -> String {
if value.is_empty() {
return "''".to_string();
}
if value
.chars()
.all(|c| c.is_ascii_alphanumeric() || "-_.,/:=+@".contains(c))
{
return value.to_string();
}
let escaped = value.replace('\'', "'\\''");
format!("'{}'", escaped)
}
fn build_shell_command(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
extra_envs: &[(&str, &str)],
) -> String {
let mut parts = Vec::new();
for (key, value) in &self.envs {
parts.push(format!("{}={}", key, Self::shell_escape(value)));
}
for (key, value) in extra_envs {
parts.push(format!("{}={}", key, Self::shell_escape(value)));
}
let cmd_str = Self::shell_escape(&cmd.to_string_lossy());
let args_str: Vec<String> = args.iter().map(|a| Self::shell_escape(a)).collect();
match run_as {
RunAs::Root => {
parts.push(cmd_str);
parts.extend(args_str);
}
RunAs::User => {
let user = self.build_user.as_ref().unwrap();
let inner_cmd = std::iter::once(cmd_str)
.chain(args_str)
.collect::<Vec<_>>()
.join(" ");
parts.push("su".to_string());
parts.push(Self::shell_escape(user));
parts.push("-c".to_string());
parts.push(Self::shell_escape(&inner_cmd));
}
}
parts.push("2>&1".to_string());
parts.join(" ")
}
}
struct ChannelCallback<'a> {
sandbox_id: usize,
status_tx: &'a Sender<ChannelCommand>,
}
impl<'a> ChannelCallback<'a> {
fn new(sandbox_id: usize, status_tx: &'a Sender<ChannelCommand>) -> Self {
Self {
sandbox_id,
status_tx,
}
}
}
impl<'a> BuildCallback for ChannelCallback<'a> {
fn stage(&mut self, stage: &str) {
let _ = self.status_tx.send(ChannelCommand::StageUpdate(
self.sandbox_id,
Some(stage.to_string()),
));
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum BuildOutcome {
Success,
Failed(String),
UpToDate,
Skipped(SkipReason),
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct BuildResult {
pub pkgname: PkgName,
pub pkgpath: Option<PkgPath>,
pub outcome: BuildOutcome,
pub duration: Duration,
pub log_dir: Option<PathBuf>,
}
#[derive(Clone, Debug, Default)]
pub struct BuildCounts {
pub success: usize,
pub failed: usize,
pub up_to_date: usize,
pub skipped: SkippedCounts,
pub scanfail: usize,
}
#[derive(Clone, Debug)]
pub struct BuildSummary {
pub duration: Duration,
pub results: Vec<BuildResult>,
pub scanfail: Vec<(PkgPath, String)>,
}
impl BuildSummary {
pub fn counts(&self) -> BuildCounts {
let mut c = BuildCounts {
scanfail: self.scanfail.len(),
..Default::default()
};
for r in &self.results {
match &r.outcome {
BuildOutcome::Success => c.success += 1,
BuildOutcome::Failed(_) => c.failed += 1,
BuildOutcome::UpToDate => c.up_to_date += 1,
BuildOutcome::Skipped(SkipReason::PkgSkip(_)) => c.skipped.pkg_skip += 1,
BuildOutcome::Skipped(SkipReason::PkgFail(_)) => c.skipped.pkg_fail += 1,
BuildOutcome::Skipped(SkipReason::UnresolvedDep(_)) => c.skipped.unresolved += 1,
BuildOutcome::Skipped(SkipReason::IndirectFail(_)) => c.skipped.indirect_fail += 1,
BuildOutcome::Skipped(SkipReason::IndirectSkip(_)) => c.skipped.indirect_skip += 1,
}
}
c
}
pub fn failed(&self) -> Vec<&BuildResult> {
self.results
.iter()
.filter(|r| matches!(r.outcome, BuildOutcome::Failed(_)))
.collect()
}
pub fn succeeded(&self) -> Vec<&BuildResult> {
self.results
.iter()
.filter(|r| matches!(r.outcome, BuildOutcome::Success))
.collect()
}
pub fn skipped(&self) -> Vec<&BuildResult> {
self.results
.iter()
.filter(|r| matches!(r.outcome, BuildOutcome::Skipped(_)))
.collect()
}
}
#[derive(Clone, Debug, Default)]
pub struct BuildOptions {
pub force_rebuild: bool,
}
#[derive(Debug)]
pub struct Build {
config: Config,
pkgsrc_env: PkgsrcEnv,
scope: SandboxScope,
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
cached: IndexMap<PkgName, BuildResult>,
options: BuildOptions,
}
#[derive(Debug)]
struct PackageBuild {
session: Arc<BuildSession>,
sandbox_id: usize,
pkginfo: ResolvedPackage,
}
struct MakeQuery<'a> {
session: &'a BuildSession,
sandbox_id: usize,
pkgpath: &'a PkgPath,
env: &'a HashMap<String, String>,
}
impl<'a> MakeQuery<'a> {
fn new(
session: &'a BuildSession,
sandbox_id: usize,
pkgpath: &'a PkgPath,
env: &'a HashMap<String, String>,
) -> Self {
Self {
session,
sandbox_id,
pkgpath,
env,
}
}
fn var(&self, name: &str) -> Option<String> {
let pkgdir = self.session.config.pkgsrc().join(self.pkgpath.as_path());
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
cmd.arg("-C")
.arg(&pkgdir)
.arg("show-var")
.arg(format!("VARNAME={}", name));
for (key, value) in self.env {
cmd.env(key, value);
}
cmd.stderr(Stdio::null());
let output = cmd.output().ok()?;
if !output.status.success() {
return None;
}
let value = String::from_utf8_lossy(&output.stdout).trim().to_string();
if value.is_empty() { None } else { Some(value) }
}
fn var_path(&self, name: &str) -> Option<PathBuf> {
self.var(name).map(PathBuf::from)
}
fn wrkdir(&self) -> Option<PathBuf> {
self.var_path("WRKDIR")
}
fn resolve_path(&self, path: &Path) -> PathBuf {
if self.session.sandbox.enabled() {
self.session
.sandbox
.path(self.sandbox_id)
.join(path.strip_prefix("/").unwrap_or(path))
} else {
path.to_path_buf()
}
}
}
#[derive(Debug)]
enum PackageBuildResult {
Success,
Failed,
Skipped,
}
impl std::fmt::Display for PackageBuildResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Success => write!(f, "success"),
Self::Failed => write!(f, "failed"),
Self::Skipped => write!(f, "skipped"),
}
}
}
impl PackageBuild {
fn build(&self, status_tx: &Sender<ChannelCommand>) -> anyhow::Result<PackageBuildResult> {
let pkgname = self.pkginfo.index.pkgname.pkgname();
info!("Starting package build");
let pkgpath = &self.pkginfo.pkgpath;
let logdir = self.session.config.logdir();
let pkg_env = match self.session.config.get_pkg_env(&self.pkginfo) {
Ok(env) => env,
Err(e) => {
error!(error = %e, "Failed to get env from Lua config");
HashMap::new()
}
};
let mut envs = self
.session
.config
.script_env(Some(&self.session.pkgsrc_env));
for (key, value) in &pkg_env {
envs.push((key.clone(), value.clone()));
}
let patterns = self.session.config.save_wrkdir_patterns();
if !self.session.sandbox.run_pre_build(
self.sandbox_id,
&self.session.config,
envs.clone(),
)? {
warn!("pre-build script failed");
}
let builder = PkgBuilder::new(
&self.session,
self.sandbox_id,
&self.pkginfo,
envs.clone(),
Some(status_tx.clone()),
);
let mut callback = ChannelCallback::new(self.sandbox_id, status_tx);
let result = builder.build(&mut callback);
let _ = status_tx.send(ChannelCommand::StageUpdate(self.sandbox_id, None));
let result = match &result {
Ok(PkgBuildResult::Success) => {
info!("Package build completed successfully");
PackageBuildResult::Success
}
Ok(PkgBuildResult::Skipped) => {
info!("Package build skipped (up-to-date)");
PackageBuildResult::Skipped
}
Ok(PkgBuildResult::Failed) => {
error!("Package build failed");
let _ = status_tx.send(ChannelCommand::StageUpdate(
self.sandbox_id,
Some("cleanup".to_string()),
));
let kill_start = Instant::now();
self.session.sandbox.kill_processes_by_id(self.sandbox_id);
trace!(
elapsed_ms = kill_start.elapsed().as_millis(),
"kill_processes_by_id completed"
);
if !patterns.is_empty() {
let save_start = Instant::now();
self.save_wrkdir_files(pkgname, pkgpath, logdir, patterns, &pkg_env);
trace!(
elapsed_ms = save_start.elapsed().as_millis(),
"save_wrkdir_files completed"
);
let clean_start = Instant::now();
self.run_clean(pkgpath, &envs);
trace!(
elapsed_ms = clean_start.elapsed().as_millis(),
"run_clean completed"
);
} else {
let clean_start = Instant::now();
self.run_clean(pkgpath, &envs);
trace!(
elapsed_ms = clean_start.elapsed().as_millis(),
"run_clean completed"
);
}
PackageBuildResult::Failed
}
Err(e) => {
error!(error = %e, "Package build error");
let _ = status_tx.send(ChannelCommand::StageUpdate(
self.sandbox_id,
Some("cleanup".to_string()),
));
let kill_start = Instant::now();
self.session.sandbox.kill_processes_by_id(self.sandbox_id);
trace!(
elapsed_ms = kill_start.elapsed().as_millis(),
"kill_processes_by_id completed"
);
if !patterns.is_empty() {
let save_start = Instant::now();
self.save_wrkdir_files(pkgname, pkgpath, logdir, patterns, &pkg_env);
trace!(
elapsed_ms = save_start.elapsed().as_millis(),
"save_wrkdir_files completed"
);
let clean_start = Instant::now();
self.run_clean(pkgpath, &envs);
trace!(
elapsed_ms = clean_start.elapsed().as_millis(),
"run_clean completed"
);
} else {
let clean_start = Instant::now();
self.run_clean(pkgpath, &envs);
trace!(
elapsed_ms = clean_start.elapsed().as_millis(),
"run_clean completed"
);
}
PackageBuildResult::Failed
}
};
match self
.session
.sandbox
.run_post_build(self.sandbox_id, &self.session.config, envs)
{
Ok(true) => {}
Ok(false) => warn!("post-build script failed"),
Err(e) => {
warn!(error = %e, "post-build script error")
}
}
Ok(result)
}
fn save_wrkdir_files(
&self,
pkgname: &str,
pkgpath: &PkgPath,
logdir: &Path,
patterns: &[String],
pkg_env: &HashMap<String, String>,
) {
let make = MakeQuery::new(&self.session, self.sandbox_id, pkgpath, pkg_env);
let wrkdir = match make.wrkdir() {
Some(w) => w,
None => {
debug!(%pkgname, "Could not determine WRKDIR, skipping file save");
return;
}
};
let wrkdir_path = make.resolve_path(&wrkdir);
if !wrkdir_path.exists() {
debug!(%pkgname, wrkdir = %wrkdir_path.display(), "WRKDIR does not exist, skipping file save");
return;
}
let save_dir = logdir.join(pkgname).join("wrkdir-files");
if let Err(e) = fs::create_dir_all(&save_dir) {
warn!(%pkgname, error = %e, "Failed to create wrkdir-files directory");
return;
}
let compiled_patterns: Vec<Pattern> = patterns
.iter()
.filter_map(|p| {
Pattern::new(p).ok().or_else(|| {
warn!(pattern = %p, "Invalid glob pattern");
None
})
})
.collect();
if compiled_patterns.is_empty() {
return;
}
let mut saved_count = 0;
if let Err(e) = walk_and_save(
&wrkdir_path,
&wrkdir_path,
&save_dir,
&compiled_patterns,
&mut saved_count,
) {
warn!(%pkgname, error = %e, "Error while saving wrkdir files");
}
if saved_count > 0 {
info!(%pkgname, count = saved_count, dest = %save_dir.display(), "Saved wrkdir files");
}
}
fn run_clean(&self, pkgpath: &PkgPath, envs: &[(String, String)]) {
let pkgdir = self.session.config.pkgsrc().join(pkgpath.as_path());
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
cmd.arg("-C").arg(&pkgdir).arg("clean");
for (key, value) in envs {
cmd.env(key, value);
}
let result = cmd
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
if let Err(e) = result {
debug!(error = %e, "Failed to run bmake clean");
}
}
}
fn walk_and_save(
base: &Path,
current: &Path,
save_dir: &Path,
patterns: &[Pattern],
saved_count: &mut usize,
) -> std::io::Result<()> {
if !current.is_dir() {
return Ok(());
}
for entry in fs::read_dir(current)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
walk_and_save(base, &path, save_dir, patterns, saved_count)?;
} else if path.is_file() {
let rel_path = path.strip_prefix(base).unwrap_or(&path);
let rel_str = rel_path.to_string_lossy();
for pattern in patterns {
if pattern.matches(&rel_str)
|| pattern.matches(
path.file_name()
.unwrap_or_default()
.to_string_lossy()
.as_ref(),
)
{
let dest_path = save_dir.join(rel_path);
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent)?;
}
if let Err(e) = fs::copy(&path, &dest_path) {
warn!(src = %path.display(),
dest = %dest_path.display(),
error = %e,
"Failed to copy file"
);
} else {
debug!(src = %path.display(),
dest = %dest_path.display(),
"Saved wrkdir file"
);
*saved_count += 1;
}
break; }
}
}
}
Ok(())
}
#[derive(Debug)]
enum ChannelCommand {
ClientReady(usize),
ComeBackLater,
JobData(Box<PackageBuild>),
JobSuccess(PkgName, Duration),
JobFailed(PkgName, Duration),
JobSkipped(PkgName),
JobError((PkgName, Duration, anyhow::Error)),
Quit,
Shutdown,
StageUpdate(usize, Option<String>),
OutputLines(usize, Vec<String>),
}
#[derive(Debug)]
enum BuildStatus {
Available(PkgName),
NoneAvailable,
Done,
}
#[derive(Clone, Debug)]
struct BuildJobs {
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
incoming: HashMap<PkgName, HashSet<PkgName>>,
reverse_deps: HashMap<PkgName, HashSet<PkgName>>,
effective_weights: HashMap<PkgName, usize>,
running: HashSet<PkgName>,
done: HashSet<PkgName>,
failed: HashSet<PkgName>,
results: Vec<BuildResult>,
logdir: PathBuf,
}
impl BuildJobs {
fn mark_success(&mut self, pkgname: &PkgName, duration: Duration) {
self.mark_done(pkgname, BuildOutcome::Success, duration);
}
fn mark_up_to_date(&mut self, pkgname: &PkgName) {
self.mark_done(pkgname, BuildOutcome::UpToDate, Duration::ZERO);
}
fn mark_done(&mut self, pkgname: &PkgName, outcome: BuildOutcome, duration: Duration) {
for dep in self.incoming.values_mut() {
if dep.contains(pkgname) {
dep.remove(pkgname);
}
}
self.done.insert(pkgname.clone());
let scanpkg = self.scanpkgs.get(pkgname);
let log_dir = Some(self.logdir.join(pkgname.pkgname()));
self.results.push(BuildResult {
pkgname: pkgname.clone(),
pkgpath: scanpkg.map(|s| s.pkgpath.clone()),
outcome,
duration,
log_dir,
});
}
fn mark_failure(&mut self, pkgname: &PkgName, duration: Duration) {
trace!(pkgname = %pkgname.pkgname(), "mark_failure called");
let start = std::time::Instant::now();
let mut broken: HashSet<PkgName> = HashSet::new();
let mut to_check: Vec<PkgName> = vec![];
to_check.push(pkgname.clone());
loop {
let Some(badpkg) = to_check.pop() else {
break;
};
if broken.contains(&badpkg) {
continue;
}
if let Some(dependents) = self.reverse_deps.get(&badpkg) {
for pkg in dependents {
to_check.push(pkg.clone());
}
}
broken.insert(badpkg);
}
trace!(pkgname = %pkgname.pkgname(), broken_count = broken.len(), elapsed_ms = start.elapsed().as_millis(), "mark_failure found broken packages");
let is_original = |p: &PkgName| p == pkgname;
for pkg in broken {
self.incoming.remove(&pkg);
self.failed.insert(pkg.clone());
let scanpkg = self.scanpkgs.get(&pkg);
let log_dir = Some(self.logdir.join(pkg.pkgname()));
let (outcome, dur) = if is_original(&pkg) {
(BuildOutcome::Failed("Build failed".to_string()), duration)
} else {
(
BuildOutcome::Skipped(SkipReason::IndirectFail(format!(
"dependency {} failed",
pkgname.pkgname()
))),
Duration::ZERO,
)
};
self.results.push(BuildResult {
pkgname: pkg,
pkgpath: scanpkg.map(|s| s.pkgpath.clone()),
outcome,
duration: dur,
log_dir,
});
}
trace!(pkgname = %pkgname.pkgname(), total_results = self.results.len(), elapsed_ms = start.elapsed().as_millis(), "mark_failure completed");
}
fn get_next_build(&self) -> BuildStatus {
if self.incoming.is_empty() {
return BuildStatus::Done;
}
let mut pkgs: Vec<(PkgName, usize)> = self
.incoming
.iter()
.filter(|(_, v)| v.is_empty())
.map(|(k, _)| (k.clone(), *self.effective_weights.get(k).unwrap_or(&100)))
.collect();
if pkgs.is_empty() {
return BuildStatus::NoneAvailable;
}
pkgs.sort_by_key(|&(_, weight)| std::cmp::Reverse(weight));
BuildStatus::Available(pkgs[0].0.clone())
}
}
impl Build {
pub fn new(
config: &Config,
pkgsrc_env: PkgsrcEnv,
scope: SandboxScope,
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
options: BuildOptions,
) -> Build {
info!(
package_count = scanpkgs.len(),
sandbox_enabled = scope.enabled(),
build_threads = config.build_threads(),
?options,
"Creating new Build instance"
);
for (pkgname, index) in &scanpkgs {
debug!(pkgname = %pkgname.pkgname(),
pkgpath = ?index.pkgpath,
depends_count = index.depends().len(),
depends = ?index.depends().iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
"Package in build queue"
);
}
Build {
config: config.clone(),
pkgsrc_env,
scope,
scanpkgs,
cached: IndexMap::new(),
options,
}
}
pub fn load_cached_from_db(&mut self, db: &crate::db::Database) -> anyhow::Result<usize> {
let mut count = 0;
for pkgname in self.scanpkgs.keys() {
if let Some(pkg) = db.get_package_by_name(pkgname.pkgname())? {
if let Some(result) = db.get_build_result(pkg.id)? {
self.cached.insert(pkgname.clone(), result);
count += 1;
}
}
}
if count > 0 {
info!(
cached_count = count,
"Loaded cached build results from database"
);
}
Ok(count)
}
pub fn start(
&mut self,
ctx: &RunContext,
db: &crate::db::Database,
) -> anyhow::Result<BuildSummary> {
let started = Instant::now();
info!(package_count = self.scanpkgs.len(), "Build::start() called");
let shutdown_flag = Arc::clone(&ctx.shutdown);
debug!("Populating BuildJobs from scanpkgs");
let mut incoming: HashMap<PkgName, HashSet<PkgName>> = HashMap::new();
let mut reverse_deps: HashMap<PkgName, HashSet<PkgName>> = HashMap::new();
for (pkgname, index) in &self.scanpkgs {
let mut deps: HashSet<PkgName> = HashSet::new();
for dep in index.depends() {
if !self.scanpkgs.contains_key(dep) {
continue;
}
deps.insert(dep.clone());
reverse_deps
.entry(dep.clone())
.or_default()
.insert(pkgname.clone());
}
trace!(pkgname = %pkgname.pkgname(),
deps_count = deps.len(),
deps = ?deps.iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
"Adding package to incoming build queue"
);
incoming.insert(pkgname.clone(), deps);
}
let mut done: HashSet<PkgName> = HashSet::new();
let mut failed: HashSet<PkgName> = HashSet::new();
let results: Vec<BuildResult> = Vec::new();
let mut cached_count = 0usize;
for (pkgname, result) in &self.cached {
match result.outcome {
BuildOutcome::Success | BuildOutcome::UpToDate => {
incoming.remove(pkgname);
done.insert(pkgname.clone());
for deps in incoming.values_mut() {
deps.remove(pkgname);
}
cached_count += 1;
}
BuildOutcome::Failed(_) | BuildOutcome::Skipped(_) => {
incoming.remove(pkgname);
failed.insert(pkgname.clone());
cached_count += 1;
}
}
}
loop {
let mut newly_failed: Vec<PkgName> = Vec::new();
for (pkgname, deps) in &incoming {
for dep in deps {
if failed.contains(dep) {
newly_failed.push(pkgname.clone());
break;
}
}
}
if newly_failed.is_empty() {
break;
}
for pkgname in newly_failed {
incoming.remove(&pkgname);
failed.insert(pkgname);
}
}
if cached_count > 0 {
println!("Loaded {} cached build results", cached_count);
}
info!(
incoming_count = incoming.len(),
scanpkgs_count = self.scanpkgs.len(),
cached_count = cached_count,
"BuildJobs populated"
);
if incoming.is_empty() {
return Ok(BuildSummary {
duration: started.elapsed(),
results,
scanfail: Vec::new(),
});
}
self.scope.ensure(self.config.build_threads())?;
let get_weight = |pkg: &PkgName| -> usize {
self.scanpkgs
.get(pkg)
.and_then(|idx| idx.pbulk_weight())
.and_then(|w| w.parse().ok())
.unwrap_or(100)
};
let mut effective_weights: HashMap<PkgName, usize> = HashMap::new();
let mut pending: HashMap<&PkgName, usize> = incoming
.keys()
.map(|p| (p, reverse_deps.get(p).map_or(0, |s| s.len())))
.collect();
let mut queue: VecDeque<&PkgName> = pending
.iter()
.filter(|(_, c)| **c == 0)
.map(|(&p, _)| p)
.collect();
while let Some(pkg) = queue.pop_front() {
let mut total = get_weight(pkg);
if let Some(dependents) = reverse_deps.get(pkg) {
for dep in dependents {
total += effective_weights.get(dep).unwrap_or(&0);
}
}
effective_weights.insert(pkg.clone(), total);
for dep in incoming.get(pkg).iter().flat_map(|s| s.iter()) {
if let Some(c) = pending.get_mut(dep) {
*c -= 1;
if *c == 0 {
queue.push_back(dep);
}
}
}
}
let running: HashSet<PkgName> = HashSet::new();
let logdir = self.config.logdir().clone();
let jobs = BuildJobs {
scanpkgs: self.scanpkgs.clone(),
incoming,
reverse_deps,
effective_weights,
running,
done,
failed,
results,
logdir,
};
println!("Building packages...");
let progress = Arc::new(Mutex::new(
MultiProgress::new(
"Building",
"Built",
self.scanpkgs.len(),
self.config.build_threads(),
)
.expect("Failed to initialize progress display"),
));
if cached_count > 0 {
if let Ok(mut p) = progress.lock() {
p.state_mut().cached = cached_count;
}
}
let stop_refresh = Arc::new(AtomicBool::new(false));
let progress_refresh = Arc::clone(&progress);
let stop_flag = Arc::clone(&stop_refresh);
let shutdown_for_refresh = Arc::clone(&shutdown_flag);
let refresh_thread = std::thread::spawn(move || {
while !stop_flag.load(Ordering::Relaxed) && !shutdown_for_refresh.load(Ordering::SeqCst)
{
let has_event = event::poll(REFRESH_INTERVAL).unwrap_or(false);
if let Ok(mut p) = progress_refresh.lock() {
if has_event {
let _ = p.handle_event();
}
let _ = p.render();
}
}
});
let (manager_tx, manager_rx) = mpsc::channel::<ChannelCommand>();
let mut threads = vec![];
let mut clients: HashMap<usize, Sender<ChannelCommand>> = HashMap::new();
for i in 0..self.config.build_threads() {
let (client_tx, client_rx) = mpsc::channel::<ChannelCommand>();
clients.insert(i, client_tx);
let manager_tx = manager_tx.clone();
let shutdown_for_worker = Arc::clone(&shutdown_flag);
let thread = std::thread::spawn(move || {
loop {
if shutdown_for_worker.load(Ordering::SeqCst) {
break;
}
if manager_tx.send(ChannelCommand::ClientReady(i)).is_err() {
break;
}
let Ok(msg) = client_rx.recv() else {
break;
};
match msg {
ChannelCommand::ComeBackLater => {
std::thread::sleep(WORKER_BACKOFF_INTERVAL);
continue;
}
ChannelCommand::JobData(pkg) => {
let pkgname = pkg.pkginfo.index.pkgname.clone();
let pkgpath = &pkg.pkginfo.pkgpath;
let span = info_span!(
"build",
sandbox_id = pkg.sandbox_id,
pkgpath = %pkgpath,
pkgname = %pkgname.pkgname(),
);
let _guard = span.enter();
let build_start = Instant::now();
let result = pkg.build(&manager_tx);
let duration = build_start.elapsed();
trace!(
elapsed_ms = duration.as_millis(),
result = %result.as_ref().map_or("error".to_string(), |r| r.to_string()),
"Build finished"
);
match result {
Ok(PackageBuildResult::Success) => {
let _ = manager_tx
.send(ChannelCommand::JobSuccess(pkgname, duration));
}
Ok(PackageBuildResult::Skipped) => {
let _ = manager_tx.send(ChannelCommand::JobSkipped(pkgname));
}
Ok(PackageBuildResult::Failed) => {
let _ = manager_tx
.send(ChannelCommand::JobFailed(pkgname, duration));
}
Err(e) => {
if !shutdown_for_worker.load(Ordering::SeqCst) {
let _ = manager_tx
.send(ChannelCommand::JobError((pkgname, duration, e)));
}
}
}
if shutdown_for_worker.load(Ordering::SeqCst) {
break;
}
continue;
}
ChannelCommand::Quit | ChannelCommand::Shutdown => {
break;
}
_ => todo!(),
}
}
});
threads.push(thread);
}
let session = Arc::new(BuildSession {
config: self.config.clone(),
pkgsrc_env: self.pkgsrc_env.clone(),
sandbox: self.scope.sandbox().clone(),
options: self.options.clone(),
shutdown: Arc::clone(&shutdown_flag),
});
let progress_clone = Arc::clone(&progress);
let shutdown_for_manager = Arc::clone(&shutdown_flag);
let (results_tx, results_rx) = mpsc::channel::<Vec<BuildResult>>();
let (interrupted_tx, interrupted_rx) = mpsc::channel::<bool>();
let (completed_tx, completed_rx) = mpsc::channel::<BuildResult>();
let manager = std::thread::spawn(move || {
let mut clients = clients.clone();
let mut jobs = jobs.clone();
let mut was_interrupted = false;
let mut thread_packages: HashMap<usize, PkgName> = HashMap::new();
loop {
if shutdown_for_manager.load(Ordering::SeqCst) {
if let Ok(mut p) = progress_clone.lock() {
p.state_mut().suppress();
}
for (_, client) in clients.drain() {
let _ = client.send(ChannelCommand::Shutdown);
}
was_interrupted = true;
break;
}
let command = match manager_rx.recv_timeout(SHUTDOWN_POLL_INTERVAL) {
Ok(cmd) => cmd,
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
match command {
ChannelCommand::ClientReady(c) => {
let client = clients.get(&c).unwrap();
match jobs.get_next_build() {
BuildStatus::Available(pkg) => {
let pkginfo = jobs.scanpkgs.get(&pkg).unwrap();
jobs.incoming.remove(&pkg);
jobs.running.insert(pkg.clone());
thread_packages.insert(c, pkg.clone());
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_active(c, pkg.pkgname());
let _ = p.render();
}
let _ =
client.send(ChannelCommand::JobData(Box::new(PackageBuild {
session: Arc::clone(&session),
sandbox_id: c,
pkginfo: pkginfo.clone(),
})));
}
BuildStatus::NoneAvailable => {
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_idle(c);
let _ = p.render();
}
let _ = client.send(ChannelCommand::ComeBackLater);
}
BuildStatus::Done => {
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_idle(c);
let _ = p.render();
}
let _ = client.send(ChannelCommand::Quit);
clients.remove(&c);
if clients.is_empty() {
break;
}
}
};
}
ChannelCommand::JobSuccess(pkgname, duration) => {
jobs.mark_success(&pkgname, duration);
jobs.running.remove(&pkgname);
if let Some(result) = jobs.results.last() {
let _ = completed_tx.send(result.clone());
}
if let Ok(mut p) = progress_clone.lock() {
let _ = p.print_status(&format!(
" Built {} ({})",
pkgname.pkgname(),
format_duration(duration)
));
p.state_mut().increment_completed();
for (tid, pkg) in &thread_packages {
if pkg == &pkgname {
p.clear_output_buffer(*tid);
p.state_mut().set_worker_idle(*tid);
break;
}
}
let _ = p.render();
}
}
ChannelCommand::JobSkipped(pkgname) => {
jobs.mark_up_to_date(&pkgname);
jobs.running.remove(&pkgname);
if let Some(result) = jobs.results.last() {
let _ = completed_tx.send(result.clone());
}
if let Ok(mut p) = progress_clone.lock() {
let _ = p.print_status(&format!(
" Skipped {} (up-to-date)",
pkgname.pkgname()
));
p.state_mut().increment_skipped();
for (tid, pkg) in &thread_packages {
if pkg == &pkgname {
p.clear_output_buffer(*tid);
p.state_mut().set_worker_idle(*tid);
break;
}
}
let _ = p.render();
}
}
ChannelCommand::JobFailed(pkgname, duration) => {
let results_before = jobs.results.len();
jobs.mark_failure(&pkgname, duration);
jobs.running.remove(&pkgname);
for result in jobs.results.iter().skip(results_before) {
let _ = completed_tx.send(result.clone());
}
if let Ok(mut p) = progress_clone.lock() {
let _ = p.print_status(&format!(
" Failed {} ({})",
pkgname.pkgname(),
format_duration(duration)
));
p.state_mut().increment_failed();
for (tid, pkg) in &thread_packages {
if pkg == &pkgname {
p.clear_output_buffer(*tid);
p.state_mut().set_worker_idle(*tid);
break;
}
}
let _ = p.render();
}
}
ChannelCommand::JobError((pkgname, duration, e)) => {
let results_before = jobs.results.len();
jobs.mark_failure(&pkgname, duration);
jobs.running.remove(&pkgname);
for result in jobs.results.iter().skip(results_before) {
let _ = completed_tx.send(result.clone());
}
if let Ok(mut p) = progress_clone.lock() {
let _ = p.print_status(&format!(
" Failed {} ({})",
pkgname.pkgname(),
format_duration(duration)
));
p.state_mut().increment_failed();
for (tid, pkg) in &thread_packages {
if pkg == &pkgname {
p.clear_output_buffer(*tid);
p.state_mut().set_worker_idle(*tid);
break;
}
}
let _ = p.render();
}
tracing::error!(error = %e, pkgname = %pkgname.pkgname(), "Build error");
}
ChannelCommand::StageUpdate(tid, stage) => {
if let Ok(mut p) = progress_clone.lock() {
p.state_mut().set_worker_stage(tid, stage.as_deref());
let _ = p.render();
}
}
ChannelCommand::OutputLines(tid, lines) => {
if let Ok(mut p) = progress_clone.lock() {
if let Some(buf) = p.output_buffer_mut(tid) {
for line in lines {
buf.push(line);
}
}
}
}
_ => {}
}
}
debug!(
result_count = jobs.results.len(),
"Manager sending results back"
);
let _ = results_tx.send(jobs.results);
let _ = interrupted_tx.send(was_interrupted);
});
threads.push(manager);
debug!("Waiting for worker threads to complete");
let join_start = Instant::now();
for thread in threads {
thread.join().expect("thread panicked");
}
debug!(
elapsed_ms = join_start.elapsed().as_millis(),
"Worker threads completed"
);
let mut saved_count = 0;
while let Ok(result) = completed_rx.try_recv() {
if let Err(e) = db.store_build_by_name(&result) {
warn!(
pkgname = %result.pkgname.pkgname(),
error = %e,
"Failed to save build result"
);
} else {
saved_count += 1;
}
}
if saved_count > 0 {
debug!(saved_count, "Saved build results to database");
}
stop_refresh.store(true, Ordering::Relaxed);
let _ = refresh_thread.join();
let was_interrupted = interrupted_rx.recv().unwrap_or(false);
if let Ok(mut p) = progress.lock() {
if was_interrupted {
let _ = p.finish_interrupted();
} else {
let _ = p.finish();
}
}
debug!("Collecting results from manager");
let results = results_rx.recv().unwrap_or_default();
debug!(
result_count = results.len(),
"Collected results from manager"
);
let summary = BuildSummary {
duration: started.elapsed(),
results,
scanfail: Vec::new(),
};
Ok(summary)
}
}