thread_tree/
job.rs

1use std::any::Any;
2use std::cell::UnsafeCell;
3use std::mem;
4use std::sync::atomic::{AtomicBool, Ordering};
5use super::unwind;
6
7// from rayon
8pub enum JobResult<T> {
9    None,
10    Ok(T),
11    Panic(Box<dyn Any + Send>),
12}
13
14// from rayon
15pub struct StackJob<F, R> {
16    func: UnsafeCell<Option<F>>,
17    result: UnsafeCell<JobResult<R>>,
18    latch: AtomicBool,
19}
20
21impl<F, R> StackJob<F, R> {
22    pub fn new(f: F) -> Self
23        where F: FnOnce() -> R + Send
24    {
25        Self {
26            func: UnsafeCell::new(Some(f)),
27            result: UnsafeCell::new(JobResult::None),
28            latch: AtomicBool::new(false),
29        }
30    }
31
32    #[inline]
33    pub fn into_result(self) -> R {
34        unsafe {
35            debug_assert!((*self.func.get()).is_none());
36            match mem::replace(&mut *self.result.get(), JobResult::None) {
37                JobResult::None => unreachable!(),
38                JobResult::Ok(r) => r,
39                JobResult::Panic(x) => unwind::resume_unwinding(x),
40            }
41        }
42    }
43    pub fn probe(&self) -> bool {
44        self.latch.load(Ordering::Acquire)
45    }
46}
47
48impl<F, R> Job for StackJob<F, R>
49where
50    //L: Latch + Sync,
51    F: FnOnce() -> R + Send,
52    R: Send,
53{
54    #[inline]
55    unsafe fn execute(this: *const Self) {
56        let this = &*this;
57        let abort = unwind::AbortIfPanic;
58        let func = (*this.func.get()).take().unwrap();
59        (*this.result.get()) = match unwind::halt_unwinding(|| func()) {
60            Ok(x) => JobResult::Ok(x),
61            Err(x) => JobResult::Panic(x),
62        };
63        this.latch.store(true, Ordering::Release);
64        //this.latch.set();
65        mem::forget(abort);
66    }
67}
68
69
70/// A `Job` is used to advertise work for other threads that they may
71/// want to steal. In accordance with time honored tradition, jobs are
72/// arranged in a deque, so that thieves can take from the top of the
73/// deque while the main worker manages the bottom of the deque. This
74/// deque is managed by the `thread_pool` module.
75pub trait Job {
76    /// Unsafe: this may be called from a different thread than the one
77    /// which scheduled the job, so the implementer must ensure the
78    /// appropriate traits are met, whether `Send`, `Sync`, or both.
79    unsafe fn execute(this: *const Self);
80}
81
82/// Effectively a Job trait object. Each JobRef **must** be executed
83/// exactly once, or else data may leak.
84///
85/// Internally, we store the job's data in a `*const ()` pointer.  The
86/// true type is something like `*const StackJob<...>`, but we hide
87/// it. We also carry the "execute fn" from the `Job` trait.
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
89pub struct JobRef {
90    pointer: *const (),
91    execute_fn: unsafe fn(*const ()),
92}
93
94unsafe impl Send for JobRef {}
95unsafe impl Sync for JobRef {}
96
97impl JobRef {
98    /// Unsafe: caller asserts that `data` will remain valid until the
99    /// job is executed.
100    pub unsafe fn new<T>(data: *const T) -> JobRef
101    where
102        T: Job + Send,
103    {
104        let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
105
106        // erase types:
107        let fn_ptr: unsafe fn(*const ()) = mem::transmute(fn_ptr);
108        let pointer = data as *const ();
109
110        JobRef {
111            pointer: pointer,
112            execute_fn: fn_ptr,
113        }
114    }
115
116    #[inline]
117    pub unsafe fn execute(&self) {
118        (self.execute_fn)(self.pointer)
119    }
120}
121