use crate::error::{DataFusionError, Result};
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
};
use crate::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
use core::fmt;
use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use std::fmt::Display;
use std::str::FromStr;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
#[cfg(feature = "compression")]
use zstd::Decoder as ZstdDecoder;
use CompressionTypeVariant::*;
pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow";
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
pub trait GetExt {
fn get_ext(&self) -> String;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}
impl GetExt for FileCompressionType {
fn get_ext(&self) -> String {
match self.variant {
GZIP => ".gz".to_owned(),
BZIP2 => ".bz2".to_owned(),
XZ => ".xz".to_owned(),
ZSTD => ".zst".to_owned(),
UNCOMPRESSED => "".to_owned(),
}
}
}
impl From<CompressionTypeVariant> for FileCompressionType {
fn from(t: CompressionTypeVariant) -> Self {
Self { variant: t }
}
}
impl FromStr for FileCompressionType {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self> {
let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}"))
})?;
Ok(Self { variant })
}
}
impl FileCompressionType {
pub const GZIP: Self = Self { variant: GZIP };
pub const BZIP2: Self = Self { variant: BZIP2 };
pub const XZ: Self = Self { variant: XZ };
pub const ZSTD: Self = Self { variant: ZSTD };
pub const UNCOMPRESSED: Self = Self {
variant: UNCOMPRESSED,
};
pub const fn is_compressed(&self) -> bool {
self.variant.is_compressed()
}
pub fn convert_to_compress_stream(
&self,
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return crate::error::_not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
}
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(GzipEncoder::new(w)),
#[cfg(feature = "compression")]
BZIP2 => Box::new(BzEncoder::new(w)),
#[cfg(feature = "compression")]
XZ => Box::new(XzEncoder::new(w)),
#[cfg(feature = "compression")]
ZSTD => Box::new(ZstdEncoder::new(w)),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return crate::error::_not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => w,
})
}
pub fn convert_stream(
&self,
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return crate::error::_not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
}
pub fn convert_read<T: std::io::Read + Send + 'static>(
&self,
r: T,
) -> Result<Box<dyn std::io::Read + Send>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(MultiGzDecoder::new(r)),
#[cfg(feature = "compression")]
BZIP2 => Box::new(MultiBzDecoder::new(r)),
#[cfg(feature = "compression")]
XZ => Box::new(XzDecoder::new_multi_decoder(r)),
#[cfg(feature = "compression")]
ZSTD => match ZstdDecoder::new(r) {
Ok(decoder) => Box::new(decoder),
Err(e) => return Err(DataFusionError::External(Box::new(e))),
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return crate::error::_not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => Box::new(r),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FileType {
ARROW,
AVRO,
PARQUET,
CSV,
JSON,
}
impl GetExt for FileType {
fn get_ext(&self) -> String {
match self {
FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
}
}
}
impl Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
FileType::CSV => "csv",
FileType::JSON => "json",
FileType::PARQUET => "parquet",
FileType::AVRO => "avro",
FileType::ARROW => "arrow",
};
write!(f, "{}", out)
}
}
impl FromStr for FileType {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"ARROW" => Ok(FileType::ARROW),
"AVRO" => Ok(FileType::AVRO),
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
"JSON" | "NDJSON" => Ok(FileType::JSON),
_ => crate::error::_not_impl_err!("Unknown FileType: {s}"),
}
}
}
impl FileType {
pub fn get_ext_with_compression(&self, c: FileCompressionType) -> Result<String> {
let ext = self.get_ext();
match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => crate::error::_internal_err!(
"FileCompressionType can be specified for CSV/JSON FileType."
),
},
}
}
}
#[cfg(test)]
mod tests {
use crate::error::DataFusionError;
use crate::file_type::{FileCompressionType, FileType};
use std::str::FromStr;
#[test]
fn get_ext_with_compression() {
for (file_type, compression, extension) in [
(FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"),
(FileType::CSV, FileCompressionType::GZIP, ".csv.gz"),
(FileType::CSV, FileCompressionType::XZ, ".csv.xz"),
(FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"),
(FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"),
(FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"),
(FileType::JSON, FileCompressionType::GZIP, ".json.gz"),
(FileType::JSON, FileCompressionType::XZ, ".json.xz"),
(FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"),
(FileType::JSON, FileCompressionType::ZSTD, ".json.zst"),
] {
assert_eq!(
file_type.get_ext_with_compression(compression).unwrap(),
extension
);
}
for (file_type, extension) in
[(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
{
assert_eq!(
file_type
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
.unwrap(),
extension
);
for compression in [
FileCompressionType::GZIP,
FileCompressionType::XZ,
FileCompressionType::BZIP2,
FileCompressionType::ZSTD,
] {
assert!(matches!(
file_type.get_ext_with_compression(compression),
Err(DataFusionError::Internal(_))
));
}
}
}
#[test]
fn from_str() {
for (ext, file_type) in [
("csv", FileType::CSV),
("CSV", FileType::CSV),
("json", FileType::JSON),
("JSON", FileType::JSON),
("avro", FileType::AVRO),
("AVRO", FileType::AVRO),
("parquet", FileType::PARQUET),
("PARQUET", FileType::PARQUET),
] {
assert_eq!(FileType::from_str(ext).unwrap(), file_type);
}
assert!(matches!(
FileType::from_str("Unknown"),
Err(DataFusionError::NotImplemented(_))
));
for (ext, compression_type) in [
("gz", FileCompressionType::GZIP),
("GZ", FileCompressionType::GZIP),
("gzip", FileCompressionType::GZIP),
("GZIP", FileCompressionType::GZIP),
("xz", FileCompressionType::XZ),
("XZ", FileCompressionType::XZ),
("bz2", FileCompressionType::BZIP2),
("BZ2", FileCompressionType::BZIP2),
("bzip2", FileCompressionType::BZIP2),
("BZIP2", FileCompressionType::BZIP2),
("zst", FileCompressionType::ZSTD),
("ZST", FileCompressionType::ZSTD),
("zstd", FileCompressionType::ZSTD),
("ZSTD", FileCompressionType::ZSTD),
("", FileCompressionType::UNCOMPRESSED),
] {
assert_eq!(
FileCompressionType::from_str(ext).unwrap(),
compression_type
);
}
assert!(matches!(
FileCompressionType::from_str("Unknown"),
Err(DataFusionError::NotImplemented(_))
));
}
}