1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::cell::Cell;
use std::future::Future;
use std::pin::pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::io::reactor::Reactor;
use crate::parking;
use crate::waker_fn::waker_fn;
/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
/// Runs a future to completion on the current thread.
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
Reactor::get().add_block_on_count();
let _guard = CallOnDrop(|| {
Reactor::get().sub_block_on_count();
Reactor::get().unpark();
});
let (p, u) = parking::pair();
let io_blocked = Arc::new(AtomicBool::new(false));
thread_local! {
static IO_POLLING: Cell<bool> = const { Cell::new(false) };
}
let waker = waker_fn({
let io_blocked = io_blocked.clone();
move || {
if u.unpark() && !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::Acquire) {
Reactor::get().notify();
}
}
});
let cx = &mut Context::from_waker(&waker);
let mut future = pin!(future);
loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
return t;
}
// Check if a notification has been received.
if p.park_timeout(Some(Duration::from_secs(0))) {
// Try grabbing a lock on the reactor to process I/O events.
if let Some(reactor_lock) = Reactor::get().try_lock() {
IO_POLLING.with(|io| io.set(true));
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
});
// Process available I/O events.
reactor_lock.react(Some(Duration::from_secs(0))).ok();
}
continue;
}
// Try grabbing a lock on the reactor to process I/O events.
if let Some(reactor_lock) = Reactor::get().try_lock() {
// Hold the lock means all I/O events just handled.
// Record the instant at which the lock was grabbed.
let start = Instant::now();
loop {
IO_POLLING.with(|io| io.set(true));
io_blocked.store(true, Ordering::Release);
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
io_blocked.store(false, Ordering::Release);
});
// Check if a notification has been received.
if p.park_timeout(Some(Duration::from_secs(0))) {
break;
}
// Wait on I/O Events
reactor_lock.react(None).ok();
// Check if a notification has been received.
if p.park_timeout(Some(Duration::from_secs(0))) {
break;
}
// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
// reactor and give other threads a chance to process I/O events for
// themselves.
drop(reactor_lock);
// Unpark the epoll thread in case no other thread is ready to start
// processing I/O events. This way we prevent a potential latency spike.
Reactor::get().unpark();
// Wait for a notification.
p.park();
break;
}
}
} else {
p.park();
}
}
}