1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
//! A global, auto-scaling, preemptive scheduler using work-balancing.
//!
//! ## What? Another executor?
//!
//! `smolscale` is a **work-balancing** executor based on [async-task], designed to be a drop-in replacement to `smol` and `async-global-executor`. It is designed based on the thesis that work-stealing, the usual approach in async executors like `async-executor` and `tokio`, is not the right algorithm for scheduling huge amounts of tiny, interdependent work units, which are what message-passing futures end up being. Instead, `smolscale` uses *work-balancing*, an approach also found in Erlang, where a global "balancer" thread periodically balances work between workers, but workers do not attempt to steal tasks from each other. This avoids the extremely frequent stealing attempts that work-stealing schedulers generate when applied to async tasks.
//!
//! `smolscale`'s approach especially excels in two circumstances:
//! - **When the CPU cores are not fully loaded**: Traditional work stealing optimizes for the case where most workers have work to do, which is only the case in fully-loaded scenarios. When workers often wake up and go back to sleep, however, a lot of CPU time is wasted stealing work. `smolscale` will instead drastically reduce CPU usage in these circumstances --- a `async-executor` app that takes 80% of CPU time may now take only 20%. Although this does not improve fully-loaded throughput, it significantly reduces power consumption and does increase throughput in circumstances where multiple thread pools compete for CPU time.
//! - **When a lot of message-passing is happening**: Message-passing workloads often involve tasks quickly waking up and going back to sleep. In a work-stealing scheduler, this again floods the scheduler with stealing requests. `smolscale` can significantly improve throughput, especially compared to executors like `async-executor` that do not special-case message passing.
//!
//! Furthermore, smolscale has a **preemptive** thread pool that ensures that tasks cannot block other tasks no matter what. This means that you can do things like run expensive computations or even do blocking I/O within a task without worrying about causing deadlocks. Even with "traditional" tasks that do not block, this approach can reduce worst-case latency. Preemption is heavily inspired by Stjepan Glavina's [previous work on async-std](https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/).
//!
//! `smolscale` also experimentally includes `Nursery`, a helper for [structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) on the `smolscale` global executor.
//!
//! ## Show me the benchmarks!
//!
//! Right now, `smolscale` uses a very naive implementation (for example, stealable local queues are implemented as SPSC queues with a spinlock on the consumer side, and worker parking is done naively through `event-listener`), and its performance is expected to drastically increase. However, at most tasks it is already much faster than `async-global-executor` (the de-facto standard "non-Tokio-world" executor, which powers `async-std`), sometimes an order of magnitude faster. Here are some unscientific benchmark results; percentages are compared to `async-global-executor`:
//! ```
//! spawn_one time: [105.08 ns 105.21 ns 105.36 ns]
//! change: [-98.570% -98.549% -98.530%] (p = 0.00 < 0.05)
//! Performance has improved.
//!
//! spawn_many time: [3.0585 ms 3.0598 ms 3.0613 ms]
//! change: [-87.576% -87.291% -86.948%] (p = 0.00 < 0.05)
//! Performance has improved.
//!
//! yield_now time: [4.1676 ms 4.1917 ms 4.2166 ms]
//! change: [-50.455% -49.994% -49.412%] (p = 0.00 < 0.05)
//! Performance has improved.
//
//! ping_pong time: [8.5389 ms 8.6990 ms 8.8525 ms]
//! change: [+12.264% +14.548% +16.917%] (p = 0.00 < 0.05)
//! Performance has regressed.
//!
//! Benchmarking spawn_executors_recursively:
//! spawn_executors_recursively
//! time: [180.26 ms 180.40 ms 180.56 ms]
//! change: [+497.14% +500.08% +502.97%] (p = 0.00 < 0.05)
//! Performance has regressed.
//!
//! context_switch_quiet time: [100.67 us 102.05 us 103.07 us]
//! change: [-42.789% -41.170% -39.490%] (p = 0.00 < 0.05)
//! Performance has improved.
//!
//! context_switch_busy time: [8.7637 ms 8.9012 ms 9.0561 ms]
//! change: [+3.3147% +5.5719% +7.6684%] (p = 0.00 < 0.05)
//! Performance has regressed.
//!```
use fastcounter::FastCounter;
use futures_lite::prelude::*;
use once_cell::sync::{Lazy, OnceCell};
use std::{
pin::Pin,
sync::atomic::AtomicUsize,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
time::Duration,
};
mod executor;
mod fastcounter;
mod nursery;
mod sp2c;
pub use executor::*;
pub use nursery::*;
//const CHANGE_THRESH: u32 = 10;
const MONITOR_MS: u64 = 5;
const MAX_THREADS: usize = 1500;
// thread_local! {
// static LEXEC: Rc<async_executor::LocalExecutor<'static>> = Rc::new(async_executor::LocalExecutor::new())
// }
static EXEC: Lazy<Executor> = Lazy::new(Executor::new);
static FUTURES_BEING_POLLED: Lazy<FastCounter> = Lazy::new(Default::default);
static POLL_COUNT: Lazy<FastCounter> = Lazy::new(Default::default);
static THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
static MONITOR: OnceCell<std::thread::JoinHandle<()>> = OnceCell::new();
static SINGLE_THREAD: AtomicBool = AtomicBool::new(false);
/// Irrevocably puts smolscale into single-threaded mode.
pub fn permanently_single_threaded() {
SINGLE_THREAD.store(true, Ordering::Relaxed);
}
fn start_monitor() {
MONITOR.get_or_init(|| {
std::thread::Builder::new()
.name("sscale-mon".into())
.spawn(monitor_loop)
.unwrap()
});
}
fn monitor_loop() {
fn start_thread(exitable: bool, process_io: bool) {
THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
std::thread::Builder::new()
.name(
if exitable {
"sscale-wkr-e"
} else {
"sscale-wkr-c"
}
.into(),
)
.stack_size(10 * 1024 * 1024)
.spawn(move || {
// let local_exec = LEXEC.with(|v| Rc::clone(v));
let future = async {
scopeguard::defer!({
THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
});
// let run_local = local_exec.run(futures_lite::future::pending::<()>());
if exitable {
EXEC.worker()
.run()
.or(async {
async_io::Timer::after(Duration::from_secs(3)).await;
})
.await;
} else {
EXEC.worker().run().await;
};
};
if process_io {
async_io::block_on(future)
} else {
futures_lite::future::block_on(future)
}
})
.unwrap();
}
if SINGLE_THREAD.load(Ordering::Relaxed) {
start_thread(false, true);
return;
} else {
for _ in 0..num_cpus::get() {
start_thread(false, true);
}
}
// "Token bucket"
let mut token_bucket = 1000;
for count in 0u64.. {
if count % 100 == 0 && token_bucket < 1000 {
token_bucket += 1
}
EXEC.rebalance();
if SINGLE_THREAD.load(Ordering::Relaxed) {
return;
}
let before_sleep = POLL_COUNT.count();
std::thread::sleep(Duration::from_millis(MONITOR_MS));
let after_sleep = POLL_COUNT.count();
let running_threads = THREAD_COUNT.load(Ordering::Relaxed);
let full_running = FUTURES_BEING_POLLED.count() >= running_threads;
if after_sleep == before_sleep
&& running_threads <= MAX_THREADS
&& full_running
&& token_bucket > 0
{
start_thread(true, false);
token_bucket -= 1;
}
}
unreachable!()
}
/// Spawns a future onto the global executor and immediately blocks on it.
pub fn block_on<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> T {
futures_lite::future::block_on(spawn(future))
}
/// Spawns a task onto the lazily-initialized global executor.
///
/// The task can block or run CPU-intensive code if needed --- it will not block other tasks.
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> async_executor::Task<T> {
start_monitor();
EXEC.spawn(WrappedFuture::new(future))
// async_global_executor::spawn(future)
}
// /// Spawns a task onto the lazily-initialized thread-local executor.
// ///
// /// The task should **NOT** block or run CPU-intensive code
// pub fn spawn<T: 'static>(
// future: impl Future<Output = T> + 'static,
// ) -> async_executor::Task<T> {
// start_monitor();
// LEXEC.with(|v| v.spawn(future))
// // async_global_executor::spawn(future)
// }
struct WrappedFuture<T, F: Future<Output = T>> {
fut: F,
}
static ACTIVE_TASKS: Lazy<FastCounter> = Lazy::new(Default::default);
/// Returns the current number of active tasks.
pub fn active_task_count() -> usize {
ACTIVE_TASKS.count()
}
/// Returns the current number of running tasks.
pub fn running_task_count() -> usize {
FUTURES_BEING_POLLED.count()
}
impl<T, F: Future<Output = T>> Drop for WrappedFuture<T, F> {
fn drop(&mut self) {
ACTIVE_TASKS.decr();
}
}
impl<T, F: Future<Output = T>> Future for WrappedFuture<T, F> {
type Output = T;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// FUTURES_BEING_POLLED.incr();
POLL_COUNT.incr();
// scopeguard::defer!({
// FUTURES_BEING_POLLED.decr();
// });
let fut = unsafe { self.map_unchecked_mut(|v| &mut v.fut) };
fut.poll(cx)
}
}
impl<T, F: Future<Output = T> + 'static> WrappedFuture<T, F> {
pub fn new(fut: F) -> Self {
ACTIVE_TASKS.incr();
WrappedFuture { fut }
}
}