#![allow(
renamed_and_removed_lints,
clippy::new_without_default,
clippy::unneeded_field_pattern,
clippy::match_like_matches_macro,
clippy::manual_strip,
clippy::if_same_then_else,
clippy::unknown_clippy_lints,
clippy::len_without_is_empty,
clippy::should_implement_trait
)]
#![warn(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_qualifications,
clippy::pattern_type_mismatch
)]
pub mod arc;
pub mod util;
use self::arc::Linearc;
use crossbeam_deque::{Injector, Steal};
use std::{
fmt, mem, ops,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread, time,
};
const BITS_PER_BYTE: usize = 8;
const MAX_WORKERS: usize = mem::size_of::<usize>() * BITS_PER_BYTE;
#[derive(Debug, Default)]
struct Continuation {
forks: usize,
dependents: Vec<Linearc<Task>>,
waiting_threads: Vec<thread::Thread>,
}
#[derive(Debug)]
pub struct Notifier {
serial_id: usize,
name: String,
parent: Option<Arc<Notifier>>,
continuation: Mutex<Option<Continuation>>,
}
impl fmt::Display for Notifier {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "'{}'@{}", self.name, self.serial_id)
}
}
pub struct ExecutionContext<'a> {
conductor: &'a Arc<Conductor>,
notifier: &'a Arc<Notifier>,
}
impl ExecutionContext<'_> {
pub fn fork(&self, name: &str) -> ProtoTask {
log::trace!("Forking task {} as '{}'", self.notifier, name);
ProtoTask {
conductor: self.conductor,
notifier: Notifier {
serial_id: self.notifier.serial_id,
name: name.to_string(),
parent: Some(Arc::clone(self.notifier)),
continuation: Mutex::new(Some(Continuation::default())),
},
}
}
}
pub type SubIndex = u32;
enum Functor {
Dummy,
Once(Box<dyn FnOnce(ExecutionContext) + Send + 'static>),
Multi(
ops::Range<SubIndex>,
Linearc<dyn Fn(ExecutionContext, SubIndex) + Send + Sync + 'static>,
),
}
impl fmt::Debug for Functor {
fn fmt(&self, serializer: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Dummy => serializer.debug_struct("Dummy").finish(),
Self::Once(_) => serializer.debug_struct("Once").finish(),
Self::Multi(ref range, _) => serializer
.debug_struct("Multi")
.field("range", range)
.finish(),
}
}
}
unsafe impl Sync for Functor {}
#[derive(Debug)]
struct Task {
functor: Functor,
notifier: Arc<Notifier>,
}
struct Worker {
name: String,
alive: AtomicBool,
}
struct WorkerContext {
thread: thread::Thread,
}
struct WorkerPool {
contexts: [Option<WorkerContext>; MAX_WORKERS],
}
struct Conductor {
injector: Injector<Task>,
workers: RwLock<WorkerPool>,
parked_mask: AtomicUsize,
}
impl Conductor {
fn schedule(&self, task: Task) {
log::trace!("Task {} is scheduled", task.notifier);
self.injector.push(task);
let mask = self.parked_mask.load(Ordering::Acquire);
if mask != 0 {
let index = mask.trailing_zeros() as usize;
profiling::scope!("unpark");
log::trace!("\twaking up thread[{}]", index);
let pool = self.workers.read().unwrap();
if let Some(context) = pool.contexts[index].as_ref() {
context.thread.unpark();
}
}
}
fn execute(self: &Arc<Self>, task: Task, worker_index: usize) -> Option<Arc<Notifier>> {
let execontext = ExecutionContext {
conductor: self,
notifier: &task.notifier,
};
match task.functor {
Functor::Dummy => {
log::debug!(
"Task {} (dummy) runs on thread[{}]",
task.notifier,
worker_index
);
Some(task.notifier)
}
Functor::Once(fun) => {
log::debug!("Task {} runs on thread[{}]", task.notifier, worker_index);
profiling::scope!("execute");
(fun)(execontext);
Some(task.notifier)
}
Functor::Multi(mut sub_range, fun) => {
log::debug!(
"Task {} ({}) runs on thread[{}]",
task.notifier,
sub_range.start,
worker_index,
);
debug_assert!(sub_range.start < sub_range.end);
let middle = (sub_range.end + sub_range.start) >> 1;
if middle != sub_range.start {
let mask = self.parked_mask.load(Ordering::Acquire);
if mask != 0 {
self.injector.push(Task {
functor: Functor::Multi(middle..sub_range.end, Linearc::clone(&fun)),
notifier: Arc::clone(&task.notifier),
});
let index = mask.trailing_zeros() as usize;
log::trace!(
"\tsplit out {:?} for thread[{}]",
middle..sub_range.end,
index
);
sub_range.end = middle;
let pool = self.workers.read().unwrap();
if let Some(context) = pool.contexts[index].as_ref() {
context.thread.unpark();
}
}
}
(fun)(execontext, sub_range.start);
sub_range.start += 1;
if sub_range.start == sub_range.end {
if Linearc::drop_last(fun) {
Some(task.notifier)
} else {
None
}
} else {
self.injector.push(Task {
functor: Functor::Multi(sub_range, fun),
notifier: task.notifier,
});
None
}
}
}
}
fn finish(&self, notifier: &Notifier) {
profiling::scope!("unblock");
log::trace!("Finishing task {}", notifier);
let continuation = {
let mut guard = notifier.continuation.lock().unwrap();
if let Some(ref mut cont) = *guard {
if cont.forks != 0 {
cont.forks -= 1;
return;
}
}
guard.take().unwrap()
};
for thread in continuation.waiting_threads {
log::trace!("\tresolving a join");
thread.unpark();
}
for dependent in continuation.dependents {
if let Some(ready) = Linearc::into_inner(dependent) {
self.schedule(ready);
}
}
}
fn work_loop(self: &Arc<Self>, worker: &Worker) {
profiling::register_thread!();
let index = {
let mut pool = self.workers.write().unwrap();
let index = pool.contexts.iter_mut().position(|c| c.is_none()).unwrap();
pool.contexts[index] = Some(WorkerContext {
thread: thread::current(),
});
index
};
log::info!("Thread[{}] = '{}' started", index, worker.name);
while worker.alive.load(Ordering::Acquire) {
match self.injector.steal() {
Steal::Empty => {
log::trace!("Thread[{}] sleeps", index);
let mask = 1 << index;
self.parked_mask.fetch_or(mask, Ordering::Release);
if self.injector.is_empty() {
profiling::scope!("park");
thread::park();
}
self.parked_mask.fetch_and(!mask, Ordering::Release);
}
Steal::Success(task) => {
let notifier_maybe = self.execute(task, index);
let mut notifier_ref = notifier_maybe.as_ref();
while let Some(notifier) = notifier_ref {
self.finish(notifier);
notifier_ref = notifier.parent.as_ref();
}
}
Steal::Retry => {}
}
}
log::info!("Thread '{}' dies", worker.name);
self.workers.write().unwrap().contexts[index] = None;
}
}
pub struct Choir {
conductor: Arc<Conductor>,
next_id: AtomicUsize,
}
pub struct WorkerHandle {
worker: Arc<Worker>,
join_handle: Option<thread::JoinHandle<()>>,
}
enum MaybeArc<T> {
Unique(T),
Shared(Linearc<T>),
Null,
}
impl<T> MaybeArc<T> {
const NULL_ERROR: &'static str = "Value is gone!";
fn new(value: T) -> Self {
Self::Unique(value)
}
fn share(&mut self) -> Linearc<T> {
let arc = match mem::replace(self, Self::Null) {
Self::Unique(value) => Linearc::new(value),
Self::Shared(arc) => arc,
Self::Null => panic!("{}", Self::NULL_ERROR),
};
*self = Self::Shared(Linearc::clone(&arc));
arc
}
fn as_ref(&self) -> &T {
match *self {
Self::Unique(ref value) => value,
Self::Shared(ref arc) => arc,
Self::Null => panic!("{}", Self::NULL_ERROR),
}
}
fn extract(&mut self) -> Option<T> {
match mem::replace(self, Self::Null) {
Self::Unique(value) => Some(value),
Self::Shared(arc) => Linearc::into_inner(arc),
Self::Null => None,
}
}
}
pub struct ProtoTask<'c> {
conductor: &'c Arc<Conductor>,
notifier: Notifier,
}
pub struct IdleTask {
conductor: Arc<Conductor>,
task: MaybeArc<Task>,
}
impl AsRef<Notifier> for IdleTask {
fn as_ref(&self) -> &Notifier {
&self.task.as_ref().notifier
}
}
impl Linearc<dyn Fn(ExecutionContext, SubIndex) + Send + Sync + 'static> {
fn new_unsized(fun: impl Fn(ExecutionContext, SubIndex) + Send + Sync + 'static) -> Self {
Self::from_inner(Box::new(arc::LinearcInner {
ref_count: AtomicUsize::new(1),
data: fun,
}))
}
}
impl ProtoTask<'_> {
fn fill(self, functor: Functor) -> IdleTask {
if let Some(ref parent_notifier) = self.notifier.parent {
parent_notifier
.continuation
.lock()
.unwrap()
.as_mut()
.unwrap()
.forks += 1;
}
IdleTask {
conductor: Arc::clone(self.conductor),
task: MaybeArc::new(Task {
functor,
notifier: Arc::new(self.notifier),
}),
}
}
pub fn init_dummy(self) -> IdleTask {
self.fill(Functor::Dummy)
}
pub fn init<F: FnOnce(ExecutionContext) + Send + 'static>(self, fun: F) -> IdleTask {
self.fill(Functor::Once(Box::new(fun)))
}
pub fn init_multi<F: Fn(ExecutionContext, SubIndex) + Send + Sync + 'static>(
self,
count: SubIndex,
fun: F,
) -> IdleTask {
self.fill(if count == 0 {
Functor::Dummy
} else {
Functor::Multi(0..count, Linearc::new_unsized(fun))
})
}
pub fn init_iter<I, F>(self, iter: I, fun: F) -> IdleTask
where
I: Iterator,
I::Item: Send + 'static,
F: Fn(I::Item) + Send + Sync + 'static,
{
let task_data = iter.collect::<util::PerTaskData<_>>();
self.init_multi(task_data.len(), move |_, index| unsafe {
fun(task_data.take(index))
})
}
}
#[derive(Clone, Debug)]
pub struct RunningTask {
notifier: Arc<Notifier>,
}
impl AsRef<Notifier> for RunningTask {
fn as_ref(&self) -> &Notifier {
&self.notifier
}
}
impl RunningTask {
#[profiling::function]
pub fn join(self) {
log::debug!("Joining {}", self.notifier);
match *self.notifier.continuation.lock().unwrap() {
Some(ref mut cont) => {
cont.waiting_threads.push(thread::current());
}
None => return,
}
loop {
log::trace!("Parking for {}", self.notifier);
thread::park();
if self.notifier.continuation.lock().unwrap().is_none() {
return;
}
}
}
pub fn join_debug(self, timeout: time::Duration) {
log::debug!("Joining {}", self.notifier);
match *self.notifier.continuation.lock().unwrap() {
Some(ref mut cont) => {
cont.waiting_threads.push(thread::current());
}
None => return,
}
let instant = time::Instant::now();
loop {
log::trace!("Parking for {}", self.notifier);
thread::park_timeout(timeout);
if self.notifier.continuation.lock().unwrap().is_none() {
return;
}
if instant.elapsed() > timeout {
println!("Join timeout reached for {}", self.notifier);
println!(
"Continuation: {:?}",
self.notifier.continuation.lock().unwrap()
);
panic!("");
}
}
}
}
impl Default for Choir {
fn default() -> Self {
Self::new()
}
}
impl Choir {
pub fn new() -> Self {
const NO_WORKER: Option<WorkerContext> = None;
let injector = Injector::new();
Self {
conductor: Arc::new(Conductor {
injector,
workers: RwLock::new(WorkerPool {
contexts: [NO_WORKER; MAX_WORKERS],
}),
parked_mask: AtomicUsize::new(0),
}),
next_id: AtomicUsize::new(1),
}
}
pub fn add_worker(&mut self, name: &str) -> WorkerHandle {
let worker = Arc::new(Worker {
name: name.to_string(),
alive: AtomicBool::new(true),
});
let conductor = Arc::clone(&self.conductor);
let worker_clone = Arc::clone(&worker);
let join_handle = thread::Builder::new()
.name(name.to_string())
.spawn(move || conductor.work_loop(&worker_clone))
.unwrap();
WorkerHandle {
worker,
join_handle: Some(join_handle),
}
}
#[profiling::function]
pub fn spawn(&self, name: &str) -> ProtoTask {
let id = self.next_id.fetch_add(1, Ordering::AcqRel);
log::trace!("Creating task '{}'@{}", name, id);
ProtoTask {
conductor: &self.conductor,
notifier: Notifier {
serial_id: id,
name: name.to_string(),
parent: None,
continuation: Mutex::new(Some(Continuation::default())),
},
}
}
}
impl Drop for WorkerHandle {
fn drop(&mut self) {
self.worker.alive.store(false, Ordering::Release);
let handle = self.join_handle.take().unwrap();
handle.thread().unpark();
let _ = handle.join();
}
}
impl IdleTask {
pub fn run(self) -> RunningTask {
let task = self.task.as_ref();
RunningTask {
notifier: Arc::clone(&task.notifier),
}
}
pub fn depend_on<D: AsRef<Notifier>>(&mut self, dependency: &D) {
if let Some(ref mut cont) = *dependency.as_ref().continuation.lock().unwrap() {
cont.dependents.push(self.task.share());
}
}
}
impl Drop for IdleTask {
fn drop(&mut self) {
if let Some(ready) = self.task.extract() {
self.conductor.schedule(ready);
}
}
}