async_mtzip/zip/job.rs
1use async_compression::Level;
2use std::fs::{Metadata};
3use std::path::PathBuf;
4
5use cfg_if::cfg_if;
6use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
7use tokio::sync::{mpsc, oneshot};
8
9use super::{extra_field::ExtraFields, file::ZipFile};
10use crate::zip::crc32::Crc32Reader;
11use crate::{zip::file::ZipFileHeader, CompressionType};
12use crate::zip::level::CompressionLevel;
13
14pub enum ZipJobOrigin {
15 Filesystem {
16 path: PathBuf,
17 compression_level: CompressionLevel,
18 compression_type: CompressionType,
19 },
20 Directory {
21 extra_fields: ExtraFields,
22 external_attributes: u16,
23 },
24}
25
26impl<'p> std::fmt::Debug for ZipJobOrigin {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 Self::Filesystem {
30 path,
31 compression_level,
32 compression_type,
33 } => f
34 .debug_struct("Filesystem")
35 .field("path", path)
36 .field("compression_level", compression_level)
37 .field("compression_type", compression_type)
38 .finish(),
39 // Self::RawData {
40 // data,
41 // compression_level,
42 // compression_type,
43 // extra_fields,
44 // external_attributes,
45 // } => f
46 // .debug_struct("RawData")
47 // .field("data", data)
48 // .field("compression_level", compression_level)
49 // .field("compression_type", compression_type)
50 // .field("extra_fields", extra_fields)
51 // .field("external_attributes", external_attributes)
52 // .finish(),
53 Self::Directory {
54 extra_fields,
55 external_attributes,
56 } => f
57 .debug_struct("Directory")
58 .field("extra_fields", extra_fields)
59 .field("external_attributes", external_attributes)
60 .finish(),
61 // Self::Reader {
62 // reader: _,
63 // compression_level,
64 // compression_type,
65 // extra_fields,
66 // external_attributes,
67 // } => f
68 // .debug_struct("Reader")
69 // .field("compression_level", compression_level)
70 // .field("compression_type", compression_type)
71 // .field("extra_fields", extra_fields)
72 // .field("external_attributes", external_attributes)
73 // .finish_non_exhaustive(),
74 }
75 }
76}
77
78#[derive(Debug)]
79pub struct ZipJob {
80 pub data_origin: ZipJobOrigin,
81 pub archive_path: String,
82}
83
84type Responder = oneshot::Sender<ZipJob>;
85pub enum ZipCommand {
86 Get { resp: Responder },
87}
88
89impl ZipJob {
90 #[inline]
91 #[allow(unused)]
92 const fn convert_attrs(attrs: u32) -> u16 {
93 (attrs & 0xFFFF) as u16
94 }
95
96 #[inline]
97 pub(crate) fn attributes_from_fs(metadata: &Metadata) -> u16 {
98 cfg_if! {
99 if #[cfg(target_os = "windows")] {
100 use std::os::windows::fs::MetadataExt;
101 Self::convert_attrs(metadata.file_attributes())
102 } else if #[cfg(target_os = "linux")] {
103 use std::os::linux::fs::MetadataExt;
104 Self::convert_attrs(metadata.st_mode())
105 } else if #[cfg(target_os = "unix")] {
106 use std::os::unix::fs::MetadataExt;
107 Self::convert_attrs(metadata.permissions().mode())
108 } else {
109 if metadata.is_dir() {
110 0o040755
111 } else {
112 0o100644
113 }
114 }
115 }
116 }
117
118 async fn generate_file<T: AsyncRead + Unpin>(
119 source: T,
120 uncompressed_size: Option<u32>,
121 archive_path: String,
122 attributes: u16,
123 compression_level: CompressionLevel,
124 compression_type: CompressionType,
125 extra_fields: ExtraFields,
126 process: Option<tokio::sync::mpsc::Sender<u64>>,
127 ) -> std::io::Result<ZipFile> {
128 let mut crc32_reader = Crc32Reader::new(source);
129 let uncompressed_size = uncompressed_size.unwrap_or(0) as usize;
130 let mut data = match compression_type {
131 CompressionType::Deflate => {
132 let mut encoder = async_compression::tokio::write::DeflateEncoder::with_quality(
133 Vec::with_capacity(uncompressed_size),
134 Level::Precise(compression_level.get() as i32),
135 );
136 let mut buf_reader = BufReader::with_capacity(1024 * 1024, &mut crc32_reader);
137 loop {
138 let bytes = buf_reader.fill_buf().await?;
139 if bytes.is_empty() {
140 break;
141 }
142 let len = bytes.len();
143 encoder.write_all(bytes).await?;
144 buf_reader.consume(len);
145 if let Some(process) = &process {
146 process
147 .send(len as u64)
148 .await
149 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
150 }
151 }
152 encoder.shutdown().await?;
153 let bytes = encoder.into_inner();
154 bytes
155 }
156 CompressionType::Stored => Vec::new(),
157 };
158 debug_assert!(uncompressed_size <= u32::MAX as usize);
159 let uncompressed_size = uncompressed_size as u32;
160 data.shrink_to_fit();
161 let crc = crc32_reader.sum();
162 Ok(ZipFile {
163 header: ZipFileHeader {
164 compression_type: CompressionType::Deflate,
165 crc,
166 uncompressed_size,
167 filename: archive_path,
168 external_file_attributes: (attributes as u32) << 16,
169 extra_fields,
170 },
171 data,
172 })
173 }
174
175 // fn gen_file<R: Read>(
176 // source: R,
177 // uncompressed_size: Option<u32>,
178 // archive_path: String,
179 // attributes: u16,
180 // compression_level: CompressionLevel,
181 // compression_type: CompressionType,
182 // extra_fields: ExtraFields,
183 // // zip_listener: Arc<Mutex<P>>,
184 // ) -> std::io::Result<ZipFile> {
185 // println!("3");
186 // let mut crc_reader = CrcReader::new(source);
187 // let mut data = Vec::with_capacity(uncompressed_size.unwrap_or(0) as usize);
188 // let uncompressed_size = match compression_type {
189 // CompressionType::Deflate => {
190 // let mut encoder = DeflateEncoder::new(&mut crc_reader, compression_level.into());
191 //
192 // let buffer = &mut [0u8; 1024 * 1024 * 4];
193 // let mut zip_bytes = 0;
194 // loop {
195 // let read = encoder.read(buffer)?;
196 // if read == 0 {
197 // break;
198 // }
199 // zip_bytes += read as u64;
200 // data.extend_from_slice(&buffer[..read]);
201 // // let listener = zip_listener.lock().unwrap();
202 // // listener(zip_bytes, 0);
203 // }
204 // encoder.total_in() as usize
205 // // encoder.read_to_end(&mut data)?;
206 // // encoder.total_in() as usize
207 // }
208 // CompressionType::Stored => {
209 // let buffer = &mut [0u8; 1024 * 1024 * 4];
210 // let mut zip_bytes = 0;
211 // loop {
212 // let read = crc_reader.read(buffer)?;
213 // if read == 0 {
214 // break;
215 // }
216 // zip_bytes += read as u64;
217 // data.extend_from_slice(&buffer[..read]);
218 // // let listener = zip_listener.lock().unwrap();
219 // // listener(zip_bytes, 0);
220 // }
221 // zip_bytes as usize
222 // // crc_reader.read_to_end(&mut data)?
223 // }
224 // };
225 // debug_assert!(uncompressed_size <= u32::MAX as usize);
226 // let uncompressed_size = uncompressed_size as u32;
227 // data.shrink_to_fit();
228 // let crc = crc_reader.crc().sum();
229 // Ok(ZipFile {
230 // header: ZipFileHeader {
231 // compression_type: CompressionType::Deflate,
232 // crc,
233 // uncompressed_size,
234 // filename: archive_path,
235 // external_file_attributes: (attributes as u32) << 16,
236 // extra_fields,
237 // },
238 // data,
239 // })
240 // }
241
242 pub async fn into_file(self, process: Option<mpsc::Sender<u64>>) -> std::io::Result<ZipFile> {
243 match self.data_origin {
244 ZipJobOrigin::Directory {
245 extra_fields,
246 external_attributes,
247 } => Ok(ZipFile::directory(
248 self.archive_path,
249 extra_fields,
250 external_attributes,
251 )),
252 ZipJobOrigin::Filesystem {
253 path,
254 compression_level,
255 compression_type,
256 } => {
257 let file = tokio::fs::File::open(&path).await?;
258 let file_metadata = tokio::fs::metadata(path).await?;
259 let uncompressed_size = file_metadata.len();
260 debug_assert!(uncompressed_size <= u32::MAX.into());
261 let uncompressed_size = uncompressed_size as u32;
262 let external_file_attributes = Self::attributes_from_fs(&file_metadata);
263 let extra_fields = ExtraFields::new_from_fs(&file_metadata);
264 Self::generate_file(
265 file,
266 Some(uncompressed_size),
267 self.archive_path,
268 external_file_attributes,
269 compression_level,
270 compression_type,
271 extra_fields,
272 process,
273 )
274 .await
275 } // ZipJobOrigin::RawData {
276 // data,
277 // compression_level,
278 // compression_type,
279 // extra_fields,
280 // external_attributes,
281 // } => {
282 // let uncompressed_size = data.len();
283 // debug_assert!(uncompressed_size <= u32::MAX as usize);
284 // let uncompressed_size = uncompressed_size as u32;
285 // Self::gen_file_with_tokio(
286 // data.as_ref(),
287 // Some(uncompressed_size),
288 // self.archive_path,
289 // external_attributes,
290 // compression_level,
291 // compression_type,
292 // extra_fields,
293 // zip_listener,
294 // )
295 // }
296 // ZipJobOrigin::Reader {
297 // reader,
298 // compression_level,
299 // compression_type,
300 // extra_fields,
301 // external_attributes,
302 // } => Self::gen_file(
303 // reader,
304 // None,
305 // self.archive_path,
306 // external_attributes,
307 // compression_level,
308 // compression_type,
309 // extra_fields,
310 // zip_listener,
311 // ),
312 }
313 }
314
315 // pub fn into_file<P: Fn(u64, u64)>(
316 // self,
317 // zip_listener: Arc<Mutex<P>>,
318 // ) -> std::io::Result<ZipFile> {
319 // match self.data_origin {
320 // ZipJobOrigin::Directory {
321 // extra_fields,
322 // external_attributes,
323 // } => Ok(ZipFile::directory(
324 // self.archive_path,
325 // extra_fields,
326 // external_attributes,
327 // )),
328 // ZipJobOrigin::Filesystem {
329 // path,
330 // compression_level,
331 // compression_type,
332 // } => {
333 // let file = File::open(path).unwrap();
334 // let file_metadata = file.metadata().unwrap();
335 // let uncompressed_size = file_metadata.len();
336 // debug_assert!(uncompressed_size <= u32::MAX.into());
337 // let uncompressed_size = uncompressed_size as u32;
338 // let external_file_attributes = Self::attributes_from_fs(&file_metadata);
339 // let extra_fields = ExtraFields::new_from_fs(&file_metadata);
340 // Self::gen_file(
341 // file,
342 // Some(uncompressed_size),
343 // self.archive_path,
344 // external_file_attributes,
345 // compression_level,
346 // compression_type,
347 // extra_fields,
348 // // zip_listener,
349 // )
350 // } // ZipJobOrigin::RawData {
351 // // data,
352 // // compression_level,
353 // // compression_type,
354 // // extra_fields,
355 // // external_attributes,
356 // // } => {
357 // // let uncompressed_size = data.len();
358 // // debug_assert!(uncompressed_size <= u32::MAX as usize);
359 // // let uncompressed_size = uncompressed_size as u32;
360 // // Self::gen_file(
361 // // data.as_ref(),
362 // // Some(uncompressed_size),
363 // // self.archive_path,
364 // // external_attributes,
365 // // compression_level,
366 // // compression_type,
367 // // extra_fields,
368 // // zip_listener,
369 // // )
370 // // }
371 // // ZipJobOrigin::Reader {
372 // // reader,
373 // // compression_level,
374 // // compression_type,
375 // // extra_fields,
376 // // external_attributes,
377 // // } => Self::gen_file(
378 // // reader,
379 // // None,
380 // // self.archive_path,
381 // // external_attributes,
382 // // compression_level,
383 // // compression_type,
384 // // extra_fields,
385 // // zip_listener,
386 // // ),
387 // }
388 // }
389}