1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::any::Any;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::{mem, ptr};

use super::job::HeapJob;
use super::latch::{CountLatch, Latch};
use super::scheduler::{Scheduler, WorkerThread};
use super::unwind;

/// Represents a fork-join scope which can be used to spawn any number of tasks.
pub struct Scope<'s> {
    scheduler: Option<Arc<Scheduler>>,
    latch: CountLatch,
    marker: PhantomData<Box<FnOnce(&Scope<'s>) + Send + Sync + 's>>,
    /// if some job panicked, the error is stored here; it will be
    /// propagated to the one who created the scope
    panic: AtomicPtr<Box<Any + Send + 'static>>,
}

impl<'s> Scope<'s> {
    pub fn new(scheduler: Option<Arc<Scheduler>>) -> Self {
        Scope {
            scheduler,
            latch: CountLatch::new(),
            marker: PhantomData::default(),
            panic: AtomicPtr::new(ptr::null_mut()),
        }
    }

    /// Spawns a job into the fork-join scope `self`. This job will execute sometime before
    /// the fork-join scope completes.  The job is specified as a closure, and this closure
    /// receives its own reference to `self` as argument. This can be used to inject new jobs
    /// into `self`.
    pub fn spawn<F>(&self, func: F)
    where
        F: FnOnce(&Scope<'s>) + Send + 's,
    {
        unsafe {
            if let Some(ref scheduler) = self.scheduler {
                self.latch.increment();

                let job = Box::new(HeapJob::new(move || {
                    let _v = self.execute(func);
                }))
                .transmute();

                // Since `Scope` implements `Sync`, we can't be sure that we're still in a thread
                // of this pool, so we can't just push to the local worker thread.
                scheduler.inject_or_push(job);
            } else {
                func(self);
            }
        }
    }

    /// Executes `func` as a job in scope. Adjusts the "job completed" counters and
    /// also catches any panic and stores it into `scope`.
    pub(crate) unsafe fn execute<F, R>(&self, func: F) -> Option<R>
    where
        F: FnOnce(&Scope<'s>) -> R + 's,
    {
        match unwind::halt_unwinding(move || func(self)) {
            Ok(r) => {
                self.latch.set();
                Some(r)
            }
            Err(err) => {
                // capture the first error we see, free the rest
                let nil = ptr::null_mut();
                let mut err = Box::new(err); // box up the fat ptr
                if self
                    .panic
                    .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
                    .is_ok()
                {
                    mem::forget(err); // ownership now transferred into self.panic
                }

                self.latch.set();
                None
            }
        }
    }

    pub(crate) unsafe fn wait_until_completed(&self, worker: &WorkerThread) {
        // wait for job counter to reach 0:
        worker.wait_until(&self.latch);

        // propagate panic, if any occurred; at this point, all outstanding jobs have completed,
        // so we can use a relaxed ordering:
        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
        if !panic.is_null() {
            let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic);
            unwind::resume_unwinding(*value);
        }
    }
}