pooled-writer 0.4.0

Library for using N threads to write to M compressed files/writers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
//! A pooled writer and compressor.
//!
//! # Overview
//!
//! `pooled-writer` solves the problem of compressing and writing data to a set of writers using
//! multiple threads, where the number of writers and threads cannot easily be equal.  For example
//! writing to hundreds of gzipped files using 16 threads, or writing to a four gzipped files
//! using 32 threads.
//!
//! To accomplish this, a pool is configured and writers are exchanged for [`PooledWriter`]s
//! that can be used in place of the original writers.  This is accomplished using the
//! [`PoolBuilder`] which is the preferred way to configure and create a pool.  The [`Pool`] and
//! builder require two generic types: the `W` Writer type and the `C` compressor type. `W` may
//! usually be elided if calls to [`PoolBuilder::exchange`] may be used to infer the type. `C`
//! must be specified as something that implements [`Compressor`].
//!
//! The [`Pool`] consists of a single thread pool that consumes work from both a compression queue
//! and a writing queue.  All concurrency is managed via message passing over channels.
//!
//! Every time the internal buffer of a [`PooledWriter`] reaches capacity (defined by
//! [`Compressor::BLOCK_SIZE`]) it sends two messages:
//! 1. It sends a message over the corresponding writer's channel to the writer pool, enqueueing
//!    a one-shot receiver channel in the writers queue that will receive the compressed bytes
//!    once the compressor is done. This is done to maintain the output order.
//! 2. It sends a message to the compressor pool that contains a buffer of bytes to compress
//!    as well as the sender side of the one-shot channel to send the compressed bytes on.
//!
//! The threads in the thread pool loop continuously until the pool is shut down, and attempt
//! first receive and compress one block, then secondly to receive and write one compressed block.
//! A third internal channel is used to manage the queue of writes to be performed so that the
//! individual per-writer channels (of which there may be many) are only polled if there is likely
//! to be data available for writing.  When data is available to be written, the appropriate
//! underlying writer is locked, and the data written.
//!
//! When all writing to [`PooledWriter`]s is complete, the writers should be close()'d or drop()'d
//! and then the pool should be stopped using [`Pool::stop_pool`].  Writers that are not closed
//! may have data buffered that is never written!  
//!
//! [`Pool::stop_pool`] will shutdown channels in a safe order ensuring that data submitted to the
//! pool is compressed and written before threads are stopped.  After initiating the pool shutdown
//! any subsequent attempts to write to [`PooledWriter`]s will result in errors.  Likewise any
//! calls to [`PooledWriter:close`] that cause data to be flushed into the compression queue will
//! raise errors.
//!
//! # Example
//!
//! ```rust
//! use std::{
//!     error::Error,
//!     fs::File,
//!     io::{BufWriter, Write},
//!     path::Path,
//! };
//!
//! use pooled_writer::{Compressor, PoolBuilder, Pool, bgzf::BgzfCompressor};
//!
//! type DynError = Box<dyn Error + 'static>;
//!
//! fn create_writer<P: AsRef<Path>>(name: P) -> Result<BufWriter<File>, DynError> {
//!     Ok(BufWriter::new(File::create(name)?))
//! }
//!
//! fn main() -> Result<(), DynError> {
//!     let writers = vec![
//!         create_writer("/tmp/test1.txt.gz")?,
//!         create_writer("/tmp/test2.txt.gz")?,
//!         create_writer("/tmp/test3.txt.gz")?,
//!     ];
//!
//!     let mut builder = PoolBuilder::<_, BgzfCompressor>::new()
//!         .threads(8)
//!         .compression_level(5)?;
//!
//!    let mut pooled_writers = writers.into_iter().map(|w| builder.exchange(w)).collect::<Vec<_>>();
//!    let mut pool = builder.build()?;
//!
//!     writeln!(&mut pooled_writers[1], "This is writer2")?;
//!     writeln!(&mut pooled_writers[0], "This is writer1")?;
//!     writeln!(&mut pooled_writers[2], "This is writer3")?;
//!     pooled_writers.into_iter().try_for_each(|w| w.close())?;
//!     pool.stop_pool()?;
//!
//!     Ok(())
//! }
//! ```
#![forbid(unsafe_code)]
#![allow(
    unused,
    clippy::missing_panics_doc,
    clippy::missing_errors_doc,
    clippy::must_use_candidate,
    clippy::module_name_repetitions
)]

