Crate pooled_writer

source ·
Expand description

A pooled writer and compressor.

Overview

pooled-writer solves the problem of compressing and writing data to a set of writers using multiple threads, where the number of writers and threads cannot easily be equal. For example writing to hundreds of gzipped files using 16 threads, or writing to a four gzipped files using 32 threads.

To accomplish this, a pool is configured and writers are exchanged for PooledWriters that can be used in place of the original writers. This is accomplished using the PoolBuilder which is the preferred way to configure and create a pool. The Pool and builder require two generic types: the W Writer type and the C compressor type. W may usually be elided if calls to PoolBuilder::exchange may be used to infer the type. C must be specified as something that implements Compressor.

The Pool consists of a single thread pool that consumes work from both a compression queue and a writing queue. All concurrency is managed via message passing over channels.

Every time the internal buffer of a PooledWriter reaches capacity (defined by Compressor::BLOCK_SIZE) it sends two messages:

  1. It sends a message over the corresponding writer’s channel to the writer pool, enqueueing a one-shot receiver channel in the writers queue that will receive the compressed bytes once the compressor is done. This is done to maintain the output order.
  2. It sends a message to the compressor pool that contains a buffer of bytes to compress as well as the sender side of the one-shot channel to send the compressed bytes on.

The threads in the thread pool loop continuously until the pool is shut down, and attempt first receive and compress one block, then secondly to receive and write one compressed block. A third internal channel is used to manage the queue of writes to be performed so that the individual per-writer channels (of which there may be many) are only polled if there is likely to be data available for writing. When data is available to be written, the appropriate underlying writer is locked, and the data written.

When all writing to PooledWriters is complete, the writers should be close()’d or drop()’d and then the pool should be stopped using Pool::stop_pool. Writers that are not closed may have data buffered that is never written!

Pool::stop_pool will shutdown channels in a safe order ensuring that data submitted to the pool is compressed and written before threads are stopped. After initiating the pool shutdown any subsequent attempts to write to PooledWriters will result in errors. Likewise any calls to [PooledWriter:close] that cause data to be flushed into the compression queue will raise errors.

Example

use std::{
    error::Error,
    fs::File,
    io::{BufWriter, Write},
    path::Path,
};

use pooled_writer::{Compressor, PoolBuilder, Pool, bgzf::BgzfCompressor};

type DynError = Box<dyn Error + 'static>;

fn create_writer<P: AsRef<Path>>(name: P) -> Result<BufWriter<File>, DynError> {
    Ok(BufWriter::new(File::create(name)?))
}

fn main() -> Result<(), DynError> {
    let writers = vec![
        create_writer("/tmp/test1.txt.gz")?,
        create_writer("/tmp/test2.txt.gz")?,
        create_writer("/tmp/test3.txt.gz")?,
    ];

    let mut builder = PoolBuilder::<_, BgzfCompressor>::new()
        .threads(8)
        .compression_level(5)?;

   let mut pooled_writers = writers.into_iter().map(|w| builder.exchange(w)).collect::<Vec<_>>();
   let mut pool = builder.build()?;

    writeln!(&mut pooled_writers[1], "This is writer2")?;
    writeln!(&mut pooled_writers[0], "This is writer1")?;
    writeln!(&mut pooled_writers[2], "This is writer3")?;
    pooled_writers.into_iter().try_for_each(|w| w.close())?;
    pool.stop_pool()?;

    Ok(())
}

Modules

Structs

A Pool orchestrates two different threadpools, a compressor pool and a writer pool.
A struct to make building up a Pool simpler. The builder should be constructed using PoolBuilder::new, which provides the user control over the sizes of the queues used for compression and writing. It should be noted that a single compression queue is created, and one writer queue per writer exchanged. A good starting point for these queue sizes is two times the number of threads.
A PooledWriter is created by exchanging a writer with a Pool.

Enums

Represents errors that may be generated by any Pool related functionality.

Traits

A Compressor is used in the compressor pool to compress bytes.