1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
//! A thread pool for running blocking functions asynchronously.
//!
//! Blocking thread pool consists of four elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//! * Time-based Downscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 200 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
//!
//! ## Time-based Downscaler
//! When threads becomes idle, they will not shut down immediately.
//! Instead, they wait a random amount between 1 and 11 seconds
//! to even out the load.

use std::collections::VecDeque;

use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;

use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::utils;
use lightproc::lightproc::LightProc;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use std::future::Future;
use std::io::ErrorKind;
use std::iter::Iterator;

use std::sync::Mutex;

/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
const LOW_WATERMARK: u64 = 2;

/// Pool managers interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const MANAGER_POLL_INTERVAL: u64 = 200;

/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);

/// Pool task frequency variable.
/// Holds scheduled tasks onto the thread pool for the calculation time window.
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

/// Possible max threads (without OS contract).
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);

/// Pool interface between the scheduler and thread pool
struct Pool {
    sender: Sender<LightProc>,
    receiver: Receiver<LightProc>,
}

lazy_static! {
    /// Blocking pool with static starting thread count.
    static ref POOL: Pool = {
        for _ in 0..LOW_WATERMARK {
            thread::Builder::new()
                .name("bastion-blocking-driver".to_string())
                .spawn(|| {
                    for task in &POOL.receiver {
                        task.run();
                    }
                })
                .expect("cannot start a thread driving blocking tasks");
        }

        // Pool manager to check frequency of task rates
        // and take action by scaling the pool accordingly.
        thread::Builder::new()
            .name("bastion-pool-manager".to_string())
            .spawn(|| {
                let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL);
                loop {
                    scale_pool();
                    thread::sleep(poll_interval);
                }
            })
            .expect("thread pool manager cannot be started");

        // We want to use an unbuffered channel here to help
        // us drive our dynamic control. In effect, the
        // kernel's scheduler becomes the queue, reducing
        // the number of buffers that work must flow through
        // before being acted on by a core. This helps keep
        // latency snappy in the overall async system by
        // reducing bufferbloat.
        let (sender, receiver) = bounded(0);
        Pool { sender, receiver }
    };

    /// Sliding window for pool task frequency calculation
    static ref FREQ_QUEUE: Mutex<VecDeque<u64>> = {
        Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1)))
    };

    /// Dynamic pool thread count variable
    static ref POOL_SIZE: Mutex<u64> = Mutex::new(LOW_WATERMARK);
}

/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```text
/// +--------+-----------------+----------------------------------+
/// | Symbol |   Identifier    |           Explanation            |
/// +--------+-----------------+----------------------------------+
/// | α      | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt     | freq            | frequency sample at time t       |
/// | St     | acc             | EMA at time t                    |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```text
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
    freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
        acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
    }) * EMA_COEFFICIENT as f64
}

/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
    // Fetch current frequency, it does matter that operations are ordered in this approach.
    let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst);
    let mut freq_queue = FREQ_QUEUE.lock().unwrap();

    // Make it safe to start for calculations by adding initial frequency scale
    if freq_queue.len() == 0 {
        freq_queue.push_back(0);
    }

    // Calculate message rate for the given time window
    let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64;

    // Calculates current time window's EMA value (including last sample)
    let prev_ema_frequency = calculate_ema(&freq_queue);

    // Add seen frequency data to the frequency histogram.
    freq_queue.push_back(frequency);
    if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
        freq_queue.pop_front();
    }

    // Calculates current time window's EMA value (including last sample)
    let curr_ema_frequency = calculate_ema(&freq_queue);

    // Adapts the thread count of pool
    //
    // Sliding window of frequencies visited by the pool manager.
    // Pool manager creates EMA value for previous window and current window.
    // Compare them to determine scaling amount based on the trends.
    // If current EMA value is bigger, we will scale up.
    if curr_ema_frequency > prev_ema_frequency {
        // "Scale by" amount can be seen as "how much load is coming".
        // "Scale" amount is "how many threads we should spawn".
        let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
        let scale = num_cpus::get()
            .min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize);

        // It is time to scale the pool!
        (0..scale).for_each(|_| {
            create_blocking_thread();
        });
    } else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
        && current_frequency != 0
    {
        // Throughput is low. Allocate more threads to unblock flow.
        // If we fall to this case, scheduler is congested by longhauling tasks.
        // For unblock the flow we should add up some threads to the pool, but not that many to
        // stagger the program's operation.
        (0..LOW_WATERMARK).for_each(|_| {
            create_blocking_thread();
        });
    }
}

/// Creates blocking thread to receive tasks
/// Dynamic threads will terminate themselves if they don't
/// receive any work after between one and ten seconds.
fn create_blocking_thread() {
    // Check that thread is spawnable.
    // If it hits to the OS limits don't spawn it.
    {
        let pool_size = *POOL_SIZE.lock().unwrap();
        if pool_size >= MAX_THREADS.load(Ordering::SeqCst) {
            MAX_THREADS.store(10_000, Ordering::SeqCst);
            return;
        }
    }
    // We want to avoid having all threads terminate at
    // exactly the same time, causing thundering herd
    // effects. We want to stagger their destruction over
    // 10 seconds or so to make the costs fade into
    // background noise.
    //
    // Generate a simple random number of milliseconds
    let rand_sleep_ms = 1000_u64
        .checked_add(u64::from(utils::random(10_000)))
        .expect("shouldn't overflow");

    let _ = thread::Builder::new()
        .name("bastion-blocking-driver-dynamic".to_string())
        .spawn(move || {
            let wait_limit = Duration::from_millis(rand_sleep_ms);

            // Adjust the pool size counter before and after spawn
            *POOL_SIZE.lock().unwrap() += 1;
            while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
                task.run();
            }
            *POOL_SIZE.lock().unwrap() -= 1;
        })
        .map_err(|err| {
            match err.kind() {
                ErrorKind::WouldBlock => {
                    // Maximum allowed threads per process is varying from system to system.
                    // Also, some systems have it(like macOS), and some don't(Linux).
                    // This case expected not to happen.
                    // But when happened this shouldn't throw a panic.
                    let guarded_count = POOL_SIZE
                        .lock()
                        .unwrap()
                        .checked_sub(1)
                        .expect("shouldn't underflow");
                    MAX_THREADS.store(guarded_count, Ordering::SeqCst);
                }
                _ => eprintln!(
                    "cannot start a dynamic thread driving blocking tasks: {}",
                    err
                ),
            }
        });
}

/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: LightProc) {
    // Add up for every incoming scheduled task
    FREQUENCY.fetch_add(1, Ordering::Acquire);

    if let Err(err) = POOL.sender.try_send(t) {
        // We were not able to send to the channel without
        // blocking.
        POOL.sender.send(err.into_inner()).unwrap();
    }
}

/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn_blocking<F, R>(future: F, stack: ProcStack) -> RecoverableHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = LightProc::recoverable(future, schedule, stack);
    task.schedule();
    handle
}