1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/*! A thread pool optimised for bursts of activity.

Consider the following use-case: A single thread produces work which must then be performed by a
pool of worker threads. The work is produced infrequently, but in bursts. Under normal operation,
therefore, the threads in the pool sleep until some event requires many of them to be suddenly be
woken at once. Those threads perform some work before going back to sleep again.

Most thread pools schedule work based on thread readiness. This invariably means that work is
pushed onto a single queue which is shared by all the workers, and the workers steal work from the
queue when they're ready to accept it. This usually works very well, but not in our use-case: since
many threads become runnable at the same time, they immediately contend to read from the queue.

Instead, we use a round-robin scheduling strategy, in which the work-producing thread sends work to
specific workers. This eliminates contention (since each worker has its own queue). The trade-off
is that it performs badly when utilisation is high and the workloads are uneven.

# Usage

Normally a thread pool will spawn a fixed number of worker threads; once the threads are running,
you send closures to the pool which are then executed by one of the workers. `BurstPool`'s API
works a bit differently.

A `BurstPool` is parameterised over the kind of data which will be sent to the workers. (This data
could be boxed closures, if you want to mimic the normal API.) The user provides work by calling
`BurstPool::send()`, and one of the workers is chosen to receive it. Each worker has its own
function which it uses to handle work sent to it. This function is provided when the thread is
spawned.

```
use burst_pool::BurstPool;

let mut pool = BurstPool::new();

// Spawn a worker in the pool
pool.spawn(|x| println!("Received {}!", x));

// Send some work to the worker
pool.send(36).unwrap();
```

# Performance

I'm using the following benchmark:

1. Create a pool with 10 workers.
2. Send 10 pieces of work to the pool.
3. When a thread recieves some work it reads the clock, records its latency, and goes back to
   sleep.

For comparison, the benchmark was replicated using some of the other thread pools on crates.io
(which are not optimised for this use-case), and I also benchmarked the time taken to suddenly
unpark 10 parked threads.

<img src="https://raw.githubusercontent.com/asayers/burst-pool/master/histogram.png" style="margin: 0 auto; display: block;" />

crate               | mean latency | 20 %ⁱˡᵉ | 40 %ⁱˡᵉ  | 60 %ⁱˡᵉ  | 80 %ⁱˡᵉ
--------------------|--------------|---------|----------|----------|-----------
[burst_pool]        | 8.3 μs       | <3.4 μs | <5.1 μs  | <7.6 μs  | <7.6 μs
[threadpool]        | 17.4 μs      | <7.6 μs | <11.4 μs | <17.1 μs | <17.1 μs
[scoped_threadpool] | 18.7 μs      | <7.6 μs | <11.4 μs | <17.1 μs | <17.1 μs
(unpark only)       | 7.7 μs       | <3.4 μs | <5.1 μs  | <7.6 μs  | <7.6 μs

[burst_pool]: https://docs.rs/burst_pool/
[threadpool]: https://docs.rs/threadpool/
[scoped_threadpool]: https://docs.rs/scoped_threadpool/

- The profile of `BurstPool` shows that it doesn't add much latency over just calling `unpark()`.
  Almost all of the time goes to the linux scheduler.
- The mean latency is heavily skewed by outliers, lying above the 80th percentile. You can expect
  latencies better than 7.6 μs most of the time, with the occasional long wait.
- Getting results as good as these is heavily dependent on setting `max_cstate = 0` - this makes a
  huge difference to thread wake-up times.

You can run the benchmarks yourself with `cargo bench`. If your results are significantly worse
than those above, your kernel might be powering down CPU cores too eagerly. If you care about
latency more than battery life, consider setting `max_cstate = 0`.
*/

#[cfg(test)] extern crate env_logger;
#[macro_use] extern crate log;

use std::thread::{self,JoinHandle};
use std::sync::mpsc;

