blackjack/dataframe/
io.rs

1//!
2//! This module contains the io operators for dealing with DataFrames reading and writing.
3//!
4
5use std::ffi::OsStr;
6use std::path::Path;
7
8use crate::prelude::*;
9
10/// DataFrame reading struct
11///
12/// ## Example
13///
14/// ```
15/// use blackjack::prelude::*;
16///
17/// let path = format!("{}/tests/data/basic_csv.csv", env!("CARGO_MANIFEST_DIR"));
18/// let df = Reader::new(&path).delimiter(b',').read().unwrap();
19///
20/// let col1: &Series<f32> = df.get_column("col1").unwrap();
21/// assert_eq!(col1.sum() as i32, 15);
22///
23/// ```
24#[derive(Clone)]
25pub struct Reader {
26    path: String,
27    delimiter: u8,
28    terminator: csv::Terminator,
29    quote: u8,
30    has_headers: bool,
31    header: Option<Vec<String>>,
32}
33
34/// DataFrame reading struct
35///
36/// ## Example
37/// ```
38/// use blackjack::prelude::*;
39///
40/// let mut df = DataFrame::new();
41///
42/// df.add_column(Series::arange(0, 10));
43/// df.add_column(Series::arange(0, 10));
44///
45/// let result = Writer::new(&"/tmp/test.csv.gz").delimiter(b',').write(df).is_ok(); // Gzip compression inferred.
46/// assert_eq!(result, true);
47/// ```
48#[derive(Clone)]
49pub struct Writer {
50    path: String,
51    delimiter: u8,
52    terminator: csv::Terminator,
53    quote: u8,
54    has_headers: bool,
55}
56
57impl Reader {
58    /// Create a new instance of `Reader` with defaults CSV params
59    pub fn new<S: AsRef<OsStr> + ToString>(path: &S) -> Self {
60        Reader {
61            path: path.to_string(),
62            delimiter: b',',
63            terminator: csv::Terminator::CRLF,
64            quote: b'"',
65            has_headers: true,
66            header: None,
67        }
68    }
69
70    /// Set header, must be set if `has_headers` is false, and ignore if it is true
71    pub fn headers(self, header: Vec<String>) -> Self {
72        let mut rdr = self;
73        rdr.header = Some(header);
74        rdr
75    }
76
77    /// Whether to expect headers in the file or not.
78    pub fn has_headers(self, yes: bool) -> Self {
79        let mut rdr = self;
80        rdr.has_headers = yes;
81        rdr
82    }
83
84    /// Set the CSV quote character, default is `b'"'`
85    pub fn quote(self, quote: u8) -> Self {
86        let mut rdr = self;
87        rdr.quote = quote;
88        rdr
89    }
90
91    /// Set the CSV delimiter, default is `b','` (comma delimited)
92    pub fn delimiter(self, delimiter: u8) -> Self {
93        let mut rdr = self;
94        rdr.delimiter = delimiter;
95        rdr
96    }
97
98    /// Set the CSV line terminator, default treats any of `\r`, `\n` or `\r\n` as a line terminator
99    pub fn terminator(self, terminator: u8) -> Self {
100        let mut rdr = self;
101        rdr.terminator = csv::Terminator::Any(terminator);
102        rdr
103    }
104
105    /// Read a CSV file into a [`DataFrame`] where each column represents a Series
106    /// supports automatic decompression of gzipped files if they end with `.gz`
107    pub fn read(&self) -> Result<DataFrame<i32>, BlackJackError> {
108        use flate2::read::GzDecoder;
109        use std::fs::File;
110        use std::io::prelude::*;
111
112        let p = Path::new(&self.path);
113        let file_reader: Box<Read> = if self.path.to_string().to_lowercase().ends_with(".gz") {
114            // Return a Gzip reader
115            Box::new(GzDecoder::new(File::open(p)?))
116        } else {
117            // Return plain file reader
118            Box::new(File::open(p)?)
119        };
120
121        let mut reader = csv::ReaderBuilder::new()
122            .quote(self.quote)
123            .has_headers(self.has_headers)
124            .delimiter(self.delimiter)
125            .terminator(self.terminator)
126            .from_reader(file_reader);
127
128        let headers: Vec<String> = if self.has_headers {
129            reader
130                .headers()?
131                .clone()
132                .into_iter()
133                .map(|v| v.to_string())
134                .collect()
135        } else {
136            match &self.header {
137                Some(header) => header.to_owned(),
138                None => {
139                    return Err(BlackJackError::ValueError(
140                        r#"Reader specifies file does not have headers,
141                        but no headers were supplied with Reader::header()"#
142                            .to_owned(),
143                    ));
144                }
145            }
146        };
147
148        // Containers for storing column data
149        let mut vecs: Vec<Vec<String>> = (0..headers.len()).map(|_| Vec::new()).collect();
150
151        for record in reader.records() {
152            match record {
153                Ok(rec) => {
154                    for (field, container) in rec.iter().zip(&mut vecs) {
155                        container.push(field.into());
156                    }
157                }
158
159                // TODO: Process for dealing with invalid records.
160                Err(err) => println!("Unable to read record: '{}'", err),
161            }
162        }
163
164        let mut df = DataFrame::new();
165
166        // map headers to vectors containing it's fields in parallel and into
167        // Series structs, parsing each field.
168        // TODO: Parallelize this operation, parse && serialize columns in parallel, then add them.
169        let _ = headers
170            .into_iter()
171            .zip(vecs)
172            .map(|(header, vec)| {
173                let mut series = Series::from_vec(vec);
174                series.set_name(&header);
175                if let Ok(ser) = series.astype::<i32>() {
176                    df.add_column(ser).unwrap();
177                } else if let Ok(ser) = series.astype::<f32>() {
178                    df.add_column(ser).unwrap()
179                } else {
180                    df.add_column(series).unwrap()
181                }
182            })
183            .collect::<Vec<()>>();
184        Ok(df)
185    }
186}
187
188impl Writer {
189    /// Create a new instance of `Reader` with defaults CSV params
190    pub fn new<S: AsRef<OsStr> + ToString>(path: &S) -> Self {
191        Writer {
192            path: path.to_string(),
193            delimiter: b',',
194            terminator: csv::Terminator::CRLF,
195            quote: b'"',
196            has_headers: true,
197        }
198    }
199
200    /// Whether to write headers in the file or not with the dataframe output.
201    pub fn has_headers(self, yes: bool) -> Self {
202        let mut wtr = self;
203        wtr.has_headers = yes;
204        wtr
205    }
206
207    /// Set the CSV quote character, default is `b'"'`
208    pub fn quote(self, quote: u8) -> Self {
209        let mut wtr = self;
210        wtr.quote = quote;
211        wtr
212    }
213
214    /// Set the CSV delimiter, default is `b','` (comma delimited)
215    pub fn delimiter(self, delimiter: u8) -> Self {
216        let mut wtr = self;
217        wtr.delimiter = delimiter;
218        wtr
219    }
220
221    /// Set the CSV line terminator, default treats any of `\r`, `\n` or `\r\n` as a line terminator
222    pub fn terminator(self, terminator: u8) -> Self {
223        let mut wtr = self;
224        wtr.terminator = csv::Terminator::Any(terminator);
225        wtr
226    }
227
228    /// Write a dataframe to CSV, consumes self, and thus will not double memory whilst
229    /// writing to CSV.
230    pub fn write<I: PartialEq + PartialOrd + BlackJackData>(
231        &self,
232        df: DataFrame<I>,
233    ) -> Result<(), BlackJackError> {
234        use flate2::read::GzEncoder;
235        use flate2::Compression;
236        use std::fs::File;
237        use std::io::prelude::*;
238
239        let p = Path::new(&self.path);
240
241        let file_writer: Box<Write> = if self.path.to_string().to_lowercase().ends_with(".gz") {
242            // Return a Gzip reader
243            Box::new(GzEncoder::new(File::create(p)?, Compression::default()))
244        } else {
245            // Return plain file reader
246            Box::new(File::create(p)?)
247        };
248
249        let mut writer = csv::WriterBuilder::new()
250            .delimiter(self.delimiter)
251            .has_headers(self.has_headers)
252            .quote(self.quote)
253            .terminator(self.terminator)
254            .from_writer(file_writer);
255
256        let header = df.columns().map(|v| v.to_string()).collect::<Vec<String>>();
257
258        // Deserialize all series into string vecs
259        let mut data = vec![];
260        for col_name in df.data.keys() {
261            let series_container = df.get_column_infer(col_name.as_str()).unwrap();
262            let string_vec = series_container.into_string_vec();
263            data.push(string_vec);
264        }
265
266        // User might not want to write out headers
267        if self.has_headers {
268            // Write out records
269            writer.write_record(header.as_slice())?;
270        };
271
272        // TODO: Probably a better way to do this?
273        for row_idx in 0..data[0].len() {
274            let mut row = vec![];
275            for column_idx in 0..data.len() {
276                row.push(&data[column_idx][row_idx]);
277            }
278            writer.write_record(row.as_slice())?;
279        }
280
281        Ok(())
282    }
283}