1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
//! A pooled writer and compressor.
//!
//! # Overview
//!
//! A [`Pool`] is created, started, and writers exchanged for [`PooledWriter`]s all at once in [`Pool::new`].
//! When creating a pool via [`Pool::new`] two types must be specified: the `W` Writer type and the `C` compressor type.
//! The `W` may be elided since it's an input. The `C` must be specified as something that implements [`Compressor`].
//! See `examples/ex1.rs`.
//!
//! The [`Pool`] consists of two threadpools, one for compressing and one for writing. All concurrency
//! is managed via message passing over channels.
//!
//! Every time the internal buffer of a [`PooledWriter`] reaches capacity (defined by [`Compressor::BLOCK_SIZE`])
//! it sends two messages:
//! 1. It sends a message over the corresponding writers channel to the writer pool, enqueueing
//!    a one-shot receiver channel in the writers queue that will receive the compressed bytes
//!    once the compressor is done. This is done to maintain the output order.
//! 2. It sends a message to the compressor pool that contains a buffer of bytes to compress
//!    as well as the sender side of the one-shot channel to send the compressed bytes on.
//!
//! The writer threadpool contains a `Vec` of receivers, one for each writer. It loops over
//! this `Vec`, checking to see if the receiver has any messages. If it does, a lock is
//! acquired and that writer's receiver is drained, writing to the underlying writer that was exchanged
//! for the [`PooledWriter`].
//!
//! The compressor threadpool consists of a single receiver that is continually polled for new
//! messages. The messages are processed, the bytes compressed, and then the compressed bytes are
//! sent over the one-shot channel to the corresponding receiver, which is a place-holder receivers
//! in the writer queues.
//!
//! Shutdown of the entire pool is managed via a sentinel value that is checked in the writer loop.
//! If a shutdown has been requested a cascade of channel drops will cleanly disconnect all senders
//! and receivers and any further calls to [`PooledWriter`]s will result in an error.
//!
//! # Example
//!
//! ```rust
//! use std::{
//!     error::Error,
//!     fs::File,
//!     io::{BufWriter, Write},
//!     path::Path,
//! };
//!
//! use pooled_writer::{Compressor, Pool, bgzf::BgzfCompressor};
//!
//! type DynError = Box<dyn Error + 'static>;
//!
//! fn create_writer<P: AsRef<Path>>(name: P) -> Result<BufWriter<File>, DynError> {
//!     Ok(BufWriter::new(File::create(name)?))
//! }
//!
//! fn main() -> Result<(), DynError> {
//!     let writers = vec![
//!         create_writer("./test1.txt.gz")?,
//!         create_writer("./test2.txt.gz")?,
//!         create_writer("./test3.txt.gz")?,
//!     ];
//!     let (mut pool, mut pooled_writers) = Pool::new::<_, BgzfCompressor>(4, 4, 4, writers)?;
//!
//!     writeln!(&mut pooled_writers[1], "This is writer2")?;
//!     writeln!(&mut pooled_writers[0], "This is writer1")?;
//!     writeln!(&mut pooled_writers[2], "This is writer3")?;
//!     pooled_writers.into_iter().try_for_each(|w| w.close())?;
//!     pool.stop_pool()?;
//!
//!     Ok(())
//! }
//! ```
#![forbid(unsafe_code)]
#![allow(
    unused,
    clippy::missing_panics_doc,
    clippy::missing_errors_doc,
    clippy::must_use_candidate,
    clippy::module_name_repetitions
)]

#[cfg(feature = "bgzf_compressor")]
pub mod bgzf;

use std::{
    error::Error,
    io::{self, Read, Write},
    sync::Arc,
    thread::JoinHandle,
};

use bytes::{Bytes, BytesMut};
use flume::{bounded, unbounded, Receiver, Sender};
use parking_lot::{lock_api::RawMutex, Mutex};
use thiserror::Error;

/// 128 KB default buffer size, same as pigz.
pub(crate) const BUFSIZE: usize = 128 * 1024;

/// Convenience type for functions that return [`PoolError`].
type PoolResult<T> = Result<T, PoolError>;

