1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
use std::{ffi::OsStr, fmt, str::FromStr};
use async_trait::async_trait;
use crate::common::*;
mod csv_converter;
mod jsonl_converter;
/// The format of a stream of data.
///
/// TODO: We might add a `StreamFormat` that handles "wrapper" formats like
/// `gzip` or `bzip2`.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub enum DataFormat {
/// Comma-separated values.
#[default]
Csv,
/// One JSON value per line. See [JSON Lines](http://jsonlines.org/).
JsonLines,
/// Another data format that we don't support. This will be the file extension,
/// minus any leading "." character.
Unsupported(String),
}
impl DataFormat {
/// Fetch the `DataFormat` for a given file extension, or `None` if we don't
/// have a file extension.
pub(crate) fn from_extension(ext: &OsStr) -> Self {
// `to_string_lossy` will replace any non-UTF-8 bytes with the Unicode
// replacement character U+FFFD. Any such extension will wind up as
// `DataFormat::Unsupported`, so it doesn't matter than we lose
// non-UTF-8 information in this case.
let ext = ext.to_string_lossy();
let ext = ext.to_ascii_lowercase();
match &ext[..] {
"csv" => Self::Csv,
"jsonl" => Self::JsonLines,
_ => Self::Unsupported(ext),
}
}
/// Look up the [`DataFormatConverter`] for a given data format.
fn converter(&self) -> Result<Box<dyn DataFormatConverter>> {
match self {
DataFormat::Csv => Ok(Box::new(csv_converter::CsvConverter)),
DataFormat::JsonLines => Ok(Box::new(jsonl_converter::JsonLinesConverter)),
other => Err(format_err!("cannot convert between `*.{}` and CSV", other)),
}
}
}
impl FromStr for DataFormat {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
Ok(Self::from_extension(OsStr::new(s)))
}
}
impl fmt::Display for DataFormat {
/// Format data formats as their file extensions, without the leading ".".
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Csv => write!(f, "csv"),
Self::JsonLines => write!(f, "jsonl"),
Self::Unsupported(s) => write!(f, "{}", s),
}
}
}
#[test]
fn data_format_default_is_csv() {
assert_eq!(DataFormat::default(), DataFormat::Csv);
}
/// An async stream similar to a [`CsvStream`], but it can hold different kinds of
/// data.
pub(crate) struct DataStream {
/// The name of this stream.
pub(crate) name: String,
/// The format of this stream.
pub(crate) format: DataFormat,
/// Our data.
pub(crate) data: BoxStream<BytesMut>,
}
impl DataStream {
/// Try to infer a schema from this `DataStream`.
pub(crate) async fn schema(self, ctx: &Context) -> Result<Option<Schema>> {
self.format
.converter()?
.schema(ctx, &self.name, self.data)
.await
}
/// Convert this `DataStream` into a `CsvStream`. This is very cheap if
/// the data is already in CSV format.
pub(crate) async fn into_csv_stream(
self,
ctx: &Context,
schema: &Schema,
) -> Result<CsvStream> {
let data = self
.format
.converter()?
.data_format_to_csv(ctx, schema, self.data)
.await?;
Ok(CsvStream {
name: self.name,
data,
})
}
/// Convert a `CsvStream` into a `DataStream`. This is very cheap if
/// the data is already in CSV format.
pub(crate) async fn from_csv_stream(
ctx: &Context,
format: DataFormat,
schema: &Schema,
stream: CsvStream,
) -> Result<Self> {
let data = format
.converter()?
.csv_to_data_format(ctx, schema, stream.data)
.await?;
Ok(Self {
name: stream.name,
format,
data,
})
}
}
/// Convert a format to and from CSV format.
#[async_trait]
pub(self) trait DataFormatConverter: Send + Sync {
/// Infer a schema from a stream of data.
async fn schema(
&self,
_ctx: &Context,
_table_name: &str,
_data: BoxStream<BytesMut>,
) -> Result<Option<Schema>> {
Ok(None)
}
/// Convert a stream to CSV format.
async fn data_format_to_csv(
&self,
ctx: &Context,
schema: &Schema,
data: BoxStream<BytesMut>,
) -> Result<BoxStream<BytesMut>>;
/// Convert a stream from CSV format.
async fn csv_to_data_format(
&self,
ctx: &Context,
schema: &Schema,
data: BoxStream<BytesMut>,
) -> Result<BoxStream<BytesMut>>;
}