csvsc/flush/
target.rs

1use std::path::{Path, PathBuf};
2use std::fs::{self, File};
3use std::collections::HashMap;
4use std::io;
5
6use csv::{Writer, WriterBuilder};
7
8use crate::{Headers, Row, error};
9
10/// Helper for building a target for flushing data into.
11pub struct Target { }
12
13impl Target {
14    /// Write output to a file whose path is specified by the given column.
15    ///
16    /// This gives the oportunity to create a stream that writes to multiple
17    /// files depending on some criteria. It doesn't write the value of the
18    /// specified column by default, it is just used to decide the path to the
19    /// file(s) for writing but that behaviour is configurable.
20    pub fn from_column(name: &str) -> ColumnTarget {
21        ColumnTarget::new(name.into())
22    }
23
24    /// Write output to the specified path in the filesystem.
25    pub fn path(path: &str) -> impl TargetManager {
26        PathTarget::new(path.into())
27    }
28
29    /// Write output to stdout
30    pub fn stdout() -> impl TargetManager {
31        StdoutTarget::new()
32    }
33
34    /// Write output to stderr
35    pub fn stderr() -> impl TargetManager {
36        StderrTarget::new()
37    }
38}
39
40pub trait TargetManager {
41    fn write_row(&mut self, headers: &Headers, row: &Row) -> error::Result<()>;
42}
43
44pub struct ColumnTarget {
45    colname: String,
46    write_target: bool,
47    targets: HashMap<PathBuf, Writer<File>>,
48}
49
50impl ColumnTarget {
51    fn new(name: String) -> ColumnTarget {
52        ColumnTarget {
53            colname: name,
54            targets: HashMap::new(),
55            write_target: false,
56        }
57    }
58
59    pub fn write_target(self) -> ColumnTarget {
60        ColumnTarget {
61            write_target: true,
62            ..self
63        }
64    }
65}
66
67impl TargetManager for ColumnTarget {
68    fn write_row(&mut self, headers: &Headers, row: &Row) -> error::Result<()> {
69        // can unwrap because we checked the existence of the field
70        // while building the Flush
71        let target_path = PathBuf::from(
72            headers
73                .get_field(row, &self.colname)
74                .ok_or_else(|| error::Error::ColumnNotFound(self.colname.clone()))?
75        );
76
77        let colname = self.colname.clone();
78
79        let writer = if self.targets.contains_key(&target_path) {
80            self.targets.get_mut(&target_path).unwrap()
81        } else {
82            if let Some(dirname) = Path::new(&target_path).parent() {
83                fs::create_dir_all(dirname)?;
84            }
85
86            let mut writer = Writer::from_path(&target_path)?;
87
88            writer.write_record(&if self.write_target {
89                headers.as_row().clone()
90            } else {
91                Row::from(
92                    headers
93                        .as_row()
94                        .iter()
95                        .filter(|col| *col != self.colname)
96                        .collect::<Vec<_>>()
97                )
98            })?;
99
100            self.targets.insert(target_path.to_path_buf(), writer);
101
102            self.targets.get_mut(&target_path).unwrap()
103        };
104
105        writer.write_record(&if self.write_target {
106            row.clone()
107        } else {
108            Row::from(
109                headers
110                    .iter()
111                    .zip(row)
112                    .filter(|(header, _val)| *header != colname)
113                    .map(|(_header, val)| val)
114                    .collect::<Vec<_>>()
115            )
116        })?;
117
118        Ok(())
119    }
120}
121
122struct PathTarget {
123    path: PathBuf,
124    writer: Option<Writer<File>>,
125}
126
127impl PathTarget {
128    fn new(path: PathBuf) -> PathTarget {
129        PathTarget {
130            path,
131            writer: None,
132        }
133    }
134}
135
136impl TargetManager for PathTarget {
137    fn write_row(&mut self, headers: &Headers, row: &Row) -> error::Result<()> {
138        if self.writer.is_none() {
139            if let Some(parent) = self.path.parent() {
140                fs::create_dir_all(parent)?;
141            }
142
143            let mut writer = Writer::from_path(&self.path)?;
144
145            writer.write_record(headers.as_row())?;
146
147            self.writer = Some(writer);
148        }
149
150        let writer = self.writer.as_mut().unwrap();
151
152        writer.write_record(row)?;
153
154        Ok(())
155    }
156}
157
158struct StdoutTarget {
159    writer: Option<Writer<io::Stdout>>,
160}
161
162impl StdoutTarget {
163    fn new() -> StdoutTarget {
164        StdoutTarget {
165            writer: None,
166        }
167    }
168}
169
170impl TargetManager for StdoutTarget {
171    fn write_row(&mut self, headers: &Headers, row: &Row) -> error::Result<()> {
172        if self.writer.is_none() {
173            let mut writer = WriterBuilder::new().from_writer(io::stdout());
174
175            writer.write_record(headers.as_row())?;
176
177            self.writer = Some(writer);
178        }
179
180        let writer = self.writer.as_mut().unwrap();
181
182        writer.write_record(row)?;
183
184        Ok(())
185    }
186}
187
188struct StderrTarget {
189    writer: Option<Writer<io::Stderr>>,
190}
191
192impl StderrTarget {
193    fn new() -> StderrTarget {
194        StderrTarget {
195            writer: None,
196        }
197    }
198}
199
200impl TargetManager for StderrTarget {
201    fn write_row(&mut self, headers: &Headers, row: &Row) -> error::Result<()> {
202        if self.writer.is_none() {
203            let mut writer = WriterBuilder::new().from_writer(io::stderr());
204
205            writer.write_record(headers.as_row())?;
206
207            self.writer = Some(writer);
208        }
209
210        let writer = self.writer.as_mut().unwrap();
211
212        writer.write_record(row)?;
213
214        Ok(())
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::fs::File;
221    use std::io::Read;
222    use std::mem;
223
224    use super::{ColumnTarget, TargetManager};
225
226    use crate::Row;
227
228    #[test]
229    fn test_column_target() {
230        let mut target = ColumnTarget::new("_target".into());
231        let path = "data/col_target_test.csv";
232        let mut contents = String::new();
233
234        target.write_row(&Row::from(vec!["col", "_target"]).into(), &Row::from(vec!["1", path])).unwrap();
235        mem::drop(target);  // Force closing the file
236
237        File::open(path).unwrap().read_to_string(&mut contents).unwrap();
238
239        assert_eq!(contents, "col\n1\n");
240    }
241
242    #[test]
243    fn test_column_target_write_target() {
244        let mut target = ColumnTarget::new("_target".into()).write_target();
245        let path = "data/col_target_test_with_target.csv";
246        let mut contents = String::new();
247
248        target.write_row(&Row::from(vec!["col", "_target"]).into(), &Row::from(vec!["1", path])).unwrap();
249        mem::drop(target);  // Force closing the file
250
251        File::open(path).unwrap().read_to_string(&mut contents).unwrap();
252
253        assert_eq!(contents, format!("col,_target\n1,{}\n", path));
254    }
255}