/// A thread pool optimised for bursts of activity.
///
/// A `BurstPool` maintains one unbounded [channel] for each thread. Because these channels have
/// only one producer, the more efficient spsc implementation will be used.
///
/// [channel]: https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html
///
/// Threads spawned in the pool are parked (they do *not* busy-wait) until there is work to be
/// done. When `BurstPool::send()` is called, a thread is chosen, the payload is pushed onto its
/// queue, and it is unparked. Threads are selected round-robin, so uneven processing time is not
/// handled well.
///
/// When the `BurstPool` is dropped, all threads will be signalled to exit. Dropping will then
/// block until the threads are all dead.
///
/// If a thread panics it will go undetected until we attempt to send some work to it. At this
/// point, the thread will be removed from the pool and a warning will be issued, and the work will
/// be sent to the next thread in the ring. Dead threads are not replenished.
///
/// ## Example
///
/// ```
/// use burst_pool::BurstPool;
///
/// let mut pool = BurstPool::new();
///
/// for thread_id in 0..3 {
///     pool.spawn(move|x| {
///         println!("Thread {} received {}", thread_id, x);
///     });
/// }
///
/// for x in 0..5 {
///     pool.send(x).unwrap();
/// }
/// ```
///
/// This will print something like:
///
/// ```none
/// Thread 0 received 0
/// Thread 0 received 3
/// Thread 1 received 1
/// Thread 1 received 4
/// Thread 2 received 2
/// ```
///
pub struct BurstPool<T> {
    threads: Vec<(JoinHandle<()>, mpsc::Sender<T>)>,
    next_thread: usize,
}

impl<T> BurstPool<T> where T: Send {
    /// Create a new empty pool.
    pub fn new() -> BurstPool<T> {
        BurstPool {
            threads: Vec::new(),
            next_thread: 0,
        }
    }

    /// Spawn a thread in the pool.
    ///
    /// The spawned thread blocks until `pool.send()` is called, at which point it may be woken up.
    /// When woken, it runs the given closure with the value it recieved, before going back to
    /// sleep.
    ///
    /// Since you typically don't know which thread a `send()` call will wake up, all workers in
    /// a pool should do pretty similar things with the work they receive. Also, bear in mind that
    /// the `BurstPool`'s scheduling strategy doesn't cope well with uneven processing time -
    /// particularly when one thread is systematically slower than the others.
    pub fn spawn<F>(&mut self, mut consume: F)
            where F: FnMut(T) + Send + 'static, T: 'static {
        let (tx,rx) = mpsc::channel();
        let handle = thread::spawn(move|| {
            loop {
                use self::mpsc::TryRecvError::*;
                match rx.try_recv() {
                    Ok(x) => consume(x),
                    Err(Empty) => thread::park(),  // May unblock spuriously.
                    Err(Disconnected) => break,    // Kill the thread
                }
            }
        });
        self.threads.push((handle, tx));
    }

    /// Send a value to be processed by one of the threads in the pool.
    ///
    /// If there are no threads available to perform the work, an error is returned containing the
    /// unperformed work.
    pub fn send(&mut self, x: T) -> Result<(), BurstError<T>> {
        if self.threads.is_empty() { return Err(BurstError::NoThreads(x)); }
        self.next_thread = self.next_thread % self.threads.len();
        let ret = self.threads[self.next_thread].1.send(x);
        match ret {
            Ok(()) => {
                // Data sent successfully: unpark and advance next_thread.
                self.threads[self.next_thread].0.thread().unpark();
                self.next_thread += 1;
                Ok(())
            },
            Err(mpsc::SendError(x)) => {
                // The thread we tried to send to is dead: remove it from the list.
                // FIXME: We don't seem to reliably take this branch when a worker panics. In such
                // cases we silently drop work!
                let (handle, _) = self.threads.remove(self.next_thread);
                match handle.join() {
                    Ok(()) => unreachable!(),
                    Err(_) => error!("Worker thread panicked! Removing from pool and retrying..."),
                }
                self.send(x)
            },
        }
    }
}

impl<T> Drop for BurstPool<T> {
    /// Signal all the threads in the pool to shut down, and wait for them to do so.
    fn drop(&mut self) {
        for (handle, tx) in self.threads.drain(..) {
            ::std::mem::drop(tx);
            handle.thread().unpark();
            handle.join().unwrap_or_else(|_| error!("Worker thread panicked! Never mind..."));
        }
    }
}

#[derive(Debug)]
pub enum BurstError<T> {
    NoThreads(T),
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_panic_1() {
        // env_logger::init().unwrap();
        let mut pool = BurstPool::new();
        pool.spawn(|_| panic!("whoa!"));
        pool.spawn(|x| println!("got {}", x));
        println!("still ok");
        pool.send(1).unwrap();
        println!("still ok");
    }

    #[test]
    fn test_panic_2() {
        // env_logger::init().unwrap();
        let mut pool = BurstPool::new();
        pool.spawn(|_| panic!("whoa!"));
        println!("still ok");
        pool.send(1).expect("Sending failed");
        println!("still ok");
    }
}