1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#[cfg(feature = "compression")]
mod inner {
use std::convert::Infallible;
use std::fmt::Display;
use std::fs::File;
use std::io::{
copy,
Seek,
SeekFrom,
Write,
};
use std::str::FromStr;
// Added copy, Seek,
// SeekFrom
use polars::io::mmap::MmapBytesReader;
use tempfile::tempfile; // Added tempfile
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Compression {
None,
Gz,
Zstd,
Lz4,
Xz2,
Bzip2,
Zip,
}
impl FromStr for Compression {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" => Ok(Compression::None),
"gz" => Ok(Compression::Gz),
"zstd" => Ok(Compression::Zstd),
"lz4" => Ok(Compression::Lz4),
"xz2" => Ok(Compression::Xz2),
"bzip2" => Ok(Compression::Bzip2),
"zip" => Ok(Compression::Zip),
_ => unimplemented!(),
}
}
}
impl Display for Compression {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
let str = match self {
Compression::None => String::from("none"),
Compression::Gz => String::from("gz"),
Compression::Zstd => String::from("zstd"),
Compression::Lz4 => String::from("lz4"),
Compression::Xz2 => String::from("xz2"),
Compression::Bzip2 => String::from("bzip2"),
Compression::Zip => String::from("zip"),
};
write!(f, "{}", str)
}
}
impl Compression {
/// Returns the name of the compression algorithm.
pub fn name(&self) -> &str {
match self {
Compression::None => "none",
Compression::Gz => "gzip",
Compression::Zstd => "zstd",
Compression::Lz4 => "lz4",
Compression::Xz2 => "xz2",
Compression::Bzip2 => "bzip2",
Compression::Zip => "zip",
}
}
/// Returns a `MmapBytesReader` wrapped in a decompressor.
pub fn get_decoder(
&self,
handle: File, // Added mut
) -> anyhow::Result<Box<dyn MmapBytesReader>> {
// Changed return type
let mut temp_file = tempfile()?; // Create a temp file
match self {
Compression::Gz => {
let mut decoder = flate2::read::GzDecoder::new(handle);
copy(&mut decoder, &mut temp_file)?;
},
Compression::Zstd => {
// zstd::Decoder::new itself returns a Result
let mut decoder = zstd::Decoder::new(handle)?;
copy(&mut decoder, &mut temp_file)?;
},
Compression::Lz4 => {
// lz4::Decoder::new itself returns a Result
let mut decoder = lz4::Decoder::new(handle)?;
copy(&mut decoder, &mut temp_file)?;
},
Compression::Xz2 => {
let mut decoder = xz2::read::XzDecoder::new(handle);
copy(&mut decoder, &mut temp_file)?;
},
Compression::Bzip2 => {
let mut decoder = bzip2::read::BzDecoder::new(handle);
copy(&mut decoder, &mut temp_file)?;
},
Compression::Zip => {
// zip::ZipArchive::new itself returns a Result
let mut archive = zip::ZipArchive::new(handle)?;
if !archive.is_empty() {
// Extract the first file
let mut file_in_zip = archive.by_index(0)?;
copy(&mut file_in_zip, &mut temp_file)?;
}
else {
// Handle empty zip file - return empty temp file
}
},
Compression::None => {
// Directly copy if no compression
return Ok(Box::new(handle)); // Return the original handle
},
}
// Rewind the temporary file to the beginning before returning
temp_file.seek(SeekFrom::Start(0))?;
Ok(Box::new(temp_file)) // Return the temp file handle
}
/// Returns a `Write` wrapped in a compressor.
pub fn get_encoder<W: Write + Seek + 'static>(
&self,
handle: W,
compression_level: u32,
) -> anyhow::Result<Box<dyn Write>> {
let encoder: Box<dyn Write> = match self {
Compression::Gz => {
Box::new(flate2::write::GzEncoder::new(
handle,
flate2::Compression::new(compression_level),
))
},
Compression::Zstd => {
Box::new(zstd::Encoder::new(handle, compression_level as i32)?)
},
Compression::Lz4 => {
let encoder = lz4::EncoderBuilder::new()
.level(compression_level)
.build(handle)?;
Box::new(encoder)
},
Compression::Xz2 => {
Box::new(xz2::write::XzEncoder::new(handle, compression_level))
},
Compression::Bzip2 => {
Box::new(bzip2::write::BzEncoder::new(
handle,
bzip2::Compression::new(compression_level),
))
},
Compression::Zip => Box::new(zip::write::ZipWriter::new(handle)),
Compression::None => Box::new(handle),
};
Ok(encoder)
}
}
}
#[cfg(feature = "compression")]
pub use inner::*;