Crate pooled_writer
source · [−]Expand description
A pooled writer and compressor.
Overview
A Pool is created, started, and writers exchanged for PooledWriters all at once in Pool::new.
When creating a pool via Pool::new two types must be specified: the W Writer type and the C compressor type.
The W may be elided since it’s an input. The C must be specified as something that implements Compressor.
See examples/ex1.rs.
The Pool consists of two threadpools, one for compressing and one for writing. All concurrency
is managed via message passing over channels.
Every time the internal buffer of a PooledWriter reaches capacity (defined by Compressor::BLOCK_SIZE)
it sends two messages:
- It sends a message over the corresponding writers channel to the writer pool, enqueueing a one-shot receiver channel in the writers queue that will receive the compressed bytes once the compressor is done. This is done to maintain the output order.
- It sends a message to the compressor pool that contains a buffer of bytes to compress as well as the sender side of the one-shot channel to send the compressed bytes on.
The writer threadpool contains a Vec of receivers, one for each writer. It loops over
this Vec, checking to see if the receiver has any messages. If it does, a lock is
acquired and that writer’s receiver is drained, writing to the underlying writer that was exchanged
for the PooledWriter.
The compressor threadpool consists of a single receiver that is continually polled for new messages. The messages are processed, the bytes compressed, and then the compressed bytes are sent over the one-shot channel to the corresponding receiver, which is a place-holder receivers in the writer queues.
Shutdown of the entire pool is managed via a sentinel value that is checked in the writer loop.
If a shutdown has been requested a cascade of channel drops will cleanly disconnect all senders
and receivers and any further calls to PooledWriters will result in an error.
Example
use std::{
error::Error,
fs::File,
io::{BufWriter, Write},
path::Path,
};
use pooled_writer::{Compressor, Pool, bgzf::BgzfCompressor};
type DynError = Box<dyn Error + 'static>;
fn create_writer<P: AsRef<Path>>(name: P) -> Result<BufWriter<File>, DynError> {
Ok(BufWriter::new(File::create(name)?))
}
fn main() -> Result<(), DynError> {
let writers = vec![
create_writer("./test1.txt.gz")?,
create_writer("./test2.txt.gz")?,
create_writer("./test3.txt.gz")?,
];
let (mut pool, mut pooled_writers) = Pool::new::<_, BgzfCompressor>(4, 4, 4, writers)?;
writeln!(&mut pooled_writers[1], "This is writer2")?;
writeln!(&mut pooled_writers[0], "This is writer1")?;
writeln!(&mut pooled_writers[2], "This is writer3")?;
pooled_writers.into_iter().try_for_each(|w| w.close())?;
pool.stop_pool()?;
Ok(())
}Modules
Structs
A PooledWriter is created by exchanging a writer with a Pool.
Enums
Represents errors that may be generated by any Pool related functionality.
Traits
A Compressor is used in the compressor pool to compress bytes.