#![allow(
renamed_and_removed_lints,
clippy::new_without_default,
clippy::unneeded_field_pattern,
clippy::match_like_matches_macro,
clippy::manual_strip,
clippy::if_same_then_else,
clippy::unknown_clippy_lints,
clippy::len_without_is_empty,
clippy::should_implement_trait
)]
#![warn(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_qualifications,
clippy::pattern_type_mismatch
)]
pub mod arc;
pub mod util;
use self::arc::Linearc;
use crossbeam_deque::{Injector, Steal};
use std::{
borrow::Cow,
fmt, mem, ops,
sync::{
atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
Arc, Condvar, Mutex, RwLock,
},
thread, time,
};
const BITS_PER_BYTE: usize = 8;
const MAX_WORKERS: usize = mem::size_of::<usize>() * BITS_PER_BYTE;
pub type Name = Cow<'static, str>;
#[derive(Debug)]
struct WaitingThread {
condvar: *const Condvar,
}
unsafe impl Send for WaitingThread {}
unsafe impl Sync for WaitingThread {}
#[derive(Debug, Default)]
struct Continuation {
parents: Vec<Arc<Notifier>>,
forks: usize,
dependents: Vec<Linearc<Task>>,
waiting_threads: Vec<WaitingThread>,
}
impl Continuation {
fn unpark_waiting(&mut self) {
for wt in self.waiting_threads.drain(..) {
log::trace!("\tresolving a join");
unsafe {
(*wt.condvar).notify_all();
}
}
}
}
#[derive(Debug)]
pub struct Notifier {
name: Name,
continuation: Mutex<Option<Continuation>>,
}
impl fmt::Display for Notifier {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "'{}'", self.name)
}
}
pub struct ExecutionContext<'a> {
choir: &'a Arc<Choir>,
notifier: &'a Arc<Notifier>,
worker_index: isize,
}
impl<'a> ExecutionContext<'a> {
pub fn self_task(&self) -> RunningTask {
RunningTask {
choir: Arc::clone(self.choir),
notifier: Arc::clone(self.notifier),
}
}
pub fn choir(&self) -> &'a Arc<Choir> {
self.choir
}
pub fn fork<N: Into<Name>>(&self, name: N) -> ProtoTask {
let name = name.into();
log::trace!("Forking task {} as '{}'", self.notifier, name);
ProtoTask {
choir: self.choir,
name,
parent: Some(Arc::clone(self.notifier)),
dependents: Vec::new(),
}
}
pub fn add_fork<D: AsRef<Notifier>>(&self, other: &D) {
if let Some(ref mut continuation) = *other.as_ref().continuation.lock().unwrap() {
continuation.parents.push(Arc::clone(self.notifier));
self.notifier
.continuation
.lock()
.unwrap()
.as_mut()
.unwrap()
.forks += 1;
}
}
}
impl Drop for ExecutionContext<'_> {
fn drop(&mut self) {
if thread::panicking() {
self.choir.issue_panic(self.worker_index);
let mut guard = self.notifier.continuation.lock().unwrap();
if let Some(mut cont) = guard.take() {
cont.unpark_waiting();
}
}
}
}
pub type SubIndex = u32;
enum Functor {
Dummy,
Once(Box<dyn FnOnce(ExecutionContext) + Send + 'static>),
Multi(
ops::Range<SubIndex>,
Linearc<dyn Fn(ExecutionContext, SubIndex) + Send + Sync + 'static>,
),
}
impl fmt::Debug for Functor {
fn fmt(&self, serializer: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Dummy => serializer.debug_struct("Dummy").finish(),
Self::Once(_) => serializer.debug_struct("Once").finish(),
Self::Multi(ref range, _) => serializer
.debug_struct("Multi")
.field("range", range)
.finish(),
}
}
}
unsafe impl Sync for Functor {}
#[derive(Debug)]
struct Task {
functor: Functor,
notifier: Arc<Notifier>,
}
struct Worker {
name: String,
alive: AtomicBool,
}
struct WorkerContext {}
struct WorkerPool {
contexts: [Option<WorkerContext>; MAX_WORKERS],
}
pub struct MaybePanic {
worker_index: isize,
}
impl Drop for MaybePanic {
fn drop(&mut self) {
assert_eq!(
self.worker_index, -1,
"Panic occurred on worker {}",
self.worker_index,
);
}
}
pub struct Choir {
injector: Injector<Task>,
condvar: Condvar,
parked_mask_mutex: Mutex<usize>,
workers: RwLock<WorkerPool>,
panic_worker: AtomicIsize,
}
impl Default for Choir {
fn default() -> Self {
const NO_WORKER: Option<WorkerContext> = None;
let injector = Injector::new();
Self {
injector,
condvar: Condvar::default(),
parked_mask_mutex: Mutex::new(0),
workers: RwLock::new(WorkerPool {
contexts: [NO_WORKER; MAX_WORKERS],
}),
panic_worker: AtomicIsize::new(-1),
}
}
}
impl Choir {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn add_worker(self: &Arc<Self>, name: &str) -> WorkerHandle {
let worker = Arc::new(Worker {
name: name.to_string(),
alive: AtomicBool::new(true),
});
let worker_clone = Arc::clone(&worker);
let choir = Arc::clone(self);
let join_handle = thread::Builder::new()
.name(name.to_string())
.spawn(move || choir.work_loop(&worker_clone))
.unwrap();
WorkerHandle {
worker,
join_handle: Some(join_handle),
choir: Arc::clone(self),
}
}
pub fn spawn<'a, N: Into<Name>>(self: &'a Arc<Self>, name: N) -> ProtoTask<'a> {
let name = name.into();
log::trace!("Creating task '{}'", name);
ProtoTask {
choir: self,
name,
parent: None,
dependents: Vec::new(),
}
}
fn schedule(&self, task: Task) {
log::trace!("Task {} is scheduled", task.notifier);
self.injector.push(task);
self.condvar.notify_one();
}
fn execute(self: &Arc<Self>, task: Task, worker_index: isize) {
let execontext = ExecutionContext {
choir: self,
notifier: &task.notifier,
worker_index,
};
match task.functor {
Functor::Dummy => {
log::debug!(
"Task {} (dummy) runs on thread[{}]",
task.notifier,
worker_index
);
drop(execontext);
}
Functor::Once(fun) => {
log::debug!("Task {} runs on thread[{}]", task.notifier, worker_index);
profiling::scope!(task.notifier.name.as_ref());
(fun)(execontext);
}
Functor::Multi(mut sub_range, fun) => {
log::debug!(
"Task {} ({}) runs on thread[{}]",
task.notifier,
sub_range.start,
worker_index,
);
debug_assert!(sub_range.start < sub_range.end);
let middle = (sub_range.end + sub_range.start) >> 1;
if middle != sub_range.start && *self.parked_mask_mutex.lock().unwrap() != 0 {
self.injector.push(Task {
functor: Functor::Multi(middle..sub_range.end, Linearc::clone(&fun)),
notifier: Arc::clone(&task.notifier),
});
log::trace!("\tsplit out {:?}", middle..sub_range.end);
sub_range.end = middle;
self.condvar.notify_one();
}
{
profiling::scope!(task.notifier.name.as_ref());
(fun)(execontext, sub_range.start);
}
sub_range.start += 1;
if sub_range.start == sub_range.end {
if !Linearc::drop_last(fun) {
return;
}
} else {
self.injector.push(Task {
functor: Functor::Multi(sub_range, fun),
notifier: task.notifier,
});
return;
}
}
}
let mut notifiers = self.finish(&task.notifier);
while let Some(notifier) = notifiers.pop() {
notifiers.extend(self.finish(¬ifier));
}
}
#[profiling::function]
fn finish(&self, notifier: &Notifier) -> Vec<Arc<Notifier>> {
log::trace!("Finishing task {}", notifier);
let continuation = {
let mut guard = notifier.continuation.lock().unwrap();
if let Some(ref mut cont) = *guard {
if cont.forks != 0 {
log::trace!("\t{} forks are still alive", cont.forks);
cont.forks -= 1;
return Vec::new();
}
}
let mut cont = guard.take().unwrap();
cont.unpark_waiting();
cont
};
for dependent in continuation.dependents {
if let Some(ready) = Linearc::into_inner(dependent) {
self.schedule(ready);
}
}
continuation.parents
}
fn register(&self) -> Option<usize> {
let mut pool = self.workers.write().unwrap();
let index = pool.contexts.iter_mut().position(|c| c.is_none())?;
pool.contexts[index] = Some(WorkerContext {});
Some(index)
}
fn unregister(&self, index: usize) {
self.workers.write().unwrap().contexts[index] = None;
self.condvar.notify_one();
}
fn work_loop(self: &Arc<Self>, worker: &Worker) {
profiling::register_thread!();
let index = self.register().unwrap();
log::info!("Thread[{}] = '{}' started", index, worker.name);
while worker.alive.load(Ordering::Acquire) {
match self.injector.steal() {
Steal::Empty => {
log::trace!("Thread[{}] sleeps", index);
let mask = 1 << index;
let mut parked_mask = self.parked_mask_mutex.lock().unwrap();
*parked_mask |= mask;
parked_mask = self
.condvar
.wait_while(parked_mask, |_| {
worker.alive.load(Ordering::Acquire) && self.injector.is_empty()
})
.unwrap();
*parked_mask &= !mask;
}
Steal::Success(task) => {
self.execute(task, index as isize);
}
Steal::Retry => {}
}
}
log::info!("Thread '{}' dies", worker.name);
self.unregister(index);
}
fn flush_queue(&self) {
let mut num_tasks = 0;
loop {
match self.injector.steal() {
Steal::Empty => {
break;
}
Steal::Success(task) => {
num_tasks += 1;
let mut guard = task.notifier.continuation.lock().unwrap();
if let Some(mut cont) = guard.take() {
cont.unpark_waiting();
}
}
Steal::Retry => {}
}
}
log::trace!("\tflushed {} tasks down the drain", num_tasks);
}
fn issue_panic(&self, worker_index: isize) {
log::debug!("panic on worker {}", worker_index);
self.panic_worker.store(worker_index, Ordering::Release);
self.flush_queue();
}
pub fn check_panic(&self) -> MaybePanic {
let worker_index = self.panic_worker.load(Ordering::Acquire);
MaybePanic { worker_index }
}
}
pub struct WorkerHandle {
worker: Arc<Worker>,
join_handle: Option<thread::JoinHandle<()>>,
choir: Arc<Choir>,
}
enum MaybeArc<T> {
Unique(T),
Shared(Linearc<T>),
Null,
}
impl<T> MaybeArc<T> {
const NULL_ERROR: &'static str = "Value is gone!";
fn new(value: T) -> Self {
Self::Unique(value)
}
fn share(&mut self) -> Linearc<T> {
let arc = match mem::replace(self, Self::Null) {
Self::Unique(value) => Linearc::new(value),
Self::Shared(arc) => arc,
Self::Null => panic!("{}", Self::NULL_ERROR),
};
*self = Self::Shared(Linearc::clone(&arc));
arc
}
fn as_ref(&self) -> &T {
match *self {
Self::Unique(ref value) => value,
Self::Shared(ref arc) => arc,
Self::Null => panic!("{}", Self::NULL_ERROR),
}
}
fn extract(&mut self) -> Option<T> {
match mem::replace(self, Self::Null) {
Self::Unique(value) => Some(value),
Self::Shared(arc) => Linearc::into_inner(arc),
Self::Null => None,
}
}
}
pub struct ProtoTask<'c> {
choir: &'c Arc<Choir>,
name: Name,
parent: Option<Arc<Notifier>>,
dependents: Vec<Linearc<Task>>,
}
pub struct IdleTask {
choir: Arc<Choir>,
task: MaybeArc<Task>,
}
impl AsRef<Notifier> for IdleTask {
fn as_ref(&self) -> &Notifier {
&self.task.as_ref().notifier
}
}
impl<'a> Linearc<dyn Fn(ExecutionContext, SubIndex) + Send + Sync + 'a> {
fn new_unsized(fun: impl Fn(ExecutionContext, SubIndex) + Send + Sync + 'a) -> Self {
Self::from_inner(Box::new(arc::LinearcInner {
ref_count: AtomicUsize::new(1),
data: fun,
}))
}
}
impl Drop for ProtoTask<'_> {
fn drop(&mut self) {
for dependent in self.dependents.drain(..) {
if let Some(ready) = Linearc::into_inner(dependent) {
self.choir.schedule(ready);
}
}
}
}
impl ProtoTask<'_> {
pub fn as_blocker_for(mut self, other: &mut IdleTask) -> Self {
self.dependents.push(other.task.share());
self
}
fn fill(mut self, functor: Functor) -> IdleTask {
if let Some(ref parent_notifier) = self.parent {
parent_notifier
.continuation
.lock()
.unwrap()
.as_mut()
.unwrap()
.forks += 1;
}
IdleTask {
choir: Arc::clone(self.choir),
task: MaybeArc::new(Task {
functor,
notifier: Arc::new(Notifier {
name: mem::take(&mut self.name),
continuation: Mutex::new(Some(Continuation {
parents: self.parent.take().into_iter().collect(),
forks: 0,
dependents: mem::take(&mut self.dependents),
waiting_threads: Vec::new(),
})),
}),
}),
}
}
pub fn init_dummy(self) -> IdleTask {
self.fill(Functor::Dummy)
}
pub fn init<F: FnOnce(ExecutionContext) + Send + 'static>(self, fun: F) -> IdleTask {
let b: Box<dyn FnOnce(ExecutionContext) + Send + 'static> = Box::new(fun);
self.fill(Functor::Once(unsafe { mem::transmute(b) }))
}
pub fn init_multi<F: Fn(ExecutionContext, SubIndex) + Send + Sync + 'static>(
self,
count: SubIndex,
fun: F,
) -> IdleTask {
self.fill(if count == 0 {
Functor::Dummy
} else {
let arc: Linearc<dyn Fn(ExecutionContext, SubIndex) + Send + Sync + 'static> =
Linearc::new_unsized(fun);
Functor::Multi(0..count, unsafe { mem::transmute(arc) })
})
}
pub fn init_iter<I, F>(self, iter: I, fun: F) -> IdleTask
where
I: Iterator,
I::Item: Send + 'static,
F: Fn(ExecutionContext, I::Item) + Send + Sync + 'static,
{
let task_data = iter.collect::<util::PerTaskData<_>>();
self.init_multi(task_data.len(), move |exe_context, index| unsafe {
fun(exe_context, task_data.take(index))
})
}
}
#[derive(Clone)]
pub struct RunningTask {
choir: Arc<Choir>,
notifier: Arc<Notifier>,
}
impl fmt::Debug for RunningTask {
fn fmt(&self, serializer: &mut fmt::Formatter) -> fmt::Result {
self.notifier.fmt(serializer)
}
}
impl AsRef<Notifier> for RunningTask {
fn as_ref(&self) -> &Notifier {
&self.notifier
}
}
impl RunningTask {
pub fn is_done(&self) -> bool {
self.notifier.continuation.lock().unwrap().is_none()
}
#[profiling::function]
pub fn join(&self) -> MaybePanic {
log::debug!("Joining {}", self.notifier);
let mut guard = self.notifier.continuation.lock().unwrap();
if let Some(ref mut cont) = *guard {
let condvar = Condvar::new();
cont.waiting_threads
.push(WaitingThread { condvar: &condvar });
let _ = condvar.wait_while(guard, |cont| cont.is_some());
}
self.choir.check_panic()
}
#[profiling::function]
pub fn join_active(&self) -> MaybePanic {
let condvar;
match *self.notifier.continuation.lock().unwrap() {
Some(ref mut cont) => {
condvar = Condvar::new();
cont.waiting_threads
.push(WaitingThread { condvar: &condvar });
}
None => return self.choir.check_panic(),
}
let index = self.choir.register().unwrap();
log::info!("Join thread[{}] started", index);
loop {
let is_done = match self.choir.injector.steal() {
Steal::Empty => {
log::trace!("Thread[{}] sleeps", index);
let guard = self.notifier.continuation.lock().unwrap();
if guard.is_some() {
let guard = condvar.wait(guard).unwrap();
guard.is_none()
} else {
false
}
}
Steal::Success(task) => {
self.choir.execute(task, index as isize);
self.is_done()
}
Steal::Retry => false,
};
if is_done {
break;
}
}
log::info!("Thread[{}] is released", index);
self.choir.unregister(index);
self.choir.check_panic()
}
pub fn join_debug(&self, timeout: time::Duration) -> MaybePanic {
log::debug!("Joining {}", self.notifier);
let mut guard = self.notifier.continuation.lock().unwrap();
if let Some(ref mut cont) = *guard {
let condvar = Condvar::new();
cont.waiting_threads
.push(WaitingThread { condvar: &condvar });
let (guard, wait_result) = condvar
.wait_timeout_while(guard, timeout, |cont| cont.is_some())
.unwrap();
if wait_result.timed_out() {
println!("Join timeout reached for {}", self.notifier);
println!("Continuation: {:?}", guard);
panic!("");
}
}
self.choir.check_panic()
}
}
impl Drop for WorkerHandle {
fn drop(&mut self) {
self.worker.alive.store(false, Ordering::Release);
let handle = self.join_handle.take().unwrap();
if let Ok(_guard) = self.choir.parked_mask_mutex.lock() {
self.choir.condvar.notify_all();
}
let _ = handle.join();
}
}
impl IdleTask {
pub fn run(self) -> RunningTask {
let task = self.task.as_ref();
RunningTask {
choir: Arc::clone(&self.choir),
notifier: Arc::clone(&task.notifier),
}
}
pub fn run_attached(mut self) {
let task = self.task.as_ref();
let notifier = Arc::clone(&task.notifier);
if let Some(ready) = self.task.extract() {
self.choir.execute(ready, -1);
} else {
RunningTask {
choir: Arc::clone(&self.choir),
notifier,
}
.join_active();
}
}
#[profiling::function]
pub fn depend_on<D: AsRef<Notifier>>(&mut self, dependency: &D) {
if let Some(ref mut cont) = *dependency.as_ref().continuation.lock().unwrap() {
cont.dependents.push(self.task.share());
}
}
}
impl Drop for IdleTask {
fn drop(&mut self) {
if let Some(ready) = self.task.extract() {
self.choir.schedule(ready);
}
}
}