1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
//! A thread pool for running blocking functions asynchronously. //! //! Blocking thread pool consists of four elements: //! * Frequency Detector //! * Trend Estimator //! * Predictive Upscaler //! * Time-based Downscaler //! //! ## Frequency Detector //! Detects how many tasks are submitted from scheduler to thread pool in a given time frame. //! Pool manager thread does this sampling every 200 milliseconds. //! This value is going to be used for trend estimation phase. //! //! ## Trend Estimator //! Hold up to the given number of frequencies to create an estimation. //! Trend estimator holds 10 frequencies at a time. //! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html). //! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. //! //! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61) //! and altered to: //! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value. //! * use instead of linear trend estimation, it uses exponential trend estimation where formula is: //! ```text //! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK //! ``` //! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count //! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution) //! * operate extensive subsampling. Extensive subsampling congests the pool manager thread. //! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations. //! //! ## Predictive Upscaler //! Upscaler has three cases (also can be seen in paper): //! * The rate slightly increases and there are many idle threads. //! * The number of worker threads tends to be reduced since the workload of the system is descending. //! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs) //! //! For the first two EMA calculation and exponential trend estimation gives good performance. //! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen. //! //! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected. //! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler. //! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on //! some OS or make congestion on the other thread utilizations of the program, because of context switch. //! //! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. //! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors. //! //! ## Time-based Downscaler //! When threads becomes idle, they will not shut down immediately. //! Instead, they wait a random amount between 1 and 11 seconds //! to even out the load. use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::Duration; use crossbeam_channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::utils; use lightproc::lightproc::LightProc; use lightproc::proc_stack::ProcStack; use lightproc::recoverable_handle::RecoverableHandle; use std::future::Future; use std::io::ErrorKind; use std::iter::Iterator; use std::sync::Mutex; /// Low watermark value, defines the bare minimum of the pool. /// Spawns initial thread set. const LOW_WATERMARK: u64 = 2; /// Pool managers interval time (milliseconds). /// This is the actual interval which makes adaptation calculation. const MANAGER_POLL_INTERVAL: u64 = 200; /// Frequency histogram's sliding window size. /// Defines how many frequencies will be considered for adaptation. const FREQUENCY_QUEUE_SIZE: usize = 10; /// Exponential moving average smoothing coefficient for limited window. /// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); /// Pool task frequency variable. /// Holds scheduled tasks onto the thread pool for the calculation time window. static FREQUENCY: AtomicU64 = AtomicU64::new(0); /// Possible max threads (without OS contract). static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000); /// Pool interface between the scheduler and thread pool struct Pool { sender: Sender<LightProc>, receiver: Receiver<LightProc>, } lazy_static! { /// Blocking pool with static starting thread count. static ref POOL: Pool = { for _ in 0..LOW_WATERMARK { thread::Builder::new() .name("bastion-blocking-driver".to_string()) .spawn(|| { for task in &POOL.receiver { task.run(); } }) .expect("cannot start a thread driving blocking tasks"); } // Pool manager to check frequency of task rates // and take action by scaling the pool accordingly. thread::Builder::new() .name("bastion-pool-manager".to_string()) .spawn(|| { let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL); loop { scale_pool(); thread::sleep(poll_interval); } }) .expect("thread pool manager cannot be started"); // We want to use an unbuffered channel here to help // us drive our dynamic control. In effect, the // kernel's scheduler becomes the queue, reducing // the number of buffers that work must flow through // before being acted on by a core. This helps keep // latency snappy in the overall async system by // reducing bufferbloat. let (sender, receiver) = bounded(0); Pool { sender, receiver } }; /// Sliding window for pool task frequency calculation static ref FREQ_QUEUE: Mutex<VecDeque<u64>> = { Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1))) }; /// Dynamic pool thread count variable static ref POOL_SIZE: Mutex<u64> = Mutex::new(LOW_WATERMARK); } /// Exponentially Weighted Moving Average calculation /// /// This allows us to find the EMA value. /// This value represents the trend of tasks mapped onto the thread pool. /// Calculation is following: /// ```text /// +--------+-----------------+----------------------------------+ /// | Symbol | Identifier | Explanation | /// +--------+-----------------+----------------------------------+ /// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | /// | Yt | freq | frequency sample at time t | /// | St | acc | EMA at time t | /// +--------+-----------------+----------------------------------+ /// ``` /// Under these definitions formula is following: /// ```text /// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St /// ``` /// # Arguments /// /// * `freq_queue` - Sliding window of frequency samples #[inline] fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 { freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) }) * EMA_COEFFICIENT as f64 } /// Adaptive pool scaling function /// /// This allows to spawn new threads to make room for incoming task pressure. /// Works in the background detached from the pool system and scales up the pool based /// on the request rate. /// /// It uses frequency based calculation to define work. Utilizing average processing rate. fn scale_pool() { // Fetch current frequency, it does matter that operations are ordered in this approach. let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst); let mut freq_queue = FREQ_QUEUE.lock().unwrap(); // Make it safe to start for calculations by adding initial frequency scale if freq_queue.len() == 0 { freq_queue.push_back(0); } // Calculate message rate for the given time window let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64; // Calculates current time window's EMA value (including last sample) let prev_ema_frequency = calculate_ema(&freq_queue); // Add seen frequency data to the frequency histogram. freq_queue.push_back(frequency); if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) { freq_queue.pop_front(); } // Calculates current time window's EMA value (including last sample) let curr_ema_frequency = calculate_ema(&freq_queue); // Adapts the thread count of pool // // Sliding window of frequencies visited by the pool manager. // Pool manager creates EMA value for previous window and current window. // Compare them to determine scaling amount based on the trends. // If current EMA value is bigger, we will scale up. if curr_ema_frequency > prev_ema_frequency { // "Scale by" amount can be seen as "how much load is coming". // "Scale" amount is "how many threads we should spawn". let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; let scale = num_cpus::get() .min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize); // It is time to scale the pool! (0..scale).for_each(|_| { create_blocking_thread(); }); } else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON && current_frequency != 0 { // Throughput is low. Allocate more threads to unblock flow. // If we fall to this case, scheduler is congested by longhauling tasks. // For unblock the flow we should add up some threads to the pool, but not that many to // stagger the program's operation. (0..LOW_WATERMARK).for_each(|_| { create_blocking_thread(); }); } } /// Creates blocking thread to receive tasks /// Dynamic threads will terminate themselves if they don't /// receive any work after between one and ten seconds. fn create_blocking_thread() { // Check that thread is spawnable. // If it hits to the OS limits don't spawn it. { let pool_size = *POOL_SIZE.lock().unwrap(); if pool_size >= MAX_THREADS.load(Ordering::SeqCst) { MAX_THREADS.store(10_000, Ordering::SeqCst); return; } } // We want to avoid having all threads terminate at // exactly the same time, causing thundering herd // effects. We want to stagger their destruction over // 10 seconds or so to make the costs fade into // background noise. // // Generate a simple random number of milliseconds let rand_sleep_ms = 1000_u64 .checked_add(u64::from(utils::random(10_000))) .expect("shouldn't overflow"); let _ = thread::Builder::new() .name("bastion-blocking-driver-dynamic".to_string()) .spawn(move || { let wait_limit = Duration::from_millis(rand_sleep_ms); // Adjust the pool size counter before and after spawn *POOL_SIZE.lock().unwrap() += 1; while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { task.run(); } *POOL_SIZE.lock().unwrap() -= 1; }) .map_err(|err| { match err.kind() { ErrorKind::WouldBlock => { // Maximum allowed threads per process is varying from system to system. // Also, some systems have it(like macOS), and some don't(Linux). // This case expected not to happen. // But when happened this shouldn't throw a panic. let guarded_count = POOL_SIZE .lock() .unwrap() .checked_sub(1) .expect("shouldn't underflow"); MAX_THREADS.store(guarded_count, Ordering::SeqCst); } _ => eprintln!( "cannot start a dynamic thread driving blocking tasks: {}", err ), } }); } /// Enqueues work, attempting to send to the thread pool in a /// nonblocking way and spinning up needed amount of threads /// based on the previous statistics without relying on /// if there is not a thread ready to accept the work or not. fn schedule(t: LightProc) { // Add up for every incoming scheduled task FREQUENCY.fetch_add(1, Ordering::Acquire); if let Err(err) = POOL.sender.try_send(t) { // We were not able to send to the channel without // blocking. POOL.sender.send(err.into_inner()).unwrap(); } } /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. pub fn spawn_blocking<F, R>(future: F, stack: ProcStack) -> RecoverableHandle<R> where F: Future<Output = R> + Send + 'static, R: Send + 'static, { let (task, handle) = LightProc::recoverable(future, schedule, stack); task.schedule(); handle }