1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
use crate::{shard, Error, FileSplitting};
use csv::StringRecord;
use std::{
collections::{hash_map::Entry, HashMap},
io::{BufWriter, Write},
path::Path,
rc::Rc,
};
pub struct ShardedWriterBuilder {
header: Option<StringRecord>,
}
impl ShardedWriterBuilder {
/// Start creating a sharded writer for data that don't have a header.
pub fn new_without_header() -> Self {
ShardedWriterBuilder { header: None }
}
/// Start creating a sharded writer for data with the specified `header`.
pub fn new_with_header<T>(header: T) -> Self
where
T: Into<StringRecord>,
{
ShardedWriterBuilder {
header: Some(header.into()),
}
}
/// Start creating a sharded writer from the specified [`csv::Reader`]
///
/// The reader's header settings will be copied over to the sharded writer.
pub fn new_from_csv_reader<T>(csv: &mut csv::Reader<T>) -> Result<Self, Error>
where
T: std::io::Read,
{
let header = if csv.has_headers() {
Some(csv.headers()?.clone())
} else {
None
};
Ok(Self { header })
}
/// Specifies how the input will be sharded.
///
/// Given a row of input, the key selector determines which shard the record belongs in.
pub fn with_key_selector<FKey>(self, key_selector: FKey) -> ShardedWriterWithKey<FKey>
where
FKey: Fn(&StringRecord) -> String,
{
ShardedWriterWithKey {
header: self.header,
key_selector,
}
}
}
pub struct ShardedWriterWithKey<FKey> {
header: Option<StringRecord>,
key_selector: FKey,
}
impl<FKey> ShardedWriterWithKey<FKey>
where
FKey: Fn(&StringRecord) -> String,
{
/// Specifies how output shard files will be named.
///
/// The specified function will be called with the key value (derived from the `key_selector`
/// passed to [`ShardedWriterBuilder::with_key_selector`]) and the current sequence number,
/// which is a zero-based number identifying how many files have been written for this shard.
pub fn with_output_shard_naming<FNameFile>(
self,
create_output_filename: FNameFile,
) -> ShardedWriter<FKey, FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
let ShardedWriterWithKey {
header,
key_selector,
} = self;
ShardedWriter {
header_record: header,
key_selector,
output_splitting: FileSplitting::NoSplit,
output_delimiter: b',',
on_file_completion: None,
create_file_writer: default_create_file_writer,
create_output_filename: Rc::new(create_output_filename),
handles: HashMap::new(),
}
}
}
pub struct ShardedWriter<FKey, FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
/// How the input file should be split
output_splitting: FileSplitting,
/// The field delimiter; default is ','
output_delimiter: u8,
/// A closure that accepts a CSV row and returns a String identifying which shard it belongs to.
key_selector: FKey,
/// An optional header record that will be written to every output file.
header_record: Option<StringRecord>,
/// A function that will be called when an intermediate file is completed
on_file_completion: Option<fn(&Path, &str)>,
create_output_filename: Rc<FNameFile>,
/// A function that creates a writer for a requested output file path
create_file_writer: crate::shard::CreateFileWriter,
/// A mapping of shard keys to the shards that output to files
handles: HashMap<String, shard::Shard<FNameFile>>,
}
impl<FKey, FNameFile> std::fmt::Debug for ShardedWriter<FKey, FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShardedWriter")
.field("output_splitting", &self.output_splitting)
.field("delimiter", &self.output_delimiter)
.finish()
}
}
impl<FKey, FNameFile> ShardedWriter<FKey, FNameFile>
where
FKey: Fn(&StringRecord) -> String,
FNameFile: Fn(&str, usize) -> String,
{
/// Creates a new writer.
///
/// You must specify the directory into which the output will be written, a function
/// that extracts the shard key from a csv [StringRecord], and how output files will
/// be named. The file naming function accepts the shard key and a zero-based number
/// indicating how many files have been created for this shard.
///
/// This function can return an error if the output directory can't be created.
///
/// ```
/// let writer = ShardedWriter::new(
/// "./foo-sharded/",
/// |record| record.get(7).unwrap_or("_unknown").to_string(),
/// |shard, seq| format!("{}-file{}.csv", shard, seq)
/// )?;
/// ```
/// Specifies when sharded output files should be split.
pub fn with_output_splitting(mut self, output_splitting: FileSplitting) -> Self {
self.output_splitting = output_splitting;
self
}
/// Sets the field delimiter to be used for output files. Default is ','.
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.output_delimiter = delimiter;
self
}
/// Sets an optional function that will be called when individual files are completed, either
/// because they have been split by the number of rows or bytes or because processing is
/// complete and the values are being dropped.
pub fn on_file_completion(mut self, f: fn(&Path, &str)) -> Self {
self.on_file_completion = Some(f);
self
}
/// Takes a closure that specifies how to create output files.
///
/// The closure provides the [Path] of the output file to be created. If you don't
/// provide your own way to create output files, the default implementation will simply create
/// a new [BufWriter] for the output file, which is the same as:
///
/// ```
/// my_sharded_writer.on_create_file(|path| Ok(BufWriter::new(File::create(path)?)));
/// ```
///
/// This function may be useful if, for example, you want to inject gzip compression into the
/// output writer.
pub fn on_create_file(mut self, f: fn(&Path) -> std::io::Result<Box<dyn Write>>) -> Self {
self.create_file_writer = f;
self
}
/// Processes the input `filename`, creating output files according to the specified key
/// selector.
///
/// This function will fail if the output directory or an output file can't be created or if a
/// row can't be written. It can also fail if it is called multiple times with files that have
/// different column counts.
///
/// On success, the number of records written is returned.
pub fn process_file(&mut self, filename: &str) -> Result<usize, Error> {
let mut reader = csv::ReaderBuilder::new()
.delimiter(self.output_delimiter)
.has_headers(self.header_record.is_some())
.from_path(filename)?;
let records = reader.records().filter_map(|r| r.ok());
self.process_iter(records)
}
/// Processes the input reader, creating output files as appropriate.
///
/// This function will fail if the output directory or an output file can't be created or if a
/// row can't be written. It can also fail if it is called multiple times with files that have
/// different column counts.
///
/// On success, the number of records written is returned.
pub fn process_csv<T: std::io::Read>(
&mut self,
csv_reader: &mut csv::Reader<T>,
) -> Result<usize, Error> {
let records = csv_reader.records().filter_map(|r| r.ok());
self.process_iter(records)
}
/// Processes an iterator of [std::io::Read], creating output files as appropriate.
pub fn process_reader(&mut self, reader: impl std::io::Read) -> Result<usize, Error> {
let mut reader = csv::ReaderBuilder::new()
.delimiter(self.output_delimiter)
.has_headers(self.header_record.is_some())
.from_reader(reader);
let records = reader.records().filter_map(|r| r.ok());
self.process_iter(records)
}
/// Iterates over every record, calculating the shard key for each, getting or creating the shard file,
/// and writing the record.
pub fn process_iter<T>(&mut self, records: T) -> Result<usize, Error>
where
T: IntoIterator<Item = StringRecord>,
{
let mut records_written = 0;
for record in records {
let key = (self.key_selector)(&record);
match self.handles.entry(key.clone()) {
Entry::Occupied(mut e) => {
e.get_mut().write_record(&record)?;
}
Entry::Vacant(e) => {
let header_record = self.header_record.clone();
let create_output_filename = self.create_output_filename.clone();
let mut shard = shard::Shard::new(
self.output_splitting,
key,
header_record,
self.create_file_writer,
create_output_filename,
self.on_file_completion,
);
shard.write_record(&record)?;
e.insert(shard);
}
};
records_written += 1;
}
Ok(records_written)
}
/// Checks if `key` has been seen in the processed data.
pub fn is_shard_key_seen(&self, key: &str) -> bool {
self.handles.contains_key(key)
}
/// Returns a vec of all keys that have been seen.
pub fn shard_keys_seen(&self) -> Vec<String> {
self.handles.keys().cloned().collect()
}
}
/// The standard approach to creating a file writer -- create and buffer it.
///
/// To do something different (such as gzipping output), [ShardedWriter::on_create_file]
/// is passed an alternate function with this signature.
fn default_create_file_writer(path: &Path) -> std::io::Result<Box<dyn Write>> {
let writer = std::fs::File::create(path)?;
let buf = BufWriter::new(writer);
Ok(Box::new(buf))
}