shard_csv/lib.rs
1//! A library for sharding delimited data from input streams according to a key selector.
2//!
3//! This crate allows you to easily stream data through a [`ShardedWriter`]. Each row
4//! becomes associated with exactly one shard key and written out to a corresponding
5//! output file. Multiple files can be streamed through the same `ShardedWriter`
6//! provided they are of the same schema.
7//!
8//! # Current Limitations
9//! * Input and output formats are limited to delimited (eg, CSV, TSV) formats, making
10//! use of the [`csv` crate](https://crates.io/crates/csv).
11//!
12//! # Using `csv`
13//! Because `shard-csv` intimately deals with CSV data, it pulls in the `csv` crate and
14//! re-exports it for callers as `shard_csv::csv`:
15//!
16//! ```
17//! let mut csv_reader = shard_csv::csv::ReaderBuilder::new()
18//! .delimiter(b',')
19//! .has_headers(true)
20//! .from_path("foo.csv")
21//! .expect("Failed to create reader from file");
22//! ```
23//!
24//! Several functions make use of the `csv::Reader` type, but you can write data without
25//! it provided each row is converted to a `StringRecord`:
26//!
27//! ```
28//! let data = vec![
29//! ["john", "smith", "123 main st"],
30//! ["jane", "doe", "999 anywhere"],
31//! ];
32//! let it = data.iter().map(|r| StringRecord::from(&r[..]));
33//! shard_writer.process_iter(it).ok();
34//! ```
35//!
36//! # Usage
37//! A `ShardedWriter` must know about the header row. This can be accomplished by one of
38//! three functions:
39//! * [`ShardedWriterBuilder::new_without_header`] -- No header exists on input, and none
40//! will be written on output.
41//! * [`ShardedWriterBuilder::new_with_header`] -- The input data contain a header which
42//! will be written to each output sharded file.
43//! * [`ShardedWriterBuilder::new_from_csv_reader`] -- The presence or absence of a header
44//! is determined from a provided `csv::Reader` by way of its `.headers()` function.
45//!
46//! ```
47//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
48//! .expect("Failed to create writer builder");
49//! ```
50//!
51//! A writer must also know how to shard its data. You provide a closure that, for each
52//! record in the file, identifies the shard to which the row belongs as a String. For
53//! example, if the first column (index 0) is the identifier, you can get the value or
54//! default to the empty string:
55//!
56//! ```
57//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
58//! .expect("Failed to create writer builder");
59//! .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());
60//! ```
61//!
62//! Finally, specify how output shard files are named by passing a closure to
63//! `.with_output_shard_naming`. This closure is provided with the shard key and the
64//! zero-based sequence number of the shard. That is, the first file written for any
65//! given shard will have a sequence of zero; the second file will have a sequence of one,
66//! and so on.
67//!
68//! Sequence numbers are generated based on [FileSplitting], which is specified by
69//! `.with_output_splitting`.
70//!
71//! ```
72//! let mut shard_writer = ShardedWriterBuilder::new_from_csv_reader(&mut csv_reader)
73//! .expect("Failed to create writer builder");
74//! .with_key_selector(|rec| rec.get(0).unwrap_or("unknown").to_owned());
75//! .with_output_shard_naming(|shard, seq| format!("{}-{}.csv", shard, seq));
76//! ```
77//!
78//! At this point, you can pass iterators of records (likely just the `csv_reader` itself)
79//! to the writer:
80//!
81//! ```
82//! shard_writer.process_csv(&mut csv_reader).ok();
83//! ```
84//!
85//! # Additional options
86//! ## Output Splitting
87//! By default, all rows for a given shard will be written to the same file. If you want
88//! to split the output into multiple files, provide details with `with_output_splitting`:
89//!
90//! ```
91//! shard_writer = shard_writer.with_output_splitting(FileSplitting::SplitAfterRows(100));
92//! ```
93//!
94//! ## File completion notification
95//! When a shard is done being written -- either becuase the specified number of rows or
96//! bytes were met and the writer is splitting to a new file or because the writer itself
97//! is being dropped and cleaning up open file handles -- you can be notified of the file's
98//! completion. The completed file path and the associated shard key will be provided.
99//!
100//! ```
101//! shard_writer = shard_writer.on_file_completion(|path, key| {
102//! println!("Output file '{}' for key '{}' is complete", path.display(), key);
103//! });
104//! ```
105//!
106//! ## Alternate file creation
107//! By default, when a new shard file is created, a `BufWriter<File>` is created
108//! automatically. If you want to create your own file (eg, with a GZip stream writer),
109//! override this with `.on_create_file`, which returns a `Box<dyn Write>` on success:
110//! ```
111//! shard_writer = shard_writer.on_create_file(|path| {
112//! let f = std::fs::File::create(path)?;
113//! let gz = flate2::write::GzEncoder::new(f, flate2::Compression::fast());
114//! let buf = BufWriter::new(gz);
115//! Ok(Box::new(buf))
116//! });
117//! ```
118mod shard;
119mod sharded_writer;
120pub use csv;
121pub use sharded_writer::*;
122
123/// Defines how output files will be split
124#[derive(Clone, Copy, Debug)]
125pub enum FileSplitting {
126 /// Output files won't be split
127 NoSplit,
128
129 /// Output files will be split after at least some number of rows are written
130 SplitAfterRows(usize),
131
132 /// Output files will be split after at least some number of bytes are written
133 SplitAfterBytes(usize),
134}
135
136#[derive(Debug)]
137pub enum Error {
138 Csv(csv::Error),
139 IO(std::io::Error),
140}
141
142impl From<csv::Error> for Error {
143 fn from(e: csv::Error) -> Self {
144 Error::Csv(e)
145 }
146}
147
148impl From<std::io::Error> for Error {
149 fn from(e: std::io::Error) -> Self {
150 Error::IO(e)
151 }
152}