1use crate::status::{self, StatusMessage};
67use crate::tui::{MultiProgress, format_duration};
68use crate::{Config, Sandbox};
69use anyhow::{Context, bail};
70use glob::Pattern;
71use pkgsrc::{PkgName, PkgPath, ScanIndex};
72use std::collections::{HashMap, HashSet};
73use std::fs;
74use std::path::{Path, PathBuf};
75use std::process::Command;
76use std::sync::atomic::{AtomicBool, Ordering};
77use std::sync::{Arc, Mutex, mpsc, mpsc::Sender};
78use std::time::{Duration, Instant};
79use tracing::{debug, error, info, trace, warn};
80
81fn format_scan_index(idx: &ScanIndex) -> String {
84 let mut out = String::new();
85
86 out.push_str(&format!("PKGNAME={}\n", idx.pkgname.pkgname()));
87
88 if let Some(ref loc) = idx.pkg_location {
89 out.push_str(&format!("PKG_LOCATION={}\n", loc.as_path().display()));
90 }
91
92 if !idx.depends.is_empty() {
93 let deps: Vec<&str> = idx.depends.iter().map(|d| d.pkgname()).collect();
94 out.push_str(&format!("DEPENDS={}\n", deps.join(" ")));
95 }
96
97 if !idx.multi_version.is_empty() {
98 out.push_str(&format!(
99 "MULTI_VERSION={}\n",
100 idx.multi_version.join(" ")
101 ));
102 }
103
104 if let Some(ref bootstrap) = idx.bootstrap_pkg {
105 out.push_str(&format!("BOOTSTRAP_PKG={}\n", bootstrap));
106 }
107
108 if let Some(ref phase) = idx.usergroup_phase {
109 out.push_str(&format!("USERGROUP_PHASE={}\n", phase));
110 }
111
112 out
113}
114
115#[derive(Clone, Debug)]
120#[allow(dead_code)]
121pub enum BuildOutcome {
122 Success,
124 Failed(String),
128 Skipped(String),
136}
137
138#[derive(Clone, Debug)]
142pub struct BuildResult {
143 pub pkgname: PkgName,
145 pub pkgpath: Option<PkgPath>,
147 pub outcome: BuildOutcome,
149 pub duration: Duration,
151 pub log_dir: Option<PathBuf>,
156}
157
158#[derive(Clone, Debug)]
178pub struct BuildSummary {
179 pub duration: Duration,
181 pub results: Vec<BuildResult>,
183}
184
185impl BuildSummary {
186 pub fn success_count(&self) -> usize {
188 self.results
189 .iter()
190 .filter(|r| matches!(r.outcome, BuildOutcome::Success))
191 .count()
192 }
193
194 pub fn failed_count(&self) -> usize {
196 self.results
197 .iter()
198 .filter(|r| matches!(r.outcome, BuildOutcome::Failed(_)))
199 .count()
200 }
201
202 pub fn skipped_count(&self) -> usize {
204 self.results
205 .iter()
206 .filter(|r| matches!(r.outcome, BuildOutcome::Skipped(_)))
207 .count()
208 }
209
210 pub fn failed(&self) -> Vec<&BuildResult> {
212 self.results
213 .iter()
214 .filter(|r| matches!(r.outcome, BuildOutcome::Failed(_)))
215 .collect()
216 }
217
218 pub fn succeeded(&self) -> Vec<&BuildResult> {
220 self.results
221 .iter()
222 .filter(|r| matches!(r.outcome, BuildOutcome::Success))
223 .collect()
224 }
225
226 pub fn skipped(&self) -> Vec<&BuildResult> {
228 self.results
229 .iter()
230 .filter(|r| matches!(r.outcome, BuildOutcome::Skipped(_)))
231 .collect()
232 }
233}
234
235#[derive(Debug, Default)]
236pub struct Build {
237 config: Config,
241 sandbox: Sandbox,
245 scanpkgs: HashMap<PkgName, ScanIndex>,
249}
250
251#[derive(Debug)]
252struct PackageBuild {
253 id: usize,
254 config: Config,
255 pkginfo: ScanIndex,
256 sandbox: Sandbox,
257}
258
259struct MakeQuery<'a> {
261 config: &'a Config,
262 sandbox: &'a Sandbox,
263 sandbox_id: usize,
264 pkgpath: &'a PkgPath,
265 env: &'a HashMap<String, String>,
266}
267
268impl<'a> MakeQuery<'a> {
269 fn new(
270 config: &'a Config,
271 sandbox: &'a Sandbox,
272 sandbox_id: usize,
273 pkgpath: &'a PkgPath,
274 env: &'a HashMap<String, String>,
275 ) -> Self {
276 Self { config, sandbox, sandbox_id, pkgpath, env }
277 }
278
279 fn var(&self, name: &str) -> Option<String> {
281 let pkgdir = self.config.pkgsrc().join(self.pkgpath.as_path());
282
283 let mut cmd = if self.sandbox.enabled() {
284 let mut c = Command::new("/usr/sbin/chroot");
285 c.arg(self.sandbox.path(self.sandbox_id)).arg(self.config.make());
286 c
287 } else {
288 Command::new(self.config.make())
289 };
290
291 cmd.arg("-C")
292 .arg(&pkgdir)
293 .arg("show-var")
294 .arg(format!("VARNAME={}", name));
295
296 for (key, value) in self.env {
298 cmd.env(key, value);
299 }
300
301 let output = cmd.output().ok()?;
302
303 if !output.status.success() {
304 return None;
305 }
306
307 let value = String::from_utf8_lossy(&output.stdout).trim().to_string();
308
309 if value.is_empty() { None } else { Some(value) }
310 }
311
312 fn var_path(&self, name: &str) -> Option<PathBuf> {
314 self.var(name).map(PathBuf::from)
315 }
316
317 fn wrkdir(&self) -> Option<PathBuf> {
319 self.var_path("WRKDIR")
320 }
321
322 #[allow(dead_code)]
324 fn wrksrc(&self) -> Option<PathBuf> {
325 self.var_path("WRKSRC")
326 }
327
328 #[allow(dead_code)]
330 fn destdir(&self) -> Option<PathBuf> {
331 self.var_path("DESTDIR")
332 }
333
334 #[allow(dead_code)]
336 fn prefix(&self) -> Option<PathBuf> {
337 self.var_path("PREFIX")
338 }
339
340 fn resolve_path(&self, path: &Path) -> PathBuf {
343 if self.sandbox.enabled() {
344 self.sandbox
345 .path(self.sandbox_id)
346 .join(path.strip_prefix("/").unwrap_or(path))
347 } else {
348 path.to_path_buf()
349 }
350 }
351}
352
353#[derive(Debug)]
355enum PackageBuildResult {
356 Success,
358 Failed,
360 Skipped,
362}
363
364impl PackageBuild {
365 fn build(
366 &self,
367 status_tx: &Sender<ChannelCommand>,
368 ) -> anyhow::Result<PackageBuildResult> {
369 let pkgname = self.pkginfo.pkgname.pkgname();
370 info!(pkgname = %pkgname,
371 sandbox_id = self.id,
372 "Starting package build"
373 );
374
375 let Some(pkgpath) = &self.pkginfo.pkg_location else {
376 error!(pkgname = %pkgname, "Could not get PKGPATH for package");
377 bail!("Could not get PKGPATH for {}", pkgname);
378 };
379
380 let logdir = self.config.logdir();
381
382 let mut envs = self.config.script_env();
384
385 if let Some(path) = self.config.script("pkg-up-to-date") {
387 envs.push((
388 "PKG_UP_TO_DATE".to_string(),
389 format!("{}", path.display()),
390 ));
391 }
392
393 let pkg_env = match self.config.get_pkg_env(&self.pkginfo) {
395 Ok(env) => {
396 for (key, value) in &env {
397 envs.push((key.clone(), value.clone()));
398 }
399 env
400 }
401 Err(e) => {
402 error!(pkgname = %pkgname, error = %e, "Failed to get env from Lua config");
403 HashMap::new()
404 }
405 };
406
407 let patterns = self.config.save_wrkdir_patterns();
409 if !patterns.is_empty() {
410 envs.push(("SKIP_CLEAN".to_string(), "1".to_string()));
411 }
412
413 let Some(pkg_build_script) = self.config.script("pkg-build") else {
414 error!(pkgname = %pkgname, "No pkg-build script defined");
415 bail!("No pkg-build script defined");
416 };
417
418 let stdin_data = format_scan_index(&self.pkginfo);
420
421 debug!(pkgname = %pkgname,
422 env_count = envs.len(),
423 "Executing build scripts"
424 );
425 trace!(pkgname = %pkgname,
426 envs = ?envs,
427 stdin = %stdin_data,
428 "Build environment variables"
429 );
430
431 if let Some(pre_build) = self.config.script("pre-build") {
433 debug!(pkgname = %pkgname, "Running pre-build script");
434 let child = self.sandbox.execute(
435 self.id,
436 pre_build,
437 envs.clone(),
438 None,
439 None,
440 )?;
441 let output = child
442 .wait_with_output()
443 .context("Failed to wait for pre-build")?;
444 if !output.status.success() {
445 warn!(pkgname = %pkgname, exit_code = ?output.status.code(), "pre-build script failed");
446 }
447 }
448
449 let (mut status_reader, status_writer) =
451 status::channel().context("Failed to create status channel")?;
452 let status_fd = status_writer.fd();
453
454 let (mut output_reader, output_writer) = status::output_channel()
455 .context("Failed to create output channel")?;
456 let output_fd = output_writer.fd();
457
458 envs.push(("bob_output_fd".to_string(), output_fd.to_string()));
460
461 let mut child = self.sandbox.execute(
462 self.id,
463 pkg_build_script,
464 envs.clone(),
465 Some(&stdin_data),
466 Some(status_fd),
467 )?;
468
469 status_writer.close();
471 output_writer.close();
472
473 let mut was_skipped = false;
475
476 loop {
478 for msg in status_reader.read_all() {
480 match msg {
481 StatusMessage::Stage(stage) => {
482 let _ = status_tx.send(ChannelCommand::StageUpdate(
483 self.id,
484 Some(stage),
485 ));
486 }
487 StatusMessage::Skipped => {
488 was_skipped = true;
489 }
490 }
491 }
492
493 let output_lines = output_reader.read_all_lines();
495 if !output_lines.is_empty() {
496 let _ = status_tx
497 .send(ChannelCommand::OutputLines(self.id, output_lines));
498 }
499
500 match child.try_wait() {
502 Ok(Some(_status)) => break,
503 Ok(None) => {
504 std::thread::sleep(Duration::from_millis(10));
506 }
507 Err(e) => {
508 return Err(e).context("Failed to wait for pkg-build");
509 }
510 }
511 }
512
513 let remaining = output_reader.read_all_lines();
515 if !remaining.is_empty() {
516 let _ =
517 status_tx.send(ChannelCommand::OutputLines(self.id, remaining));
518 }
519
520 let _ = status_tx.send(ChannelCommand::StageUpdate(self.id, None));
522
523 let status =
525 child.wait().context("Failed to get pkg-build exit status")?;
526
527 let result = if was_skipped {
528 info!(pkgname = %pkgname,
529 "pkg-build skipped (up-to-date)"
530 );
531 PackageBuildResult::Skipped
532 } else {
533 match status.code() {
534 Some(0) => {
535 info!(pkgname = %pkgname,
536 "pkg-build completed successfully"
537 );
538 PackageBuildResult::Success
539 }
540 Some(code) => {
541 error!(pkgname = %pkgname,
542 exit_code = code,
543 "pkg-build failed"
544 );
545
546 if !patterns.is_empty() {
548 self.save_wrkdir_files(
549 pkgname, pkgpath, logdir, patterns, &pkg_env,
550 );
551 self.run_clean(pkgpath);
552 }
553 PackageBuildResult::Failed
554 }
555 None => {
556 warn!(pkgname = %pkgname,
558 "pkg-build terminated by signal"
559 );
560 PackageBuildResult::Failed
561 }
562 }
563 };
564
565 if let Some(post_build) = self.config.script("post-build") {
567 debug!(pkgname = %pkgname, "Running post-build script");
568 if let Ok(child) =
569 self.sandbox.execute(self.id, post_build, envs, None, None)
570 {
571 match child.wait_with_output() {
572 Ok(output) if !output.status.success() => {
573 warn!(pkgname = %pkgname, exit_code = ?output.status.code(), "post-build script failed");
574 }
575 Err(e) => {
576 warn!(pkgname = %pkgname, error = %e, "Failed to wait for post-build");
577 }
578 _ => {}
579 }
580 }
581 }
582
583 Ok(result)
584 }
585
586 fn save_wrkdir_files(
588 &self,
589 pkgname: &str,
590 pkgpath: &PkgPath,
591 logdir: &Path,
592 patterns: &[String],
593 pkg_env: &HashMap<String, String>,
594 ) {
595 let make = MakeQuery::new(
596 &self.config,
597 &self.sandbox,
598 self.id,
599 pkgpath,
600 pkg_env,
601 );
602
603 let wrkdir = match make.wrkdir() {
605 Some(w) => w,
606 None => {
607 debug!(pkgname = %pkgname, "Could not determine WRKDIR, skipping file save");
608 return;
609 }
610 };
611
612 let wrkdir_path = make.resolve_path(&wrkdir);
614
615 if !wrkdir_path.exists() {
616 debug!(pkgname = %pkgname,
617 wrkdir = %wrkdir_path.display(),
618 "WRKDIR does not exist, skipping file save"
619 );
620 return;
621 }
622
623 let save_dir = logdir.join(pkgname).join("wrkdir-files");
624 if let Err(e) = fs::create_dir_all(&save_dir) {
625 warn!(pkgname = %pkgname,
626 error = %e,
627 "Failed to create wrkdir-files directory"
628 );
629 return;
630 }
631
632 let compiled_patterns: Vec<Pattern> = patterns
634 .iter()
635 .filter_map(|p| {
636 Pattern::new(p).ok().or_else(|| {
637 warn!(pattern = %p, "Invalid glob pattern");
638 None
639 })
640 })
641 .collect();
642
643 if compiled_patterns.is_empty() {
644 return;
645 }
646
647 let mut saved_count = 0;
649 if let Err(e) = walk_and_save(
650 &wrkdir_path,
651 &wrkdir_path,
652 &save_dir,
653 &compiled_patterns,
654 &mut saved_count,
655 ) {
656 warn!(pkgname = %pkgname,
657 error = %e,
658 "Error while saving wrkdir files"
659 );
660 }
661
662 if saved_count > 0 {
663 info!(pkgname = %pkgname,
664 count = saved_count,
665 dest = %save_dir.display(),
666 "Saved wrkdir files"
667 );
668 }
669 }
670
671 fn run_clean(&self, pkgpath: &PkgPath) {
673 let pkgdir = self.config.pkgsrc().join(pkgpath.as_path());
674
675 let result = if self.sandbox.enabled() {
676 Command::new("/usr/sbin/chroot")
677 .arg(self.sandbox.path(self.id))
678 .arg(self.config.make())
679 .arg("-C")
680 .arg(&pkgdir)
681 .arg("clean")
682 .stdout(std::process::Stdio::null())
683 .stderr(std::process::Stdio::null())
684 .status()
685 } else {
686 Command::new(self.config.make())
687 .arg("-C")
688 .arg(&pkgdir)
689 .arg("clean")
690 .stdout(std::process::Stdio::null())
691 .stderr(std::process::Stdio::null())
692 .status()
693 };
694
695 if let Err(e) = result {
696 debug!(error = %e, "Failed to run bmake clean");
697 }
698 }
699}
700
701fn walk_and_save(
703 base: &Path,
704 current: &Path,
705 save_dir: &Path,
706 patterns: &[Pattern],
707 saved_count: &mut usize,
708) -> std::io::Result<()> {
709 if !current.is_dir() {
710 return Ok(());
711 }
712
713 for entry in fs::read_dir(current)? {
714 let entry = entry?;
715 let path = entry.path();
716
717 if path.is_dir() {
718 walk_and_save(base, &path, save_dir, patterns, saved_count)?;
719 } else if path.is_file() {
720 let rel_path = path.strip_prefix(base).unwrap_or(&path);
722 let rel_str = rel_path.to_string_lossy();
723
724 for pattern in patterns {
726 if pattern.matches(&rel_str)
727 || pattern.matches(
728 path.file_name()
729 .unwrap_or_default()
730 .to_string_lossy()
731 .as_ref(),
732 )
733 {
734 let dest_path = save_dir.join(rel_path);
736 if let Some(parent) = dest_path.parent() {
737 fs::create_dir_all(parent)?;
738 }
739
740 if let Err(e) = fs::copy(&path, &dest_path) {
742 warn!(src = %path.display(),
743 dest = %dest_path.display(),
744 error = %e,
745 "Failed to copy file"
746 );
747 } else {
748 debug!(src = %path.display(),
749 dest = %dest_path.display(),
750 "Saved wrkdir file"
751 );
752 *saved_count += 1;
753 }
754 break; }
756 }
757 }
758 }
759
760 Ok(())
761}
762
763#[derive(Debug)]
767enum ChannelCommand {
768 ClientReady(usize),
772 ComeBackLater,
776 JobData(Box<PackageBuild>),
780 JobSuccess(PkgName, Duration),
784 JobFailed(PkgName, Duration),
788 JobSkipped(PkgName),
792 JobError((PkgName, Duration, anyhow::Error)),
796 Quit,
800 Shutdown,
804 StageUpdate(usize, Option<String>),
808 OutputLines(usize, Vec<String>),
812}
813
814#[derive(Debug)]
818enum BuildStatus {
819 Available(PkgName),
823 NoneAvailable,
828 Done,
832}
833
834#[derive(Clone, Debug)]
835struct BuildJobs {
836 scanpkgs: HashMap<PkgName, ScanIndex>,
837 incoming: HashMap<PkgName, HashSet<PkgName>>,
838 running: HashSet<PkgName>,
839 done: HashSet<PkgName>,
840 failed: HashSet<PkgName>,
841 results: Vec<BuildResult>,
842 logdir: PathBuf,
843}
844
845impl BuildJobs {
846 fn mark_success(&mut self, pkgname: &PkgName, duration: Duration) {
850 self.mark_done(pkgname, BuildOutcome::Success, duration);
851 }
852
853 fn mark_skipped(&mut self, pkgname: &PkgName) {
857 self.mark_done(
858 pkgname,
859 BuildOutcome::Skipped("up-to-date".to_string()),
860 Duration::ZERO,
861 );
862 }
863
864 fn mark_done(
865 &mut self,
866 pkgname: &PkgName,
867 outcome: BuildOutcome,
868 duration: Duration,
869 ) {
870 for dep in self.incoming.values_mut() {
876 if dep.contains(pkgname) {
877 dep.remove(pkgname);
878 }
879 }
880 self.done.insert(pkgname.clone());
885
886 let scanpkg = self.scanpkgs.get(pkgname);
888 let log_dir = Some(self.logdir.join(pkgname.pkgname()));
889 self.results.push(BuildResult {
890 pkgname: pkgname.clone(),
891 pkgpath: scanpkg.and_then(|s| s.pkg_location.clone()),
892 outcome,
893 duration,
894 log_dir,
895 });
896 }
897
898 fn mark_failure(&mut self, pkgname: &PkgName, duration: Duration) {
902 let mut broken: HashSet<PkgName> = HashSet::new();
903 let mut to_check: Vec<PkgName> = vec![];
904 to_check.push(pkgname.clone());
905 loop {
910 let Some(badpkg) = to_check.pop() else {
912 break;
913 };
914 if broken.contains(&badpkg) {
916 continue;
917 }
918 for (pkg, deps) in &self.incoming {
919 if deps.contains(&badpkg) {
920 to_check.push(pkg.clone());
921 }
922 }
923 broken.insert(badpkg);
924 }
925 let is_original = |p: &PkgName| p == pkgname;
932 for pkg in broken {
933 self.incoming.remove(&pkg);
934 self.failed.insert(pkg.clone());
935
936 let scanpkg = self.scanpkgs.get(&pkg);
938 let log_dir = Some(self.logdir.join(pkg.pkgname()));
939 let (outcome, dur) = if is_original(&pkg) {
940 (BuildOutcome::Failed("Build failed".to_string()), duration)
941 } else {
942 (
943 BuildOutcome::Skipped(format!(
944 "Dependency {} failed",
945 pkgname.pkgname()
946 )),
947 Duration::ZERO,
948 )
949 };
950 self.results.push(BuildResult {
951 pkgname: pkg,
952 pkgpath: scanpkg.and_then(|s| s.pkg_location.clone()),
953 outcome,
954 duration: dur,
955 log_dir,
956 });
957 }
958 }
959
960 fn get_next_build(&self) -> BuildStatus {
964 if self.incoming.is_empty() {
968 return BuildStatus::Done;
969 }
970
971 let mut pkgs: Vec<(PkgName, usize)> = self
978 .incoming
979 .iter()
980 .filter(|(_, v)| v.is_empty())
981 .map(|(k, _)| {
982 (
983 k.clone(),
984 self.scanpkgs
985 .get(k)
986 .unwrap()
987 .pbulk_weight
988 .clone()
989 .unwrap_or("100".to_string())
990 .parse()
991 .unwrap_or(100),
992 )
993 })
994 .collect();
995
996 if pkgs.is_empty() {
1002 return BuildStatus::NoneAvailable;
1003 }
1004
1005 pkgs.sort_by_key(|&(_, weight)| std::cmp::Reverse(weight));
1009 BuildStatus::Available(pkgs[0].0.clone())
1010 }
1011}
1012
1013impl Build {
1014 pub fn new(
1015 config: &Config,
1016 scanpkgs: HashMap<PkgName, ScanIndex>,
1017 ) -> Build {
1018 let sandbox = Sandbox::new(config);
1019 info!(
1020 package_count = scanpkgs.len(),
1021 sandbox_enabled = sandbox.enabled(),
1022 build_threads = config.build_threads(),
1023 "Creating new Build instance"
1024 );
1025 for (pkgname, index) in &scanpkgs {
1026 debug!(pkgname = %pkgname.pkgname(),
1027 pkgpath = ?index.pkg_location,
1028 depends_count = index.depends.len(),
1029 depends = ?index.depends.iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
1030 "Package in build queue"
1031 );
1032 }
1033 Build { config: config.clone(), sandbox, scanpkgs }
1034 }
1035
1036 pub fn start(
1037 &mut self,
1038 shutdown_flag: Arc<AtomicBool>,
1039 ) -> anyhow::Result<BuildSummary> {
1040 let started = Instant::now();
1041
1042 info!(package_count = self.scanpkgs.len(), "Build::start() called");
1043
1044 debug!("Populating BuildJobs from scanpkgs");
1048 let mut incoming: HashMap<PkgName, HashSet<PkgName>> = HashMap::new();
1049 for (pkgname, index) in &self.scanpkgs {
1050 let mut deps: HashSet<PkgName> = HashSet::new();
1051 for dep in &index.depends {
1052 deps.insert(dep.clone());
1053 }
1054 trace!(pkgname = %pkgname.pkgname(),
1055 deps_count = deps.len(),
1056 deps = ?deps.iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
1057 "Adding package to incoming build queue"
1058 );
1059 incoming.insert(pkgname.clone(), deps);
1060 }
1061
1062 info!(
1063 incoming_count = incoming.len(),
1064 scanpkgs_count = self.scanpkgs.len(),
1065 "BuildJobs populated"
1066 );
1067
1068 let running: HashSet<PkgName> = HashSet::new();
1069 let done: HashSet<PkgName> = HashSet::new();
1070 let failed: HashSet<PkgName> = HashSet::new();
1071 let results: Vec<BuildResult> = Vec::new();
1072 let logdir = self.config.logdir().clone();
1073 let jobs = BuildJobs {
1074 scanpkgs: self.scanpkgs.clone(),
1075 incoming,
1076 running,
1077 done,
1078 failed,
1079 results,
1080 logdir,
1081 };
1082
1083 if self.sandbox.enabled() {
1085 println!("Creating sandboxes...");
1086 for i in 0..self.config.build_threads() {
1087 if let Err(e) = self.sandbox.create(i) {
1088 for j in (0..=i).rev() {
1090 if let Err(destroy_err) = self.sandbox.destroy(j) {
1091 eprintln!(
1092 "Warning: failed to destroy sandbox {}: {}",
1093 j, destroy_err
1094 );
1095 }
1096 }
1097 return Err(e);
1098 }
1099 }
1100 }
1101
1102 println!("Building packages...");
1103
1104 let progress = Arc::new(Mutex::new(
1106 MultiProgress::new(
1107 "Building",
1108 "Built",
1109 self.scanpkgs.len(),
1110 self.config.build_threads(),
1111 true,
1112 )
1113 .expect("Failed to initialize progress display"),
1114 ));
1115
1116 let stop_refresh = Arc::new(AtomicBool::new(false));
1118
1119 let progress_refresh = Arc::clone(&progress);
1121 let stop_flag = Arc::clone(&stop_refresh);
1122 let shutdown_for_refresh = Arc::clone(&shutdown_flag);
1123 let refresh_thread = std::thread::spawn(move || {
1124 while !stop_flag.load(Ordering::Relaxed)
1125 && !shutdown_for_refresh.load(Ordering::SeqCst)
1126 {
1127 if let Ok(mut p) = progress_refresh.lock() {
1128 let _ = p.poll_events();
1130 let _ = p.render_throttled();
1131 }
1132 std::thread::sleep(Duration::from_millis(50));
1133 }
1134 });
1135
1136 let (manager_tx, manager_rx) = mpsc::channel::<ChannelCommand>();
1141
1142 let mut threads = vec![];
1148 let mut clients: HashMap<usize, Sender<ChannelCommand>> =
1149 HashMap::new();
1150 for i in 0..self.config.build_threads() {
1151 let (client_tx, client_rx) = mpsc::channel::<ChannelCommand>();
1152 clients.insert(i, client_tx);
1153 let manager_tx = manager_tx.clone();
1154 let thread = std::thread::spawn(move || {
1155 loop {
1156 if manager_tx.send(ChannelCommand::ClientReady(i)).is_err()
1158 {
1159 break;
1160 }
1161
1162 let Ok(msg) = client_rx.recv() else {
1163 break;
1164 };
1165
1166 match msg {
1167 ChannelCommand::ComeBackLater => {
1168 std::thread::sleep(Duration::from_millis(100));
1169 continue;
1170 }
1171 ChannelCommand::JobData(pkg) => {
1172 let pkgname = pkg.pkginfo.pkgname.clone();
1173 let build_start = Instant::now();
1174 match pkg.build(&manager_tx) {
1175 Ok(PackageBuildResult::Success) => {
1176 let duration = build_start.elapsed();
1177 let _ = manager_tx.send(
1178 ChannelCommand::JobSuccess(
1179 pkgname, duration,
1180 ),
1181 );
1182 }
1183 Ok(PackageBuildResult::Skipped) => {
1184 let _ = manager_tx.send(
1185 ChannelCommand::JobSkipped(pkgname),
1186 );
1187 }
1188 Ok(PackageBuildResult::Failed) => {
1189 let duration = build_start.elapsed();
1190 let _ = manager_tx.send(
1191 ChannelCommand::JobFailed(
1192 pkgname, duration,
1193 ),
1194 );
1195 }
1196 Err(e) => {
1197 let duration = build_start.elapsed();
1198 let _ = manager_tx.send(
1199 ChannelCommand::JobError((
1200 pkgname, duration, e,
1201 )),
1202 );
1203 }
1204 }
1205 continue;
1206 }
1207 ChannelCommand::Quit | ChannelCommand::Shutdown => {
1208 break;
1209 }
1210 _ => todo!(),
1211 }
1212 }
1213 });
1214 threads.push(thread);
1215 }
1216
1217 let config = self.config.clone();
1222 let sandbox = self.sandbox.clone();
1223 let progress_clone = Arc::clone(&progress);
1224 let shutdown_for_manager = Arc::clone(&shutdown_flag);
1225 let (results_tx, results_rx) = mpsc::channel::<Vec<BuildResult>>();
1226 let (interrupted_tx, interrupted_rx) = mpsc::channel::<bool>();
1227 let manager = std::thread::spawn(move || {
1228 let mut clients = clients.clone();
1229 let config = config.clone();
1230 let sandbox = sandbox.clone();
1231 let mut jobs = jobs.clone();
1232 let mut was_interrupted = false;
1233
1234 let mut thread_packages: HashMap<usize, PkgName> = HashMap::new();
1236
1237 loop {
1238 if shutdown_for_manager.load(Ordering::SeqCst) {
1240 if let Ok(mut p) = progress_clone.lock() {
1242 p.state_mut().suppress();
1243 }
1244 for (_, client) in clients.drain() {
1246 let _ = client.send(ChannelCommand::Shutdown);
1247 }
1248 was_interrupted = true;
1249 break;
1250 }
1251
1252 let command =
1254 match manager_rx.recv_timeout(Duration::from_millis(50)) {
1255 Ok(cmd) => cmd,
1256 Err(mpsc::RecvTimeoutError::Timeout) => continue,
1257 Err(mpsc::RecvTimeoutError::Disconnected) => break,
1258 };
1259
1260 match command {
1261 ChannelCommand::ClientReady(c) => {
1262 let client = clients.get(&c).unwrap();
1263 match jobs.get_next_build() {
1264 BuildStatus::Available(pkg) => {
1265 let pkginfo = jobs.scanpkgs.get(&pkg).unwrap();
1266 jobs.incoming.remove(&pkg);
1267 jobs.running.insert(pkg.clone());
1268
1269 thread_packages.insert(c, pkg.clone());
1271 if let Ok(mut p) = progress_clone.lock() {
1272 p.clear_output_buffer(c);
1273 p.state_mut()
1274 .set_worker_active(c, pkg.pkgname());
1275 let _ = p.render_throttled();
1276 }
1277
1278 let _ = client.send(ChannelCommand::JobData(
1279 Box::new(PackageBuild {
1280 id: c,
1281 config: config.clone(),
1282 pkginfo: pkginfo.clone(),
1283 sandbox: sandbox.clone(),
1284 }),
1285 ));
1286 }
1287 BuildStatus::NoneAvailable => {
1288 if let Ok(mut p) = progress_clone.lock() {
1289 p.clear_output_buffer(c);
1290 p.state_mut().set_worker_idle(c);
1291 let _ = p.render_throttled();
1292 }
1293 let _ =
1294 client.send(ChannelCommand::ComeBackLater);
1295 }
1296 BuildStatus::Done => {
1297 if let Ok(mut p) = progress_clone.lock() {
1298 p.clear_output_buffer(c);
1299 p.state_mut().set_worker_idle(c);
1300 let _ = p.render_throttled();
1301 }
1302 let _ = client.send(ChannelCommand::Quit);
1303 clients.remove(&c);
1304 if clients.is_empty() {
1305 break;
1306 }
1307 }
1308 };
1309 }
1310 ChannelCommand::JobSuccess(pkgname, duration) => {
1311 if shutdown_for_manager.load(Ordering::SeqCst) {
1313 continue;
1314 }
1315
1316 jobs.mark_success(&pkgname, duration);
1317 jobs.running.remove(&pkgname);
1318
1319 if let Ok(mut p) = progress_clone.lock() {
1321 let _ = p.print_status(&format!(
1322 " Built {} ({})",
1323 pkgname.pkgname(),
1324 format_duration(duration)
1325 ));
1326 p.state_mut().increment_completed();
1327 for (tid, pkg) in &thread_packages {
1328 if pkg == &pkgname {
1329 p.clear_output_buffer(*tid);
1330 p.state_mut().set_worker_idle(*tid);
1331 break;
1332 }
1333 }
1334 let _ = p.render_throttled();
1335 }
1336 }
1337 ChannelCommand::JobSkipped(pkgname) => {
1338 if shutdown_for_manager.load(Ordering::SeqCst) {
1340 continue;
1341 }
1342
1343 jobs.mark_skipped(&pkgname);
1344 jobs.running.remove(&pkgname);
1345
1346 if let Ok(mut p) = progress_clone.lock() {
1348 let _ = p.print_status(&format!(
1349 " Skipped {} (up-to-date)",
1350 pkgname.pkgname()
1351 ));
1352 p.state_mut().increment_skipped();
1353 for (tid, pkg) in &thread_packages {
1354 if pkg == &pkgname {
1355 p.clear_output_buffer(*tid);
1356 p.state_mut().set_worker_idle(*tid);
1357 break;
1358 }
1359 }
1360 let _ = p.render_throttled();
1361 }
1362 }
1363 ChannelCommand::JobFailed(pkgname, duration) => {
1364 if shutdown_for_manager.load(Ordering::SeqCst) {
1366 continue;
1367 }
1368
1369 jobs.mark_failure(&pkgname, duration);
1370 jobs.running.remove(&pkgname);
1371
1372 if let Ok(mut p) = progress_clone.lock() {
1374 let _ = p.print_status(&format!(
1375 " Failed {} ({})",
1376 pkgname.pkgname(),
1377 format_duration(duration)
1378 ));
1379 p.state_mut().increment_failed();
1380 for (tid, pkg) in &thread_packages {
1381 if pkg == &pkgname {
1382 p.clear_output_buffer(*tid);
1383 p.state_mut().set_worker_idle(*tid);
1384 break;
1385 }
1386 }
1387 let _ = p.render_throttled();
1388 }
1389 }
1390 ChannelCommand::JobError((pkgname, duration, e)) => {
1391 if shutdown_for_manager.load(Ordering::SeqCst) {
1393 continue;
1394 }
1395
1396 jobs.mark_failure(&pkgname, duration);
1397 jobs.running.remove(&pkgname);
1398
1399 if let Ok(mut p) = progress_clone.lock() {
1401 let _ = p.print_status(&format!(
1402 " Failed {} ({})",
1403 pkgname.pkgname(),
1404 format_duration(duration)
1405 ));
1406 p.state_mut().increment_failed();
1407 for (tid, pkg) in &thread_packages {
1408 if pkg == &pkgname {
1409 p.clear_output_buffer(*tid);
1410 p.state_mut().set_worker_idle(*tid);
1411 break;
1412 }
1413 }
1414 let _ = p.render_throttled();
1415 }
1416 tracing::error!(error = %e, pkgname = %pkgname.pkgname(), "Build error");
1417 }
1418 ChannelCommand::StageUpdate(tid, stage) => {
1419 if let Ok(mut p) = progress_clone.lock() {
1420 p.state_mut()
1421 .set_worker_stage(tid, stage.as_deref());
1422 let _ = p.render_throttled();
1423 }
1424 }
1425 ChannelCommand::OutputLines(tid, lines) => {
1426 if let Ok(mut p) = progress_clone.lock() {
1427 if let Some(buf) = p.output_buffer_mut(tid) {
1428 for line in lines {
1429 buf.push(line);
1430 }
1431 }
1432 }
1433 }
1434 _ => {}
1435 }
1436 }
1437
1438 let _ = results_tx.send(jobs.results);
1440 let _ = interrupted_tx.send(was_interrupted);
1441 });
1442
1443 threads.push(manager);
1444 for thread in threads {
1445 thread.join().expect("thread panicked");
1446 }
1447
1448 stop_refresh.store(true, Ordering::Relaxed);
1450 let _ = refresh_thread.join();
1451
1452 let was_interrupted = interrupted_rx.recv().unwrap_or(false);
1454
1455 if let Ok(mut p) = progress.lock() {
1457 if was_interrupted {
1458 let _ = p.finish_interrupted();
1459 } else {
1460 let _ = p.finish();
1461 }
1462 }
1463
1464 let results = results_rx.recv().unwrap_or_default();
1466 let summary = BuildSummary { duration: started.elapsed(), results };
1467
1468 if self.sandbox.enabled() {
1469 self.sandbox.destroy_all(self.config.build_threads())?;
1470 }
1471
1472 Ok(summary)
1473 }
1474}