1use crate::scan::ResolvedIndex;
67use crate::scan::ScanFailure;
68use crate::status::{self, StatusMessage};
69use crate::tui::{MultiProgress, format_duration};
70use crate::{Config, RunContext, Sandbox};
71use anyhow::{Context, bail};
72use glob::Pattern;
73use pkgsrc::{PkgName, PkgPath};
74use std::collections::{HashMap, HashSet};
75use std::fs;
76use std::path::{Path, PathBuf};
77use std::process::Command;
78use std::sync::atomic::{AtomicBool, Ordering};
79use std::sync::{Arc, Mutex, mpsc, mpsc::Sender};
80use std::time::{Duration, Instant};
81use tracing::{debug, error, info, trace, warn};
82
83fn format_scan_index(idx: &ResolvedIndex) -> String {
85 idx.to_string()
86}
87
88#[derive(Clone, Debug)]
93#[allow(dead_code)]
94pub enum BuildOutcome {
95 Success,
97 Failed(String),
101 Skipped(String),
109}
110
111#[derive(Clone, Debug)]
115pub struct BuildResult {
116 pub pkgname: PkgName,
118 pub pkgpath: Option<PkgPath>,
120 pub outcome: BuildOutcome,
122 pub duration: Duration,
124 pub log_dir: Option<PathBuf>,
129}
130
131#[derive(Clone, Debug)]
151pub struct BuildSummary {
152 pub duration: Duration,
154 pub results: Vec<BuildResult>,
156 pub scan_failed: Vec<ScanFailure>,
158}
159
160impl BuildSummary {
161 pub fn success_count(&self) -> usize {
163 self.results
164 .iter()
165 .filter(|r| matches!(r.outcome, BuildOutcome::Success))
166 .count()
167 }
168
169 pub fn failed_count(&self) -> usize {
171 self.results
172 .iter()
173 .filter(|r| matches!(r.outcome, BuildOutcome::Failed(_)))
174 .count()
175 }
176
177 pub fn skipped_count(&self) -> usize {
179 self.results
180 .iter()
181 .filter(|r| matches!(r.outcome, BuildOutcome::Skipped(_)))
182 .count()
183 }
184
185 pub fn scan_failed_count(&self) -> usize {
187 self.scan_failed.len()
188 }
189
190 pub fn failed(&self) -> Vec<&BuildResult> {
192 self.results
193 .iter()
194 .filter(|r| matches!(r.outcome, BuildOutcome::Failed(_)))
195 .collect()
196 }
197
198 pub fn succeeded(&self) -> Vec<&BuildResult> {
200 self.results
201 .iter()
202 .filter(|r| matches!(r.outcome, BuildOutcome::Success))
203 .collect()
204 }
205
206 pub fn skipped(&self) -> Vec<&BuildResult> {
208 self.results
209 .iter()
210 .filter(|r| matches!(r.outcome, BuildOutcome::Skipped(_)))
211 .collect()
212 }
213}
214
215#[derive(Debug, Default)]
216pub struct Build {
217 config: Config,
221 sandbox: Sandbox,
225 scanpkgs: HashMap<PkgName, ResolvedIndex>,
229}
230
231#[derive(Debug)]
232struct PackageBuild {
233 id: usize,
234 config: Config,
235 pkginfo: ResolvedIndex,
236 sandbox: Sandbox,
237}
238
239struct MakeQuery<'a> {
241 config: &'a Config,
242 sandbox: &'a Sandbox,
243 sandbox_id: usize,
244 pkgpath: &'a PkgPath,
245 env: &'a HashMap<String, String>,
246}
247
248impl<'a> MakeQuery<'a> {
249 fn new(
250 config: &'a Config,
251 sandbox: &'a Sandbox,
252 sandbox_id: usize,
253 pkgpath: &'a PkgPath,
254 env: &'a HashMap<String, String>,
255 ) -> Self {
256 Self { config, sandbox, sandbox_id, pkgpath, env }
257 }
258
259 fn var(&self, name: &str) -> Option<String> {
261 let pkgdir = self.config.pkgsrc().join(self.pkgpath.as_path());
262
263 let mut cmd = if self.sandbox.enabled() {
264 let mut c = Command::new("/usr/sbin/chroot");
265 c.arg(self.sandbox.path(self.sandbox_id)).arg(self.config.make());
266 c
267 } else {
268 Command::new(self.config.make())
269 };
270
271 cmd.arg("-C")
272 .arg(&pkgdir)
273 .arg("show-var")
274 .arg(format!("VARNAME={}", name));
275
276 for (key, value) in self.env {
278 cmd.env(key, value);
279 }
280
281 let output = cmd.output().ok()?;
282
283 if !output.status.success() {
284 return None;
285 }
286
287 let value = String::from_utf8_lossy(&output.stdout).trim().to_string();
288
289 if value.is_empty() { None } else { Some(value) }
290 }
291
292 fn var_path(&self, name: &str) -> Option<PathBuf> {
294 self.var(name).map(PathBuf::from)
295 }
296
297 fn wrkdir(&self) -> Option<PathBuf> {
299 self.var_path("WRKDIR")
300 }
301
302 #[allow(dead_code)]
304 fn wrksrc(&self) -> Option<PathBuf> {
305 self.var_path("WRKSRC")
306 }
307
308 #[allow(dead_code)]
310 fn destdir(&self) -> Option<PathBuf> {
311 self.var_path("DESTDIR")
312 }
313
314 #[allow(dead_code)]
316 fn prefix(&self) -> Option<PathBuf> {
317 self.var_path("PREFIX")
318 }
319
320 fn resolve_path(&self, path: &Path) -> PathBuf {
323 if self.sandbox.enabled() {
324 self.sandbox
325 .path(self.sandbox_id)
326 .join(path.strip_prefix("/").unwrap_or(path))
327 } else {
328 path.to_path_buf()
329 }
330 }
331}
332
333#[derive(Debug)]
335enum PackageBuildResult {
336 Success,
338 Failed,
340 Skipped,
342}
343
344impl PackageBuild {
345 fn build(
346 &self,
347 status_tx: &Sender<ChannelCommand>,
348 ) -> anyhow::Result<PackageBuildResult> {
349 let pkgname = self.pkginfo.pkgname.pkgname();
350 info!(pkgname = %pkgname,
351 sandbox_id = self.id,
352 "Starting package build"
353 );
354
355 let Some(pkgpath) = &self.pkginfo.pkg_location else {
356 error!(pkgname = %pkgname, "Could not get PKGPATH for package");
357 bail!("Could not get PKGPATH for {}", pkgname);
358 };
359
360 let logdir = self.config.logdir();
361
362 let mut envs = self.config.script_env();
364
365 if let Some(path) = self.config.script("pkg-up-to-date") {
367 envs.push((
368 "PKG_UP_TO_DATE".to_string(),
369 format!("{}", path.display()),
370 ));
371 }
372
373 let pkg_env = match self.config.get_pkg_env(&self.pkginfo) {
375 Ok(env) => {
376 for (key, value) in &env {
377 envs.push((key.clone(), value.clone()));
378 }
379 env
380 }
381 Err(e) => {
382 error!(pkgname = %pkgname, error = %e, "Failed to get env from Lua config");
383 HashMap::new()
384 }
385 };
386
387 let patterns = self.config.save_wrkdir_patterns();
389 if !patterns.is_empty() {
390 envs.push(("SKIP_CLEAN".to_string(), "1".to_string()));
391 }
392
393 let Some(pkg_build_script) = self.config.script("pkg-build") else {
394 error!(pkgname = %pkgname, "No pkg-build script defined");
395 bail!("No pkg-build script defined");
396 };
397
398 let stdin_data = format_scan_index(&self.pkginfo);
400
401 debug!(pkgname = %pkgname,
402 env_count = envs.len(),
403 "Executing build scripts"
404 );
405 trace!(pkgname = %pkgname,
406 envs = ?envs,
407 stdin = %stdin_data,
408 "Build environment variables"
409 );
410
411 if let Some(pre_build) = self.config.script("pre-build") {
413 debug!(pkgname = %pkgname, "Running pre-build script");
414 let child = self.sandbox.execute(
415 self.id,
416 pre_build,
417 envs.clone(),
418 None,
419 None,
420 )?;
421 let output = child
422 .wait_with_output()
423 .context("Failed to wait for pre-build")?;
424 if !output.status.success() {
425 warn!(pkgname = %pkgname, exit_code = ?output.status.code(), "pre-build script failed");
426 }
427 }
428
429 let (mut status_reader, status_writer) =
431 status::channel().context("Failed to create status channel")?;
432 let status_fd = status_writer.fd();
433
434 let (mut output_reader, output_writer) = status::output_channel()
435 .context("Failed to create output channel")?;
436 let output_fd = output_writer.fd();
437
438 envs.push(("bob_output_fd".to_string(), output_fd.to_string()));
440
441 let mut child = self.sandbox.execute(
442 self.id,
443 pkg_build_script,
444 envs.clone(),
445 Some(&stdin_data),
446 Some(status_fd),
447 )?;
448
449 status_writer.close();
451 output_writer.close();
452
453 let mut was_skipped = false;
455
456 loop {
458 for msg in status_reader.read_all() {
460 match msg {
461 StatusMessage::Stage(stage) => {
462 let _ = status_tx.send(ChannelCommand::StageUpdate(
463 self.id,
464 Some(stage),
465 ));
466 }
467 StatusMessage::Skipped => {
468 was_skipped = true;
469 }
470 }
471 }
472
473 let output_lines = output_reader.read_all_lines();
475 if !output_lines.is_empty() {
476 let _ = status_tx
477 .send(ChannelCommand::OutputLines(self.id, output_lines));
478 }
479
480 match child.try_wait() {
482 Ok(Some(_status)) => break,
483 Ok(None) => {
484 std::thread::sleep(Duration::from_millis(10));
486 }
487 Err(e) => {
488 return Err(e).context("Failed to wait for pkg-build");
489 }
490 }
491 }
492
493 let remaining = output_reader.read_all_lines();
495 if !remaining.is_empty() {
496 let _ =
497 status_tx.send(ChannelCommand::OutputLines(self.id, remaining));
498 }
499
500 let _ = status_tx.send(ChannelCommand::StageUpdate(self.id, None));
502
503 let status =
505 child.wait().context("Failed to get pkg-build exit status")?;
506
507 let result = if was_skipped {
508 info!(pkgname = %pkgname,
509 "pkg-build skipped (up-to-date)"
510 );
511 PackageBuildResult::Skipped
512 } else {
513 match status.code() {
514 Some(0) => {
515 info!(pkgname = %pkgname,
516 "pkg-build completed successfully"
517 );
518 PackageBuildResult::Success
519 }
520 Some(code) => {
521 error!(pkgname = %pkgname,
522 exit_code = code,
523 "pkg-build failed"
524 );
525
526 if !patterns.is_empty() {
528 self.save_wrkdir_files(
529 pkgname, pkgpath, logdir, patterns, &pkg_env,
530 );
531 self.run_clean(pkgpath);
532 }
533 PackageBuildResult::Failed
534 }
535 None => {
536 warn!(pkgname = %pkgname,
538 "pkg-build terminated by signal"
539 );
540 PackageBuildResult::Failed
541 }
542 }
543 };
544
545 if let Some(post_build) = self.config.script("post-build") {
547 debug!(pkgname = %pkgname, "Running post-build script");
548 if let Ok(child) =
549 self.sandbox.execute(self.id, post_build, envs, None, None)
550 {
551 match child.wait_with_output() {
552 Ok(output) if !output.status.success() => {
553 warn!(pkgname = %pkgname, exit_code = ?output.status.code(), "post-build script failed");
554 }
555 Err(e) => {
556 warn!(pkgname = %pkgname, error = %e, "Failed to wait for post-build");
557 }
558 _ => {}
559 }
560 }
561 }
562
563 Ok(result)
564 }
565
566 fn save_wrkdir_files(
568 &self,
569 pkgname: &str,
570 pkgpath: &PkgPath,
571 logdir: &Path,
572 patterns: &[String],
573 pkg_env: &HashMap<String, String>,
574 ) {
575 let make = MakeQuery::new(
576 &self.config,
577 &self.sandbox,
578 self.id,
579 pkgpath,
580 pkg_env,
581 );
582
583 let wrkdir = match make.wrkdir() {
585 Some(w) => w,
586 None => {
587 debug!(pkgname = %pkgname, "Could not determine WRKDIR, skipping file save");
588 return;
589 }
590 };
591
592 let wrkdir_path = make.resolve_path(&wrkdir);
594
595 if !wrkdir_path.exists() {
596 debug!(pkgname = %pkgname,
597 wrkdir = %wrkdir_path.display(),
598 "WRKDIR does not exist, skipping file save"
599 );
600 return;
601 }
602
603 let save_dir = logdir.join(pkgname).join("wrkdir-files");
604 if let Err(e) = fs::create_dir_all(&save_dir) {
605 warn!(pkgname = %pkgname,
606 error = %e,
607 "Failed to create wrkdir-files directory"
608 );
609 return;
610 }
611
612 let compiled_patterns: Vec<Pattern> = patterns
614 .iter()
615 .filter_map(|p| {
616 Pattern::new(p).ok().or_else(|| {
617 warn!(pattern = %p, "Invalid glob pattern");
618 None
619 })
620 })
621 .collect();
622
623 if compiled_patterns.is_empty() {
624 return;
625 }
626
627 let mut saved_count = 0;
629 if let Err(e) = walk_and_save(
630 &wrkdir_path,
631 &wrkdir_path,
632 &save_dir,
633 &compiled_patterns,
634 &mut saved_count,
635 ) {
636 warn!(pkgname = %pkgname,
637 error = %e,
638 "Error while saving wrkdir files"
639 );
640 }
641
642 if saved_count > 0 {
643 info!(pkgname = %pkgname,
644 count = saved_count,
645 dest = %save_dir.display(),
646 "Saved wrkdir files"
647 );
648 }
649 }
650
651 fn run_clean(&self, pkgpath: &PkgPath) {
653 let pkgdir = self.config.pkgsrc().join(pkgpath.as_path());
654
655 let result = if self.sandbox.enabled() {
656 Command::new("/usr/sbin/chroot")
657 .arg(self.sandbox.path(self.id))
658 .arg(self.config.make())
659 .arg("-C")
660 .arg(&pkgdir)
661 .arg("clean")
662 .stdout(std::process::Stdio::null())
663 .stderr(std::process::Stdio::null())
664 .status()
665 } else {
666 Command::new(self.config.make())
667 .arg("-C")
668 .arg(&pkgdir)
669 .arg("clean")
670 .stdout(std::process::Stdio::null())
671 .stderr(std::process::Stdio::null())
672 .status()
673 };
674
675 if let Err(e) = result {
676 debug!(error = %e, "Failed to run bmake clean");
677 }
678 }
679}
680
681fn walk_and_save(
683 base: &Path,
684 current: &Path,
685 save_dir: &Path,
686 patterns: &[Pattern],
687 saved_count: &mut usize,
688) -> std::io::Result<()> {
689 if !current.is_dir() {
690 return Ok(());
691 }
692
693 for entry in fs::read_dir(current)? {
694 let entry = entry?;
695 let path = entry.path();
696
697 if path.is_dir() {
698 walk_and_save(base, &path, save_dir, patterns, saved_count)?;
699 } else if path.is_file() {
700 let rel_path = path.strip_prefix(base).unwrap_or(&path);
702 let rel_str = rel_path.to_string_lossy();
703
704 for pattern in patterns {
706 if pattern.matches(&rel_str)
707 || pattern.matches(
708 path.file_name()
709 .unwrap_or_default()
710 .to_string_lossy()
711 .as_ref(),
712 )
713 {
714 let dest_path = save_dir.join(rel_path);
716 if let Some(parent) = dest_path.parent() {
717 fs::create_dir_all(parent)?;
718 }
719
720 if let Err(e) = fs::copy(&path, &dest_path) {
722 warn!(src = %path.display(),
723 dest = %dest_path.display(),
724 error = %e,
725 "Failed to copy file"
726 );
727 } else {
728 debug!(src = %path.display(),
729 dest = %dest_path.display(),
730 "Saved wrkdir file"
731 );
732 *saved_count += 1;
733 }
734 break; }
736 }
737 }
738 }
739
740 Ok(())
741}
742
743#[derive(Debug)]
747enum ChannelCommand {
748 ClientReady(usize),
752 ComeBackLater,
756 JobData(Box<PackageBuild>),
760 JobSuccess(PkgName, Duration),
764 JobFailed(PkgName, Duration),
768 JobSkipped(PkgName),
772 JobError((PkgName, Duration, anyhow::Error)),
776 Quit,
780 Shutdown,
784 StageUpdate(usize, Option<String>),
788 OutputLines(usize, Vec<String>),
792}
793
794#[derive(Debug)]
798enum BuildStatus {
799 Available(PkgName),
803 NoneAvailable,
808 Done,
812}
813
814#[derive(Clone, Debug)]
815struct BuildJobs {
816 scanpkgs: HashMap<PkgName, ResolvedIndex>,
817 incoming: HashMap<PkgName, HashSet<PkgName>>,
818 running: HashSet<PkgName>,
819 done: HashSet<PkgName>,
820 failed: HashSet<PkgName>,
821 results: Vec<BuildResult>,
822 logdir: PathBuf,
823}
824
825impl BuildJobs {
826 fn mark_success(&mut self, pkgname: &PkgName, duration: Duration) {
830 self.mark_done(pkgname, BuildOutcome::Success, duration);
831 }
832
833 fn mark_skipped(&mut self, pkgname: &PkgName) {
837 self.mark_done(
838 pkgname,
839 BuildOutcome::Skipped("up-to-date".to_string()),
840 Duration::ZERO,
841 );
842 }
843
844 fn mark_done(
845 &mut self,
846 pkgname: &PkgName,
847 outcome: BuildOutcome,
848 duration: Duration,
849 ) {
850 for dep in self.incoming.values_mut() {
856 if dep.contains(pkgname) {
857 dep.remove(pkgname);
858 }
859 }
860 self.done.insert(pkgname.clone());
865
866 let scanpkg = self.scanpkgs.get(pkgname);
868 let log_dir = Some(self.logdir.join(pkgname.pkgname()));
869 self.results.push(BuildResult {
870 pkgname: pkgname.clone(),
871 pkgpath: scanpkg.and_then(|s| s.pkg_location.clone()),
872 outcome,
873 duration,
874 log_dir,
875 });
876 }
877
878 fn mark_failure(&mut self, pkgname: &PkgName, duration: Duration) {
882 let mut broken: HashSet<PkgName> = HashSet::new();
883 let mut to_check: Vec<PkgName> = vec![];
884 to_check.push(pkgname.clone());
885 loop {
890 let Some(badpkg) = to_check.pop() else {
892 break;
893 };
894 if broken.contains(&badpkg) {
896 continue;
897 }
898 for (pkg, deps) in &self.incoming {
899 if deps.contains(&badpkg) {
900 to_check.push(pkg.clone());
901 }
902 }
903 broken.insert(badpkg);
904 }
905 let is_original = |p: &PkgName| p == pkgname;
912 for pkg in broken {
913 self.incoming.remove(&pkg);
914 self.failed.insert(pkg.clone());
915
916 let scanpkg = self.scanpkgs.get(&pkg);
918 let log_dir = Some(self.logdir.join(pkg.pkgname()));
919 let (outcome, dur) = if is_original(&pkg) {
920 (BuildOutcome::Failed("Build failed".to_string()), duration)
921 } else {
922 (
923 BuildOutcome::Skipped(format!(
924 "Dependency {} failed",
925 pkgname.pkgname()
926 )),
927 Duration::ZERO,
928 )
929 };
930 self.results.push(BuildResult {
931 pkgname: pkg,
932 pkgpath: scanpkg.and_then(|s| s.pkg_location.clone()),
933 outcome,
934 duration: dur,
935 log_dir,
936 });
937 }
938 }
939
940 fn get_next_build(&self) -> BuildStatus {
944 if self.incoming.is_empty() {
948 return BuildStatus::Done;
949 }
950
951 let mut pkgs: Vec<(PkgName, usize)> = self
958 .incoming
959 .iter()
960 .filter(|(_, v)| v.is_empty())
961 .map(|(k, _)| {
962 (
963 k.clone(),
964 self.scanpkgs
965 .get(k)
966 .unwrap()
967 .pbulk_weight
968 .clone()
969 .unwrap_or("100".to_string())
970 .parse()
971 .unwrap_or(100),
972 )
973 })
974 .collect();
975
976 if pkgs.is_empty() {
982 return BuildStatus::NoneAvailable;
983 }
984
985 pkgs.sort_by_key(|&(_, weight)| std::cmp::Reverse(weight));
989 BuildStatus::Available(pkgs[0].0.clone())
990 }
991}
992
993impl Build {
994 pub fn new(
995 config: &Config,
996 scanpkgs: HashMap<PkgName, ResolvedIndex>,
997 ) -> Build {
998 let sandbox = Sandbox::new(config);
999 info!(
1000 package_count = scanpkgs.len(),
1001 sandbox_enabled = sandbox.enabled(),
1002 build_threads = config.build_threads(),
1003 "Creating new Build instance"
1004 );
1005 for (pkgname, index) in &scanpkgs {
1006 debug!(pkgname = %pkgname.pkgname(),
1007 pkgpath = ?index.pkg_location,
1008 depends_count = index.depends.len(),
1009 depends = ?index.depends.iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
1010 "Package in build queue"
1011 );
1012 }
1013 Build { config: config.clone(), sandbox, scanpkgs }
1014 }
1015
1016 pub fn start(&mut self, ctx: &RunContext) -> anyhow::Result<BuildSummary> {
1017 let started = Instant::now();
1018
1019 info!(package_count = self.scanpkgs.len(), "Build::start() called");
1020
1021 let shutdown_flag = Arc::clone(&ctx.shutdown);
1022 let stats = ctx.stats.clone();
1023
1024 debug!("Populating BuildJobs from scanpkgs");
1028 let mut incoming: HashMap<PkgName, HashSet<PkgName>> = HashMap::new();
1029 for (pkgname, index) in &self.scanpkgs {
1030 let mut deps: HashSet<PkgName> = HashSet::new();
1031 for dep in &index.depends {
1032 deps.insert(dep.clone());
1033 }
1034 trace!(pkgname = %pkgname.pkgname(),
1035 deps_count = deps.len(),
1036 deps = ?deps.iter().map(|d| d.pkgname()).collect::<Vec<_>>(),
1037 "Adding package to incoming build queue"
1038 );
1039 incoming.insert(pkgname.clone(), deps);
1040 }
1041
1042 info!(
1043 incoming_count = incoming.len(),
1044 scanpkgs_count = self.scanpkgs.len(),
1045 "BuildJobs populated"
1046 );
1047
1048 let running: HashSet<PkgName> = HashSet::new();
1049 let done: HashSet<PkgName> = HashSet::new();
1050 let failed: HashSet<PkgName> = HashSet::new();
1051 let results: Vec<BuildResult> = Vec::new();
1052 let logdir = self.config.logdir().clone();
1053 let jobs = BuildJobs {
1054 scanpkgs: self.scanpkgs.clone(),
1055 incoming,
1056 running,
1057 done,
1058 failed,
1059 results,
1060 logdir,
1061 };
1062
1063 if self.sandbox.enabled() {
1065 println!("Creating sandboxes...");
1066 for i in 0..self.config.build_threads() {
1067 if let Err(e) = self.sandbox.create(i) {
1068 for j in (0..=i).rev() {
1070 if let Err(destroy_err) = self.sandbox.destroy(j) {
1071 eprintln!(
1072 "Warning: failed to destroy sandbox {}: {}",
1073 j, destroy_err
1074 );
1075 }
1076 }
1077 return Err(e);
1078 }
1079 }
1080 }
1081
1082 println!("Building packages...");
1083
1084 let progress = Arc::new(Mutex::new(
1086 MultiProgress::new(
1087 "Building",
1088 "Built",
1089 self.scanpkgs.len(),
1090 self.config.build_threads(),
1091 true,
1092 )
1093 .expect("Failed to initialize progress display"),
1094 ));
1095
1096 let stop_refresh = Arc::new(AtomicBool::new(false));
1098
1099 let progress_refresh = Arc::clone(&progress);
1101 let stop_flag = Arc::clone(&stop_refresh);
1102 let shutdown_for_refresh = Arc::clone(&shutdown_flag);
1103 let refresh_thread = std::thread::spawn(move || {
1104 while !stop_flag.load(Ordering::Relaxed)
1105 && !shutdown_for_refresh.load(Ordering::SeqCst)
1106 {
1107 if let Ok(mut p) = progress_refresh.lock() {
1108 let _ = p.poll_events();
1110 let _ = p.render_throttled();
1111 }
1112 std::thread::sleep(Duration::from_millis(50));
1113 }
1114 });
1115
1116 let (manager_tx, manager_rx) = mpsc::channel::<ChannelCommand>();
1121
1122 let mut threads = vec![];
1128 let mut clients: HashMap<usize, Sender<ChannelCommand>> =
1129 HashMap::new();
1130 for i in 0..self.config.build_threads() {
1131 let (client_tx, client_rx) = mpsc::channel::<ChannelCommand>();
1132 clients.insert(i, client_tx);
1133 let manager_tx = manager_tx.clone();
1134 let thread = std::thread::spawn(move || {
1135 loop {
1136 if manager_tx.send(ChannelCommand::ClientReady(i)).is_err()
1138 {
1139 break;
1140 }
1141
1142 let Ok(msg) = client_rx.recv() else {
1143 break;
1144 };
1145
1146 match msg {
1147 ChannelCommand::ComeBackLater => {
1148 std::thread::sleep(Duration::from_millis(100));
1149 continue;
1150 }
1151 ChannelCommand::JobData(pkg) => {
1152 let pkgname = pkg.pkginfo.pkgname.clone();
1153 let build_start = Instant::now();
1154 match pkg.build(&manager_tx) {
1155 Ok(PackageBuildResult::Success) => {
1156 let duration = build_start.elapsed();
1157 let _ = manager_tx.send(
1158 ChannelCommand::JobSuccess(
1159 pkgname, duration,
1160 ),
1161 );
1162 }
1163 Ok(PackageBuildResult::Skipped) => {
1164 let _ = manager_tx.send(
1165 ChannelCommand::JobSkipped(pkgname),
1166 );
1167 }
1168 Ok(PackageBuildResult::Failed) => {
1169 let duration = build_start.elapsed();
1170 let _ = manager_tx.send(
1171 ChannelCommand::JobFailed(
1172 pkgname, duration,
1173 ),
1174 );
1175 }
1176 Err(e) => {
1177 let duration = build_start.elapsed();
1178 let _ = manager_tx.send(
1179 ChannelCommand::JobError((
1180 pkgname, duration, e,
1181 )),
1182 );
1183 }
1184 }
1185 continue;
1186 }
1187 ChannelCommand::Quit | ChannelCommand::Shutdown => {
1188 break;
1189 }
1190 _ => todo!(),
1191 }
1192 }
1193 });
1194 threads.push(thread);
1195 }
1196
1197 let config = self.config.clone();
1202 let sandbox = self.sandbox.clone();
1203 let progress_clone = Arc::clone(&progress);
1204 let shutdown_for_manager = Arc::clone(&shutdown_flag);
1205 let stats_for_manager = stats.clone();
1206 let (results_tx, results_rx) = mpsc::channel::<Vec<BuildResult>>();
1207 let (interrupted_tx, interrupted_rx) = mpsc::channel::<bool>();
1208 let manager = std::thread::spawn(move || {
1209 let mut clients = clients.clone();
1210 let config = config.clone();
1211 let sandbox = sandbox.clone();
1212 let mut jobs = jobs.clone();
1213 let mut was_interrupted = false;
1214 let stats = stats_for_manager;
1215
1216 let mut thread_packages: HashMap<usize, PkgName> = HashMap::new();
1218
1219 loop {
1220 if shutdown_for_manager.load(Ordering::SeqCst) {
1222 if let Ok(mut p) = progress_clone.lock() {
1224 p.state_mut().suppress();
1225 }
1226 for (_, client) in clients.drain() {
1228 let _ = client.send(ChannelCommand::Shutdown);
1229 }
1230 was_interrupted = true;
1231 break;
1232 }
1233
1234 let command =
1236 match manager_rx.recv_timeout(Duration::from_millis(50)) {
1237 Ok(cmd) => cmd,
1238 Err(mpsc::RecvTimeoutError::Timeout) => continue,
1239 Err(mpsc::RecvTimeoutError::Disconnected) => break,
1240 };
1241
1242 match command {
1243 ChannelCommand::ClientReady(c) => {
1244 let client = clients.get(&c).unwrap();
1245 match jobs.get_next_build() {
1246 BuildStatus::Available(pkg) => {
1247 let pkginfo = jobs.scanpkgs.get(&pkg).unwrap();
1248 jobs.incoming.remove(&pkg);
1249 jobs.running.insert(pkg.clone());
1250
1251 thread_packages.insert(c, pkg.clone());
1253 if let Ok(mut p) = progress_clone.lock() {
1254 p.clear_output_buffer(c);
1255 p.state_mut()
1256 .set_worker_active(c, pkg.pkgname());
1257 let _ = p.render_throttled();
1258 }
1259
1260 let _ = client.send(ChannelCommand::JobData(
1261 Box::new(PackageBuild {
1262 id: c,
1263 config: config.clone(),
1264 pkginfo: pkginfo.clone(),
1265 sandbox: sandbox.clone(),
1266 }),
1267 ));
1268 }
1269 BuildStatus::NoneAvailable => {
1270 if let Ok(mut p) = progress_clone.lock() {
1271 p.clear_output_buffer(c);
1272 p.state_mut().set_worker_idle(c);
1273 let _ = p.render_throttled();
1274 }
1275 let _ =
1276 client.send(ChannelCommand::ComeBackLater);
1277 }
1278 BuildStatus::Done => {
1279 if let Ok(mut p) = progress_clone.lock() {
1280 p.clear_output_buffer(c);
1281 p.state_mut().set_worker_idle(c);
1282 let _ = p.render_throttled();
1283 }
1284 let _ = client.send(ChannelCommand::Quit);
1285 clients.remove(&c);
1286 if clients.is_empty() {
1287 break;
1288 }
1289 }
1290 };
1291 }
1292 ChannelCommand::JobSuccess(pkgname, duration) => {
1293 if shutdown_for_manager.load(Ordering::SeqCst) {
1295 continue;
1296 }
1297
1298 if let Some(ref s) = stats {
1300 let pkgpath = jobs
1301 .scanpkgs
1302 .get(&pkgname)
1303 .and_then(|idx| idx.pkg_location.as_ref())
1304 .map(|p| {
1305 p.as_path().to_string_lossy().to_string()
1306 });
1307 s.build(
1308 pkgname.pkgname(),
1309 pkgpath.as_deref(),
1310 duration,
1311 "success",
1312 );
1313 }
1314
1315 jobs.mark_success(&pkgname, duration);
1316 jobs.running.remove(&pkgname);
1317
1318 if let Ok(mut p) = progress_clone.lock() {
1320 let _ = p.print_status(&format!(
1321 " Built {} ({})",
1322 pkgname.pkgname(),
1323 format_duration(duration)
1324 ));
1325 p.state_mut().increment_completed();
1326 for (tid, pkg) in &thread_packages {
1327 if pkg == &pkgname {
1328 p.clear_output_buffer(*tid);
1329 p.state_mut().set_worker_idle(*tid);
1330 break;
1331 }
1332 }
1333 let _ = p.render_throttled();
1334 }
1335 }
1336 ChannelCommand::JobSkipped(pkgname) => {
1337 if shutdown_for_manager.load(Ordering::SeqCst) {
1339 continue;
1340 }
1341
1342 if let Some(ref s) = stats {
1344 let pkgpath = jobs
1345 .scanpkgs
1346 .get(&pkgname)
1347 .and_then(|idx| idx.pkg_location.as_ref())
1348 .map(|p| {
1349 p.as_path().to_string_lossy().to_string()
1350 });
1351 s.build(
1352 pkgname.pkgname(),
1353 pkgpath.as_deref(),
1354 Duration::ZERO,
1355 "skipped",
1356 );
1357 }
1358
1359 jobs.mark_skipped(&pkgname);
1360 jobs.running.remove(&pkgname);
1361
1362 if let Ok(mut p) = progress_clone.lock() {
1364 let _ = p.print_status(&format!(
1365 " Skipped {} (up-to-date)",
1366 pkgname.pkgname()
1367 ));
1368 p.state_mut().increment_skipped();
1369 for (tid, pkg) in &thread_packages {
1370 if pkg == &pkgname {
1371 p.clear_output_buffer(*tid);
1372 p.state_mut().set_worker_idle(*tid);
1373 break;
1374 }
1375 }
1376 let _ = p.render_throttled();
1377 }
1378 }
1379 ChannelCommand::JobFailed(pkgname, duration) => {
1380 if shutdown_for_manager.load(Ordering::SeqCst) {
1382 continue;
1383 }
1384
1385 if let Some(ref s) = stats {
1387 let pkgpath = jobs
1388 .scanpkgs
1389 .get(&pkgname)
1390 .and_then(|idx| idx.pkg_location.as_ref())
1391 .map(|p| {
1392 p.as_path().to_string_lossy().to_string()
1393 });
1394 s.build(
1395 pkgname.pkgname(),
1396 pkgpath.as_deref(),
1397 duration,
1398 "failed",
1399 );
1400 }
1401
1402 jobs.mark_failure(&pkgname, duration);
1403 jobs.running.remove(&pkgname);
1404
1405 if let Ok(mut p) = progress_clone.lock() {
1407 let _ = p.print_status(&format!(
1408 " Failed {} ({})",
1409 pkgname.pkgname(),
1410 format_duration(duration)
1411 ));
1412 p.state_mut().increment_failed();
1413 for (tid, pkg) in &thread_packages {
1414 if pkg == &pkgname {
1415 p.clear_output_buffer(*tid);
1416 p.state_mut().set_worker_idle(*tid);
1417 break;
1418 }
1419 }
1420 let _ = p.render_throttled();
1421 }
1422 }
1423 ChannelCommand::JobError((pkgname, duration, e)) => {
1424 if shutdown_for_manager.load(Ordering::SeqCst) {
1426 continue;
1427 }
1428
1429 if let Some(ref s) = stats {
1431 let pkgpath = jobs
1432 .scanpkgs
1433 .get(&pkgname)
1434 .and_then(|idx| idx.pkg_location.as_ref())
1435 .map(|p| {
1436 p.as_path().to_string_lossy().to_string()
1437 });
1438 s.build(
1439 pkgname.pkgname(),
1440 pkgpath.as_deref(),
1441 duration,
1442 "error",
1443 );
1444 }
1445
1446 jobs.mark_failure(&pkgname, duration);
1447 jobs.running.remove(&pkgname);
1448
1449 if let Ok(mut p) = progress_clone.lock() {
1451 let _ = p.print_status(&format!(
1452 " Failed {} ({})",
1453 pkgname.pkgname(),
1454 format_duration(duration)
1455 ));
1456 p.state_mut().increment_failed();
1457 for (tid, pkg) in &thread_packages {
1458 if pkg == &pkgname {
1459 p.clear_output_buffer(*tid);
1460 p.state_mut().set_worker_idle(*tid);
1461 break;
1462 }
1463 }
1464 let _ = p.render_throttled();
1465 }
1466 tracing::error!(error = %e, pkgname = %pkgname.pkgname(), "Build error");
1467 }
1468 ChannelCommand::StageUpdate(tid, stage) => {
1469 if let Ok(mut p) = progress_clone.lock() {
1470 p.state_mut()
1471 .set_worker_stage(tid, stage.as_deref());
1472 let _ = p.render_throttled();
1473 }
1474 }
1475 ChannelCommand::OutputLines(tid, lines) => {
1476 if let Ok(mut p) = progress_clone.lock() {
1477 if let Some(buf) = p.output_buffer_mut(tid) {
1478 for line in lines {
1479 buf.push(line);
1480 }
1481 }
1482 }
1483 }
1484 _ => {}
1485 }
1486 }
1487
1488 let _ = results_tx.send(jobs.results);
1490 let _ = interrupted_tx.send(was_interrupted);
1491 });
1492
1493 threads.push(manager);
1494 for thread in threads {
1495 thread.join().expect("thread panicked");
1496 }
1497
1498 stop_refresh.store(true, Ordering::Relaxed);
1500 let _ = refresh_thread.join();
1501
1502 let was_interrupted = interrupted_rx.recv().unwrap_or(false);
1504
1505 if let Ok(mut p) = progress.lock() {
1507 if was_interrupted {
1508 let _ = p.finish_interrupted();
1509 } else {
1510 let _ = p.finish();
1511 }
1512 }
1513
1514 let results = results_rx.recv().unwrap_or_default();
1516 let summary = BuildSummary {
1517 duration: started.elapsed(),
1518 results,
1519 scan_failed: Vec::new(),
1520 };
1521
1522 if self.sandbox.enabled() {
1523 self.sandbox.destroy_all(self.config.build_threads())?;
1524 }
1525
1526 Ok(summary)
1527 }
1528}