use csv::Reader;
use csv::{ByteRecord, ByteRecordsIntoIter};
use encoding::{DecoderTrap, EncodingRef, all::UTF_8};
use std::clone::Clone;
use std::collections::VecDeque;
use std::fs::File;
use std::path::{Path, PathBuf};
use crate::{
Row, Headers, RowStream,
error::{self, Error, RowResult},
};
fn decode(data: ByteRecord, encoding: EncodingRef) -> Row {
let mut row = Row::with_capacity(data.as_slice().len(), data.len());
for item in data.iter() {
row.push_field(&encoding.decode(item, DecoderTrap::Replace).unwrap());
}
row
}
pub struct ReaderSource {
reader: Reader<File>,
path: PathBuf,
}
impl ReaderSource {
pub fn from_reader<P: AsRef<Path>>(reader: Reader<File>, path: P) -> ReaderSource {
ReaderSource {
reader,
path: path.as_ref().to_path_buf(),
}
}
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<ReaderSource, Error> {
csv::Reader::from_path(&path)
.map(|reader| ReaderSource::from_reader(reader, path))
.map_err(|err| Error::Csv(err.to_string()))
}
fn headers(&mut self) -> ByteRecord {
self.reader.byte_headers().unwrap().clone()
}
}
impl From<Reader<File>> for ReaderSource {
fn from(reader: Reader<File>) -> ReaderSource {
ReaderSource {
reader,
path: "From<Reader<File>>".into(),
}
}
}
pub struct InputStreamBuilder {
readers: Vec<ReaderSource>,
encoding: EncodingRef,
source_col: Option<String>,
}
impl InputStreamBuilder {
pub fn from_readers<I>(readers: I) -> InputStreamBuilder
where
I: IntoIterator<Item = ReaderSource>,
{
InputStreamBuilder {
readers: readers.into_iter().collect(),
encoding: UTF_8,
source_col: None,
}
}
pub fn from_paths<I, P>(paths: I) -> error::Result<InputStreamBuilder>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
{
let paths: Vec<_> = paths.into_iter().collect();
let mut readers = Vec::with_capacity(paths.len());
for path in paths {
readers.push(ReaderSource::from_path(path)?);
}
Ok(InputStreamBuilder {
readers,
encoding: UTF_8,
source_col: None,
})
}
pub fn with_encoding(self, encoding: EncodingRef) -> Self {
InputStreamBuilder {
readers: self.readers,
encoding,
source_col: self.source_col,
}
}
pub fn with_source_col(self, colname: &str) -> Self {
InputStreamBuilder {
readers: self.readers,
encoding: self.encoding,
source_col: Some(colname.into()),
}
}
pub fn build(self) -> error::Result<InputStream> {
let mut iter = self.readers.into_iter();
let mut input_stream = if let Some(first) = iter.next() {
InputStream::new(first, self.encoding, self.source_col)?
} else {
return Err(Error::NoSources);
};
for item in iter {
input_stream.add_source(item);
}
Ok(input_stream)
}
}
pub struct InputStream {
readers: VecDeque<ReaderSource>,
current_records: ByteRecordsIntoIter<File>,
current_path: PathBuf,
encoding: EncodingRef,
headers: Headers,
source_col: Option<String>,
}
impl InputStream {
fn new(mut reader: ReaderSource, encoding: EncodingRef, source_col: Option<String>) -> error::Result<InputStream> {
let mut header_row = reader.reader.byte_headers()?.clone();
if let Some(col) = source_col.as_ref() {
header_row.push_field(col.as_bytes());
}
Ok(InputStream {
readers: VecDeque::new(),
headers: Headers::from(decode(header_row, encoding)),
source_col,
current_records: reader.reader.into_byte_records(),
current_path: reader.path,
encoding,
})
}
fn add_source(&mut self, item: ReaderSource) {
self.readers.push_back(item);
}
}
impl IntoIterator for InputStream {
type Item = RowResult;
type IntoIter = IntoIter;
fn into_iter(self) -> Self::IntoIter {
IntoIter {
readers: self.readers,
headers: self.headers,
source_col: self.source_col,
current_records: self.current_records,
current_path: self.current_path,
encoding: self.encoding,
}
}
}
impl RowStream for InputStream {
fn headers(&self) -> &Headers {
&self.headers
}
}
pub struct IntoIter {
current_records: ByteRecordsIntoIter<File>,
source_col: Option<String>,
encoding: EncodingRef,
current_path: PathBuf,
headers: Headers,
readers: VecDeque<ReaderSource>,
}
impl Iterator for IntoIter {
type Item = RowResult;
fn next(&mut self) -> Option<Self::Item> {
match self.current_records.next() {
Some(Ok(reg)) => {
let mut str_reg = decode(reg, self.encoding);
if self.source_col.is_some() {
str_reg.push_field(&self.current_path.to_string_lossy());
}
if str_reg.len() != self.headers.len() {
return Some(Err(Error::InconsistentSizeOfRows(
self.current_path.clone(),
)));
}
Some(Ok(str_reg))
}
Some(Err(e)) => Some(Err(Error::Csv(format!("{:?}", e)))),
None => match self.readers.pop_front() {
Some(mut rs) => {
let mut new_headers = decode(rs.headers(), self.encoding);
if let Some(col) = self.source_col.as_ref() {
new_headers.push_field(col);
}
if new_headers != self.headers {
return Some(Err(Error::InconsistentHeaders));
}
self.current_records = rs.reader.into_byte_records();
self.current_path = rs.path;
self.next()
}
None => None,
},
}
}
}
#[cfg(test)]
mod tests {
use super::{InputStreamBuilder, ReaderSource, Row, RowStream};
use crate::error::Error;
use encoding::all::WINDOWS_1252;
#[test]
fn test_builder_from_paths() {
let mut chain = InputStreamBuilder::from_paths(vec![
"test/assets/1.csv",
"test/assets/2.csv",
]).unwrap().build().unwrap().into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3"]));
assert!(chain.next().is_none());
}
#[test]
fn test_builder_from_readers() {
let mut chain = InputStreamBuilder::from_readers(vec![
ReaderSource::from_path("test/assets/1.csv").unwrap(),
ReaderSource::from_path("test/assets/2.csv").unwrap(),
]).build().unwrap().into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3"]));
assert!(chain.next().is_none());
}
#[test]
fn test_builder_with_encoding() {
let chain = InputStreamBuilder::from_paths(vec![
"test/assets/windows1252/data.csv",
]).unwrap().with_encoding(WINDOWS_1252).build().unwrap();
assert_eq!(chain.headers(), &Row::from(vec!["name"]).into());
let mut chain = chain.into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["árbol"]));
assert!(chain.next().is_none());
}
#[test]
fn test_builder_with_source_col() {
let chain = InputStreamBuilder::from_paths(vec![
"test/assets/1.csv",
"test/assets/2.csv",
]).unwrap()
.with_source_col("source")
.build().unwrap();
assert_eq!(chain.headers(), &Row::from(vec!["a", "b", "source"]).into());
let mut chain = chain.into_iter();
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3", "test/assets/1.csv"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2", "test/assets/1.csv"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2", "test/assets/2.csv"]));
assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3", "test/assets/2.csv"]));
assert!(chain.next().is_none());
}
#[test]
fn test_read_concatenated() {
let input_stream = InputStreamBuilder::from_paths(
&["test/assets/1.csv", "test/assets/2.csv"]
).unwrap().build().unwrap();
assert_eq!(
*input_stream.headers().as_row(),
Row::from(vec!["a", "b"])
);
let mut input_stream = input_stream.into_iter();
assert_eq!(
input_stream.next().unwrap().unwrap(),
Row::from(vec!["1", "3"])
);
assert_eq!(
input_stream.next().unwrap().unwrap(),
Row::from(vec!["5", "2"])
);
assert_eq!(
input_stream.next().unwrap().unwrap(),
Row::from(vec!["2", "2"])
);
assert_eq!(
input_stream.next().unwrap().unwrap(),
Row::from(vec!["4", "3"])
);
}
#[test]
fn detects_inconsistent_headers() {
let input_stream = InputStreamBuilder::from_paths(
&["test/assets/1.csv", "test/assets/3.csv"]
).unwrap().build().unwrap();
let input_stream = input_stream.into_iter();
match input_stream.skip(2).next() {
Some(Err(Error::InconsistentHeaders)) => { () }
x => unreachable!("{:?}", x),
}
}
}