use parking_lot::{Mutex, MutexGuard};
use std::{
fmt::Debug,
iter,
sync::{Arc, Barrier},
thread::{self, JoinHandle},
};
use crossbeam_channel::SendError;
use crate::{
source::{
util::{may_be_taken::SkipIterator, PriorityQueue},
RecurringJob, SourceManager,
},
Job, Prioritised,
};
pub(crate) type ConcurrencyLimitFn<J> =
dyn Fn(<J as Prioritised>::Priority) -> Option<u8> + Send + Sync;
pub(crate) fn spawn<J, R: RecurringJob<J> + Send + 'static>(
thread_num: usize,
jobs: Arc<Mutex<SourceManager<J, R>>>,
concurrency_limit: Box<ConcurrencyLimitFn<J>>,
) -> Vec<JoinHandle<()>>
where
J: Job + 'static,
<J as Prioritised>::Priority: Send,
{
let queue = jobs.lock().queue();
let barrier = Arc::new(Barrier::new(thread_num));
RunnerState::new(thread_num, concurrency_limit)
.map(move |(recv, state)| {
let jobs = jobs.clone();
let queue = queue.clone();
let barrier = barrier.clone();
thread::Builder::new()
.name(format!("gaffer#{}", state.worker_index))
.spawn(move || {
Runner::new(state, jobs, queue).run(barrier, recv);
})
.unwrap()
})
.collect()
}
struct Runner<J: Job + 'static, R: RecurringJob<J> + Send + 'static> {
state: RunnerState<J>,
jobs: Arc<Mutex<SourceManager<J, R>>>,
queue: Arc<Mutex<PriorityQueue<J>>>,
}
impl<J, R> Runner<J, R>
where
J: Job + 'static,
R: RecurringJob<J> + Send,
{
fn new(
state: RunnerState<J>,
jobs: Arc<Mutex<SourceManager<J, R>>>,
queue: Arc<Mutex<PriorityQueue<J>>>,
) -> Self {
Self { state, jobs, queue }
}
fn run(self, ready_barrier: Arc<Barrier>, recv: crossbeam_channel::Receiver<J>) -> ! {
let job = if ready_barrier.wait().is_leader() {
self.state.become_supervisor();
self.run_supervisor()
} else {
recv.recv()
.expect("Available worker is not connected to shared runner state")
};
drop(recv);
self.run_worker(job);
}
fn run_worker(self, mut job: J) -> ! {
loop {
job.execute(); job = self.next_job();
}
}
fn next_job(&self) -> J {
let transition = self.state.completed_job(self.queue.lock().drain());
match transition {
PostJobTransition::BecomeAvailable(recv) => recv
.recv()
.expect("Available worker is not connected to shared runner state"),
PostJobTransition::BecomeSupervisor => self.run_supervisor(),
PostJobTransition::KeepWorking(job) => job,
}
}
fn run_supervisor(&self) -> J {
let mut wait_for_new = false;
let mut jobs = self.jobs.lock();
loop {
if let Some(job) = self.state.assign_jobs(jobs.get(wait_for_new)) {
return job;
}
wait_for_new = true;
}
}
fn panic_recover(self) -> ! {
let job = self.next_job();
self.run_worker(job);
}
}
impl<J: Job + 'static, R: RecurringJob<J> + Send + 'static> Drop for Runner<J, R> {
fn drop(&mut self) {
if thread::panicking() {
let Runner {
state:
RunnerState {
workers,
worker_index,
concurrency_limit,
},
jobs,
queue,
} = self;
let state = RunnerState {
workers: workers.clone(),
worker_index: *worker_index,
concurrency_limit: concurrency_limit.clone(),
};
let runner = Runner::new(state, jobs.clone(), queue.clone());
thread::Builder::new()
.name(format!("gaffer#{}", worker_index))
.spawn(move || {
runner.panic_recover();
})
.unwrap();
}
}
}
struct RunnerState<J: Job> {
workers: Arc<Mutex<Vec<WorkerState<J>>>>,
worker_index: usize,
concurrency_limit: Arc<ConcurrencyLimitFn<J>>,
}
impl<J: Job> RunnerState<J> {
pub fn new(
num: usize,
concurrency_limit: impl Into<Arc<ConcurrencyLimitFn<J>>>,
) -> impl Iterator<Item = (crossbeam_channel::Receiver<J>, Self)> {
let (receivers, worker_state): (Vec<_>, _) =
iter::repeat_with(WorkerState::available).take(num).unzip();
let worker_state = Arc::new(Mutex::new(worker_state));
let concurrency_limit = concurrency_limit.into();
receivers.into_iter().enumerate().map(move |(idx, recv)| {
(
recv,
Self {
workers: worker_state.clone(),
worker_index: idx,
concurrency_limit: concurrency_limit.clone(),
},
)
})
}
fn become_supervisor(&self) {
let mut workers = self.workers();
assert!(!workers.iter().any(|worker| worker.is_supervisor()));
workers[self.worker_index] = WorkerState::Supervisor;
}
fn completed_job(&self, mut jobs: impl SkipIterator<Item = J>) -> PostJobTransition<J> {
let mut workers = self.workers();
assert!(workers[self.worker_index].is_working());
log::debug!(
"{}: Job completed by worker",
std::thread::current().name().unwrap_or_default()
);
let working_count = workers.iter().filter(|state| state.is_working()).count() - 1; while let Some(job) = jobs.maybe_next() {
if let Some(max_concurrency) = (self.concurrency_limit)(job.priority()) {
if working_count as u8 >= max_concurrency {
log::trace!(
"{}: > Can't continue onto this job as {} working and {} max concurrency",
std::thread::current().name().unwrap_or_default(),
working_count,
max_concurrency
);
continue;
}
}
if workers
.iter()
.any(|worker| worker.exclusion() == Some(job.exclusion()))
{
log::trace!(
"{}: > Can't continue onto this job as exclusion matches",
std::thread::current().name().unwrap_or_default()
);
continue;
}
return PostJobTransition::KeepWorking(job.into_inner());
}
if workers.iter().any(|worker| worker.is_supervisor()) {
let (send, recv) = crossbeam_channel::bounded(1);
workers[self.worker_index] = WorkerState::Available(send);
log::trace!(
"{}: > Supervisor found, becoming available",
std::thread::current().name().unwrap_or_default()
);
PostJobTransition::BecomeAvailable(recv)
} else {
log::trace!(
"{}: > No supervisor found, becoming supervisor",
std::thread::current().name().unwrap_or_default()
);
workers[self.worker_index] = WorkerState::Supervisor;
PostJobTransition::BecomeSupervisor
}
}
fn assign_jobs(&self, mut jobs: impl SkipIterator<Item = J>) -> Option<J> {
let mut workers = self.workers();
assert!(workers[self.worker_index].is_supervisor());
let mut exclusions: Vec<_> = workers.iter().flat_map(|state| state.exclusion()).collect();
let mut working_count = workers.iter().filter(|state| state.is_working()).count();
log::debug!(
"{}: Supervisor to assign jobs, {} currently working",
std::thread::current().name().unwrap_or_default(),
working_count
);
let mut workers_iter = workers.iter_mut();
while let Some(job) = jobs.maybe_next() {
if let Some(max_concurrency) = (self.concurrency_limit)(job.priority()) {
if working_count as u8 >= max_concurrency {
continue;
}
}
if exclusions.contains(&job.exclusion()) {
continue;
}
working_count += 1;
exclusions.push(job.exclusion());
let mut job = job.into_inner();
loop {
if let Some(worker) = workers_iter.next() {
if let WorkerState::Available(send) = worker {
let exclusion = job.exclusion();
if let Err(SendError(returned_job)) = send.send(job) {
job = returned_job; } else {
*worker = WorkerState::Working(exclusion);
break;
}
} else {
continue;
}
} else {
workers[self.worker_index] = WorkerState::Working(job.exclusion());
return Some(job);
}
}
}
None
}
fn workers(&self) -> MutexGuard<'_, Vec<WorkerState<J>>> {
self.workers.lock()
}
}
#[derive(Debug)]
enum PostJobTransition<J> {
BecomeSupervisor,
BecomeAvailable(crossbeam_channel::Receiver<J>),
KeepWorking(J),
}
#[derive(Debug)]
enum WorkerState<J: Job> {
Supervisor,
Working(J::Exclusion),
Available(crossbeam_channel::Sender<J>),
}
impl<J: Job> WorkerState<J> {
fn available() -> (crossbeam_channel::Receiver<J>, Self) {
let (send, recv) = crossbeam_channel::bounded(1);
(recv, Self::Available(send))
}
fn exclusion(&self) -> Option<J::Exclusion> {
if let Self::Working(exclusion) = self {
Some(*exclusion)
} else {
None
}
}
fn is_working(&self) -> bool {
matches!(self, Self::Working(_))
}
fn is_supervisor(&self) -> bool {
matches!(self, Self::Supervisor)
}
}
#[cfg(test)]
mod test {
use crate::{source::util::may_be_taken::VecSkipIter, Job, NoExclusion};
use super::*;
#[derive(Debug)]
struct ExcludedJob(u8);
impl Job for ExcludedJob {
type Exclusion = u8;
fn exclusion(&self) -> Self::Exclusion {
self.0
}
type Priority = ();
fn priority(&self) -> Self::Priority {}
fn execute(self) {}
}
struct PrioritisedJob(u8);
impl Job for PrioritisedJob {
type Exclusion = NoExclusion;
fn exclusion(&self) -> Self::Exclusion {
NoExclusion
}
type Priority = u8;
fn priority(&self) -> Self::Priority {
self.0
}
fn execute(self) {}
}
#[test]
fn working_to_available() {
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Working(1),
WorkerState::Supervisor,
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let job_recv = state.completed_job(PriorityQueue::new(None).drain());
assert!(matches!(job_recv, PostJobTransition::BecomeAvailable(_)));
let workers = state.workers.lock();
assert!(matches!(workers[0], WorkerState::Available(_)));
}
#[test]
fn working_to_supervisor() {
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Working(1),
WorkerState::Working(2),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let job_recv = state.completed_job(PriorityQueue::new(None).drain());
assert!(matches!(job_recv, PostJobTransition::BecomeSupervisor));
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
}
#[test]
fn working_to_working() {
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Working(1),
WorkerState::Working(2),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let mut queue = PriorityQueue::new(None);
queue.enqueue(ExcludedJob(3));
let job_recv = state.completed_job(queue.drain());
assert!(
matches!(job_recv, PostJobTransition::KeepWorking(ExcludedJob(3))),
"{:?}",
job_recv
);
let workers = state.workers.lock();
assert!(workers[0].is_working());
assert!(queue.is_empty());
}
#[test]
fn working_to_supervisor_excluded() {
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Working(1),
WorkerState::Working(2),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let mut queue = PriorityQueue::new(None);
queue.enqueue(ExcludedJob(1));
let job_recv = state.completed_job(queue.drain());
assert!(matches!(job_recv, PostJobTransition::BecomeSupervisor));
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(!queue.is_empty());
}
#[test]
fn working_to_supervisor_throttled() {
let state = RunnerState::<PrioritisedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Working(NoExclusion),
WorkerState::Working(NoExclusion),
])),
worker_index: 0,
concurrency_limit: Arc::new(|num| Some(num)),
};
let mut queue = PriorityQueue::new(None);
queue.enqueue(PrioritisedJob(1));
let job_recv = state.completed_job(queue.drain());
assert!(matches!(job_recv, PostJobTransition::BecomeSupervisor));
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(!queue.is_empty());
}
#[test]
fn available_to_working() {
let (send, recv) = crossbeam_channel::unbounded();
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Available(send),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let mut jobs = vec![ExcludedJob(1)];
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_none());
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(workers[1].is_working());
assert!(recv.try_recv().is_ok());
}
#[test]
fn supervisor_to_working() {
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(1),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
assert!(state
.assign_jobs(VecSkipIter::new(&mut vec![ExcludedJob(2)]))
.is_some());
let workers = state.workers.lock();
assert!(workers[0].is_working());
assert!(workers[1].is_working());
}
#[test]
fn equal_exclusion_running() {
let (send, recv) = crossbeam_channel::unbounded();
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(1),
WorkerState::Available(send),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let mut jobs = vec![ExcludedJob(1)];
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_none());
{
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(workers[1].is_working());
assert!(matches!(workers[2], WorkerState::Available(_)));
}
assert!(recv.try_recv().is_err());
assert_eq!(jobs.len(), 1);
}
#[test]
fn equal_exclusion_adding() {
let (send, recv) = crossbeam_channel::unbounded();
let state = RunnerState::<ExcludedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Available(send.clone()),
WorkerState::Available(send),
])),
worker_index: 0,
concurrency_limit: Arc::new(|()| None),
};
let mut jobs = vec![ExcludedJob(1), ExcludedJob(1)];
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_none());
{
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(workers[1].is_working());
assert!(matches!(workers[2], WorkerState::Available(_)));
}
assert!(recv.try_recv().is_ok());
assert!(recv.try_recv().is_err());
assert_eq!(jobs.len(), 1);
}
#[test]
fn parallelisation_1_running_1() {
let state = RunnerState::<PrioritisedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(NoExclusion),
])),
worker_index: 0,
concurrency_limit: Arc::new(|priority| Some(priority)),
};
let mut jobs = vec![PrioritisedJob(1)];
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_none());
{
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(workers[1].is_working());
}
assert_eq!(jobs.len(), 1);
}
#[test]
fn parallelisation_2_running_1() {
let state = RunnerState::<PrioritisedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(NoExclusion),
])),
worker_index: 0,
concurrency_limit: Arc::new(|priority| Some(priority)),
};
assert!(state
.assign_jobs(VecSkipIter::new(&mut vec![PrioritisedJob(2)]))
.is_some());
{
let workers = state.workers.lock();
assert!(workers[0].is_working());
assert!(workers[1].is_working());
}
}
#[test]
fn parallelisation_2x2_running_1() {
let (send, recv) = crossbeam_channel::unbounded();
let state = RunnerState::<PrioritisedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(NoExclusion),
WorkerState::Available(send),
])),
worker_index: 0,
concurrency_limit: Arc::new(|priority| Some(priority)),
};
let mut jobs = vec![PrioritisedJob(2), PrioritisedJob(2)];
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_none());
{
let workers = state.workers.lock();
assert!(workers[0].is_supervisor());
assert!(workers[1].is_working());
assert!(workers[2].is_working());
}
assert!(recv.try_recv().is_ok());
assert!(recv.try_recv().is_err());
assert_eq!(jobs.len(), 1);
}
#[test]
fn unassigned_jobs_not_consumed() {
let mut jobs = vec![PrioritisedJob(100), PrioritisedJob(100)];
let state = RunnerState::<PrioritisedJob> {
workers: Arc::new(Mutex::new(vec![
WorkerState::Supervisor,
WorkerState::Working(NoExclusion),
])),
worker_index: 0,
concurrency_limit: Arc::new(|priority| Some(priority)),
};
assert!(state.assign_jobs(VecSkipIter::new(&mut jobs)).is_some());
assert_eq!(jobs.len(), 1);
}
}