1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::cell::Cell;
use std::future::Future;
use std::mem::{self, ManuallyDrop};
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use std::thread;

use crossbeam_utils::sync::Parker;
use kv_log_macro::trace;
use log::log_enabled;

use crate::task::{Context, Poll, Task, Waker};

/// Spawns a task and blocks the current thread on its result.
///
/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an
/// asynchronous task will be spawned.
///
/// See also: [`task::spawn_blocking`].
///
/// [`task::spawn_blocking`]: fn.spawn_blocking.html
///
/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html
/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join
///
/// # Examples
///
/// ```no_run
/// use async_std::task;
///
/// fn main() {
///     task::block_on(async {
///         println!("Hello, world!");
///     })
/// }
/// ```
pub fn block_on<F, T>(future: F) -> T
where
    F: Future<Output = T>,
{
    // Create a new task handle.
    let task = Task::new(None);

    // Log this `block_on` operation.
    if log_enabled!(log::Level::Trace) {
        trace!("block_on", {
            task_id: task.id().0,
            parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
        });
    }

    let future = async move {
        // Drop task-locals on exit.
        defer! {
            Task::get_current(|t| unsafe { t.drop_locals() });
        }

        // Log completion on exit.
        defer! {
            if log_enabled!(log::Level::Trace) {
                Task::get_current(|t| {
                    trace!("completed", {
                        task_id: t.id().0,
                    });
                });
            }
        }

        future.await
    };

    // Run the future as a task.
    unsafe { Task::set_current(&task, || run(future)) }
}

/// Blocks the current thread on a future's result.
fn run<F, T>(future: F) -> T
where
    F: Future<Output = T>,
{
    thread_local! {
        // May hold a pre-allocated parker that can be reused for efficiency.
        //
        // Note that each invocation of `block` needs its own parker. In particular, if `block`
        // recursively calls itself, we must make sure that each recursive call uses a distinct
        // parker instance.
        static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
    }

    // Virtual table for wakers based on `Arc<Parker>`.
    static VTABLE: RawWakerVTable = {
        unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
            let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
            #[allow(clippy::redundant_clone)]
            mem::forget(arc.clone());
            RawWaker::new(ptr, &VTABLE)
        }

        unsafe fn wake_raw(ptr: *const ()) {
            let arc = Arc::from_raw(ptr as *const Parker);
            arc.unparker().unpark();
        }

        unsafe fn wake_by_ref_raw(ptr: *const ()) {
            let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
            arc.unparker().unpark();
        }

        unsafe fn drop_raw(ptr: *const ()) {
            drop(Arc::from_raw(ptr as *const Parker))
        }

        RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
    };

    // Pin the future on the stack.
    pin_utils::pin_mut!(future);

    CACHE.with(|cache| {
        // Reuse a cached parker or create a new one for this invocation of `block`.
        let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
        let ptr = (&*arc_parker as *const Parker) as *const ();

        // Create a waker and task context.
        let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &VTABLE))) };
        let cx = &mut Context::from_waker(&waker);

        let mut step = 0;
        loop {
            if let Poll::Ready(t) = future.as_mut().poll(cx) {
                // Save the parker for the next invocation of `block`.
                cache.set(Some(arc_parker));
                return t;
            }

            // Yield a few times or park the current thread.
            if step < 3 {
                thread::yield_now();
                step += 1;
            } else {
                arc_parker.park();
                step = 0;
            }
        }
    })
}