1use std::str::FromStr;
21
22use datafusion_common::error::{DataFusionError, Result};
23
24use datafusion_common::parsers::CompressionTypeVariant::{self, *};
25use datafusion_common::GetExt;
26
27#[cfg(feature = "compression")]
28use async_compression::tokio::bufread::{
29 BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
30 GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
31 XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
32 ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
33};
34
35#[cfg(feature = "compression")]
36use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
37use bytes::Bytes;
38#[cfg(feature = "compression")]
39use bzip2::read::MultiBzDecoder;
40#[cfg(feature = "compression")]
41use flate2::read::MultiGzDecoder;
42use futures::stream::BoxStream;
43use futures::StreamExt;
44#[cfg(feature = "compression")]
45use futures::TryStreamExt;
46use object_store::buffered::BufWriter;
47use tokio::io::AsyncWrite;
48#[cfg(feature = "compression")]
49use tokio_util::io::{ReaderStream, StreamReader};
50#[cfg(feature = "compression")]
51use xz2::read::XzDecoder;
52#[cfg(feature = "compression")]
53use zstd::Decoder as ZstdDecoder;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct FileCompressionType {
58 variant: CompressionTypeVariant,
59}
60
61impl GetExt for FileCompressionType {
62 fn get_ext(&self) -> String {
63 match self.variant {
64 GZIP => ".gz".to_owned(),
65 BZIP2 => ".bz2".to_owned(),
66 XZ => ".xz".to_owned(),
67 ZSTD => ".zst".to_owned(),
68 UNCOMPRESSED => "".to_owned(),
69 }
70 }
71}
72
73impl From<CompressionTypeVariant> for FileCompressionType {
74 fn from(t: CompressionTypeVariant) -> Self {
75 Self { variant: t }
76 }
77}
78
79impl From<FileCompressionType> for CompressionTypeVariant {
80 fn from(t: FileCompressionType) -> Self {
81 t.variant
82 }
83}
84
85impl FromStr for FileCompressionType {
86 type Err = DataFusionError;
87
88 fn from_str(s: &str) -> Result<Self> {
89 let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
90 DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}"))
91 })?;
92 Ok(Self { variant })
93 }
94}
95
96impl FileCompressionType {
98 pub const GZIP: Self = Self { variant: GZIP };
100
101 pub const BZIP2: Self = Self { variant: BZIP2 };
103
104 pub const XZ: Self = Self { variant: XZ };
106
107 pub const ZSTD: Self = Self { variant: ZSTD };
109
110 pub const UNCOMPRESSED: Self = Self {
112 variant: UNCOMPRESSED,
113 };
114
115 pub fn get_variant(&self) -> &CompressionTypeVariant {
117 &self.variant
118 }
119
120 pub const fn is_compressed(&self) -> bool {
122 self.variant.is_compressed()
123 }
124
125 pub fn convert_to_compress_stream<'a>(
127 &self,
128 s: BoxStream<'a, Result<Bytes>>,
129 ) -> Result<BoxStream<'a, Result<Bytes>>> {
130 Ok(match self.variant {
131 #[cfg(feature = "compression")]
132 GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
133 .map_err(DataFusionError::from)
134 .boxed(),
135 #[cfg(feature = "compression")]
136 BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
137 .map_err(DataFusionError::from)
138 .boxed(),
139 #[cfg(feature = "compression")]
140 XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
141 .map_err(DataFusionError::from)
142 .boxed(),
143 #[cfg(feature = "compression")]
144 ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
145 .map_err(DataFusionError::from)
146 .boxed(),
147 #[cfg(not(feature = "compression"))]
148 GZIP | BZIP2 | XZ | ZSTD => {
149 return Err(DataFusionError::NotImplemented(
150 "Compression feature is not enabled".to_owned(),
151 ))
152 }
153 UNCOMPRESSED => s.boxed(),
154 })
155 }
156
157 pub fn convert_async_writer(
160 &self,
161 w: BufWriter,
162 ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
163 Ok(match self.variant {
164 #[cfg(feature = "compression")]
165 GZIP => Box::new(GzipEncoder::new(w)),
166 #[cfg(feature = "compression")]
167 BZIP2 => Box::new(BzEncoder::new(w)),
168 #[cfg(feature = "compression")]
169 XZ => Box::new(XzEncoder::new(w)),
170 #[cfg(feature = "compression")]
171 ZSTD => Box::new(ZstdEncoder::new(w)),
172 #[cfg(not(feature = "compression"))]
173 GZIP | BZIP2 | XZ | ZSTD => {
174 return Err(DataFusionError::NotImplemented(
175 "Compression feature is not enabled".to_owned(),
176 ))
177 }
178 UNCOMPRESSED => Box::new(w),
179 })
180 }
181
182 pub fn convert_stream<'a>(
184 &self,
185 s: BoxStream<'a, Result<Bytes>>,
186 ) -> Result<BoxStream<'a, Result<Bytes>>> {
187 Ok(match self.variant {
188 #[cfg(feature = "compression")]
189 GZIP => {
190 let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
191 decoder.multiple_members(true);
192
193 ReaderStream::new(decoder)
194 .map_err(DataFusionError::from)
195 .boxed()
196 }
197 #[cfg(feature = "compression")]
198 BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
199 .map_err(DataFusionError::from)
200 .boxed(),
201 #[cfg(feature = "compression")]
202 XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
203 .map_err(DataFusionError::from)
204 .boxed(),
205 #[cfg(feature = "compression")]
206 ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
207 .map_err(DataFusionError::from)
208 .boxed(),
209 #[cfg(not(feature = "compression"))]
210 GZIP | BZIP2 | XZ | ZSTD => {
211 return Err(DataFusionError::NotImplemented(
212 "Compression feature is not enabled".to_owned(),
213 ))
214 }
215 UNCOMPRESSED => s.boxed(),
216 })
217 }
218
219 pub fn convert_read<T: std::io::Read + Send + 'static>(
221 &self,
222 r: T,
223 ) -> Result<Box<dyn std::io::Read + Send>> {
224 Ok(match self.variant {
225 #[cfg(feature = "compression")]
226 GZIP => Box::new(MultiGzDecoder::new(r)),
227 #[cfg(feature = "compression")]
228 BZIP2 => Box::new(MultiBzDecoder::new(r)),
229 #[cfg(feature = "compression")]
230 XZ => Box::new(XzDecoder::new_multi_decoder(r)),
231 #[cfg(feature = "compression")]
232 ZSTD => match ZstdDecoder::new(r) {
233 Ok(decoder) => Box::new(decoder),
234 Err(e) => return Err(DataFusionError::External(Box::new(e))),
235 },
236 #[cfg(not(feature = "compression"))]
237 GZIP | BZIP2 | XZ | ZSTD => {
238 return Err(DataFusionError::NotImplemented(
239 "Compression feature is not enabled".to_owned(),
240 ))
241 }
242 UNCOMPRESSED => Box::new(r),
243 })
244 }
245}
246
247pub trait FileTypeExt {
249 fn get_ext_with_compression(&self, c: FileCompressionType) -> Result<String>;
251}
252
253#[cfg(test)]
254mod tests {
255 use std::str::FromStr;
256
257 use super::FileCompressionType;
258 use datafusion_common::error::DataFusionError;
259
260 use bytes::Bytes;
261 use futures::StreamExt;
262
263 #[test]
264 fn from_str() {
265 for (ext, compression_type) in [
266 ("gz", FileCompressionType::GZIP),
267 ("GZ", FileCompressionType::GZIP),
268 ("gzip", FileCompressionType::GZIP),
269 ("GZIP", FileCompressionType::GZIP),
270 ("xz", FileCompressionType::XZ),
271 ("XZ", FileCompressionType::XZ),
272 ("bz2", FileCompressionType::BZIP2),
273 ("BZ2", FileCompressionType::BZIP2),
274 ("bzip2", FileCompressionType::BZIP2),
275 ("BZIP2", FileCompressionType::BZIP2),
276 ("zst", FileCompressionType::ZSTD),
277 ("ZST", FileCompressionType::ZSTD),
278 ("zstd", FileCompressionType::ZSTD),
279 ("ZSTD", FileCompressionType::ZSTD),
280 ("", FileCompressionType::UNCOMPRESSED),
281 ] {
282 assert_eq!(
283 FileCompressionType::from_str(ext).unwrap(),
284 compression_type
285 );
286 }
287
288 assert!(matches!(
289 FileCompressionType::from_str("Unknown"),
290 Err(DataFusionError::NotImplemented(_))
291 ));
292 }
293
294 #[tokio::test]
295 async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
296 #[rustfmt::skip]
300 let data = [
301 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
303 0x02, 0x00,
304 0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
306 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
308 0x02, 0x00,
309 0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
311 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
313 0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
314 ];
315
316 let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
318 Bytes::from(data.to_vec()),
319 )]);
320 let converted_stream =
321 FileCompressionType::GZIP.convert_stream(stream.boxed())?;
322
323 let vec = converted_stream
324 .map(|r| r.unwrap())
325 .collect::<Vec<Bytes>>()
326 .await;
327
328 let string_value = String::from_utf8_lossy(&vec[0]);
329
330 assert_eq!(string_value, "42\n42\n");
331
332 Ok(())
333 }
334}