use crate::config::{PkgsrcEnv, WrkObjKind};
use crate::makejobs::PkgMakeJobs;
use crate::sandbox::{CommandSetsid, SHUTDOWN_POLL_INTERVAL, SandboxScope, wait_with_shutdown};
use crate::scan::ResolvedPackage;
use crate::scheduler::Scheduler;
use crate::tui::{Progress, REFRESH_INTERVAL};
use crate::{Config, RunState, Sandbox};
use crate::{PackageCounts, PackageState, PackageStateKind};
use anyhow::{Context, bail};
use crossterm::event;
use glob::Pattern;
use indexmap::IndexMap;
use pkgsrc::archive::BinaryPackage;
use pkgsrc::digest::Digest;
use pkgsrc::metadata::FileRead;
use pkgsrc::{PkgName, PkgPath};
use std::collections::{HashMap, HashSet};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, mpsc, mpsc::Sender};
use std::task::Poll;
use std::time::{Duration, Instant};
use tracing::{debug, error, info, info_span, trace, warn};
const OUTPUT_BATCH_INTERVAL: Duration = Duration::from_millis(100);
const WORKER_BACKOFF_INTERVAL: Duration = Duration::from_millis(100);
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BuildReason {
PackageNotFound,
BuildFileRemoved(String),
BuildFileChanged(String),
DependencyAdded(String),
DependenciesAdded(Vec<String>),
DependencyRemoved(String),
DependenciesRemoved(Vec<String>),
DependencyUpdated(String, String, String),
DependenciesUpdated(Vec<(String, String, String)>),
DependenciesChanged {
updated: Vec<(String, String, String)>,
added: Vec<String>,
removed: Vec<String>,
},
DependencyMissing(String),
DependencyRefresh(String),
}
impl std::fmt::Display for BuildReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BuildReason::PackageNotFound => write!(f, "Package not found"),
BuildReason::BuildFileRemoved(file) => {
write!(f, "Build file removed: {}", file)
}
BuildReason::BuildFileChanged(file) => {
write!(f, "Build file changed: {}", file)
}
BuildReason::DependencyAdded(dep) => {
write!(f, "Dependency added: {}", dep)
}
BuildReason::DependenciesAdded(deps) => {
write!(f, "Dependencies added: {}", deps.join(", "))
}
BuildReason::DependencyRemoved(dep) => {
write!(f, "Dependency removed: {}", dep)
}
BuildReason::DependenciesRemoved(deps) => {
write!(f, "Dependencies removed: {}", deps.join(", "))
}
BuildReason::DependencyUpdated(base, old, new) => {
write!(f, "Dependency updated: {} {} -> {}", base, old, new)
}
BuildReason::DependenciesUpdated(updates) => {
let parts: Vec<String> = updates
.iter()
.map(|(base, old, new)| format!("{} {} -> {}", base, old, new))
.collect();
write!(f, "Dependencies updated: {}", parts.join(", "))
}
BuildReason::DependenciesChanged {
updated,
added,
removed,
} => {
let mut parts = Vec::new();
for r in removed {
parts.push(format!("-{}", r));
}
for a in added {
parts.push(format!("+{}", a));
}
for (base, old, new) in updated {
parts.push(format!("{} {} -> {}", base, old, new));
}
write!(f, "Dependencies changed: {}", parts.join(", "))
}
BuildReason::DependencyMissing(dep) => {
write!(f, "Dependency missing: {}", dep)
}
BuildReason::DependencyRefresh(dep) => {
write!(f, "Dependency refreshed: {}", dep)
}
}
}
}
fn dir_disk_usage(path: &Path) -> Option<u64> {
let output = std::process::Command::new("du")
.arg("-sk")
.arg(path)
.output()
.ok()?;
if !output.status.success() {
return None;
}
let s = std::str::from_utf8(&output.stdout).ok()?;
let kb: u64 = s.split_whitespace().next()?.parse().ok()?;
Some(kb * 1024)
}
fn pkgdir_as_str(pkgdir: &Path) -> anyhow::Result<&str> {
pkgdir
.to_str()
.ok_or_else(|| anyhow::anyhow!("pkgdir path is not valid UTF-8: {}", pkgdir.display()))
}
pub fn pkg_up_to_date(
pkgname: &str,
depends: &[&str],
packages_dir: &Path,
pkgsrc_dir: &Path,
) -> anyhow::Result<Option<BuildReason>> {
let pkgfile = packages_dir.join(format!("{}.tgz", pkgname));
let pkgfile_mtime = match pkgfile.metadata().and_then(|m| m.modified()) {
Ok(t) => t,
Err(_) => {
debug!(path = %pkgfile.display(), "Package file not found");
return Ok(Some(BuildReason::PackageNotFound));
}
};
let pkg = BinaryPackage::open(&pkgfile)
.with_context(|| format!("Failed to open package {}", pkgfile.display()))?;
let build_version = pkg
.build_version()
.context("Failed to read BUILD_VERSION")?
.unwrap_or_default();
debug!(
lines = build_version.lines().count(),
"Checking BUILD_VERSION"
);
for line in build_version.lines() {
let Some((file, file_id)) = line.split_once(':') else {
continue;
};
let file_id = file_id.trim();
if file.is_empty() || file_id.is_empty() {
continue;
}
let src_file = pkgsrc_dir.join(file);
if !src_file.exists() {
debug!(file, "File removed");
return Ok(Some(BuildReason::BuildFileRemoved(file.to_string())));
}
if file_id.starts_with("$NetBSD") {
let Ok(content) = std::fs::read_to_string(&src_file) else {
return Ok(Some(BuildReason::BuildFileRemoved(file.to_string())));
};
let id = content.lines().find_map(|line| {
let start = line.find("$NetBSD")?;
let end = line[start + 1..].find('$')?;
Some(&line[start..start + end + 2])
});
if id != Some(file_id) {
debug!(file, "CVS ID mismatch");
return Ok(Some(BuildReason::BuildFileChanged(file.to_string())));
}
} else {
let mut f = File::open(&src_file)
.with_context(|| format!("Failed to open {}", src_file.display()))?;
let hash = Digest::SHA256
.hash_file(&mut f)
.with_context(|| format!("Failed to digest {file}"))?;
if hash != file_id {
debug!(
file,
path = %src_file.display(),
expected = file_id,
actual = hash,
"Hash mismatch"
);
return Ok(Some(BuildReason::BuildFileChanged(file.to_string())));
}
}
}
let recorded_deps: HashSet<&str> = pkg
.plist()
.build_depends()
.filter(|l| !l.is_empty())
.collect();
let expected_deps: HashSet<&str> = depends.iter().copied().collect();
let removed_set: HashSet<&str> = recorded_deps.difference(&expected_deps).copied().collect();
if !removed_set.is_empty() {
let expected_by_base: HashMap<String, (&str, String)> = expected_deps
.iter()
.map(|&name| {
let pkg = PkgName::new(name);
(
pkg.pkgbase().to_string(),
(name, pkg.pkgversion().to_string()),
)
})
.collect();
let mut updated = Vec::new();
let mut removed = Vec::new();
for &name in &removed_set {
let pkg = PkgName::new(name);
if let Some((_, new_ver)) = expected_by_base.get(pkg.pkgbase()) {
updated.push((
pkg.pkgbase().to_string(),
pkg.pkgversion().to_string(),
new_ver.clone(),
));
} else {
removed.push(name.to_string());
}
}
debug!(?updated, ?removed, "Dependency list changed");
let reason = if updated.is_empty() {
if removed.len() == 1 {
BuildReason::DependencyRemoved(removed.swap_remove(0))
} else {
BuildReason::DependenciesRemoved(removed)
}
} else if removed.is_empty() {
if updated.len() == 1 {
let (base, old, new) = updated.swap_remove(0);
BuildReason::DependencyUpdated(base, old, new)
} else {
BuildReason::DependenciesUpdated(updated)
}
} else {
BuildReason::DependenciesChanged {
updated,
added: Vec::new(),
removed,
}
};
return Ok(Some(reason));
}
for dep in &recorded_deps {
let dep_pkg = packages_dir.join(format!("{}.tgz", dep));
let dep_mtime = match dep_pkg.metadata().and_then(|m| m.modified()) {
Ok(t) => t,
Err(_) => {
debug!(dep, "Dependency package missing");
return Ok(Some(BuildReason::DependencyMissing((*dep).to_string())));
}
};
if dep_mtime > pkgfile_mtime {
debug!(dep, "Dependency is newer");
return Ok(Some(BuildReason::DependencyRefresh((*dep).to_string())));
}
}
debug!(pkgname, "Package is up-to-date");
Ok(None)
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
clap::ValueEnum,
serde::Serialize,
serde::Deserialize,
strum::EnumProperty,
strum::FromRepr,
strum::IntoStaticStr,
strum::VariantArray,
)]
#[clap(rename_all = "kebab-case")]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case", const_into_str)]
#[repr(i32)]
pub enum Stage {
PreClean = 1,
Depends = 2,
Checksum = 3,
Configure = 4,
Build = 5,
Install = 6,
Package = 7,
Deinstall = 8,
Clean = 9,
}
impl crate::ColumnAlign for Stage {
fn align(&self) -> crate::Align {
crate::Align::Right
}
}
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct PkgBuildStats {
pub make_jobs: PkgMakeJobs,
pub stage: Option<Stage>,
pub stage_durations: Vec<(Stage, Duration)>,
pub stage_cpu_times: Vec<(Stage, Duration)>,
pub disk_usage: Option<u64>,
pub wrkobjdir: Option<WrkObjKind>,
pub duration: Duration,
pub timestamp: i64,
}
#[derive(Debug)]
enum PkgBuildResult {
Success(PkgBuildStats),
Failed(PkgBuildStats),
}
impl std::fmt::Display for PkgBuildResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Success(_) => write!(f, "success"),
Self::Failed(_) => write!(f, "failed"),
}
}
}
#[derive(Debug, Clone, Copy)]
enum RunAs {
Root,
User,
}
trait BuildCallback: Send {
fn stage(&mut self, stage: &str);
}
#[derive(Debug)]
struct BuildSession {
config: Config,
pkgsrc_env: PkgsrcEnv,
sandbox: Sandbox,
state: RunState,
wrkobjdir_map: HashMap<PkgName, WrkObjKind>,
}
struct PkgBuilder<'a> {
session: &'a BuildSession,
sandbox_id: Option<usize>,
worker_id: usize,
pkginfo: &'a ResolvedPackage,
logdir: PathBuf,
build_user: Option<String>,
envs: Vec<(String, String)>,
output_tx: Sender<ChannelCommand>,
make_jobs: PkgMakeJobs,
wrkdir: Option<PathBuf>,
}
impl<'a> PkgBuilder<'a> {
#[allow(clippy::too_many_arguments)]
fn new(
session: &'a BuildSession,
sandbox_id: Option<usize>,
worker_id: usize,
pkginfo: &'a ResolvedPackage,
envs: Vec<(String, String)>,
output_tx: Sender<ChannelCommand>,
make_jobs: PkgMakeJobs,
wrkdir: Option<PathBuf>,
) -> Self {
let logdir = session
.config
.logdir()
.join(pkginfo.index.pkgname.pkgname());
let build_user = session.config.build_user().map(|s| s.to_string());
Self {
session,
sandbox_id,
worker_id,
pkginfo,
logdir,
build_user,
envs,
output_tx,
make_jobs,
wrkdir,
}
}
fn build<C: BuildCallback>(
&self,
stats: &mut PkgBuildStats,
callback: &mut C,
) -> anyhow::Result<PkgBuildResult> {
let pkgname_str = self.pkginfo.pkgname().pkgname();
let pkgpath = &self.pkginfo.pkgpath;
let pkgdir = self.session.config.pkgsrc().join(pkgpath.as_path());
let stage_start = Instant::now();
stats.stage = Some(Stage::PreClean);
callback.stage(Stage::PreClean.into_str());
let (_, cpu_time) =
self.run_make_stage(Stage::PreClean, &pkgdir, &["clean"], RunAs::Root, false)?;
stats
.stage_durations
.push((Stage::PreClean, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::PreClean, cpu_time));
if !self.pkginfo.depends().is_empty() {
let stage_start = Instant::now();
stats.stage = Some(Stage::Depends);
callback.stage(Stage::Depends.into_str());
if !self.install_dependencies()? {
stats
.stage_durations
.push((Stage::Depends, stage_start.elapsed()));
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
stats
.stage_durations
.push((Stage::Depends, stage_start.elapsed()));
}
let stage_start = Instant::now();
stats.stage = Some(Stage::Checksum);
callback.stage(Stage::Checksum.into_str());
let (ok, cpu_time) =
self.run_make_stage(Stage::Checksum, &pkgdir, &["checksum"], RunAs::Root, true)?;
stats
.stage_durations
.push((Stage::Checksum, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::Checksum, cpu_time));
if !ok {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let jobs_suffix = match (self.make_jobs.safe(), self.make_jobs.jobs()) {
(false, Some(j)) => format!(" -j{}*", j),
(true, Some(j)) => format!(" -j{}", j),
(_, None) => String::new(),
};
stats.make_jobs = self.make_jobs;
let stage_start = Instant::now();
stats.stage = Some(Stage::Configure);
callback.stage(Stage::Configure.into_str());
let configure_log = self.logdir.join("configure.log");
if !self.run_usergroup_if_needed(Stage::Configure, &pkgdir, &configure_log)? {
stats
.stage_durations
.push((Stage::Configure, stage_start.elapsed()));
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let (ok, cpu_time) = self.run_make_stage(
Stage::Configure,
&pkgdir,
&["configure"],
self.build_run_as(),
true,
)?;
stats
.stage_durations
.push((Stage::Configure, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::Configure, cpu_time));
if !ok {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let build_phase_start = Instant::now();
stats.stage = Some(Stage::Build);
callback.stage(&format!("{}{}", Stage::Build.into_str(), jobs_suffix));
let build_log = self.logdir.join("build.log");
if !self.run_usergroup_if_needed(Stage::Build, &pkgdir, &build_log)? {
stats
.stage_durations
.push((Stage::Build, build_phase_start.elapsed()));
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let (build_ok, cpu_time) =
self.run_make_stage(Stage::Build, &pkgdir, &["all"], self.build_run_as(), true)?;
stats
.stage_durations
.push((Stage::Build, build_phase_start.elapsed()));
stats.stage_cpu_times.push((Stage::Build, cpu_time));
if !build_ok {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let stage_start = Instant::now();
stats.stage = Some(Stage::Install);
callback.stage(Stage::Install.into_str());
let install_log = self.logdir.join("install.log");
if !self.run_usergroup_if_needed(Stage::Install, &pkgdir, &install_log)? {
stats
.stage_durations
.push((Stage::Install, stage_start.elapsed()));
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let (ok, cpu_time) = self.run_make_stage(
Stage::Install,
&pkgdir,
&["stage-install"],
self.build_run_as(),
true,
)?;
stats
.stage_durations
.push((Stage::Install, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::Install, cpu_time));
if !ok {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let stage_start = Instant::now();
stats.stage = Some(Stage::Package);
callback.stage(Stage::Package.into_str());
let (ok, cpu_time) = self.run_make_stage(
Stage::Package,
&pkgdir,
&["stage-package-create"],
RunAs::Root,
true,
)?;
stats
.stage_durations
.push((Stage::Package, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::Package, cpu_time));
if !ok {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let pkgfile = self.get_make_var(&pkgdir, "STAGE_PKGFILE")?;
let is_bootstrap = self.pkginfo.bootstrap_pkg() == Some("yes");
if !is_bootstrap {
if !self.pkg_add(&pkgfile)? {
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
let stage_start = Instant::now();
stats.stage = Some(Stage::Deinstall);
callback.stage(Stage::Deinstall.into_str());
if !self.pkg_delete(pkgname_str)? {
stats
.stage_durations
.push((Stage::Deinstall, stage_start.elapsed()));
return Ok(PkgBuildResult::Failed(std::mem::take(stats)));
}
stats
.stage_durations
.push((Stage::Deinstall, stage_start.elapsed()));
}
let packages_dir = self.session.pkgsrc_env.packages.join("All");
fs::create_dir_all(&packages_dir)?;
let dest = packages_dir.join(
Path::new(&pkgfile)
.file_name()
.context("Invalid package file path")?,
);
let host_pkgfile = match self.sandbox_id {
Some(id) => self
.session
.sandbox
.path(id)
.join(pkgfile.trim_start_matches('/')),
None => PathBuf::from(&pkgfile),
};
fs::copy(&host_pkgfile, &dest)?;
match self.wrkdir {
Some(ref wrkdir) => match dir_disk_usage(wrkdir) {
Some(size) => {
debug!(wrkdir = %wrkdir.display(), size, "Measured WRKDIR disk usage");
stats.disk_usage = Some(size);
}
None => {
debug!(wrkdir = %wrkdir.display(), "Failed to measure disk usage")
}
},
None => debug!("No WRKDIR available for disk usage measurement"),
}
let stage_start = Instant::now();
stats.stage = Some(Stage::Clean);
callback.stage(Stage::Clean.into_str());
let (_, cpu_time) =
self.run_make_stage(Stage::Clean, &pkgdir, &["clean"], RunAs::Root, false)?;
stats
.stage_durations
.push((Stage::Clean, stage_start.elapsed()));
stats.stage_cpu_times.push((Stage::Clean, cpu_time));
let _ = fs::remove_dir_all(&self.logdir);
Ok(PkgBuildResult::Success(std::mem::take(stats)))
}
fn build_run_as(&self) -> RunAs {
if self.build_user.is_some() {
RunAs::User
} else {
RunAs::Root
}
}
fn run_make_stage(
&self,
stage: Stage,
pkgdir: &Path,
targets: &[&str],
run_as: RunAs,
include_make_flags: bool,
) -> anyhow::Result<(bool, Duration)> {
self.run_make_stage_with_flags(stage, pkgdir, targets, run_as, include_make_flags, &[])
}
fn run_make_stage_with_flags(
&self,
stage: Stage,
pkgdir: &Path,
targets: &[&str],
run_as: RunAs,
include_make_flags: bool,
extra_flags: &[&str],
) -> anyhow::Result<(bool, Duration)> {
let logfile = self.logdir.join(format!("{}.log", stage.into_str()));
let owned_args = self.make_args(pkgdir, targets, include_make_flags, extra_flags)?;
let args: Vec<&str> = owned_args.iter().map(|s| s.as_str()).collect();
info!(stage = stage.into_str(), "Running make stage");
let (status, cpu_time) =
self.run_command_logged(self.session.config.make(), &args, run_as, &logfile)?;
Ok((status.success(), cpu_time))
}
fn run_command_logged(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
logfile: &Path,
) -> anyhow::Result<(ExitStatus, Duration)> {
self.run_command_logged_with_env(cmd, args, run_as, logfile, &[])
}
fn run_command_logged_with_env(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
logfile: &Path,
extra_envs: &[(&str, &str)],
) -> anyhow::Result<(ExitStatus, Duration)> {
let mut log = OpenOptions::new().create(true).append(true).open(logfile)?;
let _ = writeln!(log, "=> {:?} {:?}", cmd, args);
let _ = log.flush();
let shell_cmd = self.build_shell_command(cmd, args, run_as, extra_envs);
let mut child = self
.session
.sandbox
.command(self.sandbox_id, Path::new("/bin/sh"))
.new_session()
.arg("-c")
.arg(&shell_cmd)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.context("Failed to spawn shell command")?;
let stdout = child.stdout.take().unwrap();
let output_tx = self.output_tx.clone();
let worker_id = self.worker_id;
let (tee_done_tx, tee_done_rx) = mpsc::sync_channel::<()>(1);
let tee_handle = std::thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut buf = Vec::new();
let mut batch = Vec::with_capacity(50);
let mut last_send = Instant::now();
let send_interval = OUTPUT_BATCH_INTERVAL;
loop {
buf.clear();
match reader.read_until(b'\n', &mut buf) {
Ok(0) => break,
Ok(_) => {}
Err(_) => break,
};
let _ = log.write_all(&buf);
let line = String::from_utf8_lossy(&buf);
let line = line.trim_end_matches('\n').to_string();
batch.push(line);
if last_send.elapsed() >= send_interval || batch.len() >= 50 {
let _ = output_tx.send(ChannelCommand::OutputLines(
worker_id,
std::mem::take(&mut batch),
));
last_send = Instant::now();
}
}
if !batch.is_empty() {
let _ = output_tx.send(ChannelCommand::OutputLines(worker_id, batch));
}
let _ = tee_done_tx.send(());
});
let (status, cpu_time) = wait_with_shutdown(&mut child, &self.session.state)?;
if tee_done_rx.recv_timeout(Duration::from_secs(5)).is_ok() {
let _ = tee_handle.join();
} else {
warn!(
pkg = %self.pkginfo.index.pkgname,
"Tee thread stuck on pipe held by orphaned process, detaching"
);
}
trace!(?cmd, ?status, "Command completed");
Ok((status, cpu_time))
}
fn get_make_var(&self, pkgdir: &Path, varname: &str) -> anyhow::Result<String> {
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
cmd.new_session();
self.apply_envs(&mut cmd, &[]);
let make_args = self.make_args(
pkgdir,
&["show-var", &format!("VARNAME={}", varname)],
true,
&[],
)?;
let output = cmd.args(&make_args).stderr(Stdio::piped()).output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("Failed to get make variable {varname}: {}", stderr.trim());
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
fn install_dependencies(&self) -> anyhow::Result<bool> {
let deps: Vec<String> = self
.pkginfo
.depends()
.iter()
.map(|d| d.to_string())
.collect();
let pkg_path = self.session.pkgsrc_env.packages.join("All");
let logfile = self.logdir.join("depends.log");
let mut args = vec![];
for dep in &deps {
args.push(dep.as_str());
}
let (status, _) = self.run_pkg_add_with_path(&args, &pkg_path, &logfile)?;
Ok(status.success())
}
fn run_pkg_add_with_path(
&self,
packages: &[&str],
pkg_path: &Path,
logfile: &Path,
) -> anyhow::Result<(ExitStatus, Duration)> {
let pkg_add = self.session.pkgsrc_env.pkgtools.join("pkg_add");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let pkg_path_value = pkg_path.to_string_lossy().to_string();
let extra_envs = [("PKG_PATH", pkg_path_value.as_str())];
let mut args = vec!["-K", &*pkg_dbdir];
args.extend(packages.iter().copied());
self.run_command_logged_with_env(&pkg_add, &args, RunAs::Root, logfile, &extra_envs)
}
fn pkg_add(&self, pkgfile: &str) -> anyhow::Result<bool> {
let pkg_add = self.session.pkgsrc_env.pkgtools.join("pkg_add");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let logfile = self.logdir.join("package.log");
let (status, _) = self.run_command_logged(
&pkg_add,
&["-K", &*pkg_dbdir, pkgfile],
RunAs::Root,
&logfile,
)?;
Ok(status.success())
}
fn pkg_delete(&self, pkgname: &str) -> anyhow::Result<bool> {
let pkg_delete = self.session.pkgsrc_env.pkgtools.join("pkg_delete");
let pkg_dbdir = self.session.pkgsrc_env.pkg_dbdir.to_string_lossy();
let logfile = self.logdir.join("deinstall.log");
let (status, _) = self.run_command_logged(
&pkg_delete,
&["-K", &*pkg_dbdir, pkgname],
RunAs::Root,
&logfile,
)?;
Ok(status.success())
}
fn run_usergroup_if_needed(
&self,
stage: Stage,
pkgdir: &Path,
logfile: &Path,
) -> anyhow::Result<bool> {
let usergroup_phase = self.pkginfo.usergroup_phase().unwrap_or("");
let should_run = match stage {
Stage::Configure => usergroup_phase.ends_with("configure"),
Stage::Build => usergroup_phase.ends_with("build"),
Stage::Install => usergroup_phase == "pre-install",
_ => false,
};
if !should_run {
return Ok(true);
}
let pkgdir_str = pkgdir_as_str(pkgdir)?;
let mut args = vec!["-C", pkgdir_str, "create-usergroup"];
if stage == Stage::Configure {
args.push("clean");
}
let (status, _) =
self.run_command_logged(self.session.config.make(), &args, RunAs::Root, logfile)?;
Ok(status.success())
}
fn make_args(
&self,
pkgdir: &Path,
targets: &[&str],
include_make_flags: bool,
extra_flags: &[&str],
) -> anyhow::Result<Vec<String>> {
let mut owned_args: Vec<String> =
vec!["-C".to_string(), pkgdir_as_str(pkgdir)?.to_string()];
owned_args.extend(targets.iter().map(|s| s.to_string()));
if include_make_flags {
owned_args.push("BATCH=1".to_string());
owned_args.push("DEPENDS_TARGET=/nonexistent".to_string());
if let Some(multi_version) = self.pkginfo.multi_version() {
for flag in multi_version {
owned_args.push(flag.clone());
}
}
}
owned_args.extend(extra_flags.iter().map(|s| s.to_string()));
Ok(owned_args)
}
fn apply_envs(&self, cmd: &mut Command, extra_envs: &[(&str, &str)]) {
for (key, value) in &self.envs {
cmd.env(key, value);
}
for (key, value) in extra_envs {
cmd.env(key, value);
}
}
fn shell_escape(value: &str) -> String {
if value.is_empty() {
return "''".to_string();
}
if value
.chars()
.all(|c| c.is_ascii_alphanumeric() || "-_.,/:=+@".contains(c))
{
return value.to_string();
}
let escaped = value.replace('\'', "'\\''");
format!("'{}'", escaped)
}
fn build_shell_command(
&self,
cmd: &Path,
args: &[&str],
run_as: RunAs,
extra_envs: &[(&str, &str)],
) -> String {
let mut parts = Vec::new();
for (key, value) in &self.envs {
parts.push(format!("{}={}", key, Self::shell_escape(value)));
}
for (key, value) in extra_envs {
parts.push(format!("{}={}", key, Self::shell_escape(value)));
}
let cmd_str = Self::shell_escape(&cmd.to_string_lossy());
let args_str: Vec<String> = args.iter().map(|a| Self::shell_escape(a)).collect();
match run_as {
RunAs::Root => {
parts.push(cmd_str);
parts.extend(args_str);
}
RunAs::User => {
let user = self.build_user.as_ref().unwrap();
let inner_cmd = std::iter::once(cmd_str)
.chain(args_str)
.collect::<Vec<_>>()
.join(" ");
parts.push("su".to_string());
parts.push(Self::shell_escape(user));
parts.push("-c".to_string());
parts.push(Self::shell_escape(&inner_cmd));
}
}
parts.push("2>&1".to_string());
parts.join(" ")
}
}
struct ChannelCallback<'a> {
sandbox_id: usize,
status_tx: &'a Sender<ChannelCommand>,
}
impl<'a> ChannelCallback<'a> {
fn new(sandbox_id: usize, status_tx: &'a Sender<ChannelCommand>) -> Self {
Self {
sandbox_id,
status_tx,
}
}
}
impl<'a> BuildCallback for ChannelCallback<'a> {
fn stage(&mut self, stage: &str) {
let _ = self.status_tx.send(ChannelCommand::StageUpdate(
self.sandbox_id,
Some(stage.to_string()),
));
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct BuildResult {
pub pkgname: PkgName,
pub pkgpath: Option<PkgPath>,
pub state: PackageState,
pub log_dir: Option<PathBuf>,
#[serde(flatten, default)]
pub build_stats: PkgBuildStats,
}
impl BuildResult {
pub fn history_input(&self) -> Option<crate::History> {
if self.state == PackageState::Pending {
return None;
}
Some(crate::History {
timestamp: self.build_stats.timestamp,
pkgpath: self.pkgpath.as_ref()?.to_string(),
pkgname: self.pkgname.pkgname().to_string(),
pkgbase: self.pkgname.pkgbase().to_string(),
outcome: self.state.clone(),
stage: self.build_stats.stage,
make_jobs: self.build_stats.make_jobs.jobs(),
duration: self.build_stats.duration,
disk_usage: self.build_stats.disk_usage,
wrkobjdir: self.build_stats.wrkobjdir.clone(),
stage_durations: self.build_stats.stage_durations.clone(),
stage_cpu_times: self.build_stats.stage_cpu_times.clone(),
build_id: None,
})
}
}
#[derive(Clone, Debug, Default)]
pub struct BuildCounts {
pub states: PackageCounts,
pub scanfail: usize,
}
#[derive(Clone, Debug)]
pub struct BuildSummary {
pub duration: Duration,
pub results: Vec<BuildResult>,
pub scanfail: Vec<(PkgPath, String)>,
}
impl BuildSummary {
pub fn counts(&self) -> BuildCounts {
let mut c = BuildCounts {
scanfail: self.scanfail.len(),
..Default::default()
};
for r in &self.results {
c.states.add(&r.state);
}
c
}
pub fn failed(&self) -> Vec<&BuildResult> {
self.results
.iter()
.filter(|r| matches!(r.state, PackageState::Failed(_)))
.collect()
}
pub fn succeeded(&self) -> Vec<&BuildResult> {
self.results
.iter()
.filter(|r| matches!(r.state, PackageState::Success))
.collect()
}
pub fn skipped(&self) -> Vec<&BuildResult> {
self.results.iter().filter(|r| r.state.is_skip()).collect()
}
}
#[derive(Debug)]
pub struct Build {
config: Config,
pkgsrc_env: PkgsrcEnv,
scope: SandboxScope,
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
cached: IndexMap<PkgName, BuildResult>,
}
#[derive(Debug)]
struct PackageBuild {
session: Arc<BuildSession>,
sandbox_id: Option<usize>,
worker_id: usize,
pkginfo: ResolvedPackage,
make_jobs: PkgMakeJobs,
}
struct MakeQuery<'a> {
session: &'a BuildSession,
sandbox_id: Option<usize>,
pkgpath: &'a PkgPath,
env: &'a HashMap<String, String>,
}
impl<'a> MakeQuery<'a> {
fn new(
session: &'a BuildSession,
sandbox_id: Option<usize>,
pkgpath: &'a PkgPath,
env: &'a HashMap<String, String>,
) -> Self {
Self {
session,
sandbox_id,
pkgpath,
env,
}
}
fn vars(&self, names: &[&str]) -> HashMap<String, String> {
let pkgdir = self.session.config.pkgsrc().join(self.pkgpath.as_path());
let varnames_arg = names.join(" ");
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
cmd.new_session();
cmd.arg("-C")
.arg(&pkgdir)
.arg("show-vars")
.arg(format!("VARNAMES={}", varnames_arg));
for (key, value) in self.env {
cmd.env(key, value);
}
cmd.stderr(Stdio::piped());
let output = match cmd.output() {
Ok(o) if o.status.success() => o,
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
warn!(
status = ?o.status.code(),
stderr = %stderr.trim(),
?names,
"show-vars failed"
);
return HashMap::new();
}
Err(e) => {
warn!(error = format!("{e:#}"), ?names, "show-vars exec error");
return HashMap::new();
}
};
let stdout = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout.lines().collect();
let mut result = HashMap::new();
for (name, value) in names.iter().zip(&lines) {
let value = value.trim();
if !value.is_empty() {
result.insert(name.to_string(), value.to_string());
}
}
result
}
fn resolve_path(&self, path: &Path) -> PathBuf {
match self.sandbox_id {
Some(id) => self
.session
.sandbox
.path(id)
.join(path.strip_prefix("/").unwrap_or(path)),
None => path.to_path_buf(),
}
}
}
impl PackageBuild {
fn build(&mut self, status_tx: &Sender<ChannelCommand>) -> anyhow::Result<PkgBuildResult> {
let pkgname = self.pkginfo.index.pkgname.pkgname();
let logdir = self.session.config.logdir();
let pkg_logdir = logdir.join(pkgname);
if pkg_logdir.exists() {
fs::remove_dir_all(&pkg_logdir)?;
}
fs::create_dir_all(&pkg_logdir)?;
info!("Starting package build");
let pkgpath = &self.pkginfo.pkgpath;
let mut envs = self.session.sandbox.script_env();
let wrkobjdir_kind =
if let Some(kind) = self.session.wrkobjdir_map.get(&self.pkginfo.index.pkgname) {
envs.push(("WRKOBJDIR".to_string(), kind.path().display().to_string()));
Some(kind)
} else {
None
};
let patterns = self.session.config.save_wrkdir_patterns();
if let Err(e) = self.session.sandbox.run_pre_build(self.sandbox_id) {
if let Err(post_e) = self.session.sandbox.run_post_build(self.sandbox_id) {
warn!(
error = format!("{post_e:#}"),
"post-build error during pre-build cleanup"
);
}
return Err(e.context("pre-build failed"));
}
if let Some(jobs) = self.make_jobs.allocated() {
envs.push(("MAKE_JOBS".to_string(), jobs.to_string()));
}
let env_map: HashMap<String, String> = envs.iter().cloned().collect();
let make = MakeQuery::new(&self.session, self.sandbox_id, pkgpath, &env_map);
let vars = make.vars(&["_MAKE_JOBS_N", "WRKDIR"]);
let wrkdir = Some(
make.resolve_path(Path::new(
vars.get("WRKDIR")
.ok_or_else(|| anyhow::anyhow!("failed to query WRKDIR"))?,
)),
);
if let Some(n) = vars.get("_MAKE_JOBS_N").and_then(|v| v.parse().ok()) {
self.make_jobs.set_jobs(n);
}
let builder = PkgBuilder::new(
&self.session,
self.sandbox_id,
self.worker_id,
&self.pkginfo,
envs.clone(),
status_tx.clone(),
self.make_jobs,
wrkdir.clone(),
);
let mut callback = ChannelCallback::new(self.worker_id, status_tx);
let mut stats = PkgBuildStats {
make_jobs: self.make_jobs,
..PkgBuildStats::default()
};
let result = builder.build(&mut stats, &mut callback);
let _ = status_tx.send(ChannelCommand::StageUpdate(
self.worker_id,
Some("post-build destroy hooks".to_string()),
));
let measure_wrkdir = || -> Option<u64> {
let w = wrkdir.as_ref()?;
dir_disk_usage(w)
};
let wrkobjdir = wrkobjdir_kind.cloned();
let result = match result {
Ok(PkgBuildResult::Success(mut stats)) => {
info!("Package build completed successfully");
stats.wrkobjdir = wrkobjdir;
PkgBuildResult::Success(stats)
}
Ok(PkgBuildResult::Failed(mut stats)) => {
error!("Package build failed");
stats.disk_usage = measure_wrkdir();
stats.wrkobjdir = wrkobjdir;
self.cleanup_after_failure(
status_tx,
pkgname,
pkgpath,
logdir,
patterns,
&envs,
wrkdir.as_deref(),
);
PkgBuildResult::Failed(stats)
}
Err(e) => {
if self.session.state.is_shutdown() {
return Err(e);
}
error!(error = format!("{e:#}"), "Package build error");
stats.disk_usage = measure_wrkdir();
stats.wrkobjdir = wrkobjdir;
self.cleanup_after_failure(
status_tx,
pkgname,
pkgpath,
logdir,
patterns,
&envs,
wrkdir.as_deref(),
);
PkgBuildResult::Failed(stats)
}
};
if let Err(e) = self.session.sandbox.run_post_build(self.sandbox_id) {
warn!(error = format!("{e:#}"), "post-build error");
}
Ok(result)
}
#[allow(clippy::too_many_arguments)]
fn cleanup_after_failure(
&self,
status_tx: &Sender<ChannelCommand>,
pkgname: &str,
pkgpath: &PkgPath,
logdir: &Path,
patterns: &[String],
envs: &[(String, String)],
wrkdir: Option<&Path>,
) {
let _ = status_tx.send(ChannelCommand::StageUpdate(
self.worker_id,
Some("cleanup".to_string()),
));
let kill_start = Instant::now();
self.session.sandbox.kill_processes_by_id(self.sandbox_id);
trace!(
elapsed_ms = kill_start.elapsed().as_millis(),
"kill_processes_by_id completed"
);
if let Some(wrkdir_path) = wrkdir {
let src = wrkdir_path.join(".work.log");
let dest = logdir.join(pkgname).join("work.log");
if src.exists() {
let _ = fs::copy(&src, &dest);
}
if !patterns.is_empty() {
self.save_wrkdir_files(pkgname, logdir, wrkdir_path, patterns);
}
}
let clean_start = Instant::now();
self.run_clean(pkgpath, envs);
trace!(
elapsed_ms = clean_start.elapsed().as_millis(),
"run_clean completed"
);
}
fn save_wrkdir_files(
&self,
pkgname: &str,
logdir: &Path,
wrkdir_path: &Path,
patterns: &[String],
) {
if !wrkdir_path.exists() {
debug!(%pkgname, wrkdir = %wrkdir_path.display(), "WRKDIR does not exist, skipping file save");
return;
}
let save_dir = logdir.join(pkgname).join("wrkdir-files");
if let Err(e) = fs::create_dir_all(&save_dir) {
warn!(%pkgname, error = format!("{e:#}"), "Failed to create wrkdir-files directory");
return;
}
let compiled_patterns: Vec<Pattern> = patterns
.iter()
.filter_map(|p| {
Pattern::new(p).ok().or_else(|| {
warn!(pattern = %p, "Invalid glob pattern");
None
})
})
.collect();
if compiled_patterns.is_empty() {
return;
}
let mut saved_count = 0;
if let Err(e) = walk_and_save(
wrkdir_path,
wrkdir_path,
&save_dir,
&compiled_patterns,
&mut saved_count,
) {
warn!(%pkgname, error = format!("{e:#}"), "Error while saving wrkdir files");
}
if saved_count > 0 {
info!(%pkgname, count = saved_count, dest = %save_dir.display(), "Saved wrkdir files");
}
}
fn run_clean(&self, pkgpath: &PkgPath, envs: &[(String, String)]) {
let pkgdir = self.session.config.pkgsrc().join(pkgpath.as_path());
let mut cmd = self
.session
.sandbox
.command(self.sandbox_id, self.session.config.make());
cmd.new_session();
cmd.arg("-C").arg(&pkgdir).arg("clean");
for (key, value) in envs {
cmd.env(key, value);
}
let result = cmd
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
if let Err(e) = result {
debug!(error = format!("{e:#}"), "Failed to run bmake clean");
}
}
}
fn walk_and_save(
base: &Path,
current: &Path,
save_dir: &Path,
patterns: &[Pattern],
saved_count: &mut usize,
) -> std::io::Result<()> {
if !current.symlink_metadata()?.is_dir() {
return Ok(());
}
for entry in fs::read_dir(current)? {
let entry = entry?;
let ft = entry.file_type()?;
let path = entry.path();
if ft.is_dir() {
walk_and_save(base, &path, save_dir, patterns, saved_count)?;
} else if ft.is_file() {
let Some(rel_path) = path.strip_prefix(base).ok() else {
continue;
};
let rel_str = rel_path.to_string_lossy();
for pattern in patterns {
if pattern.matches(&rel_str)
|| pattern.matches(
path.file_name()
.unwrap_or_default()
.to_string_lossy()
.as_ref(),
)
{
let dest_path = save_dir.join(rel_path);
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent)?;
}
if let Err(e) = fs::copy(&path, &dest_path) {
warn!(src = %path.display(),
dest = %dest_path.display(),
error = format!("{e:#}"),
"Failed to copy file"
);
} else {
debug!(src = %path.display(),
dest = %dest_path.display(),
"Saved wrkdir file"
);
*saved_count += 1;
}
break; }
}
}
}
Ok(())
}
#[derive(Debug)]
enum ChannelCommand {
ClientReady(usize),
ComeBackLater,
JobData(Box<PackageBuild>),
JobSuccess(BuildResult),
JobFailed(BuildResult),
Quit,
Shutdown,
StageUpdate(usize, Option<String>),
OutputLines(usize, Vec<String>),
}
struct BuildJobs {
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
scheduler: Scheduler<PkgName>,
results: Vec<BuildResult>,
logdir: PathBuf,
}
impl BuildJobs {
fn mark_success(&mut self, result: BuildResult) {
self.scheduler.mark_success(&result.pkgname);
self.results.push(result);
}
fn mark_failure(&mut self, result: BuildResult) {
trace!(pkgname = %result.pkgname.pkgname(), "mark_failure called");
let start = std::time::Instant::now();
let indirect = self.scheduler.mark_failure(&result.pkgname);
trace!(pkgname = %result.pkgname.pkgname(), broken_count = indirect.len() + 1, elapsed_ms = start.elapsed().as_millis(), "mark_failure found broken packages");
let pkgname = result.pkgname.clone();
self.results.push(result);
for pkg in indirect {
let pkgpath = self.scanpkgs.get(&pkg).map(|r| r.pkgpath.clone());
let log_dir = Some(self.logdir.join(pkg.pkgname()));
self.results.push(BuildResult {
pkgname: pkg,
pkgpath,
state: PackageState::IndirectFailed(format!(
"dependency {} failed",
pkgname.pkgname()
)),
log_dir,
build_stats: PkgBuildStats::default(),
});
}
trace!(pkgname = %pkgname.pkgname(), total_results = self.results.len(), elapsed_ms = start.elapsed().as_millis(), "mark_failure completed");
}
}
impl Build {
pub fn new(
config: &Config,
pkgsrc_env: PkgsrcEnv,
scope: SandboxScope,
scanpkgs: IndexMap<PkgName, ResolvedPackage>,
) -> Build {
info!(
package_count = scanpkgs.len(),
sandbox_enabled = scope.enabled(),
build_threads = config.build_threads(),
"Creating new Build instance"
);
scope.sandbox().set_pkgsrc_env(pkgsrc_env.clone());
Build {
config: config.clone(),
pkgsrc_env,
scope,
scanpkgs,
cached: IndexMap::new(),
}
}
pub fn load_cached_from_db(&mut self, db: &crate::db::Database) -> anyhow::Result<usize> {
let mut count = 0;
for result in db.get_all_build_results()? {
if self.scanpkgs.contains_key(&result.pkgname) {
self.cached.insert(result.pkgname.clone(), result);
count += 1;
}
}
if count > 0 {
info!(
cached_count = count,
"Loaded cached build results from database"
);
}
Ok(count)
}
pub fn start(
mut self,
state: &RunState,
db: &crate::db::Database,
) -> anyhow::Result<BuildSummary> {
let started = Instant::now();
info!(package_count = self.scanpkgs.len(), "Build::start() called");
let state_flag = state.clone();
let results: Vec<BuildResult> = Vec::new();
let mut scheduler = Scheduler::new(db)?;
let all_pkgs: Vec<PkgName> = scheduler.iter().map(|sp| sp.pkg).collect();
for pkg in &all_pkgs {
if !self.scanpkgs.contains_key(pkg) {
scheduler.mark_success(pkg);
}
}
let mut cached_count = 0usize;
let mut indirect_failed_count = 0usize;
for (pkgname, result) in &self.cached {
match result.state {
PackageState::Success | PackageState::UpToDate => {
scheduler.mark_success(pkgname);
}
_ => {
let indirect = scheduler.mark_failure(pkgname);
indirect_failed_count += indirect
.iter()
.filter(|p| !self.cached.contains_key(*p))
.count();
}
}
cached_count += 1;
}
if cached_count > 0 {
println!("Loaded {} cached build results", cached_count);
}
info!(
queued_count = scheduler.queued_count(),
scanpkgs_count = self.scanpkgs.len(),
cached_count = cached_count,
"BuildJobs populated"
);
if scheduler.queued_count() == 0 {
return Ok(BuildSummary {
duration: started.elapsed(),
results,
scanfail: Vec::new(),
});
}
let n = self.config.build_threads().min(scheduler.queued_count());
if self.scope.enabled() && n > self.scope.count() {
let to_create = n - self.scope.count();
let msg = if to_create == 1 {
"Creating sandbox".to_string()
} else {
format!("Creating {} sandboxes", to_create)
};
crate::print_status(&msg);
let start = std::time::Instant::now();
self.scope.ensure(n)?;
crate::print_elapsed(&msg, start.elapsed());
}
let build_history = db.build_history_by_pkg_all(None);
let wrkobjdir_map: HashMap<PkgName, WrkObjKind> = if let Some(w) = self.config.wrkobjdir() {
let success = Some(PackageStateKind::Success);
debug!(
total_packages = self.scanpkgs.len(),
history_entries = build_history.len(),
"WRKOBJDIR routing query results"
);
let mut map = HashMap::new();
for (pkgname, resolved) in &self.scanpkgs {
let pkgpath = resolved.pkgpath.as_str();
if w.always_disk.iter().any(|p| p == pkgpath) {
if let Some(disk) = w.disk.clone() {
map.insert(pkgname.clone(), WrkObjKind::Disk(disk));
}
continue;
}
let du = build_history.get(pkgname.pkgbase()).and_then(|h| {
if h.outcome == success {
h.disk_usage
} else {
match (h.disk_usage, w.failed_threshold) {
(Some(size), Some(ft)) if size <= ft => Some(size),
_ => None,
}
}
});
if let Some(kind) = w.route(du) {
map.insert(pkgname.clone(), kind);
}
}
map
} else {
HashMap::new()
};
if let Some(jobs) = self.config.jobs() {
scheduler.set_allocator(crate::makejobs::Allocator::new(n, jobs));
}
let logdir = self.config.logdir().clone();
let total_packages = self.scanpkgs.len();
let jobs = BuildJobs {
scanpkgs: self.scanpkgs,
scheduler,
results,
logdir,
};
let cpu_sampler = crate::cpu::start_cpu_sampler();
if cpu_sampler.is_some() {
debug!("CPU usage sampler started");
}
println!("Building packages...");
let progress = Arc::new(Mutex::new(
Progress::new("Building", "Built", total_packages, n, self.config.tui())
.context("Failed to initialize progress display")?,
));
if cached_count > 0 || indirect_failed_count > 0 {
if let Ok(mut p) = progress.lock() {
p.state_mut().cached = cached_count;
p.state_mut().skipped = indirect_failed_count;
}
}
let stop_refresh = Arc::new(AtomicBool::new(false));
let progress_refresh = Arc::clone(&progress);
let stop_flag = Arc::clone(&stop_refresh);
let state_for_refresh = state_flag.clone();
let is_plain = progress.lock().map(|p| p.is_plain()).unwrap_or(false);
let refresh_thread = std::thread::spawn(move || {
while !stop_flag.load(Ordering::Relaxed) && !state_for_refresh.is_shutdown() {
if is_plain {
std::thread::sleep(REFRESH_INTERVAL);
if let Ok(mut p) = progress_refresh.lock() {
let _ = p.render();
}
} else {
let has_event = event::poll(REFRESH_INTERVAL).unwrap_or(false);
if let Ok(mut p) = progress_refresh.lock() {
if has_event {
let _ = p.handle_event();
}
let _ = p.render();
}
}
}
});
let (manager_tx, manager_rx) = mpsc::channel::<ChannelCommand>();
let mut threads = vec![];
let mut clients: HashMap<usize, Sender<ChannelCommand>> = HashMap::new();
for i in 0..n {
let (client_tx, client_rx) = mpsc::channel::<ChannelCommand>();
clients.insert(i, client_tx);
let manager_tx = manager_tx.clone();
let state_for_worker = state_flag.clone();
let thread = std::thread::spawn(move || {
loop {
if state_for_worker.is_shutdown() {
break;
}
if manager_tx.send(ChannelCommand::ClientReady(i)).is_err() {
break;
}
let Ok(msg) = client_rx.recv() else {
break;
};
match msg {
ChannelCommand::ComeBackLater => {
std::thread::sleep(WORKER_BACKOFF_INTERVAL);
continue;
}
ChannelCommand::JobData(mut pkg) => {
let pkgname = pkg.pkginfo.index.pkgname.clone();
let pkgpath = pkg.pkginfo.pkgpath.clone();
let span = info_span!(
"build",
sandbox_id = pkg.sandbox_id,
pkgpath = %pkgpath,
pkgname = %pkgname.pkgname(),
logdir = %pkg.session.config.logdir().display(),
);
let _guard = span.enter();
let _ = manager_tx.send(ChannelCommand::StageUpdate(
i,
Some("pre-build create hooks".to_string()),
));
let log_dir = pkg.session.config.logdir().join(pkgname.pkgname());
let timestamp = crate::epoch_secs().unwrap_or(0);
let build_start = Instant::now();
let result = pkg.build(&manager_tx);
let duration = build_start.elapsed();
trace!(
elapsed_ms = duration.as_millis(),
result = %result.as_ref().map_or("error".to_string(), |r| r.to_string()),
"Build finished"
);
let mut build_stats = match &result {
Ok(PkgBuildResult::Success(s) | PkgBuildResult::Failed(s)) => {
s.clone()
}
Err(_) => PkgBuildStats::default(),
};
build_stats.duration = duration;
build_stats.timestamp = timestamp;
match result {
Ok(PkgBuildResult::Success(_)) => {
let _ =
manager_tx.send(ChannelCommand::JobSuccess(BuildResult {
pkgname,
pkgpath: Some(pkgpath),
state: PackageState::Success,
log_dir: Some(log_dir),
build_stats,
}));
}
Ok(PkgBuildResult::Failed(_)) => {
let _ =
manager_tx.send(ChannelCommand::JobFailed(BuildResult {
pkgname,
pkgpath: Some(pkgpath),
state: PackageState::Failed("Build failed".to_string()),
log_dir: Some(log_dir),
build_stats,
}));
}
Err(e) => {
if !state_for_worker.is_shutdown() {
tracing::error!(
error = format!("{e:#}"),
pkgname = %pkgname.pkgname(),
"Build error"
);
let _ = manager_tx.send(ChannelCommand::JobFailed(
BuildResult {
pkgname,
pkgpath: Some(pkgpath),
state: PackageState::Failed(e.to_string()),
log_dir: Some(log_dir),
build_stats,
},
));
}
}
}
if state_for_worker.is_shutdown() {
break;
}
continue;
}
ChannelCommand::Quit | ChannelCommand::Shutdown => {
break;
}
_ => break,
}
}
});
threads.push(thread);
}
let session = Arc::new(BuildSession {
config: self.config.clone(),
pkgsrc_env: self.pkgsrc_env.clone(),
sandbox: self.scope.sandbox().clone(),
state: state_flag.clone(),
wrkobjdir_map,
});
let sandbox_ids = self.scope.ids().map(|ids| ids.to_vec());
let progress_clone = Arc::clone(&progress);
let state_for_manager = state_flag.clone();
let (results_tx, results_rx) = mpsc::channel::<Vec<BuildResult>>();
let (completed_tx, completed_rx) = mpsc::channel::<BuildResult>();
let manager = std::thread::spawn(move || {
let sandbox_ids = sandbox_ids;
let mut clients = clients.clone();
let mut jobs = jobs;
let mut announced_interrupt = false;
let mut thread_packages: HashMap<usize, PkgName> = HashMap::new();
loop {
if state_for_manager.is_shutdown() {
let was_first = if let Ok(mut p) = progress_clone.lock() {
p.finish_interrupted().unwrap_or(false)
} else {
false
};
if was_first {
eprintln!("Interrupted, shutting down...");
}
for (_, client) in clients.drain() {
let _ = client.send(ChannelCommand::Shutdown);
}
break;
} else if state_for_manager.is_stopping() && !announced_interrupt {
if let Ok(mut p) = progress_clone.lock() {
p.announce_interrupt();
}
announced_interrupt = true;
}
let command = match manager_rx.recv_timeout(SHUTDOWN_POLL_INTERVAL) {
Ok(cmd) => cmd,
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
match command {
ChannelCommand::ClientReady(c) if state_for_manager.is_stopping() => {
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_idle(c);
let _ = p.render();
}
if let Some(client) = clients.get(&c) {
let _ = client.send(ChannelCommand::Quit);
}
clients.remove(&c);
if clients.is_empty() {
break;
}
}
ChannelCommand::ClientReady(c) => {
let client = clients.get(&c).expect("client not in map");
match jobs.scheduler.poll() {
Poll::Ready(Some(sp)) => {
let pkginfo = jobs
.scanpkgs
.get(&sp.pkg)
.expect("pkg not in scanpkgs")
.clone();
thread_packages.insert(c, sp.pkg.clone());
let hist = build_history.get(sp.pkg.pkgbase());
let wrkobjdir =
session.wrkobjdir_map.get(&sp.pkg).map(|k| k.to_string());
info!(
pkgname = %sp.pkg.pkgname(),
make_jobs = sp.make_jobs.jobs(),
make_jobs_safe = sp.make_jobs.safe(),
wrkobjdir = wrkobjdir.as_deref(),
history = hist.is_some() || sp.cpu_time > 0,
previous_status = hist.and_then(|h| h.outcome).map(|o| -> &str { o.into() }),
previous_disk_usage = hist.and_then(|h| h.disk_usage),
"Scheduler decision"
);
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_active(c, sp.pkg.pkgname());
p.state_mut().increment_dispatched();
if p.is_plain() {
let _ = p.print_status(
"Building",
sp.pkg.pkgname(),
None,
None,
);
}
let _ = p.render();
}
let _ =
client.send(ChannelCommand::JobData(Box::new(PackageBuild {
session: Arc::clone(&session),
sandbox_id: sandbox_ids.as_ref().map(|ids| ids[c]),
worker_id: c,
pkginfo,
make_jobs: sp.make_jobs,
})));
}
Poll::Ready(None) => {
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_idle(c);
let _ = p.render();
}
let _ = client.send(ChannelCommand::Quit);
clients.remove(&c);
if clients.is_empty() {
break;
}
}
Poll::Pending => {
if let Ok(mut p) = progress_clone.lock() {
p.clear_output_buffer(c);
p.state_mut().set_worker_idle(c);
let _ = p.render();
}
let _ = client.send(ChannelCommand::ComeBackLater);
}
}
}
ChannelCommand::JobSuccess(result) => {
let pkgname = result.pkgname.clone();
let duration = result.build_stats.duration;
jobs.mark_success(result);
let sid = thread_packages
.iter()
.find(|(_, p)| *p == &pkgname)
.map(|(t, _)| *t);
if let Some(r) = jobs.results.last() {
let _ = completed_tx.send(r.clone());
}
if let Ok(mut p) = progress_clone.lock() {
let _ =
p.print_status("Built", pkgname.pkgname(), Some(duration), None);
p.state_mut().increment_completed();
if let Some(sid) = sid {
p.clear_output_buffer(sid);
p.state_mut().set_worker_idle(sid);
}
let _ = p.render();
}
if let Some(sid) = sid {
thread_packages.remove(&sid);
}
}
ChannelCommand::JobFailed(result) => {
let pkgname = result.pkgname.clone();
let duration = result.build_stats.duration;
let results_before = jobs.results.len();
jobs.mark_failure(result);
let sid = thread_packages
.iter()
.find(|(_, p)| *p == &pkgname)
.map(|(t, _)| *t);
for r in jobs.results.iter().skip(results_before) {
let _ = completed_tx.send(r.clone());
}
let indirect_count = jobs.results.len() - results_before - 1;
let dep_count = jobs.scheduler.dep_count(&pkgname);
if let Ok(mut p) = progress_clone.lock() {
let _ = p.print_status(
"Failed",
pkgname.pkgname(),
Some(duration),
Some(dep_count),
);
p.state_mut().increment_failed();
p.state_mut().skipped += indirect_count;
if let Some(sid) = sid {
p.clear_output_buffer(sid);
p.state_mut().set_worker_idle(sid);
}
let _ = p.render();
}
if let Some(sid) = sid {
thread_packages.remove(&sid);
}
}
ChannelCommand::StageUpdate(tid, stage) => {
if let Ok(mut p) = progress_clone.lock() {
p.state_mut().set_worker_stage(tid, stage.as_deref());
let _ = p.render();
}
}
ChannelCommand::OutputLines(tid, lines) => {
if let Ok(mut p) = progress_clone.lock() {
if let Some(buf) = p.output_buffer_mut(tid) {
for line in lines {
buf.push(line);
}
}
}
}
ChannelCommand::ComeBackLater
| ChannelCommand::JobData(_)
| ChannelCommand::Quit
| ChannelCommand::Shutdown => {}
}
}
debug!(
result_count = jobs.results.len(),
"Manager sending results back"
);
let _ = results_tx.send(jobs.results);
});
threads.push(manager);
let mut saved_count = 0;
let mut db_error: Option<anyhow::Error> = None;
let build_id = db.build_id().ok();
while let Ok(result) = completed_rx.recv() {
if let Err(e) = db.store_build_by_name(&result) {
warn!(
pkgname = %result.pkgname.pkgname(),
error = format!("{e:#}"),
"Failed to save build result"
);
if db_error.is_none() {
db_error = Some(e);
}
} else {
saved_count += 1;
}
if let Some(mut input) = result.history_input() {
input.build_id = build_id.clone();
if let Err(e) = db.record_history(&input) {
warn!(
pkgname = %result.pkgname.pkgname(),
error = format!("{e:#}"),
"Failed to save build history"
);
}
}
}
if saved_count > 0 {
debug!(saved_count, "Saved build results to database");
}
debug!("Joining worker threads");
let join_start = Instant::now();
for thread in threads {
if let Err(e) = thread.join() {
warn!("Worker thread panicked: {:?}", e);
}
}
debug!(
elapsed_ms = join_start.elapsed().as_millis(),
"Worker threads completed"
);
if let Some(sampler) = cpu_sampler {
let samples = sampler.stop();
if !samples.is_empty() {
if let Err(e) = db.store_cpu_usage(&samples) {
warn!(error = format!("{e:#}"), "Failed to save CPU usage samples");
} else {
debug!(count = samples.len(), "Saved CPU usage samples");
}
}
}
stop_refresh.store(true, Ordering::Relaxed);
let _ = refresh_thread.join();
if let Ok(mut p) = progress.lock() {
if state_flag.interrupted() {
let _ = p.finish_interrupted();
} else {
let _ = p.finish();
}
}
debug!("Collecting results from manager");
let results = results_rx.recv().unwrap_or_default();
debug!(
result_count = results.len(),
"Collected results from manager"
);
let summary = BuildSummary {
duration: started.elapsed(),
results,
scanfail: Vec::new(),
};
if let Some(e) = db_error {
return Err(e.context("Failed to persist build results to database"));
}
Ok(summary)
}
}