bastion-executor 0.4.0

Cache affine NUMA-aware executor for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! A thread manager to predict how many threads should be spawned to handle the upcoming load.
//!
//! The thread manager consists of three elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 90 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will get rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.

use crate::{load_balancer, placement};
use core::fmt;
use crossbeam_queue::ArrayQueue;
use fmt::{Debug, Formatter};
use lazy_static::lazy_static;
use lever::prelude::TTas;
use placement::CoreId;
use std::collections::VecDeque;
use std::time::Duration;
use std::{
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc, Mutex,
    },
    thread::{self, Thread},
};
use tracing::{debug, trace};

/// The default thread park timeout before checking for new tasks.
const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);

/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

/// If low watermark isn't configured this is the default scaler value.
/// This value is used for the heuristics of the scaler
const DEFAULT_LOW_WATERMARK: u64 = 2;

/// Pool scaler interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const SCALER_POLL_INTERVAL: u64 = 90;

/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);

lazy_static! {
    static ref ROUND_ROBIN_PIN: Mutex<CoreId> = Mutex::new(CoreId { id: 0 });
}

/// The `DynamicRunner` is piloted by `DynamicPoolManager`.
/// Upon request it needs to be able to provide runner routines for:
/// * Static threads.
/// * Dynamic threads.
/// * Standalone threads.
///
/// Your implementation of `DynamicRunner`
/// will allow you to define what tasks must be accomplished.
///
/// Run static threads:
///
/// run_static should never return, and park for park_timeout instead.
///
/// Run dynamic threads:
/// run_dynamic should never return, and call `parker()` when it has no more tasks to process.
/// It will be unparked automatically by the `DynamicPoolManager` if needs be.
///
/// Run standalone threads:
/// run_standalone should return once it has no more tasks to process.
/// The `DynamicPoolManager` will spawn other standalone threads if needs be.
pub trait DynamicRunner {
    fn run_static(&self, park_timeout: Duration) -> !;
    fn run_dynamic(&self, parker: &dyn Fn()) -> !;
    fn run_standalone(&self);
}

/// The `DynamicPoolManager` is responsible for
/// growing and shrinking a pool according to EMA rules.
///
/// It needs to be passed a structure that implements `DynamicRunner`,
/// That will be responsible for actually spawning threads.
///
/// The `DynamicPoolManager` keeps track of the number
/// of required number of threads to process load correctly.
/// and depending on the current state it will case it will:
/// - Spawn a lot of threads (we're predicting a load spike, and we need to prepare for it)
/// - Spawn few threads (there's a constant load, and throughput is low because the current resources are busy)
/// - Do nothing (the load is shrinking, threads will automatically stop once they're done).
///
/// Kinds of threads:
///
/// ## Static threads:
/// Defined in the constructor, they will always be available. They park for `THREAD_PARK_TIMEOUT` on idle.
///
/// ## Dynamic threads:
/// Created during `DynamicPoolManager` initialization, they will park on idle.
/// The `DynamicPoolManager` grows the number of Dynamic threads
/// so the total number of Static threads + Dynamic threads
/// is the number of available cores on the machine. (`num_cpus::get()`)
///
/// ## Standalone threads:
/// They are created when there aren't enough static and dynamic threads to process the expected load.
/// They will be destroyed on idle.
///
/// ## Spawn order:
/// In order to handle a growing load, the pool manager will ask to:
/// - Use Static threads
/// - Unpark Dynamic threads
/// - Spawn Standalone threads
///
/// The pool manager is not responsible for the tasks to be performed by the threads, it's handled by the `DynamicRunner`
///
/// If you use tracing, you can have a look at the trace! logs generated by the structure.
///
pub struct DynamicPoolManager {
    static_threads: usize,
    dynamic_threads: usize,
    parked_threads: ArrayQueue<Thread>,
    runner: Arc<dyn DynamicRunner + Send + Sync>,
    last_frequency: AtomicU64,
    frequencies: TTas<VecDeque<u64>>,
}

