use parking_lot::{Condvar, Mutex};
use std::{collections::VecDeque, sync::Arc, thread::sleep, time::SystemTime};
pub(crate) struct JobBroker<Job> {
has_new_jobs: Arc<Condvar>,
market: Arc<Mutex<JobMarket<Job>>>,
}
impl<Job> Clone for JobBroker<Job> {
fn clone(&self) -> Self {
Self {
has_new_jobs: Arc::clone(&self.has_new_jobs),
market: Arc::clone(&self.market),
}
}
}
impl<Job> Drop for JobBroker<Job> {
fn drop(&mut self) {
let mut market = self.market.lock();
log::trace!(
"{}: Dropped, closing the market.",
std::thread::current().name().unwrap_or_default()
);
market.open = false;
market.job_batches.clear();
market.open_count = market.open_count.saturating_sub(1);
self.has_new_jobs.notify_all();
}
}
struct JobMarket<Job> {
open: bool,
thread_count: usize,
open_count: usize,
job_batches: Vec<VecDeque<Job>>,
}
impl<Job> JobBroker<Job>
where
Job: Send + 'static,
{
pub fn new(thread_count: usize, close_at: Option<SystemTime>) -> Self {
let s = Self {
has_new_jobs: Arc::new(Condvar::new()),
market: Arc::new(Mutex::new(JobMarket {
open: true,
thread_count,
open_count: thread_count,
job_batches: Vec::new(),
})),
};
if let Some(closing_time) = close_at {
let s1 = s.clone();
std::thread::Builder::new()
.name("timeout".to_owned())
.spawn(move || {
if let Ok(time_to_sleep) = closing_time.duration_since(SystemTime::now()) {
sleep(time_to_sleep);
}
let mut market = s1.market.lock();
log::debug!("Reached timeout, triggering shutdown");
market.open = false;
})
.unwrap();
}
s
}
}
impl<Job> JobBroker<Job> {
pub fn pop(&mut self) -> VecDeque<Job> {
let mut market = self.market.lock();
if !market.open {
return VecDeque::new();
}
loop {
if let Some(jobs) = market.job_batches.pop() {
log::trace!(
"{}: Got jobs. Working.",
std::thread::current().name().unwrap_or_default()
);
return jobs;
} else {
market.open_count = market.open_count.saturating_sub(1);
if market.open_count == 0 {
log::trace!(
"{}: No jobs. Last running thread.",
std::thread::current().name().unwrap_or_default()
);
self.has_new_jobs.notify_all();
market.open = false;
return VecDeque::new();
}
log::trace!(
"{}: No jobs. Awaiting. running={}",
std::thread::current().name().unwrap_or_default(),
market.open_count
);
self.has_new_jobs.wait(&mut market);
market.open_count += 1;
}
}
}
pub fn push(&mut self, jobs: VecDeque<Job>) {
let mut market = self.market.lock();
if !market.open {
return;
}
market.job_batches.push(jobs);
log::trace!(
"{}: Pushing jobs. running={}",
std::thread::current().name().unwrap_or_default(),
market.open_count
);
self.has_new_jobs.notify_one();
}
pub fn split_and_push(&mut self, jobs: &mut VecDeque<Job>) {
let mut market = self.market.lock();
if !market.open {
jobs.clear();
return;
}
let pieces = 1 + std::cmp::min(
market.thread_count.saturating_sub(market.open_count),
jobs.len(),
);
let size = jobs.len() / pieces;
log::trace!(
"{}: Sharing work. pieces={} size={} running={}",
std::thread::current().name().unwrap_or_default(),
pieces,
size,
market.open_count
);
for _ in 1..pieces {
let to_share = jobs.split_off(jobs.len() - size);
if to_share.is_empty() {
continue;
}
market.job_batches.push(to_share);
self.has_new_jobs.notify_one();
}
}
pub fn is_closed(&self) -> bool {
let market = self.market.lock();
!market.open && market.job_batches.is_empty() && market.open_count == 0
}
}