bastion_executor/
blocking.rs

1//!
2//! Pool of threads to run heavy processes
3//!
4//! We spawn futures onto the pool with [`spawn_blocking`] method of global run queue or
5//! with corresponding [`Worker`]'s spawn method.
6//!
7//! [`Worker`]: crate::run_queue::Worker
8
9use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use lazy_static::lazy_static;
12use lightproc::lightproc::LightProc;
13use lightproc::proc_stack::ProcStack;
14use lightproc::recoverable_handle::RecoverableHandle;
15use once_cell::sync::{Lazy, OnceCell};
16use std::future::Future;
17use std::iter::Iterator;
18use std::sync::Arc;
19use std::time::Duration;
20use std::{env, thread};
21use tracing::trace;
22
23/// If low watermark isn't configured this is the default scaler value.
24/// This value is used for the heuristics of the scaler
25const DEFAULT_LOW_WATERMARK: u64 = 2;
26
27const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100);
28
29/// Spawns a blocking task.
30///
31/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
32pub fn spawn_blocking<F, R>(future: F, stack: ProcStack) -> RecoverableHandle<R>
33where
34    F: Future<Output = R> + Send + 'static,
35    R: Send + 'static,
36{
37    let (task, handle) = LightProc::recoverable(future, schedule, stack);
38    task.schedule();
39    handle
40}
41
42struct BlockingRunner {
43    // We keep a handle to the tokio runtime here to make sure
44    // it will never be dropped while the DynamicPoolManager is alive,
45    // In case we need to spin up some threads.
46    #[cfg(feature = "tokio-runtime")]
47    runtime_handle: tokio::runtime::Handle,
48}
49
50impl DynamicRunner for BlockingRunner {
51    fn run_static(&self, park_timeout: Duration) -> ! {
52        loop {
53            while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
54                trace!("static thread: running task");
55                self.run(task);
56            }
57
58            trace!("static: empty queue, parking with timeout");
59            thread::park_timeout(park_timeout);
60        }
61    }
62    fn run_dynamic(&self, parker: &dyn Fn()) -> ! {
63        loop {
64            while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
65                trace!("dynamic thread: running task");
66                self.run(task);
67            }
68            trace!(
69                "dynamic thread: parking - {:?}",
70                std::thread::current().id()
71            );
72            parker();
73        }
74    }
75    fn run_standalone(&self) {
76        while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
77            self.run(task);
78        }
79        trace!("standalone thread: quitting.");
80    }
81}
82
83impl BlockingRunner {
84    fn run(&self, task: LightProc) {
85        #[cfg(feature = "tokio-runtime")]
86        {
87            self.runtime_handle.spawn_blocking(|| task.run());
88        }
89        #[cfg(not(feature = "tokio-runtime"))]
90        {
91            task.run();
92        }
93    }
94}
95
96/// Pool interface between the scheduler and thread pool
97struct Pool {
98    sender: Sender<LightProc>,
99    receiver: Receiver<LightProc>,
100}
101
102static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();
103
104static POOL: Lazy<Pool> = Lazy::new(|| {
105    #[cfg(feature = "tokio-runtime")]
106    {
107        let runner = Arc::new(BlockingRunner {
108            // We use current() here instead of try_current()
109            // because we want bastion to crash as soon as possible
110            // if there is no available runtime.
111            runtime_handle: tokio::runtime::Handle::current(),
112        });
113
114        DYNAMIC_POOL_MANAGER
115            .set(DynamicPoolManager::new(*low_watermark() as usize, runner))
116            .expect("couldn't create dynamic pool manager");
117    }
118    #[cfg(not(feature = "tokio-runtime"))]
119    {
120        let runner = Arc::new(BlockingRunner {});
121
122        DYNAMIC_POOL_MANAGER
123            .set(DynamicPoolManager::new(*low_watermark() as usize, runner))
124            .expect("couldn't create dynamic pool manager");
125    }
126
127    DYNAMIC_POOL_MANAGER
128        .get()
129        .expect("couldn't get static pool manager")
130        .initialize();
131
132    let (sender, receiver) = unbounded();
133    Pool { sender, receiver }
134});
135
136/// Enqueues work, attempting to send to the thread pool in a
137/// nonblocking way and spinning up needed amount of threads
138/// based on the previous statistics without relying on
139/// if there is not a thread ready to accept the work or not.
140fn schedule(t: LightProc) {
141    if let Err(err) = POOL.sender.try_send(t) {
142        // We were not able to send to the channel without
143        // blocking.
144        POOL.sender.send(err.into_inner()).unwrap();
145    }
146
147    // Add up for every incoming scheduled task
148    DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
149}
150
151///
152/// Low watermark value, defines the bare minimum of the pool.
153/// Spawns initial thread set.
154/// Can be configurable with env var `BASTION_BLOCKING_THREADS` at runtime.
155#[inline]
156fn low_watermark() -> &'static u64 {
157    lazy_static! {
158        static ref LOW_WATERMARK: u64 = {
159            env::var_os("BASTION_BLOCKING_THREADS")
160                .map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
161                .unwrap_or(DEFAULT_LOW_WATERMARK)
162        };
163    }
164
165    &*LOW_WATERMARK
166}