1use crate::parsers::parse_line_with_schema;
5use crate::schema::DataType;
6use deepsize::DeepSizeOf;
7use serde::{Deserialize, Serialize};
8use std::convert::{From, TryFrom};
9use std::fmt;
10use std::fs::File;
11use std::io::{BufRead, BufReader, Seek, SeekFrom, Split};
12use std::thread;
13
14#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
16pub enum Column {
17 Int(Vec<Option<i64>>),
19 Bool(Vec<Option<bool>>),
21 Float(Vec<Option<f64>>),
23 String(Vec<Option<String>>),
25}
26
27impl Column {
28 pub fn len(&self) -> usize {
29 match &self {
30 &Column::Bool(col) => col.len(),
31 &Column::Int(col) => col.len(),
32 &Column::Float(col) => col.len(),
33 &Column::String(col) => col.len(),
34 }
35 }
36}
37
38#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
41pub enum Data {
42 String(String),
44 Int(i64),
46 Float(f64),
48 Bool(bool),
50 Null,
52}
53
54impl Data {
55 pub fn unwrap_string(&self) -> String {
57 match self {
58 Data::String(s) => s.clone(),
59 _ => panic!("unwrap error"),
60 }
61 }
62
63 pub fn unwrap_int(&self) -> i64 {
65 match self {
66 Data::Int(n) => *n,
67 _ => panic!("unwrap error"),
68 }
69 }
70
71 pub fn unwrap_float(&self) -> f64 {
73 match self {
74 Data::Float(n) => *n,
75 _ => panic!("unwrap error"),
76 }
77 }
78
79 pub fn unwrap_bool(&self) -> bool {
81 match self {
82 Data::Bool(n) => *n,
83 _ => panic!("unwrap error"),
84 }
85 }
86}
87
88fn init_columnar(schema: &[DataType]) -> Vec<Column> {
90 let mut result = Vec::with_capacity(schema.len() + 1);
91 for t in schema {
92 match t {
93 DataType::Bool => result.push(Column::Bool(Vec::new())),
94 DataType::Int => result.push(Column::Int(Vec::new())),
95 DataType::Float => result.push(Column::Float(Vec::new())),
96 DataType::String => result.push(Column::String(Vec::new())),
97 }
98 }
99 result
100}
101
102pub fn from_file(
114 file_path: &str,
115 schema: Vec<DataType>,
116 from: usize,
117 len: usize,
118 num_threads: usize,
119) -> Vec<Column> {
120 let num_chars = if len == std::usize::MAX {
122 (std::fs::metadata(file_path).unwrap().len() - from as u64) as f64
123 } else {
124 len as f64
125 };
126 let step = (num_chars / num_threads as f64).ceil() as usize;
128
129 let f: File = File::open(file_path).unwrap();
132 let mut reader = BufReader::new(f);
133 let mut work: Vec<(usize, usize)> = Vec::with_capacity(num_threads + 1);
134
135 work.push((from, step));
140
141 let mut so_far = from;
142 let mut buffer = Vec::new();
143
144 for i in 1..num_threads {
149 so_far += step;
150 reader.seek(SeekFrom::Start(so_far as u64)).unwrap();
153 reader.read_until(b'\n', &mut buffer).unwrap();
154 work.push((so_far, step));
155
156 work.get_mut(i - 1).unwrap().1 += buffer.len() as usize + 1;
160 buffer.clear();
161 }
162
163 let mut threads = Vec::new();
165 for w in work {
166 let new_schema = schema.clone();
167 let f: File = File::open(file_path.clone()).unwrap();
168 let mut r = BufReader::new(f);
169 threads.push(thread::spawn(move || {
172 read_chunk(new_schema, &mut r, w.0, w.1)
173 }));
174 }
175
176 let mut parsed_data: Vec<Column> = init_columnar(&schema);
178 for t in threads {
181 let mut x: Vec<Column> = t.join().unwrap();
182 let iter = parsed_data.iter_mut().zip(x.iter_mut());
183 for (complete, partial) in iter {
184 match (complete, partial) {
185 (Column::Bool(c1), Column::Bool(c2)) => c1.append(c2),
186 (Column::Int(c1), Column::Int(c2)) => c1.append(c2),
187 (Column::Float(c1), Column::Float(c2)) => c1.append(c2),
188 (Column::String(c1), Column::String(c2)) => c1.append(c2),
189 _ => panic!("Unexpected result from thread"),
190 }
191 }
192 }
193
194 parsed_data
195}
196
197pub fn get(d: &[Column], col_idx: usize, row_idx: usize) -> Data {
199 match &d[col_idx] {
200 Column::Bool(b) => {
201 if let Some(val) = &b[row_idx] {
202 Data::Bool(*val)
203 } else {
204 Data::Null
205 }
206 }
207 Column::Int(b) => {
208 if let Some(val) = &b[row_idx] {
209 Data::Int(*val)
210 } else {
211 Data::Null
212 }
213 }
214 Column::Float(b) => {
215 if let Some(val) = &b[row_idx] {
216 Data::Float(*val)
217 } else {
218 Data::Null
219 }
220 }
221 Column::String(b) => {
222 if let Some(val) = &b[row_idx] {
223 Data::String(val.clone())
224 } else {
225 Data::Null
226 }
227 }
228 }
229}
230
231fn read_chunk<T>(
236 schema: Vec<DataType>,
237 reader: &mut T,
238 from: usize,
239 len: usize,
240) -> Vec<Column>
241where
242 T: BufRead + Seek,
243{
244 reader.seek(SeekFrom::Start(from as u64)).unwrap();
245 let mut buffer = Vec::new();
246
247 let mut so_far = if from != 0 {
248 let l1_len = reader.read_until(b'\n', &mut buffer).unwrap();
250 buffer.clear();
251 l1_len
252 } else {
253 0
254 };
255
256 let mut parsed_data = init_columnar(&schema);
257
258 loop {
259 let line_len = reader.read_until(b'\n', &mut buffer).unwrap();
260 so_far += line_len;
261 if line_len == 0 || so_far >= len {
262 break;
263 }
264
265 match parse_line_with_schema(&buffer[..], &schema) {
267 None => {
268 buffer.clear();
269 continue;
270 }
271 Some(data) => {
272 let iter = data.iter().zip(parsed_data.iter_mut());
273 for (d, col) in iter {
274 match (d, col) {
275 (Data::Bool(b), Column::Bool(c)) => c.push(Some(*b)),
276 (Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
277 (Data::Float(f), Column::Float(c)) => c.push(Some(*f)),
278 (Data::String(s), Column::String(c)) => {
279 c.push(Some(s.clone()))
280 }
281 (Data::Null, Column::Bool(c)) => c.push(None),
282 (Data::Null, Column::Int(c)) => c.push(None),
283 (Data::Null, Column::Float(c)) => c.push(None),
284 (Data::Null, Column::String(c)) => c.push(None),
285 _ => panic!("Parser Failed"),
286 }
287 }
288 }
289 }
290 buffer.clear();
291 }
292 parsed_data
293}
294
295pub struct SorTerator {
297 buf_reader: Split<BufReader<File>>,
298 chunk_size: usize,
299 schema: Vec<DataType>,
300 empty_col: Column,
301}
302
303impl SorTerator {
309 pub fn new(
311 file_name: &str,
312 schema: Vec<DataType>,
313 chunk_size: usize,
314 ) -> Self {
315 SorTerator {
316 buf_reader: BufReader::new(File::open(file_name).unwrap())
317 .split(b'\n'),
318 empty_col: Column::Bool(Vec::new()),
319 chunk_size,
320 schema,
321 }
322 }
323}
324
325impl Iterator for SorTerator {
327 type Item = Vec<Column>;
328
329 fn next(&mut self) -> Option<Self::Item> {
335 let mut parsed_data = init_columnar(&self.schema);
336 while let Some(Ok(line)) = self.buf_reader.next() {
337 match parse_line_with_schema(&line, &self.schema) {
338 None => continue,
339 Some(data) => {
340 let iter = data.iter().zip(parsed_data.iter_mut());
341 for (d, col) in iter {
342 match (d, col) {
343 (Data::Bool(b), Column::Bool(c)) => {
344 c.push(Some(*b))
345 }
346 (Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
347 (Data::Float(f), Column::Float(c)) => {
348 c.push(Some(*f))
349 }
350 (Data::String(s), Column::String(c)) => {
351 c.push(Some(s.clone()))
352 }
353 (Data::Null, Column::Bool(c)) => c.push(None),
354 (Data::Null, Column::Int(c)) => c.push(None),
355 (Data::Null, Column::Float(c)) => c.push(None),
356 (Data::Null, Column::String(c)) => c.push(None),
357 _ => panic!("Parser Failed"),
358 }
359 }
360 }
361 }
362 if let Some(column) = parsed_data.get(0) {
363 if column.len() == self.chunk_size {
364 return Some(parsed_data);
365 }
366 }
367 }
368 if parsed_data.get(0).unwrap_or(&self.empty_col).len() > 0 {
369 Some(parsed_data)
370 } else {
371 None
372 }
373 }
374}
375
376impl From<Vec<Option<bool>>> for Column {
377 fn from(v: Vec<Option<bool>>) -> Column {
378 Column::Bool(v)
379 }
380}
381
382impl From<Vec<Option<i64>>> for Column {
383 fn from(v: Vec<Option<i64>>) -> Column {
384 Column::Int(v)
385 }
386}
387
388impl From<Vec<Option<f64>>> for Column {
389 fn from(v: Vec<Option<f64>>) -> Column {
390 Column::Float(v)
391 }
392}
393
394impl From<Vec<Option<String>>> for Column {
395 fn from(v: Vec<Option<String>>) -> Column {
396 Column::String(v)
397 }
398}
399
400impl TryFrom<Column> for Vec<Option<bool>> {
401 type Error = &'static str;
402
403 fn try_from(c: Column) -> Result<Self, Self::Error> {
404 match c {
405 Column::Bool(col) => Ok(col),
406 _ => Err("The given column was not of type bool"),
407 }
408 }
409}
410
411impl TryFrom<Column> for Vec<Option<i64>> {
412 type Error = &'static str;
413
414 fn try_from(c: Column) -> Result<Self, Self::Error> {
415 match c {
416 Column::Int(col) => Ok(col),
417 _ => Err("The given column was not of type int"),
418 }
419 }
420}
421
422impl TryFrom<Column> for Vec<Option<f64>> {
423 type Error = &'static str;
424
425 fn try_from(c: Column) -> Result<Self, Self::Error> {
426 match c {
427 Column::Float(col) => Ok(col),
428 _ => Err("The given column was not of type float"),
429 }
430 }
431}
432
433impl TryFrom<Column> for Vec<Option<String>> {
434 type Error = &'static str;
435
436 fn try_from(c: Column) -> Result<Self, Self::Error> {
437 match c {
438 Column::String(col) => Ok(col),
439 _ => Err("The given column was not of type String"),
440 }
441 }
442}
443
444impl fmt::Display for Data {
451 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
452 match self {
453 Data::String(s) => write!(f, "\"{}\"", s),
454 Data::Int(n) => write!(f, "{}", n),
455 Data::Float(fl) => write!(f, "{}", fl),
456 Data::Bool(true) => write!(f, "1"),
457 Data::Bool(false) => write!(f, "0"),
458 Data::Null => write!(f, "Missing Value"),
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465
466 use super::*;
467 use std::io::Cursor;
468
469 #[test]
470 fn test_read_file() {
471 let schema = vec![DataType::String, DataType::Bool];
472
473 let expected_col1 = Column::String(vec![
474 Some("1".to_string()),
475 Some("a".to_string()),
476 Some("1.2".to_string()),
477 ]);
478 let expected_col2 = Column::Bool(vec![Some(true), Some(false), None]);
479 let expected = vec![expected_col1, expected_col2];
480
481 let mut input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>");
483 let parsed1: Vec<Column> =
484 read_chunk(schema.clone(), &mut input, 0, 26);
485 assert_eq!(parsed1, expected.clone());
486
487 let mut larger_input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>\n<no><1>");
489 let parsed2: Vec<Column> =
490 read_chunk(schema.clone(), &mut larger_input, 0, 27);
491 assert_eq!(parsed2, expected.clone());
492
493 let mut input_skipped_l1 =
495 Cursor::new(b"<b><1>\n<1><1>\n<a><0>\n<1.2><>");
496 let parsed3: Vec<Column> =
497 read_chunk(schema.clone(), &mut input_skipped_l1, 3, 26);
498 assert_eq!(parsed3, expected.clone());
499
500 let mut input_with_invalid =
504 Cursor::new(b"<1><1>\n<a><0>\n<c><1.2>\n<1.2><>");
505 let parsed4: Vec<Column> =
506 read_chunk(schema.clone(), &mut input_with_invalid, 0, 32);
507 assert_eq!(parsed4, expected.clone());
508 }
509
510 #[test]
511 fn test_sor_terator() {
512 let schema = vec![
513 DataType::Bool,
514 DataType::Int,
515 DataType::Float,
516 DataType::String,
517 ];
518 let mut sor_terator =
519 SorTerator::new("tests/sor_terator.sor", schema, 10);
520 let mut chunk = sor_terator.next();
521 assert_eq!(chunk.unwrap().get(0).unwrap().len(), 10);
522 chunk = sor_terator.next();
523 assert_eq!(chunk.unwrap().get(0).unwrap().len(), 5);
524 chunk = sor_terator.next();
525 assert!(chunk.is_none());
526 }
527}