use std::collections::HashMap;
use std::fmt::Debug;
use std::path::PathBuf;
use std::str::FromStr;
use csv_sniffer::metadata::Metadata;
use csv_sniffer::Sniffer;
use cons::*;
use error::*;
use field::FieldIdent;
use field::Value;
use fieldlist::{FieldDesignator, FieldPayloadCons, FieldSchema, SchemaCons};
use frame::SimpleFrameFields;
use label::{TypedValue, Valued};
use source::decode::decode;
use source::file::{FileLocator, LocalFileReader, Uri};
use store::{AssocFrameLookup, AssocStorage, DataStore, IntoView, PushFrontFromValueIter};
#[derive(Debug, Clone)]
pub struct CsvSource {
src: FileLocator,
metadata: Metadata,
}
impl CsvSource {
pub fn new<L: Into<FileLocator>>(loc: L) -> Result<CsvSource> {
let loc = loc.into();
let mut file_reader = LocalFileReader::new(&loc)?;
let metadata = Sniffer::new().sniff_reader(&mut file_reader)?;
Ok(CsvSource { src: loc, metadata })
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}
pub type CsvSrcSchemaCons<Label, DType, Tail> = FieldPayloadCons<Label, DType, usize, Tail>;
pub trait IntoCsvSrcSchema {
type CsvSrcSchema;
fn into_csv_src_schema(
self,
headers: &HashMap<String, usize>,
num_fields: usize,
) -> Result<Self::CsvSrcSchema>;
}
impl IntoCsvSrcSchema for Nil {
type CsvSrcSchema = Nil;
fn into_csv_src_schema(
self,
_headers: &HashMap<String, usize>,
_num_fields: usize,
) -> Result<Nil> {
Ok(Nil)
}
}
impl<Label, DType, Tail> IntoCsvSrcSchema for SchemaCons<Label, DType, Tail>
where
Tail: IntoCsvSrcSchema,
{
type CsvSrcSchema = CsvSrcSchemaCons<Label, DType, Tail::CsvSrcSchema>;
fn into_csv_src_schema(
self,
headers: &HashMap<String, usize>,
num_fields: usize,
) -> Result<CsvSrcSchemaCons<Label, DType, Tail::CsvSrcSchema>> {
let idx = match *self.head.value_ref() {
FieldDesignator::Expr(ref s) => *headers
.get(s)
.ok_or(AgnesError::FieldNotFound(FieldIdent::Name(s.to_string())))?,
FieldDesignator::Idx(idx) => {
if idx >= num_fields {
return Err(AgnesError::IndexError {
index: idx,
len: num_fields,
});
};
idx
}
};
Ok(Cons {
head: TypedValue::from(idx).into(),
tail: self.tail.into_csv_src_schema(headers, num_fields)?,
})
}
}
pub trait BuildDStore {
type OutputFields: AssocStorage;
fn build(&mut self, src: &CsvSource) -> Result<DataStore<Self::OutputFields>>;
}
impl BuildDStore for Nil {
type OutputFields = Nil;
fn build(&mut self, _src: &CsvSource) -> Result<DataStore<Nil>> {
Ok(DataStore::<Nil>::empty())
}
}
impl<Label, DType, Tail> BuildDStore for CsvSrcSchemaCons<Label, DType, Tail>
where
Tail: BuildDStore,
DataStore<<Tail as BuildDStore>::OutputFields>: PushFrontFromValueIter<Label, DType>,
Tail::OutputFields: PushBack<FieldSchema<Label, DType>>,
<Tail::OutputFields as PushBack<FieldSchema<Label, DType>>>::Output: AssocStorage,
Label: Debug,
DType: FromStr + Debug + Default + Clone,
ParseError: From<<DType as FromStr>::Err>,
{
type OutputFields = <DataStore<<Tail as BuildDStore>::OutputFields> as PushFrontFromValueIter<
Label,
DType,
>>::OutputFields;
fn build(&mut self, src: &CsvSource) -> Result<DataStore<Self::OutputFields>> {
let file_reader = LocalFileReader::new(&src.src)?;
let mut csv_reader = src.metadata.dialect.open_reader(file_reader)?;
let ds = self.tail.build(src)?;
let values: Vec<Value<DType>> = csv_reader
.byte_records()
.map(|row| {
let record = row?;
let value = decode(record.get(*self.head.value_ref().value_ref()).ok_or_else(
|| AgnesError::FieldNotFound(FieldIdent::Name(stringify![Field].to_string())),
)?)?;
Ok(value)
})
.map(|sresult| {
sresult.and_then(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
Ok(Value::Na)
} else {
trimmed
.parse::<DType>()
.map(|value| Value::Exists(value))
.map_err(|e| AgnesError::Parse(e.into()))
}
})
})
.collect::<Result<_>>()?;
let ds = ds.push_front_from_value_iter::<Label, DType, _, _>(values);
Ok(ds)
}
}
#[derive(Debug)]
pub struct CsvReader<CsvSchema> {
src: CsvSource,
csv_src_schema: CsvSchema,
}
impl<CsvSrcSchema> CsvReader<CsvSrcSchema>
where
CsvSrcSchema: Debug,
{
pub fn new<Schema>(src: &CsvSource, schema: Schema) -> Result<CsvReader<Schema::CsvSrcSchema>>
where
Schema: IntoCsvSrcSchema<CsvSrcSchema = CsvSrcSchema>,
{
let file_reader = LocalFileReader::new(&src.src)?;
let mut csv_reader = src.metadata.dialect.open_reader(file_reader)?;
debug_assert_eq!(src.metadata.num_fields, src.metadata.types.len());
let headers = if src.metadata.dialect.header.has_header_row {
let headers = csv_reader.headers()?;
if headers.len() != src.metadata.num_fields {
return Err(AgnesError::CsvDialect(
"header row does not match sniffed number of fields in CSV file".into(),
));
}
headers
.iter()
.enumerate()
.map(|(i, s)| (s.to_string(), i))
.collect::<HashMap<_, _>>()
} else {
HashMap::new()
};
let csv_src_schema = schema.into_csv_src_schema(&headers, src.metadata.num_fields)?;
Ok(CsvReader {
src: src.clone(),
csv_src_schema,
})
}
pub fn read(&mut self) -> Result<DataStore<CsvSrcSchema::OutputFields>>
where
CsvSrcSchema: BuildDStore,
{
self.csv_src_schema.build(&self.src)
}
}
pub fn load_csv<L: Into<FileLocator>, Schema>(
loc: L,
schema: Schema,
) -> Result<<DataStore<<Schema::CsvSrcSchema as BuildDStore>::OutputFields> as IntoView>::Output>
where
Schema: IntoCsvSrcSchema,
Schema::CsvSrcSchema: BuildDStore + Debug,
<Schema::CsvSrcSchema as BuildDStore>::OutputFields: AssocFrameLookup + SimpleFrameFields,
{
let source = CsvSource::new(loc)?;
let mut csv_reader = CsvReader::new(&source, schema)?;
Ok(csv_reader.read()?.into_view())
}
pub fn load_csv_from_uri<Schema>(
uri: &str,
schema: Schema,
) -> Result<<DataStore<<Schema::CsvSrcSchema as BuildDStore>::OutputFields> as IntoView>::Output>
where
Schema: IntoCsvSrcSchema,
Schema::CsvSrcSchema: BuildDStore + Debug,
<Schema::CsvSrcSchema as BuildDStore>::OutputFields: AssocFrameLookup + SimpleFrameFields,
{
load_csv(Uri::from_uri(uri.parse::<hyper::Uri>()?)?, schema)
}
pub fn load_csv_from_path<P, Schema>(
path: P,
schema: Schema,
) -> Result<<DataStore<<Schema::CsvSrcSchema as BuildDStore>::OutputFields> as IntoView>::Output>
where
P: Into<PathBuf>,
Schema: IntoCsvSrcSchema,
Schema::CsvSrcSchema: BuildDStore + Debug,
<Schema::CsvSrcSchema as BuildDStore>::OutputFields: AssocFrameLookup + SimpleFrameFields,
{
load_csv(path.into(), schema)
}