datafusion_datasource/
file_compression_type.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! File Compression type abstraction
19
20use std::str::FromStr;
21
22use datafusion_common::error::{DataFusionError, Result};
23
24use datafusion_common::parsers::CompressionTypeVariant::{self, *};
25use datafusion_common::GetExt;
26
27#[cfg(feature = "compression")]
28use async_compression::tokio::bufread::{
29    BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
30    GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
31    XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
32    ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
33};
34
35#[cfg(feature = "compression")]
36use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
37use bytes::Bytes;
38#[cfg(feature = "compression")]
39use bzip2::read::MultiBzDecoder;
40#[cfg(feature = "compression")]
41use flate2::read::MultiGzDecoder;
42use futures::stream::BoxStream;
43use futures::StreamExt;
44#[cfg(feature = "compression")]
45use futures::TryStreamExt;
46use object_store::buffered::BufWriter;
47use tokio::io::AsyncWrite;
48#[cfg(feature = "compression")]
49use tokio_util::io::{ReaderStream, StreamReader};
50#[cfg(feature = "compression")]
51use xz2::read::XzDecoder;
52#[cfg(feature = "compression")]
53use zstd::Decoder as ZstdDecoder;
54
55/// Readable file compression type
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct FileCompressionType {
58    variant: CompressionTypeVariant,
59}
60
61impl GetExt for FileCompressionType {
62    fn get_ext(&self) -> String {
63        match self.variant {
64            GZIP => ".gz".to_owned(),
65            BZIP2 => ".bz2".to_owned(),
66            XZ => ".xz".to_owned(),
67            ZSTD => ".zst".to_owned(),
68            UNCOMPRESSED => "".to_owned(),
69        }
70    }
71}
72
73impl From<CompressionTypeVariant> for FileCompressionType {
74    fn from(t: CompressionTypeVariant) -> Self {
75        Self { variant: t }
76    }
77}
78
79impl From<FileCompressionType> for CompressionTypeVariant {
80    fn from(t: FileCompressionType) -> Self {
81        t.variant
82    }
83}
84
85impl FromStr for FileCompressionType {
86    type Err = DataFusionError;
87
88    fn from_str(s: &str) -> Result<Self> {
89        let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
90            DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}"))
91        })?;
92        Ok(Self { variant })
93    }
94}
95
96/// `FileCompressionType` implementation
97impl FileCompressionType {
98    /// Gzip-ed file
99    pub const GZIP: Self = Self { variant: GZIP };
100
101    /// Bzip2-ed file
102    pub const BZIP2: Self = Self { variant: BZIP2 };
103
104    /// Xz-ed file (liblzma)
105    pub const XZ: Self = Self { variant: XZ };
106
107    /// Zstd-ed file
108    pub const ZSTD: Self = Self { variant: ZSTD };
109
110    /// Uncompressed file
111    pub const UNCOMPRESSED: Self = Self {
112        variant: UNCOMPRESSED,
113    };
114
115    /// Read only access to self.variant
116    pub fn get_variant(&self) -> &CompressionTypeVariant {
117        &self.variant
118    }
119
120    /// The file is compressed or not
121    pub const fn is_compressed(&self) -> bool {
122        self.variant.is_compressed()
123    }
124
125    /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`.
126    pub fn convert_to_compress_stream<'a>(
127        &self,
128        s: BoxStream<'a, Result<Bytes>>,
129    ) -> Result<BoxStream<'a, Result<Bytes>>> {
130        Ok(match self.variant {
131            #[cfg(feature = "compression")]
132            GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
133                .map_err(DataFusionError::from)
134                .boxed(),
135            #[cfg(feature = "compression")]
136            BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
137                .map_err(DataFusionError::from)
138                .boxed(),
139            #[cfg(feature = "compression")]
140            XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
141                .map_err(DataFusionError::from)
142                .boxed(),
143            #[cfg(feature = "compression")]
144            ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
145                .map_err(DataFusionError::from)
146                .boxed(),
147            #[cfg(not(feature = "compression"))]
148            GZIP | BZIP2 | XZ | ZSTD => {
149                return Err(DataFusionError::NotImplemented(
150                    "Compression feature is not enabled".to_owned(),
151                ))
152            }
153            UNCOMPRESSED => s.boxed(),
154        })
155    }
156
157    /// Wrap the given `BufWriter` so that it performs compressed writes
158    /// according to this `FileCompressionType`.
159    pub fn convert_async_writer(
160        &self,
161        w: BufWriter,
162    ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
163        Ok(match self.variant {
164            #[cfg(feature = "compression")]
165            GZIP => Box::new(GzipEncoder::new(w)),
166            #[cfg(feature = "compression")]
167            BZIP2 => Box::new(BzEncoder::new(w)),
168            #[cfg(feature = "compression")]
169            XZ => Box::new(XzEncoder::new(w)),
170            #[cfg(feature = "compression")]
171            ZSTD => Box::new(ZstdEncoder::new(w)),
172            #[cfg(not(feature = "compression"))]
173            GZIP | BZIP2 | XZ | ZSTD => {
174                return Err(DataFusionError::NotImplemented(
175                    "Compression feature is not enabled".to_owned(),
176                ))
177            }
178            UNCOMPRESSED => Box::new(w),
179        })
180    }
181
182    /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
183    pub fn convert_stream<'a>(
184        &self,
185        s: BoxStream<'a, Result<Bytes>>,
186    ) -> Result<BoxStream<'a, Result<Bytes>>> {
187        Ok(match self.variant {
188            #[cfg(feature = "compression")]
189            GZIP => {
190                let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
191                decoder.multiple_members(true);
192
193                ReaderStream::new(decoder)
194                    .map_err(DataFusionError::from)
195                    .boxed()
196            }
197            #[cfg(feature = "compression")]
198            BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
199                .map_err(DataFusionError::from)
200                .boxed(),
201            #[cfg(feature = "compression")]
202            XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
203                .map_err(DataFusionError::from)
204                .boxed(),
205            #[cfg(feature = "compression")]
206            ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
207                .map_err(DataFusionError::from)
208                .boxed(),
209            #[cfg(not(feature = "compression"))]
210            GZIP | BZIP2 | XZ | ZSTD => {
211                return Err(DataFusionError::NotImplemented(
212                    "Compression feature is not enabled".to_owned(),
213                ))
214            }
215            UNCOMPRESSED => s.boxed(),
216        })
217    }
218
219    /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`.
220    pub fn convert_read<T: std::io::Read + Send + 'static>(
221        &self,
222        r: T,
223    ) -> Result<Box<dyn std::io::Read + Send>> {
224        Ok(match self.variant {
225            #[cfg(feature = "compression")]
226            GZIP => Box::new(MultiGzDecoder::new(r)),
227            #[cfg(feature = "compression")]
228            BZIP2 => Box::new(MultiBzDecoder::new(r)),
229            #[cfg(feature = "compression")]
230            XZ => Box::new(XzDecoder::new_multi_decoder(r)),
231            #[cfg(feature = "compression")]
232            ZSTD => match ZstdDecoder::new(r) {
233                Ok(decoder) => Box::new(decoder),
234                Err(e) => return Err(DataFusionError::External(Box::new(e))),
235            },
236            #[cfg(not(feature = "compression"))]
237            GZIP | BZIP2 | XZ | ZSTD => {
238                return Err(DataFusionError::NotImplemented(
239                    "Compression feature is not enabled".to_owned(),
240                ))
241            }
242            UNCOMPRESSED => Box::new(r),
243        })
244    }
245}
246
247/// Trait for extending the functionality of the `FileType` enum.
248pub trait FileTypeExt {
249    /// Given a `FileCompressionType`, return the `FileType`'s extension with compression suffix
250    fn get_ext_with_compression(&self, c: FileCompressionType) -> Result<String>;
251}
252
253#[cfg(test)]
254mod tests {
255    use std::str::FromStr;
256
257    use super::FileCompressionType;
258    use datafusion_common::error::DataFusionError;
259
260    use bytes::Bytes;
261    use futures::StreamExt;
262
263    #[test]
264    fn from_str() {
265        for (ext, compression_type) in [
266            ("gz", FileCompressionType::GZIP),
267            ("GZ", FileCompressionType::GZIP),
268            ("gzip", FileCompressionType::GZIP),
269            ("GZIP", FileCompressionType::GZIP),
270            ("xz", FileCompressionType::XZ),
271            ("XZ", FileCompressionType::XZ),
272            ("bz2", FileCompressionType::BZIP2),
273            ("BZ2", FileCompressionType::BZIP2),
274            ("bzip2", FileCompressionType::BZIP2),
275            ("BZIP2", FileCompressionType::BZIP2),
276            ("zst", FileCompressionType::ZSTD),
277            ("ZST", FileCompressionType::ZSTD),
278            ("zstd", FileCompressionType::ZSTD),
279            ("ZSTD", FileCompressionType::ZSTD),
280            ("", FileCompressionType::UNCOMPRESSED),
281        ] {
282            assert_eq!(
283                FileCompressionType::from_str(ext).unwrap(),
284                compression_type
285            );
286        }
287
288        assert!(matches!(
289            FileCompressionType::from_str("Unknown"),
290            Err(DataFusionError::NotImplemented(_))
291        ));
292    }
293
294    #[tokio::test]
295    async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
296        // As described in https://samtools.github.io/hts-specs/SAMv1.pdf ("The BGZF compression format")
297
298        // Ignore rust formatting so the byte array is easier to read
299        #[rustfmt::skip]
300        let data = [
301            // Block header
302            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
303            0x02, 0x00,
304            // Block 0, literal: 42
305            0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
306            // Block header
307            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
308            0x02, 0x00,
309            // Block 1, literal: 42
310            0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
311            // EOF
312            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
313            0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
314        ];
315
316        // Create a byte stream
317        let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
318            Bytes::from(data.to_vec()),
319        )]);
320        let converted_stream =
321            FileCompressionType::GZIP.convert_stream(stream.boxed())?;
322
323        let vec = converted_stream
324            .map(|r| r.unwrap())
325            .collect::<Vec<Bytes>>()
326            .await;
327
328        let string_value = String::from_utf8_lossy(&vec[0]);
329
330        assert_eq!(string_value, "42\n42\n");
331
332        Ok(())
333    }
334}