impl Debug for DynamicPoolManager {
    fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
        fmt.debug_struct("DynamicPoolManager")
            .field("static_threads", &self.static_threads)
            .field("dynamic_threads", &self.dynamic_threads)
            .field("parked_threads", &self.parked_threads.len())
            .field("parked_threads", &self.parked_threads.len())
            .field("last_frequency", &self.last_frequency)
            .field("frequencies", &self.frequencies.try_lock())
            .finish()
    }
}

impl DynamicPoolManager {
    pub fn new(static_threads: usize, runner: Arc<dyn DynamicRunner + Send + Sync>) -> Self {
        let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0));
        Self {
            static_threads,
            dynamic_threads,
            parked_threads: ArrayQueue::new(dynamic_threads),
            runner,
            last_frequency: AtomicU64::new(0),
            frequencies: TTas::new(VecDeque::with_capacity(
                FREQUENCY_QUEUE_SIZE.saturating_add(1),
            )),
        }
    }

    pub fn increment_frequency(&self) {
        self.last_frequency.fetch_add(1, Ordering::Acquire);
    }

    /// Initialize the dynamic pool
    /// That will be scaled
    pub fn initialize(&'static self) {
        // Static thread manager that will always be available
        trace!("setting up the static thread manager");
        (0..self.static_threads).for_each(|_| {
            let clone = Arc::clone(&self.runner);
            thread::Builder::new()
                .name("bastion-driver-static".to_string())
                .spawn(move || {
                    Self::affinity_pinner();
                    clone.run_static(THREAD_PARK_TIMEOUT);
                })
                .expect("couldn't spawn static thread");
        });

        // Dynamic thread manager that will allow us to unpark threads
        // According to the needs
        trace!("setting up the dynamic thread manager");
        (0..self.dynamic_threads).for_each(|_| {
            let clone = Arc::clone(&self.runner);
            thread::Builder::new()
                .name("bastion-driver-dynamic".to_string())
                .spawn(move || {
                    Self::affinity_pinner();
                    let parker = || self.park_thread();
                    clone.run_dynamic(&parker);
                })
                .expect("cannot start dynamic thread");
        });

        // Pool manager to check frequency of task rates
        // and take action by scaling the pool accordingly.
        thread::Builder::new()
            .name("bastion-pool-manager".to_string())
            .spawn(move || {
                let poll_interval = Duration::from_millis(SCALER_POLL_INTERVAL);
                trace!("setting up the pool manager");
                loop {
                    self.scale_pool();
                    thread::park_timeout(poll_interval);
                }
            })
            .expect("thread pool manager cannot be started");
    }

    /// Provision threads takes a number of threads that need to be made available.
    /// It will try to unpark threads from the dynamic pool, and spawn more threads if needs be.
    pub fn provision_threads(&'static self, n: usize) {
        for i in 0..n {
            if !self.unpark_thread() {
                let new_threads = n - i;
                trace!(
                    "no more threads to unpark, spawning {} new threads",
                    new_threads
                );
                return self.spawn_threads(new_threads);
            }
        }
    }

    fn spawn_threads(&'static self, n: usize) {
        (0..n).for_each(|_| {
            let clone = Arc::clone(&self.runner);
            thread::Builder::new()
                .name("bastion-blocking-driver-standalone".to_string())
                .spawn(move || {
                    Self::affinity_pinner();
                    clone.run_standalone();
                })
                .unwrap();
        })
    }

    /// Parks a thread until unpark_thread unparks it
    pub fn park_thread(&self) {
        let _ = self
            .parked_threads
            .push(std::thread::current())
            .map(|_| {
                trace!("parking thread {:?}", std::thread::current().id());
                std::thread::park();
            })
            .map_err(|e| {
                debug!(
                    "couldn't park thread {:?} - {}",
                    std::thread::current().id(),
                    e
                );
            });
    }

    /// Pops a thread from the parked_threads queue and unparks it.
    /// returns true on success.
    fn unpark_thread(&self) -> bool {
        if self.parked_threads.is_empty() {
            trace!("no parked threads");
            false
        } else {
            trace!("parked_threads: len is {}", self.parked_threads.len());
            self.parked_threads
                .pop()
                .map(|thread| {
                    debug!("Executor: unpark_thread: unparking {:?}", thread.id());
                    thread.unpark();
                })
                .map_err(|e| {
                    debug!("Executor: unpark_thread: couldn't unpark thread - {}", e);
                })
                .is_ok()
        }
    }

    ///
    /// Affinity pinner for blocking pool
    /// Pinning isn't going to be enabled for single core systems.
    #[inline]
    fn affinity_pinner() {
        if 1 != *load_balancer::core_count() {
            let mut core = ROUND_ROBIN_PIN.lock().unwrap();
            placement::set_for_current(*core);
            core.id = (core.id + 1) % *load_balancer::core_count();
        }
    }

    /// Exponentially Weighted Moving Average calculation
    ///
    /// This allows us to find the EMA value.
    /// This value represents the trend of tasks mapped onto the thread pool.
    /// Calculation is following:
    /// ```text
    /// +--------+-----------------+----------------------------------+
    /// | Symbol |   Identifier    |           Explanation            |
    /// +--------+-----------------+----------------------------------+
    /// | α      | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
    /// | Yt     | freq            | frequency sample at time t       |
    /// | St     | acc             | EMA at time t                    |
    /// +--------+-----------------+----------------------------------+
    /// ```
    /// Under these definitions formula is following:
    /// ```text
    /// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
    /// ```
    /// # Arguments
    ///
    /// * `freq_queue` - Sliding window of frequency samples
    #[inline]
    fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
        freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
            acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
        }) * EMA_COEFFICIENT as f64
    }

    /// Adaptive pool scaling function
    ///
    /// This allows to spawn new threads to make room for incoming task pressure.
    /// Works in the background detached from the pool system and scales up the pool based
    /// on the request rate.
    ///
    /// It uses frequency based calculation to define work. Utilizing average processing rate.
    fn scale_pool(&'static self) {
        // Fetch current frequency, it does matter that operations are ordered in this approach.
        let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst);
        let mut freq_queue = self.frequencies.lock();

        // Make it safe to start for calculations by adding initial frequency scale
        if freq_queue.len() == 0 {
            freq_queue.push_back(0);
        }

        // Calculate message rate for the given time window
        let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64;

        // Calculates current time window's EMA value (including last sample)
        let prev_ema_frequency = Self::calculate_ema(&freq_queue);

        // Add seen frequency data to the frequency histogram.
        freq_queue.push_back(frequency);
        if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
            freq_queue.pop_front();
        }

        // Calculates current time window's EMA value (including last sample)
        let curr_ema_frequency = Self::calculate_ema(&freq_queue);

        // Adapts the thread count of pool
        //
        // Sliding window of frequencies visited by the pool manager.
        // Pool manager creates EMA value for previous window and current window.
        // Compare them to determine scaling amount based on the trends.
        // If current EMA value is bigger, we will scale up.
        if curr_ema_frequency > prev_ema_frequency {
            // "Scale by" amount can be seen as "how much load is coming".
            // "Scale" amount is "how many threads we should spawn".
            let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
            let scale = num_cpus::get().min(
                ((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
            );
            trace!("unparking {} threads", scale);

            // It is time to scale the pool!
            self.provision_threads(scale);
        } else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
            && current_frequency != 0
        {
            // Throughput is low. Allocate more threads to unblock flow.
            // If we fall to this case, scheduler is congested by longhauling tasks.
            // For unblock the flow we should add up some threads to the pool, but not that many to
            // stagger the program's operation.
            trace!("unparking {} threads", DEFAULT_LOW_WATERMARK);
            self.provision_threads(DEFAULT_LOW_WATERMARK as usize);
        }
    }
}