bastion_executor/
blocking.rs1use crate::thread_manager::{DynamicPoolManager, DynamicRunner};
10use crossbeam_channel::{unbounded, Receiver, Sender};
11use lazy_static::lazy_static;
12use lightproc::lightproc::LightProc;
13use lightproc::proc_stack::ProcStack;
14use lightproc::recoverable_handle::RecoverableHandle;
15use once_cell::sync::{Lazy, OnceCell};
16use std::future::Future;
17use std::iter::Iterator;
18use std::sync::Arc;
19use std::time::Duration;
20use std::{env, thread};
21use tracing::trace;
22
23const DEFAULT_LOW_WATERMARK: u64 = 2;
26
27const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(100);
28
29pub fn spawn_blocking<F, R>(future: F, stack: ProcStack) -> RecoverableHandle<R>
33where
34 F: Future<Output = R> + Send + 'static,
35 R: Send + 'static,
36{
37 let (task, handle) = LightProc::recoverable(future, schedule, stack);
38 task.schedule();
39 handle
40}
41
42struct BlockingRunner {
43 #[cfg(feature = "tokio-runtime")]
47 runtime_handle: tokio::runtime::Handle,
48}
49
50impl DynamicRunner for BlockingRunner {
51 fn run_static(&self, park_timeout: Duration) -> ! {
52 loop {
53 while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
54 trace!("static thread: running task");
55 self.run(task);
56 }
57
58 trace!("static: empty queue, parking with timeout");
59 thread::park_timeout(park_timeout);
60 }
61 }
62 fn run_dynamic(&self, parker: &dyn Fn()) -> ! {
63 loop {
64 while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
65 trace!("dynamic thread: running task");
66 self.run(task);
67 }
68 trace!(
69 "dynamic thread: parking - {:?}",
70 std::thread::current().id()
71 );
72 parker();
73 }
74 }
75 fn run_standalone(&self) {
76 while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
77 self.run(task);
78 }
79 trace!("standalone thread: quitting.");
80 }
81}
82
83impl BlockingRunner {
84 fn run(&self, task: LightProc) {
85 #[cfg(feature = "tokio-runtime")]
86 {
87 self.runtime_handle.spawn_blocking(|| task.run());
88 }
89 #[cfg(not(feature = "tokio-runtime"))]
90 {
91 task.run();
92 }
93 }
94}
95
96struct Pool {
98 sender: Sender<LightProc>,
99 receiver: Receiver<LightProc>,
100}
101
102static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();
103
104static POOL: Lazy<Pool> = Lazy::new(|| {
105 #[cfg(feature = "tokio-runtime")]
106 {
107 let runner = Arc::new(BlockingRunner {
108 runtime_handle: tokio::runtime::Handle::current(),
112 });
113
114 DYNAMIC_POOL_MANAGER
115 .set(DynamicPoolManager::new(*low_watermark() as usize, runner))
116 .expect("couldn't create dynamic pool manager");
117 }
118 #[cfg(not(feature = "tokio-runtime"))]
119 {
120 let runner = Arc::new(BlockingRunner {});
121
122 DYNAMIC_POOL_MANAGER
123 .set(DynamicPoolManager::new(*low_watermark() as usize, runner))
124 .expect("couldn't create dynamic pool manager");
125 }
126
127 DYNAMIC_POOL_MANAGER
128 .get()
129 .expect("couldn't get static pool manager")
130 .initialize();
131
132 let (sender, receiver) = unbounded();
133 Pool { sender, receiver }
134});
135
136fn schedule(t: LightProc) {
141 if let Err(err) = POOL.sender.try_send(t) {
142 POOL.sender.send(err.into_inner()).unwrap();
145 }
146
147 DYNAMIC_POOL_MANAGER.get().unwrap().increment_frequency();
149}
150
151#[inline]
156fn low_watermark() -> &'static u64 {
157 lazy_static! {
158 static ref LOW_WATERMARK: u64 = {
159 env::var_os("BASTION_BLOCKING_THREADS")
160 .map(|x| x.to_str().unwrap().parse::<u64>().unwrap())
161 .unwrap_or(DEFAULT_LOW_WATERMARK)
162 };
163 }
164
165 &*LOW_WATERMARK
166}