#[cfg(feature = "bgzf_compressor")]
pub mod bgzf;

use std::time::Duration;
use std::{
    error::Error,
    io::{self, Read, Write},
    sync::Arc,
    thread::JoinHandle,
};

use bytes::{Bytes, BytesMut};
use flume::{self, Receiver, Sender, bounded};
use parking_lot::{Mutex, lock_api::RawMutex};
use thiserror::Error;

/// 128 KB default buffer size, same as pigz.
pub(crate) const BUFSIZE: usize = 128 * 1024;

/// Convenience type for functions that return [`PoolError`].
type PoolResult<T> = Result<T, PoolError>;

/// Represents errors that may be generated by any `Pool` related functionality.
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum PoolError {
    #[error("Failed to send over channel")]
    ChannelSend,
    #[error(transparent)]
    ChannelReceive(#[from] flume::RecvError),

    // TODO: figure out how to better pass in an generic / dynamic error type to this.
    #[error("Error compressing data: {0}")]
    CompressionError(String),
    #[error(transparent)]
    Io(#[from] io::Error),
}

////////////////////////////////////////////////////////////////////////////////
// The PooledWriter and it's impls
////////////////////////////////////////////////////////////////////////////////

/// A [`PooledWriter`] is created by exchanging a writer with a [`Pool`].
///
/// The pooled writer will internally buffer writes, sending bytes to the [`Pool`]
/// after the internal buffer has been filled.
///
/// Note that the `compressor_tx` channel is shared by all pooled writers, whereas the `writer_tx`
/// is specific to the _underlying_ writer that this pooled writer encapsulates.
#[derive(Debug)]
pub struct PooledWriter {
    /// The index/serial number of the pooled writer within the pool
    writer_index: usize,
    /// Channel to send messages containing bytes to compress to the compressors' pool.
    compressor_tx: Sender<CompressorMessage>,
    /// Channel to send the receiving end of the one-shot channel that will be
    /// used to send the compressed bytes. This effectively "place holds" the
    /// position of the compressed bytes in the writers queue until the compressed bytes
    /// are ready.
    writer_tx: Sender<oneshot::Receiver<WriterMessage>>,
    /// The internal buffer to gather bytes to send.
    buffer: BytesMut,
    /// The desired size of the internal buffer.
    buffer_size: usize,
}

impl PooledWriter {
    /// Create a new [`PooledWriter`] that has an internal buffer capacity that matches [`bgzf::BGZF_BLOCK_SIZE`].
    ///
    /// # Arguments
    /// - `index` - a usize representing that this is the nth pooled writer created within the pool
    /// - `compressor_tx` - The channel to send uncompressed bytes to the compressor pool.
    /// - `writer_tx` - The `Send` end of the channel that transmits the `Receiver` end of the one-shot
    ///                 channel, which will be consumed when the compressor sends the compressed bytes.
    fn new<C>(
        index: usize,
        compressor_tx: Sender<CompressorMessage>,
        writer_tx: Sender<oneshot::Receiver<WriterMessage>>,
    ) -> Self
    where
        C: Compressor,
    {
        Self {
            writer_index: index,
            compressor_tx,
            writer_tx,
            buffer: BytesMut::with_capacity(C::BLOCK_SIZE),
            buffer_size: C::BLOCK_SIZE,
        }
    }

    /// Test whether the internal buffer has reached capacity.
    #[inline]
    fn buffer_full(&self) -> bool {
        self.buffer.len() == self.buffer_size
    }

    /// Send all bytes in the current buffer to the compressor.
    ///
    /// If `is_last` is `true`, the message sent to the compressor will also have the `is_last` true flag set
    /// and the compressor will finish the BGZF stream.
    ///
    /// If `is_last` is not true then only full block will be sent. If `is_last` is true, an incomplete block may be set
    /// as the final block.
    fn flush_bytes(&mut self, is_last: bool) -> std::io::Result<()> {
        if is_last || self.buffer_full() {
            self.send_block(is_last)?;
        }
        Ok(())
    }

    /// Send a single block
    fn send_block(&mut self, is_last: bool) -> std::io::Result<()> {
        let bytes = self.buffer.split_to(self.buffer.len()).freeze();
        let (mut m, r) = CompressorMessage::new_parts(self.writer_index, bytes);
        m.is_last = is_last;
        self.writer_tx
            .send(r)
            .map_err(|_e| io::Error::new(io::ErrorKind::Other, PoolError::ChannelSend))?;
        self.compressor_tx
            .send(m)
            .map_err(|_e_| io::Error::new(io::ErrorKind::Other, PoolError::ChannelSend))
    }

    /// Flush any remaining bytes and consume self, triggering drops of the senders.
    pub fn close(mut self) -> std::io::Result<()> {
        self.flush_bytes(true)
    }
}

impl Drop for PooledWriter {
    /// Drop [`PooledWriter`].
    ///
    /// This will flush the writer.
    fn drop(&mut self) {
        self.flush_bytes(true).unwrap();
    }
}

impl Write for PooledWriter {
    /// Send all bytes in `buf` to the [`Pool`].
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let mut bytes_added = 0;

        while bytes_added < buf.len() {
            let bytes_to_append =
                std::cmp::min(buf.len() - bytes_added, self.buffer_size - self.buffer.len());

            self.buffer.extend_from_slice(&buf[bytes_added..bytes_added + bytes_to_append]);
            bytes_added += bytes_to_append;
            if self.buffer_full() {
                self.send_block(false)?;
            }
        }

        Ok(buf.len())
    }

    /// Send whatever is in the current buffer even if it is not a full buffer.
    fn flush(&mut self) -> std::io::Result<()> {
        self.flush_bytes(false)
    }
}

////////////////////////////////////////////////////////////////////////////////
// The Compressor trait
////////////////////////////////////////////////////////////////////////////////

/// A [`Compressor`] is used in the compressor pool to compress bytes.
///
/// An implementation must be provided as a type to the [`Pool::new`] function so that the pool
/// knows what kind of compression to use.
///
/// See the module level example for more details.
pub trait Compressor: Sized + Send + 'static
where
    Self::CompressionLevel: Clone + Send + 'static,
    Self::Error: Error + Send + 'static,
{
    type Error;
    type CompressionLevel;

    /// The `BLOCK_SIZE` is used to set the buffer size of the [`PooledWriter`]s and should match the max
    /// size allowed by the block compression format being used.
    const BLOCK_SIZE: usize = 65280;

    /// Create a new compressor with the given compression level.
    fn new(compression_level: Self::CompressionLevel) -> Self;

    /// Returns the default compression level for the compressor.
    fn default_compression_level() -> Self::CompressionLevel;

    /// Create an instance of the compression level.
    ///
    /// The validity of the compression level should be checked here.
    fn new_compression_level(compression_level: u8) -> Result<Self::CompressionLevel, Self::Error>;

    /// Compress a set of bytes into the `output` vec. If `is_last` is true, and depending on the
    /// block compression format, an EOF block may be appended as well.
    fn compress(
        &mut self,
        input: &[u8],
        output: &mut Vec<u8>,
        is_last: bool,
    ) -> Result<(), Self::Error>;
}

////////////////////////////////////////////////////////////////////////////////
// The messages passed between threads
////////////////////////////////////////////////////////////////////////////////

/// A message that is sent from a [`PooledWriter`] to the compressor threadpool within a [`Pool`].
struct CompressorMessage {
    /// The index of the destination writer
    writer_index: usize,
    /// The bytes to compress.
    buffer: Bytes,
    /// Where the compressed bytes will be sent after compression.
    oneshot_tx: oneshot::Sender<WriterMessage>,
    /// A sentinel value to let the compressor know that the BGZF stream needs an EOF.
    is_last: bool,
}

impl CompressorMessage {
    fn new_parts(writer_index: usize, buffer: Bytes) -> (Self, oneshot::Receiver<WriterMessage>) {
        let (tx, rx) = oneshot::channel();
        let new = Self { writer_index, buffer, oneshot_tx: tx, is_last: false };
        (new, rx)
    }
}

/// The compressed bytes to be written to a file.
///
/// This is sent from the compressor threadpool to the writer queue in the writer threadpool
/// via the one-shot channel provided by the [`PooledWriter`].
#[derive(Debug)]
struct WriterMessage {
    buffer: Vec<u8>,
}

/// Internal enum used by worker threads to dispatch between compression and write work.
enum WorkItem {
    Compress(CompressorMessage),
    Write(usize),
}

////////////////////////////////////////////////////////////////////////////////
// The PoolBuilder struct and impls
////////////////////////////////////////////////////////////////////////////////

/// A struct to make building up a Pool simpler.  The builder should be constructed using
/// [`PoolBuilder::new`], which provides the user control over the sizes of the queues used for
/// compression and writing.  It should be noted that a single compression queue is created,
/// and one writer queue per writer exchanged.  A good starting point for these queue sizes is
/// two times the number of threads.
///
/// Once created various functions can configure aspects of the pool.  It is best practice, though
/// not required, to configure the builder _before_ exchanging writers.  The exception is
/// `queue_size` that may _not_ be set after any writers have been exchanged.  If not set manually
/// then `queue_size` defaults to the number of threads multiplied by
/// [`PoolBuilder::QUEUE_SIZE_THREAD_MULTIPLES`].
///
/// Once the builder is configured writers may be exchanged for [`PooledWriter`]s using the
/// [`PoolBuilder::exchange`] function, which consumes the provided writer and returns a new
/// writer that can be used in it's place.
///
/// After exchanging all writers the pool may be created and started with [`PoolBuilder::build`]
/// which consumes the builder and after which no more writers may be exchanged.
pub struct PoolBuilder<W, C>
where
    W: Write + Send + 'static,
    C: Compressor,
{
    writer_index: usize,
    compression_level: C::CompressionLevel,
    queue_size: Option<usize>,
    threads: usize,
    compressor_tx: Option<Sender<CompressorMessage>>,
    compressor_rx: Option<Receiver<CompressorMessage>>,
    writers: Vec<W>,
    writer_txs: Vec<Sender<oneshot::Receiver<WriterMessage>>>,
    writer_rxs: Vec<Receiver<oneshot::Receiver<WriterMessage>>>,
}

impl<W, C> PoolBuilder<W, C>
where
    W: Write + Send + 'static,
    C: Compressor,
{
    /// By default queue sizes will be set to threads * this constant.
    pub const QUEUE_SIZE_THREAD_MULTIPLES: usize = 50;

    /// The default number of threads that will be used if not otherwise configured
    pub const DEFAULT_THREADS: usize = 4;

    /// Creates a new PoolBuilder that can be used to configure and build a [`Pool`].
    pub fn new() -> Self {
        PoolBuilder {
            writer_index: 0,
            compression_level: C::default_compression_level(),
            queue_size: None,
            threads: Self::DEFAULT_THREADS,
            compressor_tx: None,
            compressor_rx: None,
            writers: vec![],
            writer_txs: vec![],
            writer_rxs: vec![],
        }
    }

    /// Sets the number of threads that will be used by the [[Pool]].
    ///
    /// Will panic if set to 0.
    pub fn threads(mut self, threads: usize) -> Self {
        assert!(threads > 0, "Must provide a number of threads greater than 0.");
        self.threads = threads;
        self
    }

    /// Sets the size of queues used by the pool [[Pool]].  The same size is used for
    /// a) the queue of byte buffers to be compressed, b) the per-sample queues to receive
    /// compressed bytes, and c) a control queue to manage writing to the underlying writers.
    ///
    /// In the worst case scenario the pool can be holding both queue_size uncompressed blocks
    /// _and_ queue_size compressed blocks in memory when it cannot keep up with the incoming
    /// load of writes.
    ///
    ///
    ///
    /// Will panic if called _after_ writers have been created because queues will already have
    /// been created.
    pub fn queue_size(mut self, queue_size: usize) -> Self {
        assert!(self.writers.is_empty(), "Cannot set queue_size after writers are exchanged.");
        self.queue_size.insert(queue_size);
        self
    }

    /// Sets the compression level that will be used by the [[Pool]].
    pub fn compression_level(mut self, level: u8) -> PoolResult<Self> {
        self.compression_level = C::new_compression_level(level)
            .map_err(|e| PoolError::CompressionError(e.to_string()))?;
        Ok(self)
    }

    /// If queues/channels are not yet setup, initialize them.
    fn ensure_queue_is_setup(&mut self) {
        if self.compressor_tx.is_none() && self.compressor_rx.is_none() {
            if self.queue_size.is_none() {
                self.queue_size.insert(self.threads * Self::QUEUE_SIZE_THREAD_MULTIPLES);
            }

            let (tx, rx) = bounded(self.queue_size.unwrap());
            self.compressor_tx.insert(tx);
            self.compressor_rx.insert(rx);
        }
    }

    /// Exchanges a writer for a [[PooledWriter]].
    pub fn exchange(&mut self, writer: W) -> PooledWriter {
        // Make sure queue/channel configuration is done
        self.ensure_queue_is_setup();

        let (tx, rx): (
            Sender<oneshot::Receiver<WriterMessage>>,
            Receiver<oneshot::Receiver<WriterMessage>>,
        ) = flume::bounded(self.queue_size.expect("Unreachable"));

        let p = PooledWriter::new::<C>(
            self.writer_index,
            self.compressor_tx.as_ref().expect("Unreachable").clone(),
            tx.clone(),
        );

        self.writer_index += 1;
        self.writers.push(writer);
        self.writer_txs.push(tx);
        self.writer_rxs.push(rx);
        p
    }

    /// Consumes the builder and generates the [[Pool]] ready for use.
    pub fn build(mut self) -> PoolResult<Pool> {
        // Make sure the queue/channel configuration is done - this could be necessary if
        // a pool is created by zero writers exchanged.
        self.ensure_queue_is_setup();

        // Create the channel to gracefully signal a shutdown of the pool
        let (shutdown_tx, shutdown_rx) = flume::unbounded();

        // Start the pool manager thread and thread pools
        let handle = std::thread::spawn(move || {
            Pool::pool_main::<W, C>(
                self.threads,
                self.compression_level,
                self.compressor_rx.expect("Unreachable."),
                self.writer_rxs,
                self.writers,
                shutdown_rx,
            )
        });

        let mut pool = Pool {
            compressor_tx: self.compressor_tx,
            shutdown_tx: Some(shutdown_tx),
            pool_handle: Some(handle),
        };

        Ok(pool)
    }
}

impl<W, C> Default for PoolBuilder<W, C>
where
    W: Write + Send + 'static,
    C: Compressor,
{
    fn default() -> Self {
        Self::new()
    }
}

////////////////////////////////////////////////////////////////////////////////
// The Pool struct and impls
////////////////////////////////////////////////////////////////////////////////

/// A [`Pool`] orchestrates two different threadpools, a compressor pool and a writer pool.
///
/// The pool is suitable for scenarios where there are many more writers than threads, efficiently
/// managing resources for M writers to N threads.
#[derive(Debug)]
pub struct Pool {
    /// The join handle for the thread that manages all pool resources and coordination.
    pool_handle: Option<JoinHandle<PoolResult<()>>>,
    /// The send end of the channel for communicating with the compressor pool.
    compressor_tx: Option<Sender<CompressorMessage>>,
    /// Sentinel channel to tell the pool management thread to shutdown.
    shutdown_tx: Option<Sender<()>>,
}

impl Pool {
    /// The main "run" method for the pool that orchestrates all the pieces.
    ///
    /// The [`PooledWriter`]s are sending to the compressor, the compressor compresses them, then forwards the compressed bytes.
    /// The bytes are forwarded to a queue per writer and the writer threads are iterating over that queue pulling down
    /// all values in the queue at once and writing till the queue is empty.
    ///
    /// # Arguments
    /// - `num_threads` - The number of threads to use.
    /// - `compression_level` - The compression level to use for the [`Compressor`] pool.
    /// - `compressor_rx ` - The receiving end of the channel for communicating with the compressor pool.
    /// - `writer_rxs ` - The receive halves of the channels for the [`PooledWriter`]s to enqueue the one-shot channels.
    /// - `writers` - The writers that were exchanged for [`PooledWriter`]s.
    /// - `shutdown_rx` - Sentinel channel to tell the pool management thread to shutdown.
    #[allow(clippy::unnecessary_wraps, clippy::needless_collect, clippy::needless_pass_by_value)]
    fn pool_main<W, C>(
        num_threads: usize,
        compression_level: C::CompressionLevel,
        compressor_rx: Receiver<CompressorMessage>,
        writer_rxs: Vec<Receiver<oneshot::Receiver<WriterMessage>>>, // must be pass by value to allow for easy sharing between threads
        writers: Vec<W>,
        shutdown_rx: Receiver<()>,
    ) -> PoolResult<()>
    where
        W: Write + Send + 'static,
        C: Compressor,
    {
        // Add locks to the writers
        let writers: Arc<Vec<_>> =
            Arc::new(writers.into_iter().map(|w| Arc::new(Mutex::new(w))).collect());

        // Generate one more channel for queuing up information about when a writer has data
        // available to be written
        let (write_available_tx, write_available_rx): (Sender<usize>, Receiver<usize>) =
            flume::unbounded();

        let thread_handles: Vec<JoinHandle<PoolResult<()>>> = (0..num_threads)
            .map(|thread_idx| {
                let compressor_rx = compressor_rx.clone();
                let mut compressor = C::new(compression_level.clone());
                let writer_rxs = writer_rxs.clone();
                let writers = writers.clone();
                let shutdown_rx = shutdown_rx.clone();
                let write_available_tx = write_available_tx.clone();
                let write_available_rx = write_available_rx.clone();
                let select_timeout = Duration::from_millis(100);

                std::thread::spawn(move || {
                    // Reuse a single compression buffer per thread to avoid
                    // re-allocating ~70KB on every block.
                    let mut compress_buf = Vec::new();

                    loop {
                        // Try non-blocking receives first (fast path under load).
                        // Then fall through to Selector which blocks until work
                        // arrives, avoiding the old sleep(25ms) polling delay.
                        let item = if let Ok(msg) = compressor_rx.try_recv() {
                            Some(WorkItem::Compress(msg))
                        } else if let Ok(idx) = write_available_rx.try_recv() {
                            Some(WorkItem::Write(idx))
                        } else {
                            flume::Selector::new()
                                .recv(&compressor_rx, |r| r.ok().map(WorkItem::Compress))
                                .recv(&write_available_rx, |r| r.ok().map(WorkItem::Write))
                                .wait_timeout(select_timeout)
                                .ok()
                                .flatten()
                        };

                        match item {
                            Some(WorkItem::Compress(message)) => {
                                let chunk = &message.buffer;
                                compress_buf.clear();
                                compressor
                                    .compress(chunk, &mut compress_buf, message.is_last)
                                    .map_err(|e| PoolError::CompressionError(e.to_string()))?;
                                message
                                    .oneshot_tx
                                    .send(WriterMessage { buffer: compress_buf.clone() })
                                    .map_err(|_e| PoolError::ChannelSend);
                                write_available_tx.send(message.writer_index);
                            }
                            Some(WorkItem::Write(writer_index)) => {
                                let mut writer = writers[writer_index].lock();
                                let writer_rx = &writer_rxs[writer_index];
                                let one_shot_rx = writer_rx.recv()?;
                                let write_message =
                                    one_shot_rx.recv().map_err(|_| PoolError::ChannelSend)?;
                                writer.write_all(&write_message.buffer)?;
                            }
                            None => {
                                // Timeout or channel disconnect. Check if all work
                                // is drained and shutdown was requested.
                                if shutdown_rx.is_disconnected()
                                    && write_available_rx.is_empty()
                                    && compressor_rx.is_empty()
                                    && writer_rxs.iter().all(|w| w.is_empty())
                                {
                                    break;
                                }
                            }
                        }
                    }

                    Ok(())
                })
            })
            .collect();

        // Close writer handles
        thread_handles.into_iter().try_for_each(|handle| match handle.join() {
            Ok(result) => result,
            Err(e) => std::panic::resume_unwind(e),
        });

        // Flush each writer
        writers.iter().try_for_each(|w| w.lock().flush())?;

        Ok(())
    }

    /// Shutdown all pool resources and close all channels.
    ///
    /// Ideally the [`PooledWriter`]s should all have been flushed first, that is up to the user. Any
    /// further attempts to send to the [`Pool`] will return an error.
    pub fn stop_pool(&mut self) -> Result<(), PoolError> {
        // Drop the compressor sender to disconnect the channel.  Buffered
        // messages are preserved by flume and will be drained by the worker
        // threads before they observe the disconnect and shut down.
        drop(self.compressor_tx.take().unwrap());

        // Shutdown called to force writers to start checking their receivers for disconnection / empty
        drop(self.shutdown_tx.take());

        // Wait on the pool thread to finish and pull any errors from it
        match self.pool_handle.take().unwrap().join() {
            Ok(result) => result,
            Err(e) => std::panic::resume_unwind(e),
        }
    }
}

impl Drop for Pool {
    fn drop(&mut self) {
        // Check if `stop_pool` has already been called. If it hasn't, call it.
        if self.compressor_tx.is_some() && self.pool_handle.is_some() {
            self.stop_pool().unwrap();
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////////////

#[cfg(test)]
mod test {
    use std::{
        assert_eq, format,
        fs::File,
        io::{BufReader, BufWriter},
        path::{Path, PathBuf},
        vec,
    };

    use crate::bgzf::BgzfCompressor;

    use super::*;
    use ::bgzf::Reader;
    use proptest::prelude::*;
    use tempfile::tempdir;

    fn create_output_writer<P: AsRef<Path>>(path: P) -> BufWriter<File> {
        BufWriter::new(File::create(path).unwrap())
    }

    fn create_output_file_name(name: impl AsRef<Path>, dir: impl AsRef<Path>) -> PathBuf {
        let path = dir.as_ref().to_path_buf();
        path.join(name)
    }

    #[test]
    fn test_simple() {
        let dir = tempdir().unwrap();
        let output_names: Vec<PathBuf> = (0..20)
            .map(|i| create_output_file_name(format!("test.{}.txt.gz", i), dir.path()))
            .collect();

        let output_writers: Vec<BufWriter<File>> =
            output_names.iter().map(create_output_writer).collect();
        let mut builder =
            PoolBuilder::<_, BgzfCompressor>::new().threads(8).compression_level(2).unwrap();
        let mut pooled_writers: Vec<PooledWriter> =
            output_writers.into_iter().map(|w| builder.exchange(w)).collect();
        let mut pool = builder.build().unwrap();

        for (i, writer) in pooled_writers.iter_mut().enumerate() {
            writer.write_all(format!("This is writer {}.", i).as_bytes()).unwrap();
        }
        pooled_writers.into_iter().try_for_each(|mut w| w.flush()).unwrap();
        pool.stop_pool();

        for (i, path) in output_names.iter().enumerate() {
            let mut reader = Reader::new(BufReader::new(File::open(path).unwrap()));
            let mut actual = vec![];
            reader.read_to_end(&mut actual).unwrap();
            assert_eq!(actual, format!("This is writer {}.", i).as_bytes());
        }
    }

    proptest! {
        // This test takes around 20 minutes on a 32 core machine to run but is very comprehensive.
        // Run with `cargo test -- --ignored`
        #[ignore]
        #[test]
        fn test_complete(
            input_size in 1..=BUFSIZE * 4,
            buf_size in 1..=BUFSIZE,
            num_output_files in 1..2*num_cpus::get(),
            threads in 1..=2+num_cpus::get(),
            comp_level in 1..=8_u8,
            write_size in 1..=2*BUFSIZE,
        ) {
            let dir = tempdir().unwrap();
            let output_names: Vec<PathBuf> = (0..num_output_files)
                .map(|i| create_output_file_name(format!("test.{}.txt.gz", i), dir.path()))
                .collect();
            let output_writers: Vec<_> = output_names.iter().map(create_output_writer).collect();

            let mut builder = PoolBuilder::<_, BgzfCompressor>::new()
                .threads(threads)
                .compression_level(comp_level)?;

            let mut pooled_writers: Vec<_> = output_writers.into_iter().map(|w| builder.exchange(w)).collect();
            let mut pool = builder.build()?;

            let inputs: Vec<Vec<u8>> = (0..num_output_files).map(|_| {
                (0..input_size).map(|_| rand::random::<u8>()).collect()
            }).collect();

            let chunks = (input_size as f64 / write_size as f64).ceil() as usize;

            // write a chunk to each writer (could randomly select the writers?)
            for i in (0..chunks) {
                for (j, writer) in pooled_writers.iter_mut().enumerate() {
                    let input = &inputs[j];
                    let bytes = &input[write_size * i..std::cmp::min(write_size * (i + 1), input.len())];
                    writer.write_all(bytes).unwrap()
                }
            }

            pooled_writers.into_iter().try_for_each(|mut w| w.flush()).unwrap();
            pool.stop_pool();

            for (i, path) in output_names.iter().enumerate() {
                let mut reader = Reader::new(BufReader::new(File::open(path).unwrap()));
                let mut actual = vec![];
                reader.read_to_end(&mut actual).unwrap();
                assert_eq!(actual, inputs[i]);
            }

        }
    }
}