1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::cell::Cell;
use std::future::Future;
use std::pin::pin;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use crate::io::reactor::Reactor;
use crate::parking;
use crate::waker_fn::waker_fn;

/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);

impl<F: Fn()> Drop for CallOnDrop<F> {
    fn drop(&mut self) {
        (self.0)();
    }
}

/// Runs a future to completion on the current thread.
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
    Reactor::get().add_block_on_count();

    let _guard = CallOnDrop(|| {
        Reactor::get().sub_block_on_count();
        Reactor::get().unpark();
    });

    let (p, u) = parking::pair();
    let io_blocked = Arc::new(AtomicBool::new(false));
    thread_local! {
        static IO_POLLING: Cell<bool> = const { Cell::new(false) };
    }

    let waker = waker_fn({
        let io_blocked = io_blocked.clone();
        move || {
            if u.unpark() && !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
                Reactor::get().notify();
            }
        }
    });

    let cx = &mut Context::from_waker(&waker);
    let mut future = pin!(future);
    loop {
        if let Poll::Ready(t) = future.as_mut().poll(cx) {
            return t;
        }

        // Check if a notification has been received.
        if p.park_timeout(Some(Duration::from_secs(0))) {
            // Try grabbing a lock on the reactor to process I/O events.
            if let Some(reactor_lock) = Reactor::get().try_lock() {
                IO_POLLING.with(|io| io.set(true));
                let _guard = CallOnDrop(|| {
                    IO_POLLING.with(|io| io.set(false));
                });
                // Process available I/O events.
                reactor_lock.react(Some(Duration::from_secs(0))).ok();
            }
            continue;
        }

        // Try grabbing a lock on the reactor to process I/O events.
        if let Some(reactor_lock) = Reactor::get().try_lock() {
            // Hold the lock means all I/O events just handled.

            // Record the instant at which the lock was grabbed.
            let start = Instant::now();

            loop {
                IO_POLLING.with(|io| io.set(true));
                io_blocked.store(true, Ordering::SeqCst);
                let _guard = CallOnDrop(|| {
                    IO_POLLING.with(|io| io.set(false));
                    io_blocked.store(false, Ordering::SeqCst);
                });

                // Check if a notification has been received.
                if p.park_timeout(Some(Duration::from_secs(0))) {
                    break;
                }

                // Wait on I/O Events
                reactor_lock.react(None).ok();

                // Check if a notification has been received.
                if p.park_timeout(Some(Duration::from_secs(0))) {
                    break;
                }

                // Check if this thread been handling I/O events for a long time.
                if start.elapsed() > Duration::from_micros(500) {
                    // This thread is clearly processing I/O events for some other threads
                    // because it didn't get a notification yet. It's best to stop hogging the
                    // reactor and give other threads a chance to process I/O events for
                    // themselves.
                    drop(reactor_lock);

                    // Unpark the epoll thread in case no other thread is ready to start
                    // processing I/O events. This way we prevent a potential latency spike.
                    Reactor::get().unpark();

                    // Wait for a notification.
                    p.park();
                    break;
                }
            }
        } else {
            p.park();
        }
    }
}