/// Represents errors that may be generated by any `Pool` related functionality.
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum PoolError {
    #[error("Failed to send over channel")]
    ChannelSend,
    #[error(transparent)]
    ChannelReceive(#[from] flume::RecvError),

    // TODO: figure out how to better pass in an generic / dynamic error type to this.
    #[error("Error compressing data: {0}")]
    CompressionError(String),
    #[error(transparent)]
    Io(#[from] io::Error),
}

/// A [`PooledWriter`] is created by exchanging a writer with a [`Pool`].
///
/// The pooled writer will internally buffer writes, sending bytes to the [`Pool`]
/// after the internal buffer has been filled.
///
/// Note that the `pool_tx` channel is shared by all pooled writers, whereas the `writer_tx`
/// is specific to the _underlying_ writer that this pooled writer encapsulates.
#[derive(Debug)]
pub struct PooledWriter {
    /// Channel to send messages containing bytes to compress to the compressors' pool.
    pool_tx: Sender<CompressorMessage>,
    /// Channel to send the receiving end of the one-shot channel that will be
    /// used to send the compressed bytes. This effectively "place holds" the
    /// position of the compressed bytes in the writers queue until the compressed bytes
    /// are ready.
    writer_tx: Sender<Receiver<WriterMessage>>,
    /// The internal buffer to gather bytes to send.
    buffer: BytesMut,
    /// The desired size of the internal buffer.
    buffer_size: usize,
}

impl PooledWriter {
    /// Zip together all writers and senders to create a set of [`PooledWriter`]s.
    ///
    /// # Arguments
    /// - `writers` - The conventional [`Write`] writers that are being exchanged.
    /// - `pool_tx` - The channel to send uncompressed bytes to the compressor pool.
    /// - `writer_txs` - The `Send` ends of the channels that transmit the `Receiver` ends of the one-shot
    ///                  channels, which will be consumed when the compressor sends the compressed bytes.
    fn from_writers<W, C>(
        writers: &[W],
        pool_tx: &Sender<CompressorMessage>,
        writer_txs: &[Sender<Receiver<WriterMessage>>],
    ) -> Vec<Self>
    where
        W: Write + Send + 'static,
        C: Compressor,
    {
        writers
            .iter()
            .zip(writer_txs.iter())
            .map(|(_w, writer_tx)| Self::new::<C>(pool_tx.clone(), writer_tx.clone()))
            .collect()
    }

    /// Create a new [`PooledWriter`] that has an internal buffer capacity that matches [`bgzf::BGZF_BLOCK_SIZE`].
    ///
    /// # Arguments
    /// - `pool_tx` - The channel to send uncompressed bytes to the compressor pool.
    /// - `writer_tx` - The `Send` end of the channel that transmits the `Receiver` end of the one-shot
    ///                 channel, which will be consumed when the compressor sends the compressed bytes.
    fn new<C>(
        pool_tx: Sender<CompressorMessage>,
        writer_tx: Sender<Receiver<WriterMessage>>,
    ) -> Self
    where
        C: Compressor,
    {
        Self {
            pool_tx,
            writer_tx,
            buffer: BytesMut::with_capacity(C::BLOCK_SIZE),
            buffer_size: C::BLOCK_SIZE,
        }
    }

    /// Test whether the internal buffer has reached capacity.
    #[inline]
    fn buffer_full(&self) -> bool {
        self.buffer.len() == self.buffer_size
    }

    /// Send all bytes in the current buffer to the compressor.
    ///
    /// If `is_last` is `true`, the message sent to the compressor will also have the `is_last` true flag set
    /// and the compressor will finish the BGZF stream.
    ///
    /// If `is_last` is not true then only full block will be sent. If `is_last` is true, an incomplete block may be set
    /// as the final block.
    fn flush_bytes(&mut self, is_last: bool) -> std::io::Result<()> {
        if is_last || self.buffer_full() {
            self.send_block(is_last)?;
        }
        Ok(())
    }

    /// Send a single block
    fn send_block(&mut self, is_last: bool) -> std::io::Result<()> {
        let bytes = self.buffer.split_to(self.buffer.len()).freeze();
        let (mut m, r) = CompressorMessage::new_parts(bytes);
        m.is_last = is_last;
        self.writer_tx
            .send(r)
            .map_err(|_e| io::Error::new(io::ErrorKind::Other, PoolError::ChannelSend))?;
        self.pool_tx
            .send(m)
            .map_err(|_e_| io::Error::new(io::ErrorKind::Other, PoolError::ChannelSend))
    }

    /// Flush any remaining bytes and consume self, triggering drops of the senders.
    pub fn close(mut self) -> std::io::Result<()> {
        self.flush_bytes(true)
    }
}

impl Drop for PooledWriter {
    /// Drop [`PooledWriter`].
    ///
    /// This will flush the writer.
    fn drop(&mut self) {
        self.flush_bytes(true).unwrap();
    }
}

impl Write for PooledWriter {
    /// Send all bytes in `buf` to the [`Pool`].
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let mut bytes_added = 0;

        while bytes_added < buf.len() {
            let bytes_to_append =
                std::cmp::min(buf.len() - bytes_added, self.buffer_size - self.buffer.len());

            self.buffer.extend_from_slice(&buf[bytes_added..bytes_added + bytes_to_append]);
            bytes_added += bytes_to_append;
            if self.buffer_full() {
                self.send_block(false)?;
            }
        }

        Ok(buf.len())
    }

    /// Send whatever is in the current buffer even if it is not a full buffer.
    fn flush(&mut self) -> std::io::Result<()> {
        self.flush_bytes(false)
    }
}

/// A [`Compressor`] is used in the compressor pool to compress bytes.
///
/// An implementation must be provided as a type to the [`Pool::new`] function so that the pool
/// knows what kind of compression to use.
///
/// See the module level example for more details.
pub trait Compressor: Sized + Send + 'static
where
    Self::CompressionLevel: Clone + Send + 'static,
    Self::Error: Error + Send + 'static,
{
    type Error;
    type CompressionLevel;

    /// The `BLOCK_SIZE` is used to set the buffer size of the [`PooledWriter`]s and should match the max
    /// size allowed by the block compression format being used.
    const BLOCK_SIZE: usize = 65280;

    /// Create a new compressor with the given compression level.
    fn new(compression_level: Self::CompressionLevel) -> Self;

    /// Create an instance of the compression level.
    ///
    /// The validity of the compression level should be checked here.
    fn new_compression_level(compression_level: u8) -> Result<Self::CompressionLevel, Self::Error>;

    /// Compress a set of bytes into the `output` vec. If `is_last` is true, and depending on the
    /// block compression format, an EOF block may be appended as well.
    fn compress(
        &mut self,
        input: &[u8],
        output: &mut Vec<u8>,
        is_last: bool,
    ) -> Result<(), Self::Error>;
}

/// A message that is sent from a [`PooledWriter`] to the compressor threadpool within a [`Pool`].
#[derive(Debug)]
struct CompressorMessage {
    /// The bytes to compress.
    buffer: Bytes,
    /// Where the compressed bytes will be sent after compression.
    oneshot: Sender<WriterMessage>,
    /// A sentinel value to let the compressor know that the BGZF stream needs an EOF.
    is_last: bool,
}

impl CompressorMessage {
    fn new_parts(buffer: Bytes) -> (Self, Receiver<WriterMessage>) {
        let (tx, rx) = unbounded(); // oneshot channel
        let new = Self { buffer, oneshot: tx, is_last: false };
        (new, rx)
    }
}

/// The compressed bytes to be written to a file.
///
/// This is sent from the compressor threadpool to the writer queue in the writer threadpool
/// via the one-shot channel provided by the [`PooledWriter`].
#[derive(Debug)]
struct WriterMessage {
    buffer: Vec<u8>,
}

/// A [`Pool`] orchestrates two different threadpools, a compressor pool and a writer pool.
///
/// The pool is suitable for scenarios where there are many more writers than threads, efficiently
/// managing resources for M writers to N threads.
#[derive(Debug)]
pub struct Pool {
    /// The join handle for the thread that manages all pool resources and coordination.
    pool_handle: Option<JoinHandle<PoolResult<()>>>,
    /// The send end of the channel for communicating with the compressor pool.
    compressor_tx: Option<Sender<CompressorMessage>>,
    /// The send halves of the channels for the [`PooledWriter`]s to enqueue the one-shot channels.
    writers_txs: Option<Vec<Sender<Receiver<WriterMessage>>>>,
    /// Sentinel channel to tell the pool management thread to shutdown.
    shutdown_tx: Option<Sender<()>>,
}

impl Pool {
    /// Create a running pool along with an associated set of `pooled_writers`.
    ///
    /// # Arguments
    /// - `num_writer_threads` - The number of writer threads to use in the writer pool.
    /// - `num_compressor_threads` - The number of compressor threads to use in the compressor pool.
    /// - `compression_level` - The compression level to use for the [`Compressor`] pool.
    /// - `writers` - The writers to exchange for [`PooledWriter`]s.
    #[allow(clippy::type_complexity, clippy::similar_names)]
    pub fn new<W, C>(
        num_writer_threads: usize,
        num_compressor_threads: usize,
        compression_level: u8,
        writers: Vec<W>,
    ) -> PoolResult<(Self, Vec<PooledWriter>)>
    where
        W: Write + Send + 'static,
        C: Compressor,
    {
        let compression_level = C::new_compression_level(compression_level)
            .map_err(|e| PoolError::CompressionError(e.to_string()))?;
        // Create the channels that the writers will send bytes to be compressed
        let (compressor_tx, compressor_rx) = flume::bounded(num_compressor_threads * 2);
        // Create the channel to gracefully signal a shutdown of the pool
        let (shutdown_tx, shutdown_rx) = flume::unbounded();

        // Create the channels that will enqueue the promise of compressed bytes for the writers
        let (writers_txs, writers_rxs): (
            Vec<Sender<Receiver<WriterMessage>>>,
            Vec<Receiver<Receiver<WriterMessage>>>,
        ) = (0..writers.len()).map(|_| flume::bounded(num_writer_threads * 2)).unzip();

        // Wrap writers to be pooled writers.
        let pooled_writers =
            PooledWriter::from_writers::<W, C>(&writers, &compressor_tx, &writers_txs);

        // Start the pool manager thread and thread pools
        let handle = std::thread::spawn(move || {
            Self::pool_main::<W, C>(
                num_writer_threads,
                num_compressor_threads,
                compression_level,
                compressor_rx,
                writers_rxs,
                writers,
                shutdown_rx,
            )
        });

        let mut pool = Self {
            compressor_tx: Some(compressor_tx),
            writers_txs: Some(writers_txs),
            shutdown_tx: Some(shutdown_tx),
            pool_handle: Some(handle),
        };
        Ok((pool, pooled_writers))
    }

    /// The main "run" method for the pool that orchestrates all the pieces.
    ///
    /// The [`PooledWriter`]s are sending to the compressor, the compressor compresses them, then forwards the compressed bytes.
    /// The bytes are forwarded to a queue per writer and the writer threads are iterating over that queue pulling down
    /// all values in the queue at once and writing till the queue is empty.
    ///
    /// # Arguments
    /// - `num_writer_threads` - The number of writer threads to use in the writer pool.
    /// - `num_compressor_threads` - The number of compressor threads to use in the compressor pool.
    /// - `compression_level` - The compression level to use for the [`Compressor`] pool.
    /// - `rx_compressor` - The receiving end of the channel for communicating with the compressor pool.
    /// - `rxs_writers` - The receive halves of the channels for the [`PooledWriter`]s to enqueue the one-shot channels.
    /// - `writers` - The writers that were exchanged for [`PooledWriter`]s.
    /// - `shutdown_rx` - Sentinel channel to tell the pool management thread to shutdown.
    #[allow(clippy::unnecessary_wraps, clippy::needless_collect, clippy::needless_pass_by_value)]
    fn pool_main<W, C>(
        num_writer_threads: usize,
        num_compressor_threads: usize,
        compression_level: C::CompressionLevel,
        rx_compressor: Receiver<CompressorMessage>,
        rxs_writers: Vec<Receiver<Receiver<WriterMessage>>>, // must be pass by value to allow for easy sharing between threads
        writers: Vec<W>,
        shutdown_rx: Receiver<()>,
    ) -> PoolResult<()>
    where
        W: Write + Send + 'static,
        C: Compressor,
    {
        // Add locks to the writers
        let writers: Arc<Vec<_>> =
            Arc::new(writers.into_iter().map(|w| Arc::new(Mutex::new(w))).collect());

        // Writer threads
        // Compressor threads
        let compressor_handles: Vec<JoinHandle<PoolResult<()>>> = (0..num_compressor_threads)
            .map(|_i| {
                let rx_compressor = rx_compressor.clone();
                let mut compressor = C::new(compression_level.clone());
                std::thread::spawn(move || {
                    while let Ok(message) = rx_compressor.recv() {
                        // Compress the buffer in the message
                        let chunk = &message.buffer;
                        // Compress will correctly resize the compressed vec.
                        let mut compressed = Vec::new();
                        compressor
                            .compress(chunk, &mut compressed, message.is_last)
                            .map_err(|e| PoolError::CompressionError(e.to_string()))?;
                        message
                            .oneshot
                            .send(WriterMessage { buffer: compressed })
                            .map_err(|_e| PoolError::ChannelSend);
                    }
                    Ok(())
                })
            })
            // Collect is needed to force the evaluation of the closure and start the loops
            .collect();

        let writer_handles: Vec<JoinHandle<PoolResult<()>>> = (0..num_writer_threads)
            .map(|_i| {
                let rxs_writers = rxs_writers.clone();
                let shutdown_rx = shutdown_rx.clone();
                let writers = writers.clone();
                std::thread::spawn(move || {
                    let mut i = 0;
                    loop {
                        if i == rxs_writers.len() {
                            i = 0;
                        }

                        // The compiler is better about ignoring the bounds check in this scenario
                        if let Some(rx_writer) = rxs_writers.get(i) {
                            if !rx_writer.is_empty() {
                                let mut writer = writers[i].lock();
                                while let Ok(message) = rx_writer.try_recv() {
                                    let message = message.recv()?;
                                    writer.write_all(&message.buffer)?;
                                }
                            }
                        }

                        i += 1;

                        if shutdown_rx.is_disconnected()
                            && rxs_writers.iter().all(|w| w.is_disconnected() && w.is_empty())
                        {
                            // If all receivers are disconnected (the senders have been dropped, then we are done)
                            break;
                        }
                    }

                    Ok(())
                })
            })
            // Collect is needed to force the evaluation of the closure and start the loops
            .collect();

        // close writer handles
        writer_handles.into_iter().try_for_each(|handle| match handle.join() {
            Ok(result) => result,
            Err(e) => std::panic::resume_unwind(e),
        });
        // Flush each writer
        writers.iter().try_for_each(|w| w.lock().flush())?;
        // close compressor handles
        compressor_handles.into_iter().try_for_each(|handle| match handle.join() {
            Ok(result) => result,
            Err(e) => std::panic::resume_unwind(e),
        });
        Ok(())
    }

    /// Shutdown all pool resources and close all channels.
    ///
    /// Ideally the [`PooledWriter`]s should all have been flushed first, that is up to the user. Any
    /// further attempts to send to the [`Pool`] will return an error.
    pub fn stop_pool(&mut self) -> Result<(), PoolError> {
        let compressor_queue = self.compressor_tx.take().unwrap();
        while !compressor_queue.is_empty() {
            // Wait for compression to finish before dropping the sender
        }
        drop(compressor_queue);

        // Shutdown called to force writers to start checking their receivers for disconnection / empty
        drop(self.shutdown_tx.take());

        // Drop the copy of the writer senders that the pool holds
        // TODO: the pool probably doesn't need these anyways.
        self.writers_txs.take().into_iter().enumerate().for_each(|(i, w)| {
            // Wait for writing to finish
            drop(w);
        });
        // Wait on the pool thread to finish and pull any errors from it
        match self.pool_handle.take().unwrap().join() {
            Ok(result) => result,
            Err(e) => std::panic::resume_unwind(e),
        }
    }
}

impl Drop for Pool {
    fn drop(&mut self) {
        // Check if `stop_pool` has already been called. If it hasn't, call it.
        if self.compressor_tx.is_some() && self.pool_handle.is_some() && self.writers_txs.is_some()
        {
            self.stop_pool().unwrap();
        }
    }
}

#[cfg(test)]
mod test {
    use std::{
        fs::File,
        io::{BufReader, BufWriter},
        path::{Path, PathBuf},
    };

    use crate::bgzf::BgzfCompressor;

    use super::*;
    use ::bgzf::Reader;
    use proptest::prelude::*;
    use tempfile::tempdir;

    fn create_output_writer<P: AsRef<Path>>(path: P) -> BufWriter<File> {
        BufWriter::new(File::create(path).unwrap())
    }

    fn create_output_file_name(name: impl AsRef<Path>, dir: impl AsRef<Path>) -> PathBuf {
        let path = dir.as_ref().to_path_buf();
        path.join(name)
    }

    #[test]
    fn test_simple() {
        let dir = tempdir().unwrap();
        let output_names: Vec<PathBuf> = (0..20)
            .into_iter()
            .map(|i| create_output_file_name(format!("test.{}.txt.gz", i), &dir.path()))
            .collect();
        let output_writers = output_names.iter().map(create_output_writer).collect();

        let (mut pool, mut pooled_writers) =
            Pool::new::<_, BgzfCompressor>(1, 1, 2, output_writers).unwrap();

        for (i, writer) in pooled_writers.iter_mut().enumerate() {
            writer.write_all(format!("This is writer {}.", i).as_bytes()).unwrap();
        }
        pooled_writers.into_iter().try_for_each(|mut w| w.flush()).unwrap();
        pool.stop_pool();

        for (i, path) in output_names.iter().enumerate() {
            let mut reader = Reader::new(BufReader::new(File::open(path).unwrap()));
            let mut actual = vec![];
            reader.read_to_end(&mut actual).unwrap();
            assert_eq!(actual, format!("This is writer {}.", i).as_bytes());
        }
    }

    proptest! {
        // This test takes around 20 minutes on a 32 core machine to run but is very comprehensive.
        // Run with `cargo test -- --ignored`
        #[ignore]
        #[test]
        fn test_complete(
            input_size in 1..=BUFSIZE * 4,
            buf_size in 1..=BUFSIZE,
            num_output_files in 1..2*num_cpus::get(),
            writer_threads in 1..=num_cpus::get(),
            comp_threads in 1..=num_cpus::get(),
            comp_level in 1..=12_u8,
            write_size in 1..=2*BUFSIZE,
        ) {
            let dir = tempdir().unwrap();
            let output_names: Vec<PathBuf> = (0..num_output_files)
                .into_iter()
                .map(|i| create_output_file_name(format!("test.{}.txt.gz", i), &dir.path()))
                .collect();
            let output_writers = output_names.iter().map(create_output_writer).collect();

            let (mut pool, mut pooled_writers) = Pool::new::<_, BgzfCompressor>(writer_threads, comp_threads, comp_level, output_writers).unwrap();

            let inputs: Vec<Vec<u8>> = (0..num_output_files).map(|_| {
                (0..input_size).map(|_| rand::random::<u8>()).collect()
            }).collect();

            let chunks = (input_size as f64 / write_size as f64).ceil() as usize;

            // write a chunk to each writer (could randomly select the writers?)
            for i in (0..chunks) {
                for (j, writer) in pooled_writers.iter_mut().enumerate() {
                    let input = &inputs[j];
                    let bytes = &input[write_size * i..std::cmp::min(write_size * (i + 1), input.len())];
                    writer.write_all(bytes).unwrap()
                }
            }

            pooled_writers.into_iter().try_for_each(|mut w| w.flush()).unwrap();
            pool.stop_pool();

            for (i, path) in output_names.iter().enumerate() {
                let mut reader = Reader::new(BufReader::new(File::open(path).unwrap()));
                let mut actual = vec![];
                reader.read_to_end(&mut actual).unwrap();
                assert_eq!(actual, inputs[i]);
            }

        }
    }
}