sorer/
dataframe.rs

1//! This module defines functions to parse a `SoR` file into a columnar
2//! format as a `Vec<Column>`.
3
4use crate::parsers::parse_line_with_schema;
5use crate::schema::DataType;
6use deepsize::DeepSizeOf;
7use serde::{Deserialize, Serialize};
8use std::convert::{From, TryFrom};
9use std::fmt;
10use std::fs::File;
11use std::io::{BufRead, BufReader, Seek, SeekFrom, Split};
12use std::thread;
13
14/// Represents a column of parsed data from a `SoR` file.
15#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
16pub enum Column {
17    /// A Column consisting of optional `i64`s.
18    Int(Vec<Option<i64>>),
19    /// A Column consisting of optional `bool`s.
20    Bool(Vec<Option<bool>>),
21    /// A Column consisting of optional `f64`s.
22    Float(Vec<Option<f64>>),
23    /// A Column consisting of optional `String`s.
24    String(Vec<Option<String>>),
25}
26
27impl Column {
28    pub fn len(&self) -> usize {
29        match &self {
30            &Column::Bool(col) => col.len(),
31            &Column::Int(col) => col.len(),
32            &Column::Float(col) => col.len(),
33            &Column::String(col) => col.len(),
34        }
35    }
36}
37
38/// An enumeration of the possible `SoR` data types, that also contains the
39/// data itself.
40#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
41pub enum Data {
42    /// A `String` cell.
43    String(String),
44    /// A `i64` cell.
45    Int(i64),
46    /// A `f64` cell.
47    Float(f64),
48    /// A `bool` cell.
49    Bool(bool),
50    /// A missing value.
51    Null,
52}
53
54impl Data {
55    /// Get the data assuming its a String
56    pub fn unwrap_string(&self) -> String {
57        match self {
58            Data::String(s) => s.clone(),
59            _ => panic!("unwrap error"),
60        }
61    }
62
63    /// Get the data assuming its an int
64    pub fn unwrap_int(&self) -> i64 {
65        match self {
66            Data::Int(n) => *n,
67            _ => panic!("unwrap error"),
68        }
69    }
70
71    /// Get the data assuming its a float
72    pub fn unwrap_float(&self) -> f64 {
73        match self {
74            Data::Float(n) => *n,
75            _ => panic!("unwrap error"),
76        }
77    }
78
79    /// Get the data assuming its a bool
80    pub fn unwrap_bool(&self) -> bool {
81        match self {
82            Data::Bool(n) => *n,
83            _ => panic!("unwrap error"),
84        }
85    }
86}
87
88/// Generate a `Vec<Column>` matching the given schema.
89fn init_columnar(schema: &[DataType]) -> Vec<Column> {
90    let mut result = Vec::with_capacity(schema.len() + 1);
91    for t in schema {
92        match t {
93            DataType::Bool => result.push(Column::Bool(Vec::new())),
94            DataType::Int => result.push(Column::Int(Vec::new())),
95            DataType::Float => result.push(Column::Float(Vec::new())),
96            DataType::String => result.push(Column::String(Vec::new())),
97        }
98    }
99    result
100}
101
102// TODO: this has a bug if num_threads is == 1. See tests/lib.rs
103// `is_missing_idx` and `print_col_idx`
104// TODO: use crossbeam for scoped thread spawning and change from_file to
105// take `schema: &[DataType]`
106
107/// Reads `len` number of bytes from a given file starting at the `from` byte
108/// offset an according to the given `schema`.
109///
110/// This is the top level function for using `SoRer` and the one you should be
111///  using unless you are trying to extend `SoRer`. There are many intricate
112/// facets to using `SoRer` so you *must* RTFM [here](../index.html)
113pub fn from_file(
114    file_path: &str,
115    schema: Vec<DataType>,
116    from: usize,
117    len: usize,
118    num_threads: usize,
119) -> Vec<Column> {
120    // the total number of bytes to read
121    let num_chars = if len == std::usize::MAX {
122        (std::fs::metadata(file_path).unwrap().len() - from as u64) as f64
123    } else {
124        len as f64
125    };
126    // each thread will parse this many characters +- some number
127    let step = (num_chars / num_threads as f64).ceil() as usize;
128
129    // setup the work array with the from / len for each thread
130    // each element in the work array is a tuple of (starting index, number of byte for this thread)
131    let f: File = File::open(file_path).unwrap();
132    let mut reader = BufReader::new(f);
133    let mut work: Vec<(usize, usize)> = Vec::with_capacity(num_threads + 1);
134
135    // add the first one separately since we want to access the previous thread's
136    // work when in the loop. Since the work of the first thread will call
137    // `read_file(schema, 0, step)` it will not throw away the first line
138    // since from is 0 and will throw away the last line since step > 0
139    work.push((from, step));
140
141    let mut so_far = from;
142    let mut buffer = Vec::new();
143
144    // This loop finds the byte offset for the start of a line
145    // by adding the length of the last line that a previous thread would've
146    // thrown away. The work gets added to the following thread so that
147    // each thread starts at a full line and reads only until the end of a line
148    for i in 1..num_threads {
149        so_far += step;
150        // advance the reader to this threads starting index then
151        // find the next newline character
152        reader.seek(SeekFrom::Start(so_far as u64)).unwrap();
153        reader.read_until(b'\n', &mut buffer).unwrap();
154        work.push((so_far, step));
155
156        // Since the previous thread throws away the last line, add the length
157        // of the last line of prev thread to the work of this thread so that
158        // we read all lines.
159        work.get_mut(i - 1).unwrap().1 += buffer.len() as usize + 1;
160        buffer.clear();
161    }
162
163    // initialize the threads with their own BufReader
164    let mut threads = Vec::new();
165    for w in work {
166        let new_schema = schema.clone();
167        let f: File = File::open(file_path.clone()).unwrap();
168        let mut r = BufReader::new(f);
169        // spawn the thread and give it a closure which calls `from_file`
170        // to parse the data into columnar format.
171        threads.push(thread::spawn(move || {
172            read_chunk(new_schema, &mut r, w.0, w.1)
173        }));
174    }
175
176    // initialize the resulting columnar data frame
177    let mut parsed_data: Vec<Column> = init_columnar(&schema);
178    // let all the threads finish then combine the parsed data into the
179    // columnar data frame
180    for t in threads {
181        let mut x: Vec<Column> = t.join().unwrap();
182        let iter = parsed_data.iter_mut().zip(x.iter_mut());
183        for (complete, partial) in iter {
184            match (complete, partial) {
185                (Column::Bool(c1), Column::Bool(c2)) => c1.append(c2),
186                (Column::Int(c1), Column::Int(c2)) => c1.append(c2),
187                (Column::Float(c1), Column::Float(c2)) => c1.append(c2),
188                (Column::String(c1), Column::String(c2)) => c1.append(c2),
189                _ => panic!("Unexpected result from thread"),
190            }
191        }
192    }
193
194    parsed_data
195}
196
197/// Get the (i,j) element from the DataFrame
198pub fn get(d: &[Column], col_idx: usize, row_idx: usize) -> Data {
199    match &d[col_idx] {
200        Column::Bool(b) => {
201            if let Some(val) = &b[row_idx] {
202                Data::Bool(*val)
203            } else {
204                Data::Null
205            }
206        }
207        Column::Int(b) => {
208            if let Some(val) = &b[row_idx] {
209                Data::Int(*val)
210            } else {
211                Data::Null
212            }
213        }
214        Column::Float(b) => {
215            if let Some(val) = &b[row_idx] {
216                Data::Float(*val)
217            } else {
218                Data::Null
219            }
220        }
221        Column::String(b) => {
222            if let Some(val) = &b[row_idx] {
223                Data::String(val.clone())
224            } else {
225                Data::Null
226            }
227        }
228    }
229}
230
231/// A helper function to help with multi-threading in the top level `from_file`
232/// function. Does the heavy lifting of actually calling
233/// [parser functions](::crate::parsers). Parsers a chunk of the given `reader`
234/// up to `len` bytes starting at the `from` byte offset.
235fn read_chunk<T>(
236    schema: Vec<DataType>,
237    reader: &mut T,
238    from: usize,
239    len: usize,
240) -> Vec<Column>
241where
242    T: BufRead + Seek,
243{
244    reader.seek(SeekFrom::Start(from as u64)).unwrap();
245    let mut buffer = Vec::new();
246
247    let mut so_far = if from != 0 {
248        // throw away the first line
249        let l1_len = reader.read_until(b'\n', &mut buffer).unwrap();
250        buffer.clear();
251        l1_len
252    } else {
253        0
254    };
255
256    let mut parsed_data = init_columnar(&schema);
257
258    loop {
259        let line_len = reader.read_until(b'\n', &mut buffer).unwrap();
260        so_far += line_len;
261        if line_len == 0 || so_far >= len {
262            break;
263        }
264
265        // parse line with schema and place into the columnar vec here
266        match parse_line_with_schema(&buffer[..], &schema) {
267            None => {
268                buffer.clear();
269                continue;
270            }
271            Some(data) => {
272                let iter = data.iter().zip(parsed_data.iter_mut());
273                for (d, col) in iter {
274                    match (d, col) {
275                        (Data::Bool(b), Column::Bool(c)) => c.push(Some(*b)),
276                        (Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
277                        (Data::Float(f), Column::Float(c)) => c.push(Some(*f)),
278                        (Data::String(s), Column::String(c)) => {
279                            c.push(Some(s.clone()))
280                        }
281                        (Data::Null, Column::Bool(c)) => c.push(None),
282                        (Data::Null, Column::Int(c)) => c.push(None),
283                        (Data::Null, Column::Float(c)) => c.push(None),
284                        (Data::Null, Column::String(c)) => c.push(None),
285                        _ => panic!("Parser Failed"),
286                    }
287                }
288            }
289        }
290        buffer.clear();
291    }
292    parsed_data
293}
294
295/// Used for chunking `SoR` files.
296pub struct SorTerator {
297    buf_reader: Split<BufReader<File>>,
298    chunk_size: usize,
299    schema: Vec<DataType>,
300    empty_col: Column,
301}
302
303/// A chunking iterator that can chunk `SoR` files into `Vec<Column>`s where
304/// all columns have the same length (number of rows) based an argument in
305/// the constructor. The last element returned by `next` may have less than
306/// `chunk_size` number of rows and it is up to the caller to verify the
307/// length if needed.
308impl SorTerator {
309    /// Creates a new [`SorTerator`](::crate::dataframe::SorTerator)
310    pub fn new(
311        file_name: &str,
312        schema: Vec<DataType>,
313        chunk_size: usize,
314    ) -> Self {
315        SorTerator {
316            buf_reader: BufReader::new(File::open(file_name).unwrap())
317                .split(b'\n'),
318            empty_col: Column::Bool(Vec::new()),
319            chunk_size,
320            schema,
321        }
322    }
323}
324
325/// Implementation for an `Iterator` that chunks a `SoR` file
326impl Iterator for SorTerator {
327    type Item = Vec<Column>;
328
329    /// Advances this iterator until `self.chunk_size` rows have been parsed,
330    /// returning `Some(Vec<Column>)` of the parsed rows when done, or `None`
331    /// or the file has been completely parsed. The last element returned by
332    /// `next` may have less than `chunk_size` number of rows and it is up to
333    /// the caller to verify the length if needed.
334    fn next(&mut self) -> Option<Self::Item> {
335        let mut parsed_data = init_columnar(&self.schema);
336        while let Some(Ok(line)) = self.buf_reader.next() {
337            match parse_line_with_schema(&line, &self.schema) {
338                None => continue,
339                Some(data) => {
340                    let iter = data.iter().zip(parsed_data.iter_mut());
341                    for (d, col) in iter {
342                        match (d, col) {
343                            (Data::Bool(b), Column::Bool(c)) => {
344                                c.push(Some(*b))
345                            }
346                            (Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
347                            (Data::Float(f), Column::Float(c)) => {
348                                c.push(Some(*f))
349                            }
350                            (Data::String(s), Column::String(c)) => {
351                                c.push(Some(s.clone()))
352                            }
353                            (Data::Null, Column::Bool(c)) => c.push(None),
354                            (Data::Null, Column::Int(c)) => c.push(None),
355                            (Data::Null, Column::Float(c)) => c.push(None),
356                            (Data::Null, Column::String(c)) => c.push(None),
357                            _ => panic!("Parser Failed"),
358                        }
359                    }
360                }
361            }
362            if let Some(column) = parsed_data.get(0) {
363                if column.len() == self.chunk_size {
364                    return Some(parsed_data);
365                }
366            }
367        }
368        if parsed_data.get(0).unwrap_or(&self.empty_col).len() > 0 {
369            Some(parsed_data)
370        } else {
371            None
372        }
373    }
374}
375
376impl From<Vec<Option<bool>>> for Column {
377    fn from(v: Vec<Option<bool>>) -> Column {
378        Column::Bool(v)
379    }
380}
381
382impl From<Vec<Option<i64>>> for Column {
383    fn from(v: Vec<Option<i64>>) -> Column {
384        Column::Int(v)
385    }
386}
387
388impl From<Vec<Option<f64>>> for Column {
389    fn from(v: Vec<Option<f64>>) -> Column {
390        Column::Float(v)
391    }
392}
393
394impl From<Vec<Option<String>>> for Column {
395    fn from(v: Vec<Option<String>>) -> Column {
396        Column::String(v)
397    }
398}
399
400impl TryFrom<Column> for Vec<Option<bool>> {
401    type Error = &'static str;
402
403    fn try_from(c: Column) -> Result<Self, Self::Error> {
404        match c {
405            Column::Bool(col) => Ok(col),
406            _ => Err("The given column was not of type bool"),
407        }
408    }
409}
410
411impl TryFrom<Column> for Vec<Option<i64>> {
412    type Error = &'static str;
413
414    fn try_from(c: Column) -> Result<Self, Self::Error> {
415        match c {
416            Column::Int(col) => Ok(col),
417            _ => Err("The given column was not of type int"),
418        }
419    }
420}
421
422impl TryFrom<Column> for Vec<Option<f64>> {
423    type Error = &'static str;
424
425    fn try_from(c: Column) -> Result<Self, Self::Error> {
426        match c {
427            Column::Float(col) => Ok(col),
428            _ => Err("The given column was not of type float"),
429        }
430    }
431}
432
433impl TryFrom<Column> for Vec<Option<String>> {
434    type Error = &'static str;
435
436    fn try_from(c: Column) -> Result<Self, Self::Error> {
437        match c {
438            Column::String(col) => Ok(col),
439            _ => Err("The given column was not of type String"),
440        }
441    }
442}
443
444/// Print the `Data` of a `Data` cell.
445/// The number for `Int`s and `float`s.
446/// 0 for `false`.
447/// 1 for `true`.
448/// A double quote delimited `String`.
449/// and "Missing Value" for missing data.
450impl fmt::Display for Data {
451    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
452        match self {
453            Data::String(s) => write!(f, "\"{}\"", s),
454            Data::Int(n) => write!(f, "{}", n),
455            Data::Float(fl) => write!(f, "{}", fl),
456            Data::Bool(true) => write!(f, "1"),
457            Data::Bool(false) => write!(f, "0"),
458            Data::Null => write!(f, "Missing Value"),
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465
466    use super::*;
467    use std::io::Cursor;
468
469    #[test]
470    fn test_read_file() {
471        let schema = vec![DataType::String, DataType::Bool];
472
473        let expected_col1 = Column::String(vec![
474            Some("1".to_string()),
475            Some("a".to_string()),
476            Some("1.2".to_string()),
477        ]);
478        let expected_col2 = Column::Bool(vec![Some(true), Some(false), None]);
479        let expected = vec![expected_col1, expected_col2];
480
481        // Simple case : first nd last line are not discarded
482        let mut input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>");
483        let parsed1: Vec<Column> =
484            read_chunk(schema.clone(), &mut input, 0, 26);
485        assert_eq!(parsed1, expected.clone());
486
487        // last line is discarded
488        let mut larger_input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>\n<no><1>");
489        let parsed2: Vec<Column> =
490            read_chunk(schema.clone(), &mut larger_input, 0, 27);
491        assert_eq!(parsed2, expected.clone());
492
493        // first line is discarded
494        let mut input_skipped_l1 =
495            Cursor::new(b"<b><1>\n<1><1>\n<a><0>\n<1.2><>");
496        let parsed3: Vec<Column> =
497            read_chunk(schema.clone(), &mut input_skipped_l1, 3, 26);
498        assert_eq!(parsed3, expected.clone());
499
500        // Invalid line is discarded
501        // Note since parsed lines with schema is correctly tested we do not
502        // need to test every possible way a line can be invalid here
503        let mut input_with_invalid =
504            Cursor::new(b"<1><1>\n<a><0>\n<c><1.2>\n<1.2><>");
505        let parsed4: Vec<Column> =
506            read_chunk(schema.clone(), &mut input_with_invalid, 0, 32);
507        assert_eq!(parsed4, expected.clone());
508    }
509
510    #[test]
511    fn test_sor_terator() {
512        let schema = vec![
513            DataType::Bool,
514            DataType::Int,
515            DataType::Float,
516            DataType::String,
517        ];
518        let mut sor_terator =
519            SorTerator::new("tests/sor_terator.sor", schema, 10);
520        let mut chunk = sor_terator.next();
521        assert_eq!(chunk.unwrap().get(0).unwrap().len(), 10);
522        chunk = sor_terator.next();
523        assert_eq!(chunk.unwrap().get(0).unwrap().len(), 5);
524        chunk = sor_terator.next();
525        assert!(chunk.is_none());
526    }
527}