1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#![cfg_attr(docsrs, feature(doc_cfg))]
use super::*;
use std::{
cell::Cell,
marker::PhantomData,
thread::{self, Thread},
sync::{Arc, atomic::{AtomicUsize, Ordering}},
};
pub struct Scope<'pool, 'scope> {
pool: &'pool ThreadPool,
parent: Arc<(Thread, AtomicUsize)>,
phantom: PhantomData<Cell<&'scope ()>>,
}
impl<'pool, 'scope> Scope<'pool, 'scope> {
pub fn run<F: FnOnce() + Send + 'scope>(&self, f: F) {
let parent = self.parent.clone();
parent.1.fetch_add(1, Ordering::Acquire);
let f = unsafe { std::mem::transmute::<
Box<dyn FnOnce() + Send + 'scope>,
Box<dyn FnOnce() + Send + 'static>,
>(Box::new(f)) };
self.pool.run(move || {
let _guard = scopeguard::guard(parent, |parent| {
parent.1.fetch_sub(1, Ordering::Release);
parent.0.unpark();
});
f();
})
}
#[cfg(feature = "recv")]
#[cfg_attr(docsrs, doc(cfg(feature = "recv")))]
pub fn run_recv<F: FnOnce() -> R + Send + 'scope, R: Send + 'scope>(&self, f: F) -> recv::JobHandle<R> {
let (tx, rx) = oneshot::channel();
self.run(move || { let _ = tx.send(f()); });
recv::JobHandle::new(rx)
}
}
pub(crate) fn run<'pool, 'scope, R>(pool: &'pool ThreadPool, f: impl FnOnce(Scope<'pool, 'scope>) -> R) -> R {
let this = Arc::new((thread::current(), AtomicUsize::new(0)));
let _guard = scopeguard::guard(this.clone(), |this| {
while this.1.load(Ordering::SeqCst) > 0 {
thread::park();
}
});
f(Scope {
pool,
parent: this,
phantom: PhantomData,
})
}