use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::{mem, thread};
use crossbeam_channel::{Receiver, RecvError, Sender, bounded};
use crossbeam_utils::sync::WaitGroup;
use itertools::Itertools;
use pkgcraft::repo::PkgRepository;
use pkgcraft::restrict::{Restriction, Scope};
use crate::check::CheckRunner;
use crate::report::Report;
use crate::runner::{SyncCheckRunner, Target};
use crate::scan::ScannerRun;
use crate::source::SourceKind;
#[derive(Debug)]
enum Item {
Report(Report),
Process(Target, usize),
Finish(thread::ThreadId),
}
#[derive(Debug)]
pub(crate) struct ReportSender(Sender<Item>);
impl ReportSender {
pub(crate) fn report(&self, report: Report) {
self.0.send(Item::Report(report)).ok();
}
fn process(&self, target: Target, id: usize) {
self.0.send(Item::Process(target, id)).ok();
}
fn finish(&self) {
self.0.send(Item::Finish(thread::current().id())).ok();
}
}
fn pkg_producer(
run: Arc<ScannerRun>,
wg: WaitGroup,
tx: Sender<(Option<CheckRunner>, Target, usize)>,
finish_tx: Sender<(CheckRunner, Option<Target>)>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
for runner in run
.runners
.iter()
.filter(|r| r.check.sources.contains(&SourceKind::Repo))
{
tx.send((Some(runner.clone()), Target::Repo, 0)).ok();
}
let category_targets: Vec<_> = run
.repo
.categories()
.into_iter()
.filter(|x| run.restrict.matches(x))
.map(Target::Category)
.collect();
let category_runners: Vec<_> = run
.runners
.iter()
.filter(|r| r.check.sources.contains(&SourceKind::Category))
.cloned()
.collect();
for target in &category_targets {
for runner in &category_runners {
tx.send((Some(runner.clone()), target.clone(), 0)).ok();
}
}
if run.runners.iter().any(|r| r.check.scope <= Scope::Package) {
for (id, cpn) in run.repo.iter_cpn_restrict(&run.restrict).enumerate() {
tx.send((None, cpn.into(), id)).ok();
}
}
drop(tx);
wg.wait();
for target in &category_targets {
for runner in category_runners.iter().filter(|r| r.check.finish_target()) {
finish_tx.send((runner.clone(), Some(target.clone()))).ok();
}
}
for runner in run
.runners
.iter()
.filter(|r| r.check.finish_check(run.scope))
{
finish_tx.send((runner.clone(), None)).ok();
}
run.sender().finish();
})
}
fn pkg_worker(
run: Arc<ScannerRun>,
runner: Arc<SyncCheckRunner>,
wg: WaitGroup,
rx: Receiver<(Option<CheckRunner>, Target, usize)>,
finish_rx: Receiver<(CheckRunner, Option<Target>)>,
) -> thread::JoinHandle<()> {
#[cfg(test)]
let thread_span = tracing::debug_span!("thread").or_current();
thread::spawn(move || {
#[cfg(test)]
let _entered = thread_span.clone().entered();
for (check, target, id) in rx {
if let Some(check) = check {
runner.run(&check, &target, &run);
} else {
runner.run_checks(&target, &run);
run.sender().process(target, id);
}
}
drop(wg);
for (check, target) in finish_rx {
if let Some(target) = target {
runner.finish_target(&check, &target, &run);
} else {
runner.finish_check(&check, &run);
}
}
run.sender().finish();
})
}
fn version_producer(
run: Arc<ScannerRun>,
wg: WaitGroup,
tx: Sender<(CheckRunner, Target)>,
finish_tx: Sender<(CheckRunner, Target)>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
let cpvs: Vec<_> = run.repo.iter_cpv_restrict(&run.restrict).collect();
let cpns: Vec<_> = run.repo.iter_cpn_restrict(&run.restrict).collect();
for cpv in &cpvs {
for runner in run
.runners
.iter()
.filter(|r| r.check.scope == Scope::Version)
{
tx.send((runner.clone(), cpv.clone().into())).ok();
}
}
for cpn in &cpns {
for runner in run
.runners
.iter()
.filter(|r| r.check.scope == Scope::Package)
{
tx.send((runner.clone(), cpn.clone().into())).ok();
}
}
drop(tx);
wg.wait();
for cpv in &cpvs {
for runner in run.runners.iter().filter(|r| r.check.finish_target()) {
finish_tx.send((runner.clone(), cpv.clone().into())).ok();
}
}
for cpn in &cpns {
for runner in run.runners.iter().filter(|r| r.check.finish_target()) {
finish_tx.send((runner.clone(), cpn.clone().into())).ok();
}
}
run.sender().finish();
})
}
fn version_worker(
run: Arc<ScannerRun>,
runner: Arc<SyncCheckRunner>,
wg: WaitGroup,
rx: Receiver<(CheckRunner, Target)>,
finish_rx: Receiver<(CheckRunner, Target)>,
) -> thread::JoinHandle<()> {
#[cfg(test)]
let thread_span = tracing::debug_span!("thread").or_current();
thread::spawn(move || {
#[cfg(test)]
let _entered = thread_span.clone().entered();
for (check, target) in rx {
runner.run(&check, &target, &run);
}
drop(wg);
for (check, target) in finish_rx {
runner.finish_target(&check, &target, &run);
}
run.sender().finish();
})
}
#[derive(Debug)]
pub struct ReportIter {
rx: Receiver<Item>,
threads: HashMap<thread::ThreadId, thread::JoinHandle<()>>,
id: usize,
sort: bool,
target_cache: HashMap<Target, Vec<Report>>,
id_cache: HashMap<usize, Vec<Report>>,
reports: VecDeque<Report>,
}
impl ReportIter {
pub(crate) fn new(run: ScannerRun) -> Self {
let (reports_tx, reports_rx) = bounded(run.jobs);
run.sender
.set(ReportSender(reports_tx))
.expect("failed setting sender");
let wg = WaitGroup::new();
let runner = Arc::new(SyncCheckRunner::new(&run));
let run = Arc::new(run);
let mut threads = vec![];
if run.scope >= Scope::Category {
let (targets_tx, targets_rx) = bounded(run.jobs);
let (finish_tx, finish_rx) = bounded(run.jobs);
threads.extend((0..run.jobs).map(|_| {
pkg_worker(
run.clone(),
runner.clone(),
wg.clone(),
targets_rx.clone(),
finish_rx.clone(),
)
}));
threads.push(pkg_producer(run.clone(), wg, targets_tx, finish_tx));
} else {
let (targets_tx, targets_rx) = bounded(run.jobs);
let (finish_tx, finish_rx) = bounded(run.jobs);
threads.extend((0..run.jobs).map(|_| {
version_worker(
run.clone(),
runner.clone(),
wg.clone(),
targets_rx.clone(),
finish_rx.clone(),
)
}));
threads.push(version_producer(run.clone(), wg, targets_tx, finish_tx));
}
Self {
rx: reports_rx,
threads: threads.into_iter().map(|x| (x.thread().id(), x)).collect(),
id: Default::default(),
sort: run.sort,
target_cache: Default::default(),
id_cache: Default::default(),
reports: Default::default(),
}
}
fn receive(&mut self) -> Result<(), RecvError> {
self.rx.recv().map(|value| match value {
Item::Report(report) => {
self.target_cache
.entry(report.scope().into())
.or_default()
.push(report);
}
Item::Process(target, id) => {
let mut reports = self.target_cache.remove(&target).unwrap_or_default();
reports.sort();
if self.sort {
self.id_cache.insert(id, reports);
} else if !reports.is_empty() {
self.reports.extend(reports);
}
}
Item::Finish(id) => {
let thread = self
.threads
.remove(&id)
.unwrap_or_else(|| panic!("unknown thread: {id:?}"));
thread.join().unwrap();
if self.threads.is_empty() {
self.reports
.extend(self.target_cache.values_mut().flat_map(mem::take).sorted());
}
}
})
}
}
impl Iterator for ReportIter {
type Item = Report;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.sort
&& let Some(reports) = self.id_cache.remove(&self.id)
{
self.id += 1;
if reports.is_empty() {
continue;
}
self.reports.extend(reports);
}
if let Some(report) = self.reports.pop_front() {
return Some(report);
} else if self.receive().is_err() {
return None;
}
}
}
}