use std::{ffi::OsStr, fmt, str::FromStr};
use async_trait::async_trait;
use crate::common::*;
mod csv_converter;
mod jsonl_converter;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub enum DataFormat {
#[default]
Csv,
JsonLines,
Unsupported(String),
}
impl DataFormat {
pub(crate) fn from_extension(ext: &OsStr) -> Self {
let ext = ext.to_string_lossy();
let ext = ext.to_ascii_lowercase();
match &ext[..] {
"csv" => Self::Csv,
"jsonl" => Self::JsonLines,
_ => Self::Unsupported(ext),
}
}
fn converter(&self) -> Result<Box<dyn DataFormatConverter>> {
match self {
DataFormat::Csv => Ok(Box::new(csv_converter::CsvConverter)),
DataFormat::JsonLines => Ok(Box::new(jsonl_converter::JsonLinesConverter)),
other => Err(format_err!("cannot convert between `*.{}` and CSV", other)),
}
}
}
impl FromStr for DataFormat {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
Ok(Self::from_extension(OsStr::new(s)))
}
}
impl fmt::Display for DataFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Csv => write!(f, "csv"),
Self::JsonLines => write!(f, "jsonl"),
Self::Unsupported(s) => write!(f, "{}", s),
}
}
}
#[test]
fn data_format_default_is_csv() {
assert_eq!(DataFormat::default(), DataFormat::Csv);
}
pub(crate) struct DataStream {
pub(crate) name: String,
pub(crate) format: DataFormat,
pub(crate) data: BoxStream<BytesMut>,
}
impl DataStream {
pub(crate) async fn schema(self, ctx: &Context) -> Result<Option<Schema>> {
self.format
.converter()?
.schema(ctx, &self.name, self.data)
.await
}
pub(crate) async fn into_csv_stream(
self,
ctx: &Context,
schema: &Schema,
) -> Result<CsvStream> {
let data = self
.format
.converter()?
.data_format_to_csv(ctx, schema, self.data)
.await?;
Ok(CsvStream {
name: self.name,
data,
})
}
pub(crate) async fn from_csv_stream(
ctx: &Context,
format: DataFormat,
schema: &Schema,
stream: CsvStream,
) -> Result<Self> {
let data = format
.converter()?
.csv_to_data_format(ctx, schema, stream.data)
.await?;
Ok(Self {
name: stream.name,
format,
data,
})
}
}
#[async_trait]
pub(self) trait DataFormatConverter: Send + Sync {
async fn schema(
&self,
_ctx: &Context,
_table_name: &str,
_data: BoxStream<BytesMut>,
) -> Result<Option<Schema>> {
Ok(None)
}
async fn data_format_to_csv(
&self,
ctx: &Context,
schema: &Schema,
data: BoxStream<BytesMut>,
) -> Result<BoxStream<BytesMut>>;
async fn csv_to_data_format(
&self,
ctx: &Context,
schema: &Schema,
data: BoxStream<BytesMut>,
) -> Result<BoxStream<BytesMut>>;
}