Expand description

A pooled writer and compressor.

Overview

A Pool is created, started, and writers exchanged for PooledWriters all at once in Pool::new. When creating a pool via Pool::new two types must be specified: the W Writer type and the C compressor type. The W may be elided since it’s an input. The C must be specified as something that implements Compressor. See examples/ex1.rs.

The Pool consists of two threadpools, one for compressing and one for writing. All concurrency is managed via message passing over channels.

Every time the internal buffer of a PooledWriter reaches capacity (defined by Compressor::BLOCK_SIZE) it sends two messages:

  1. It sends a message over the corresponding writers channel to the writer pool, enqueueing a one-shot receiver channel in the writers queue that will receive the compressed bytes once the compressor is done. This is done to maintain the output order.
  2. It sends a message to the compressor pool that contains a buffer of bytes to compress as well as the sender side of the one-shot channel to send the compressed bytes on.

The writer threadpool contains a Vec of receivers, one for each writer. It loops over this Vec, checking to see if the receiver has any messages. If it does, a lock is acquired and that writer’s receiver is drained, writing to the underlying writer that was exchanged for the PooledWriter.

The compressor threadpool consists of a single receiver that is continually polled for new messages. The messages are processed, the bytes compressed, and then the compressed bytes are sent over the one-shot channel to the corresponding receiver, which is a place-holder receivers in the writer queues.

Shutdown of the entire pool is managed via a sentinel value that is checked in the writer loop. If a shutdown has been requested a cascade of channel drops will cleanly disconnect all senders and receivers and any further calls to PooledWriters will result in an error.

Example

use std::{
    error::Error,
    fs::File,
    io::{BufWriter, Write},
    path::Path,
};

use pooled_writer::{Compressor, Pool, bgzf::BgzfCompressor};

type DynError = Box<dyn Error + 'static>;

fn create_writer<P: AsRef<Path>>(name: P) -> Result<BufWriter<File>, DynError> {
    Ok(BufWriter::new(File::create(name)?))
}

fn main() -> Result<(), DynError> {
    let writers = vec![
        create_writer("./test1.txt.gz")?,
        create_writer("./test2.txt.gz")?,
        create_writer("./test3.txt.gz")?,
    ];
    let (mut pool, mut pooled_writers) = Pool::new::<_, BgzfCompressor>(4, 4, 4, writers)?;

    writeln!(&mut pooled_writers[1], "This is writer2")?;
    writeln!(&mut pooled_writers[0], "This is writer1")?;
    writeln!(&mut pooled_writers[2], "This is writer3")?;
    pooled_writers.into_iter().try_for_each(|w| w.close())?;
    pool.stop_pool()?;

    Ok(())
}

Modules

Structs

A Pool orchestrates two different threadpools, a compressor pool and a writer pool.

A PooledWriter is created by exchanging a writer with a Pool.

Enums

Represents errors that may be generated by any Pool related functionality.

Traits

A Compressor is used in the compressor pool to compress bytes.