1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//! A simple adaptive threadpool that returns a oneshot future.
use log::warn;
use super::OneShot;
/// Spawn a function on the threadpool.
pub fn spawn<F, R>(work: F) -> OneShot<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (promise_filler, promise) = OneShot::pair();
let task = move || {
let result = (work)();
promise_filler.fill(result);
};
// On windows, linux, and macos send it to a threadpool.
// Otherwise just execute it immediately, because we may
// not support threads at all!
#[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
{
(task)();
return promise;
}
#[cfg(any(windows, target_os = "linux", target_os = "macos"))]
{
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use super::{debug_delay, Lazy};
const MAX_THREADS: u64 = 128;
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
struct Pool {
sender: Sender<Box<dyn FnOnce() + Send + 'static>>,
receiver: Receiver<Box<dyn FnOnce() + Send + 'static>>,
}
static POOL: Lazy<Pool, fn() -> Pool> = Lazy::new(init_pool);
fn init_pool() -> Pool {
for _ in 0..2 {
thread::Builder::new()
.name("sled-io".to_string())
.spawn(|| {
for task in &POOL.receiver {
debug_delay();
(task)()
}
})
.expect("cannot start a thread driving blocking tasks");
}
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
}
// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after one second.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
}
let spawn_res = thread::Builder::new()
.name("sled-io".to_string())
.spawn(|| {
let wait_limit = Duration::from_secs(1);
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit)
{
debug_delay();
(task)();
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
});
if let Err(e) = spawn_res {
warn!(
"Failed to dynamically increase the threadpool size: {:?}. \
Currently have {} dynamic threads",
e,
workers
);
}
}
let first_try_result = POOL.sender.try_send(Box::new(task));
match first_try_result {
Ok(()) => {
// NICEEEE
}
Err(crossbeam_channel::TrySendError::Full(task)) => {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
POOL.sender.send(task).unwrap()
}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
panic!(
"unable to send to blocking threadpool \
due to receiver disconnection"
);
}
}
promise
}
}