1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::any::Any;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::{mem, ptr};
use super::job::HeapJob;
use super::latch::{CountLatch, Latch};
use super::scheduler::{Scheduler, WorkerThread};
use super::unwind;
pub struct Scope<'s> {
scheduler: Option<Arc<Scheduler>>,
latch: CountLatch,
marker: PhantomData<Box<FnOnce(&Scope<'s>) + Send + Sync + 's>>,
panic: AtomicPtr<Box<Any + Send + 'static>>,
}
impl<'s> Scope<'s> {
pub fn new(scheduler: Option<Arc<Scheduler>>) -> Self {
Scope {
scheduler,
latch: CountLatch::new(),
marker: PhantomData::default(),
panic: AtomicPtr::new(ptr::null_mut()),
}
}
pub fn spawn<F>(&self, func: F)
where
F: FnOnce(&Scope<'s>) + Send + 's,
{
unsafe {
if let Some(ref scheduler) = self.scheduler {
self.latch.increment();
let job = Box::new(HeapJob::new(move || {
let _v = self.execute(func);
}))
.transmute();
scheduler.inject_or_push(job);
} else {
func(self);
}
}
}
pub(crate) unsafe fn execute<F, R>(&self, func: F) -> Option<R>
where
F: FnOnce(&Scope<'s>) -> R + 's,
{
match unwind::halt_unwinding(move || func(self)) {
Ok(r) => {
self.latch.set();
Some(r)
}
Err(err) => {
let nil = ptr::null_mut();
let mut err = Box::new(err); if self
.panic
.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
mem::forget(err); }
self.latch.set();
None
}
}
}
pub(crate) unsafe fn wait_until_completed(&self, worker: &WorkerThread) {
worker.wait_until(&self.latch);
let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
if !panic.is_null() {
let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic);
unwind::resume_unwinding(*value);
}
}
}