shard_csv/
lib.rs

1//! A library for sharding delimited data from input streams according to a key selector.
2//!
3//! This crate allows you to easily stream data through a [`ShardedWriter`]. Each row
4//! becomes associated with exactly one shard key and written out to a corresponding
5//! output file. Multiple files can be streamed through the same `ShardedWriter`
6//! provided they are of the same schema.
7//!
8//! # Current Limitations
9//! * Input and output formats are limited to delimited (eg, CSV, TSV) formats, making
10//!   use of the [`csv` crate](https://crates.io/crates/csv).
11//!
12//! # Using `csv`
13//! Because `shard-csv` intimately deals with CSV data, it pulls in the `csv` crate and
14//! re-exports it for callers as `shard_csv::csv`:
15//!
16//! ```
17//! let mut csv_reader = shard_csv::csv::ReaderBuilder::new()
18//!    .delimiter(b',')
19//!    .has_headers(true)
20//!    .from_path("foo.csv")
21//!    .expect("Failed to create reader from file");
22//! ```
23//!
24//! Several functions make use of the `csv::Reader` type, but you can write data without
25//! it provided each row is converted to a `StringRecord`:
26//! 
27//! ```
28//! let data = vec![
29//!     ["john", "smith", "123 main st"],
30//!     ["jane", "doe", "999 anywhere"],
31//! ];
32//! let it = data.iter().map(|r| StringRecord::from(&r[..]));
33//! shard_writer.process_iter(it).ok();
34//! ```
35//! 
36//! # Usage
37//! A `ShardedWriter` must know about the header row. This can be accomplished by one of
38//! three functions:
39//! * [`ShardedWriterBuilder::new_without_header`] -- No header exists on input, and none
40//!   will be written on output.
41//! * [`ShardedWriterBuilder::new_with_header`] -- The input data contain a header which
42//!   will be written to each output sharded file.
43//! * [`ShardedWriterBuilder::new_from_csv_reader`] -- The presence or absence of a header
44//!   is determined from a provided `csv::Reader` by way of its `.headers()` function.
45//!
46//! ```
47//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
48//!    .expect("Failed to create writer builder");
49//! ```
50//!
51//! A writer must also know how to shard its data. You provide a closure that, for each
52//! record in the file, identifies the shard to which the row belongs as a String. For
53//! example, if the first column (index 0) is the identifier, you can get the value or
54//! default to the empty string:
55//!
56//! ```
57//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
58//!    .expect("Failed to create writer builder");
59//!    .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());
60//! ```
61//!
62//! Finally, specify how output shard files are named by passing a closure to
63//! `.with_output_shard_naming`. This closure is provided with the shard key and the
64//! zero-based sequence number of the shard. That is, the first file written for any
65//! given shard will have a sequence of zero; the second file will have a sequence of one,
66//! and so on.
67//!
68//! Sequence numbers are generated based on [FileSplitting], which is specified by
69//! `.with_output_splitting`.
70//!
71//! ```
72//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
73//!    .expect("Failed to create writer builder");
74//!    .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());
75//!    .with_output_shard_naming(|shard, seq| format!("{}-{}.csv", shard, seq));
76//! ```
77//!
78//! At this point, you can pass iterators of records (likely just the `csv_reader` itself)
79//! to the writer:
80//!
81//! ```
82//! shard_writer.process_csv(&mut csv_reader).ok();
83//! ```
84//!
85//! # Additional options
86//! ## Output Splitting
87//! By default, all rows for a given shard will be written to the same file. If you want
88//! to split the output into multiple files, provide details with `with_output_splitting`:
89//!
90//! ```
91//! shard_writer = shard_writer.with_output_splitting(FileSplitting::SplitAfterRows(100));
92//! ```
93//!
94//! ## File completion notification
95//! When a shard is done being written -- either becuase the specified number of rows or
96//! bytes were met and the writer is splitting to a new file or because the writer itself
97//! is being dropped and cleaning up open file handles -- you can be notified of the file's
98//! completion. The completed file path and the associated shard key will be provided.
99//!
100//! ```
101//! shard_writer = shard_writer.on_file_completion(|path, key| {
102//!     println!("Output file '{}' for key '{}' is complete", path.display(), key);
103//! });
104//! ```
105//!
106//! ## Alternate file creation
107//! By default, when a new shard file is created, a `BufWriter<File>` is created
108//! automatically. If you want to create your own file (eg, with a GZip stream writer),
109//! override this with `.on_create_file`, which returns a `Box<dyn Write>` on success:
110//! ```
111//! shard_writer = shard_writer.on_create_file(|path| {
112//!     let f = std::fs::File::create(path)?;
113//!     let gz = flate2::write::GzEncoder::new(f, flate2::Compression::fast());
114//!     let buf = BufWriter::new(gz);
115//!     Ok(Box::new(buf))
116//! });
117//! ```
118mod shard;
119mod sharded_writer;
120pub use csv;
121pub use sharded_writer::*;
122
123/// Defines how output files will be split
124#[derive(Clone, Copy, Debug)]
125pub enum FileSplitting {
126    /// Output files won't be split
127    NoSplit,
128
129    /// Output files will be split after at least some number of rows are written
130    SplitAfterRows(usize),
131
132    /// Output files will be split after at least some number of bytes are written
133    SplitAfterBytes(usize),
134}
135
136#[derive(Debug)]
137pub enum Error {
138    Csv(csv::Error),
139    IO(std::io::Error),
140}
141
142impl From<csv::Error> for Error {
143    fn from(e: csv::Error) -> Self {
144        Error::Csv(e)
145    }
146}
147
148impl From<std::io::Error> for Error {
149    fn from(e: std::io::Error) -> Self {
150        Error::IO(e)
151    }
152}