use crate::parsers::parse_line_with_schema;
use crate::schema::DataType;
use deepsize::DeepSizeOf;
use serde::{Deserialize, Serialize};
use std::convert::{From, TryFrom};
use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom, Split};
use std::thread;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
pub enum Column {
Int(Vec<Option<i64>>),
Bool(Vec<Option<bool>>),
Float(Vec<Option<f64>>),
String(Vec<Option<String>>),
}
impl Column {
pub fn len(&self) -> usize {
match &self {
&Column::Bool(col) => col.len(),
&Column::Int(col) => col.len(),
&Column::Float(col) => col.len(),
&Column::String(col) => col.len(),
}
}
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, DeepSizeOf)]
pub enum Data {
String(String),
Int(i64),
Float(f64),
Bool(bool),
Null,
}
impl Data {
pub fn unwrap_string(&self) -> String {
match self {
Data::String(s) => s.clone(),
_ => panic!("unwrap error"),
}
}
pub fn unwrap_int(&self) -> i64 {
match self {
Data::Int(n) => *n,
_ => panic!("unwrap error"),
}
}
pub fn unwrap_float(&self) -> f64 {
match self {
Data::Float(n) => *n,
_ => panic!("unwrap error"),
}
}
pub fn unwrap_bool(&self) -> bool {
match self {
Data::Bool(n) => *n,
_ => panic!("unwrap error"),
}
}
}
fn init_columnar(schema: &[DataType]) -> Vec<Column> {
let mut result = Vec::with_capacity(schema.len() + 1);
for t in schema {
match t {
DataType::Bool => result.push(Column::Bool(Vec::new())),
DataType::Int => result.push(Column::Int(Vec::new())),
DataType::Float => result.push(Column::Float(Vec::new())),
DataType::String => result.push(Column::String(Vec::new())),
}
}
result
}
pub fn from_file(
file_path: &str,
schema: Vec<DataType>,
from: usize,
len: usize,
num_threads: usize,
) -> Vec<Column> {
let num_chars = if len == std::usize::MAX {
(std::fs::metadata(file_path).unwrap().len() - from as u64) as f64
} else {
len as f64
};
let step = (num_chars / num_threads as f64).ceil() as usize;
let f: File = File::open(file_path).unwrap();
let mut reader = BufReader::new(f);
let mut work: Vec<(usize, usize)> = Vec::with_capacity(num_threads + 1);
work.push((from, step));
let mut so_far = from;
let mut buffer = Vec::new();
for i in 1..num_threads {
so_far += step;
reader.seek(SeekFrom::Start(so_far as u64)).unwrap();
reader.read_until(b'\n', &mut buffer).unwrap();
work.push((so_far, step));
work.get_mut(i - 1).unwrap().1 += buffer.len() as usize + 1;
buffer.clear();
}
let mut threads = Vec::new();
for w in work {
let new_schema = schema.clone();
let f: File = File::open(file_path.clone()).unwrap();
let mut r = BufReader::new(f);
threads.push(thread::spawn(move || {
read_chunk(new_schema, &mut r, w.0, w.1)
}));
}
let mut parsed_data: Vec<Column> = init_columnar(&schema);
for t in threads {
let mut x: Vec<Column> = t.join().unwrap();
let iter = parsed_data.iter_mut().zip(x.iter_mut());
for (complete, partial) in iter {
match (complete, partial) {
(Column::Bool(c1), Column::Bool(c2)) => c1.append(c2),
(Column::Int(c1), Column::Int(c2)) => c1.append(c2),
(Column::Float(c1), Column::Float(c2)) => c1.append(c2),
(Column::String(c1), Column::String(c2)) => c1.append(c2),
_ => panic!("Unexpected result from thread"),
}
}
}
parsed_data
}
pub fn get(d: &[Column], col_idx: usize, row_idx: usize) -> Data {
match &d[col_idx] {
Column::Bool(b) => {
if let Some(val) = &b[row_idx] {
Data::Bool(*val)
} else {
Data::Null
}
}
Column::Int(b) => {
if let Some(val) = &b[row_idx] {
Data::Int(*val)
} else {
Data::Null
}
}
Column::Float(b) => {
if let Some(val) = &b[row_idx] {
Data::Float(*val)
} else {
Data::Null
}
}
Column::String(b) => {
if let Some(val) = &b[row_idx] {
Data::String(val.clone())
} else {
Data::Null
}
}
}
}
fn read_chunk<T>(
schema: Vec<DataType>,
reader: &mut T,
from: usize,
len: usize,
) -> Vec<Column>
where
T: BufRead + Seek,
{
reader.seek(SeekFrom::Start(from as u64)).unwrap();
let mut buffer = Vec::new();
let mut so_far = if from != 0 {
let l1_len = reader.read_until(b'\n', &mut buffer).unwrap();
buffer.clear();
l1_len
} else {
0
};
let mut parsed_data = init_columnar(&schema);
loop {
let line_len = reader.read_until(b'\n', &mut buffer).unwrap();
so_far += line_len;
if line_len == 0 || so_far >= len {
break;
}
match parse_line_with_schema(&buffer[..], &schema) {
None => {
buffer.clear();
continue;
}
Some(data) => {
let iter = data.iter().zip(parsed_data.iter_mut());
for (d, col) in iter {
match (d, col) {
(Data::Bool(b), Column::Bool(c)) => c.push(Some(*b)),
(Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
(Data::Float(f), Column::Float(c)) => c.push(Some(*f)),
(Data::String(s), Column::String(c)) => {
c.push(Some(s.clone()))
}
(Data::Null, Column::Bool(c)) => c.push(None),
(Data::Null, Column::Int(c)) => c.push(None),
(Data::Null, Column::Float(c)) => c.push(None),
(Data::Null, Column::String(c)) => c.push(None),
_ => panic!("Parser Failed"),
}
}
}
}
buffer.clear();
}
parsed_data
}
pub struct SorTerator {
buf_reader: Split<BufReader<File>>,
chunk_size: usize,
schema: Vec<DataType>,
empty_col: Column,
}
impl SorTerator {
pub fn new(
file_name: &str,
schema: Vec<DataType>,
chunk_size: usize,
) -> Self {
SorTerator {
buf_reader: BufReader::new(File::open(file_name).unwrap())
.split(b'\n'),
empty_col: Column::Bool(Vec::new()),
chunk_size,
schema,
}
}
}
impl Iterator for SorTerator {
type Item = Vec<Column>;
fn next(&mut self) -> Option<Self::Item> {
let mut parsed_data = init_columnar(&self.schema);
while let Some(Ok(line)) = self.buf_reader.next() {
match parse_line_with_schema(&line, &self.schema) {
None => continue,
Some(data) => {
let iter = data.iter().zip(parsed_data.iter_mut());
for (d, col) in iter {
match (d, col) {
(Data::Bool(b), Column::Bool(c)) => {
c.push(Some(*b))
}
(Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
(Data::Float(f), Column::Float(c)) => {
c.push(Some(*f))
}
(Data::String(s), Column::String(c)) => {
c.push(Some(s.clone()))
}
(Data::Null, Column::Bool(c)) => c.push(None),
(Data::Null, Column::Int(c)) => c.push(None),
(Data::Null, Column::Float(c)) => c.push(None),
(Data::Null, Column::String(c)) => c.push(None),
_ => panic!("Parser Failed"),
}
}
}
}
if let Some(column) = parsed_data.get(0) {
if column.len() == self.chunk_size {
return Some(parsed_data);
}
}
}
if parsed_data.get(0).unwrap_or(&self.empty_col).len() > 0 {
Some(parsed_data)
} else {
None
}
}
}
impl From<Vec<Option<bool>>> for Column {
fn from(v: Vec<Option<bool>>) -> Column {
Column::Bool(v)
}
}
impl From<Vec<Option<i64>>> for Column {
fn from(v: Vec<Option<i64>>) -> Column {
Column::Int(v)
}
}
impl From<Vec<Option<f64>>> for Column {
fn from(v: Vec<Option<f64>>) -> Column {
Column::Float(v)
}
}
impl From<Vec<Option<String>>> for Column {
fn from(v: Vec<Option<String>>) -> Column {
Column::String(v)
}
}
impl TryFrom<Column> for Vec<Option<bool>> {
type Error = &'static str;
fn try_from(c: Column) -> Result<Self, Self::Error> {
match c {
Column::Bool(col) => Ok(col),
_ => Err("The given column was not of type bool"),
}
}
}
impl TryFrom<Column> for Vec<Option<i64>> {
type Error = &'static str;
fn try_from(c: Column) -> Result<Self, Self::Error> {
match c {
Column::Int(col) => Ok(col),
_ => Err("The given column was not of type int"),
}
}
}
impl TryFrom<Column> for Vec<Option<f64>> {
type Error = &'static str;
fn try_from(c: Column) -> Result<Self, Self::Error> {
match c {
Column::Float(col) => Ok(col),
_ => Err("The given column was not of type float"),
}
}
}
impl TryFrom<Column> for Vec<Option<String>> {
type Error = &'static str;
fn try_from(c: Column) -> Result<Self, Self::Error> {
match c {
Column::String(col) => Ok(col),
_ => Err("The given column was not of type String"),
}
}
}
impl fmt::Display for Data {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Data::String(s) => write!(f, "\"{}\"", s),
Data::Int(n) => write!(f, "{}", n),
Data::Float(fl) => write!(f, "{}", fl),
Data::Bool(true) => write!(f, "1"),
Data::Bool(false) => write!(f, "0"),
Data::Null => write!(f, "Missing Value"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_read_file() {
let schema = vec![DataType::String, DataType::Bool];
let expected_col1 = Column::String(vec![
Some("1".to_string()),
Some("a".to_string()),
Some("1.2".to_string()),
]);
let expected_col2 = Column::Bool(vec![Some(true), Some(false), None]);
let expected = vec![expected_col1, expected_col2];
let mut input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>");
let parsed1: Vec<Column> =
read_chunk(schema.clone(), &mut input, 0, 26);
assert_eq!(parsed1, expected.clone());
let mut larger_input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>\n<no><1>");
let parsed2: Vec<Column> =
read_chunk(schema.clone(), &mut larger_input, 0, 27);
assert_eq!(parsed2, expected.clone());
let mut input_skipped_l1 =
Cursor::new(b"<b><1>\n<1><1>\n<a><0>\n<1.2><>");
let parsed3: Vec<Column> =
read_chunk(schema.clone(), &mut input_skipped_l1, 3, 26);
assert_eq!(parsed3, expected.clone());
let mut input_with_invalid =
Cursor::new(b"<1><1>\n<a><0>\n<c><1.2>\n<1.2><>");
let parsed4: Vec<Column> =
read_chunk(schema.clone(), &mut input_with_invalid, 0, 32);
assert_eq!(parsed4, expected.clone());
}
#[test]
fn test_sor_terator() {
let schema = vec![
DataType::Bool,
DataType::Int,
DataType::Float,
DataType::String,
];
let mut sor_terator =
SorTerator::new("tests/sor_terator.sor", schema, 10);
let mut chunk = sor_terator.next();
assert_eq!(chunk.unwrap().get(0).unwrap().len(), 10);
chunk = sor_terator.next();
assert_eq!(chunk.unwrap().get(0).unwrap().len(), 5);
chunk = sor_terator.next();
assert!(chunk.is_none());
}
}