1use std::cmp;
2use std::io::{Cursor, Read, Write};
3
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_error::{feature_gated, to_compute_err};
7
8use crate::utils::file::{Writeable, WriteableTrait};
9use crate::utils::sync_on_close::SyncOnCloseType;
10
11#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
13pub enum SupportedCompression {
14 GZIP,
15 ZLIB,
16 ZSTD,
17}
18
19impl SupportedCompression {
20 pub fn check(bytes: &[u8]) -> Option<Self> {
24 if bytes.len() < 4 {
25 return None;
27 }
28 match bytes[..4] {
29 [0x1f, 0x8b, _, _] => Some(Self::GZIP),
30 [0x78, 0x01, _, _] => Some(Self::ZLIB),
32 [0x78, 0x5e, _, _] => Some(Self::ZLIB),
33 [0x78, 0x9c, _, _] => Some(Self::ZLIB),
34 [0x78, 0xda, _, _] => Some(Self::ZLIB),
35 [0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
36 _ => None,
37 }
38 }
39}
40
41#[allow(clippy::ptr_arg)]
44#[deprecated(note = "may cause OOM, use CompressedReader instead")]
45pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
46 assert!(out.is_empty());
47
48 let Some(algo) = SupportedCompression::check(bytes) else {
49 return Ok(bytes);
50 };
51
52 feature_gated!("decompress", {
53 match algo {
54 SupportedCompression::GZIP => {
55 flate2::read::MultiGzDecoder::new(bytes)
56 .read_to_end(out)
57 .map_err(to_compute_err)?;
58 },
59 SupportedCompression::ZLIB => {
60 flate2::read::ZlibDecoder::new(bytes)
61 .read_to_end(out)
62 .map_err(to_compute_err)?;
63 },
64 SupportedCompression::ZSTD => {
65 zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
66 },
67 }
68
69 Ok(out)
70 })
71}
72
73pub enum CompressedReader {
78 Uncompressed {
79 slice: Buffer<u8>,
80 offset: usize,
81 },
82 #[cfg(feature = "decompress")]
83 Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
84 #[cfg(feature = "decompress")]
85 Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
86 #[cfg(feature = "decompress")]
87 Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
88}
89
90impl CompressedReader {
91 pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
92 let algo = SupportedCompression::check(&slice);
93
94 Ok(match algo {
95 None => CompressedReader::Uncompressed { slice, offset: 0 },
96 #[cfg(feature = "decompress")]
97 Some(SupportedCompression::GZIP) => {
98 CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
99 },
100 #[cfg(feature = "decompress")]
101 Some(SupportedCompression::ZLIB) => {
102 CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
103 },
104 #[cfg(feature = "decompress")]
105 Some(SupportedCompression::ZSTD) => {
106 CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
107 },
108 #[cfg(not(feature = "decompress"))]
109 _ => panic!("activate 'decompress' feature"),
110 })
111 }
112
113 pub fn is_compressed(&self) -> bool {
114 !matches!(&self, CompressedReader::Uncompressed { .. })
115 }
116
117 pub const fn initial_read_size() -> usize {
118 32 * 1024
122 }
123
124 pub const fn ideal_read_size() -> usize {
125 512 * 1024
132 }
133
134 pub fn total_len_estimate(&self) -> usize {
137 const ESTIMATED_DEFLATE_RATIO: usize = 3;
138 const ESTIMATED_ZSTD_RATIO: usize = 5;
139
140 match self {
141 CompressedReader::Uncompressed { slice, .. } => slice.len(),
142 #[cfg(feature = "decompress")]
143 CompressedReader::Gzip(reader) => {
144 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
145 },
146 #[cfg(feature = "decompress")]
147 CompressedReader::Zlib(reader) => {
148 reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
149 },
150 #[cfg(feature = "decompress")]
151 CompressedReader::Zstd(reader) => {
152 reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
153 },
154 }
155 }
156
157 pub fn read_next_slice(
172 &mut self,
173 prev_leftover: &Buffer<u8>,
174 read_size: usize,
175 ) -> std::io::Result<(Buffer<u8>, usize)> {
176 let prev_len = prev_leftover.len();
180
181 let mut buf = Vec::new();
182 if self.is_compressed() {
183 let reserve_size = cmp::min(
184 prev_len.saturating_add(read_size),
185 self.total_len_estimate().saturating_mul(2),
186 );
187 buf.reserve_exact(reserve_size);
188 buf.extend_from_slice(prev_leftover);
189 }
190
191 let new_slice_from_read =
192 |bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
193 buf.truncate(prev_len + bytes_read);
194 Ok((Buffer::from_vec(buf), bytes_read))
195 };
196
197 match self {
198 CompressedReader::Uncompressed { slice, offset, .. } => {
199 let bytes_read = cmp::min(read_size, slice.len() - *offset);
200 let new_slice = slice
201 .clone()
202 .sliced(*offset - prev_len..*offset + bytes_read);
203 *offset += bytes_read;
204 Ok((new_slice, bytes_read))
205 },
206 #[cfg(feature = "decompress")]
207 CompressedReader::Gzip(decoder) => {
208 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
209 },
210 #[cfg(feature = "decompress")]
211 CompressedReader::Zlib(decoder) => {
212 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
213 },
214 #[cfg(feature = "decompress")]
215 CompressedReader::Zstd(decoder) => {
216 new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
217 },
218 }
219 }
220}
221
222impl Read for CompressedReader {
225 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
226 match self {
227 CompressedReader::Uncompressed { slice, offset, .. } => {
228 let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
229 buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
230 *offset += bytes_read;
231 Ok(bytes_read)
232 },
233 #[cfg(feature = "decompress")]
234 CompressedReader::Gzip(decoder) => decoder.read(buf),
235 #[cfg(feature = "decompress")]
236 CompressedReader::Zlib(decoder) => decoder.read(buf),
237 #[cfg(feature = "decompress")]
238 CompressedReader::Zstd(decoder) => decoder.read(buf),
239 }
240 }
241}
242
243pub enum CompressedWriter {
245 #[cfg(feature = "decompress")]
246 Gzip(Option<flate2::write::GzEncoder<Writeable>>),
247 #[cfg(feature = "decompress")]
248 Zstd(Option<zstd::Encoder<'static, Writeable>>),
249}
250
251impl CompressedWriter {
252 pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
253 feature_gated!("decompress", {
254 Self::Gzip(Some(flate2::write::GzEncoder::new(
255 writer,
256 level.map(flate2::Compression::new).unwrap_or_default(),
257 )))
258 })
259 }
260
261 pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
262 feature_gated!("decompress", {
263 zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
264 .map(Some)
265 .map(Self::Zstd)
266 })
267 }
268}
269
270impl Write for CompressedWriter {
271 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
272 feature_gated!("decompress", {
273 match self {
274 Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
275 Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
276 }
277 })
278 }
279
280 fn flush(&mut self) -> std::io::Result<()> {
281 feature_gated!("decompress", {
282 match self {
283 Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
284 Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
285 }
286 })
287 }
288}
289
290impl WriteableTrait for CompressedWriter {
291 fn close(&mut self) -> std::io::Result<()> {
292 feature_gated!("decompress", {
293 let writer = match self {
294 Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
295 Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
296 };
297
298 writer.close(SyncOnCloseType::All)
299 })
300 }
301
302 fn sync_all(&self) -> std::io::Result<()> {
303 feature_gated!("decompress", {
304 match self {
305 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
306 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
307 }
308 })
309 }
310
311 fn sync_data(&self) -> std::io::Result<()> {
312 feature_gated!("decompress", {
313 match self {
314 Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
315 Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
316 }
317 })
318 }
319}