Crate shard_csv[][src]

Expand description

A library for sharding delimited data from input streams according to a key selector.

This crate allows you to easily stream data through a ShardedWriter. Each row becomes associated with exactly one shard key and written out to a corresponding output file. Multiple files can be streamed through the same ShardedWriter provided they are of the same schema.

Current Limitations

  • Input and output formats are limited to delimited (eg, CSV, TSV) formats, making use of the csv crate.

Using csv

Because shard-csv intimately deals with CSV data, it pulls in the csv crate and re-exports it for callers as shard_csv::csv:

let mut csv_reader = shard_csv::csv::ReaderBuilder::new()
   .expect("Failed to create reader from file");

Several functions make use of the csv::Reader type, but you can write data without it provided each row is converted to a StringRecord:

let data = vec![
    ["john", "smith", "123 main st"],
    ["jane", "doe", "999 anywhere"],
let it = data.iter().map(|r| StringRecord::from(&r[..]));


A ShardedWriter must know about the header row. This can be accomplished by one of three functions:

let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
   .expect("Failed to create writer builder");

A writer must also know how to shard its data. You provide a closure that, for each record in the file, identifies the shard to which the row belongs as a String. For example, if the first column (index 0) is the identifier, you can get the value or default to the empty string:

let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
   .expect("Failed to create writer builder");
   .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());

Finally, specify how output shard files are named by passing a closure to .with_output_shard_naming. This closure is provided with the shard key and the zero-based sequence number of the shard. That is, the first file written for any given shard will have a sequence of zero; the second file will have a sequence of one, and so on.

Sequence numbers are generated based on FileSplitting, which is specified by .with_output_splitting.

let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
   .expect("Failed to create writer builder");
   .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());
   .with_output_shard_naming(|shard, seq| format!("{}-{}.csv", shard, seq));

At this point, you can pass iterators of records (likely just the csv_reader itself) to the writer:

shard_writer.process_csv(&mut csv_reader).ok();

Additional options

Output Splitting

By default, all rows for a given shard will be written to the same file. If you want to split the output into multiple files, provide details with with_output_splitting:

shard_writer = shard_writer.with_output_splitting(FileSplitting::SplitAfterRows(100));

File completion notification

When a shard is done being written – either becuase the specified number of rows or bytes were met and the writer is splitting to a new file or because the writer itself is being dropped and cleaning up open file handles – you can be notified of the file’s completion. The completed file path and the associated shard key will be provided.

shard_writer = shard_writer.on_file_completion(|path, key| {
    println!("Output file '{}' for key '{}' is complete", path.display(), key);

Alternate file creation

By default, when a new shard file is created, a BufWriter<File> is created automatically. If you want to create your own file (eg, with a GZip stream writer), override this with .on_create_file, which returns a Box<dyn Write> on success:

shard_writer = shard_writer.on_create_file(|path| {
    let f = std::fs::File::create(path)?;
    let gz = flate2::write::GzEncoder::new(f, flate2::Compression::fast());
    let buf = BufWriter::new(gz);


pub use csv;



Defines how output files will be split