use csv::Error as InternalCsvError;
use educe::Educe;
use futures::{pin_mut, stream, AsyncReadExt, FutureExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_closure::*;
use std::{
error, fmt::{self, Display}, io::Cursor, marker::PhantomData
};
use amadeus_core::{
file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source
};
use super::{SerdeData, SerdeDeserializeGroup};
#[derive(Educe)]
#[educe(Clone, Debug)]
pub struct Csv<File, Row>
where
File: amadeus_core::file::File,
Row: SerdeData,
{
partitions: Vec<File::Partition>,
marker: PhantomData<fn() -> Row>,
}
impl<F, Row> Csv<F, Row>
where
F: File,
Row: SerdeData,
{
pub async fn new(file: F) -> Result<Self, <Self as Source>::Error> {
Ok(Self {
partitions: file.partitions().await.map_err(CsvError::File)?,
marker: PhantomData,
})
}
}
impl<F, Row> Source for Csv<F, Row>
where
F: File,
Row: SerdeData,
{
type Item = Row;
#[allow(clippy::type_complexity)]
type Error = CsvError<
<F as File>::Error,
<<F as File>::Partition as Partition>::Error,
<<<F as File>::Partition as Partition>::Page as Page>::Error,
>;
#[cfg(not(doc))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(doc))]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;
fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
let ret = self
.partitions
.into_dist_stream()
.flat_map(FnMut!(|partition: F::Partition| async move {
Ok(stream::iter(
partition
.pages()
.await
.map_err(CsvError::Partition)?
.into_iter(),
)
.flat_map(|page| {
async move {
let mut buf = Vec::with_capacity(10 * 1024 * 1024);
let reader = Page::reader(page);
pin_mut!(reader);
let _ = reader
.read_to_end(&mut buf)
.await
.map_err(InternalCsvError::from)?;
Ok(stream::iter(
csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(Cursor::new(buf))
.into_deserialize()
.map(|x: Result<SerdeDeserializeGroup<Row>, InternalCsvError>| {
Ok(x?.0)
}),
))
}
.map(ResultExpandIter::new)
.flatten_stream()
})
.map(|row: Result<Result<Row, InternalCsvError>, Self::Error>| Ok(row??)))
}
.map(ResultExpandIter::new)
.flatten_stream()
.map(|row: Result<Result<Row, Self::Error>, Self::Error>| Ok(row??))));
#[cfg(doc)]
let ret = amadeus_core::util::ImplDistributedStream::new(ret);
ret
}
}
mod csverror {
use serde::{Deserializer, Serializer};
pub fn serialize<T, S>(_t: &T, _serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
unimplemented!()
}
pub fn deserialize<'de, T, D>(_deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
{
unimplemented!()
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum CsvError<A, B, C> {
File(A),
Partition(B),
Page(C),
Csv(#[serde(with = "csverror")] InternalCsvError),
}
impl<A, B, C> Clone for CsvError<A, B, C>
where
A: Clone,
B: Clone,
C: Clone,
{
fn clone(&self) -> Self {
match self {
Self::File(err) => Self::File(err.clone()),
Self::Partition(err) => Self::Partition(err.clone()),
Self::Page(err) => Self::Page(err.clone()),
Self::Csv(err) => Self::Csv(serde::ser::Error::custom(err)),
}
}
}
impl<A, B, C> PartialEq for CsvError<A, B, C>
where
A: PartialEq,
B: PartialEq,
C: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::File(a), Self::File(b)) => a.eq(b),
(Self::Partition(a), Self::Partition(b)) => a.eq(b),
(Self::Page(a), Self::Page(b)) => a.eq(b),
(Self::Csv(a), Self::Csv(b)) => a.to_string() == b.to_string(),
_ => false,
}
}
}
impl<A, B, C> error::Error for CsvError<A, B, C>
where
A: error::Error,
B: error::Error,
C: error::Error,
{
}
impl<A, B, C> Display for CsvError<A, B, C>
where
A: Display,
B: Display,
C: Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::File(err) => Display::fmt(err, f),
Self::Partition(err) => Display::fmt(err, f),
Self::Page(err) => Display::fmt(err, f),
Self::Csv(err) => Display::fmt(err, f),
}
}
}
impl<A, B, C> From<InternalCsvError> for CsvError<A, B, C> {
fn from(err: InternalCsvError) -> Self {
Self::Csv(err)
}
}