1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! A simple adaptive threadpool that returns a oneshot future.

use std::{
    collections::VecDeque,
    thread,
    time::{Duration, Instant},
};

use parking_lot::{Condvar, Mutex};

use crate::{
    debug_delay, warn, AtomicBool, AtomicUsize, Lazy, OneShot, Relaxed, SeqCst,
};

const MAX_THREADS: usize = 128;
const MIN_THREADS: usize = 2;

static STANDBY_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
static TOTAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);

type Work = Box<dyn FnOnce() + Send + 'static>;

struct Queue {
    cv: Condvar,
    mu: Mutex<VecDeque<Work>>,
}

impl Queue {
    fn recv_timeout(&self, duration: Duration) -> Option<Work> {
        let mut queue = self.mu.lock();

        let cutoff = Instant::now() + duration;

        while queue.is_empty() {
            let res = self.cv.wait_until(&mut queue, cutoff);
            if res.timed_out() {
                break;
            }
        }

        queue.pop_front()
    }

    fn try_recv(&self) -> Option<Work> {
        let mut queue = self.mu.lock();
        queue.pop_front()
    }

    fn send(&self, work: Work) {
        let mut queue = self.mu.lock();
        queue.push_back(work);
        self.cv.notify_one();
    }
}

static QUEUE: Lazy<Queue, fn() -> Queue> = Lazy::new(init_queue);

fn init_queue() -> Queue {
    maybe_spawn_new_thread();
    Queue { cv: Condvar::new(), mu: Mutex::new(VecDeque::new()) }
}

fn perform_work() {
    let wait_limit = Duration::from_secs(1);

    while STANDBY_THREAD_COUNT.load(SeqCst) < MIN_THREADS {
        debug_delay();
        STANDBY_THREAD_COUNT.fetch_add(1, SeqCst);

        debug_delay();
        let task_res = QUEUE.recv_timeout(wait_limit);

        debug_delay();
        if STANDBY_THREAD_COUNT.fetch_sub(1, SeqCst) <= MIN_THREADS {
            maybe_spawn_new_thread();
        }

        if let Some(task) = task_res {
            (task)();
        }

        debug_delay();
        while let Some(task) = QUEUE.try_recv() {
            (task)();
            debug_delay();
        }

        debug_delay();
    }
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after one second.
fn maybe_spawn_new_thread() {
    debug_delay();
    let total_workers = TOTAL_THREAD_COUNT.load(SeqCst);
    debug_delay();
    let standby_workers = STANDBY_THREAD_COUNT.load(SeqCst);
    if standby_workers >= MIN_THREADS || total_workers >= MAX_THREADS {
        return;
    }

    let spawn_res =
        thread::Builder::new().name("sled-io".to_string()).spawn(|| {
            debug_delay();
            TOTAL_THREAD_COUNT.fetch_add(1, SeqCst);
            perform_work();
            TOTAL_THREAD_COUNT.fetch_sub(1, SeqCst);
        });

    if let Err(e) = spawn_res {
        once!({
            warn!(
                "Failed to dynamically increase the threadpool size: {:?}. \
                 Currently have {} running IO threads",
                e, total_workers
            )
        });
    }
}

/// Spawn a function on the threadpool.
pub fn spawn<F, R>(work: F) -> OneShot<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let (promise_filler, promise) = OneShot::pair();
    let task = move || {
        let result = (work)();
        promise_filler.fill(result);
    };

    QUEUE.send(Box::new(task));

    maybe_spawn_new_thread();

    promise
}