1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Implementation of [`run()`].
//!
//! This function is the entry point to the smol executor.

use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;

use once_cell::sync::Lazy;

use crate::context;
use crate::multitask;
use crate::parking::Parker;
use scoped_tls::scoped_thread_local;

/// The global task queue.
pub(crate) static QUEUE: Lazy<multitask::Queue> = Lazy::new(|| multitask::Queue::new());

scoped_thread_local! {
    /// Thread-local worker queue.
    pub(crate) static WORKER: multitask::Worker
}

/// Runs executors and polls the reactor.
///
/// This function simultaneously runs the thread-local executor, runs the work-stealing
/// executor, and polls the reactor for I/O events and timers. At least one thread has to be
/// calling [`run()`] in order for futures waiting on I/O and timers to get notified.
///
/// # Examples
///
/// Single-threaded executor:
///
/// ```
/// // Run the thread-local and work-stealing executor on the current thread.
/// smol::run(async {
///     println!("Hello from the smol executor!");
/// });
/// ```
///
/// Multi-threaded executor:
///
/// ```no_run
/// use futures::future;
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // Run the thread-local and work-stealing executor on a thread pool.
/// for _ in 0..num_threads {
///     // A pending future is one that simply yields forever.
///     thread::spawn(|| smol::run(future::pending::<()>()));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
///     Task::spawn(async {
///         println!("Hello from an executor thread!");
///     })
///     .await;
/// });
/// ```
///
/// Stoppable multi-threaded executor:
///
/// ```
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // A channel that sends the shutdown signal.
/// let (s, r) = piper::chan::<()>(0);
/// let mut threads = Vec::new();
///
/// // Create an executor thread pool.
/// for _ in 0..num_threads {
///     // Spawn an executor thread that waits for the shutdown signal.
///     let r = r.clone();
///     threads.push(thread::spawn(move || smol::run(r.recv())));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
///     Task::spawn(async {
///         println!("Hello from an executor thread!");
///     })
///     .await;
/// });
///
/// // Send a shutdown signal.
/// drop(s);
///
/// // Wait for threads to finish.
/// for t in threads {
///     t.join().unwrap();
/// }
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
    let parker = Parker::new();

    let unparker = parker.unparker();
    let worker = QUEUE.worker(move || unparker.unpark());

    // Create a waker that triggers an I/O event in the thread-local scheduler.
    let unparker = parker.unparker();
    let waker = async_task::waker_fn(move || unparker.unpark());
    let cx = &mut Context::from_waker(&waker);
    futures_util::pin_mut!(future);

    // Set up tokio if enabled.
    context::enter(|| {
        WORKER.set(&worker, || {
            'start: loop {
                // Poll the main future.
                if let Poll::Ready(val) = future.as_mut().poll(cx) {
                    return val;
                }

                for _ in 0..200 {
                    if !worker.tick() {
                        parker.park();
                        continue 'start;
                    }
                }

                // Process ready I/O events without blocking.
                parker.park_timeout(Duration::from_secs(0));
            }
        })
    })
}