use latch::{LatchProbe, SpinLatch};
#[allow(unused_imports)]
use log::Event::*;
use job::StackJob;
use registry::{self, WorkerThread};
use std::any::Any;
use unwind;
#[cfg(test)]
mod test;
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send
{
registry::in_worker(|worker_thread| unsafe {
log!(Join { worker: worker_thread.index() });
let job_b = StackJob::new(oper_b, SpinLatch::new());
let job_b_ref = job_b.as_job_ref();
worker_thread.push(job_b_ref);
let result_a = match unwind::halt_unwinding(oper_a) {
Ok(v) => v,
Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
};
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
if job == job_b_ref {
log!(PoppedRhs { worker: worker_thread.index() });
let result_b = job_b.run_inline();
return (result_a, result_b);
} else {
log!(PoppedJob { worker: worker_thread.index() });
worker_thread.execute(job);
}
} else {
log!(LostJob { worker: worker_thread.index() });
worker_thread.wait_until(&job_b.latch);
debug_assert!(job_b.latch.probe());
break;
}
}
return (result_a, job_b.into_result());
})
}
#[cold] unsafe fn join_recover_from_panic(worker_thread: &WorkerThread,
job_b_latch: &SpinLatch,
err: Box<Any + Send>)
-> !
{
worker_thread.wait_until(job_b_latch);
unwind::resume_unwinding(err)
}