async_mtzip/zip/
job.rs

1use async_compression::Level;
2use std::fs::{Metadata};
3use std::path::PathBuf;
4
5use cfg_if::cfg_if;
6use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
7use tokio::sync::{mpsc, oneshot};
8
9use super::{extra_field::ExtraFields, file::ZipFile};
10use crate::zip::crc32::Crc32Reader;
11use crate::{zip::file::ZipFileHeader, CompressionType};
12use crate::zip::level::CompressionLevel;
13
14pub enum ZipJobOrigin {
15    Filesystem {
16        path: PathBuf,
17        compression_level: CompressionLevel,
18        compression_type: CompressionType,
19    },
20    Directory {
21        extra_fields: ExtraFields,
22        external_attributes: u16,
23    },
24}
25
26impl<'p> std::fmt::Debug for ZipJobOrigin {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            Self::Filesystem {
30                path,
31                compression_level,
32                compression_type,
33            } => f
34                .debug_struct("Filesystem")
35                .field("path", path)
36                .field("compression_level", compression_level)
37                .field("compression_type", compression_type)
38                .finish(),
39            // Self::RawData {
40            //     data,
41            //     compression_level,
42            //     compression_type,
43            //     extra_fields,
44            //     external_attributes,
45            // } => f
46            //     .debug_struct("RawData")
47            //     .field("data", data)
48            //     .field("compression_level", compression_level)
49            //     .field("compression_type", compression_type)
50            //     .field("extra_fields", extra_fields)
51            //     .field("external_attributes", external_attributes)
52            //     .finish(),
53            Self::Directory {
54                extra_fields,
55                external_attributes,
56            } => f
57                .debug_struct("Directory")
58                .field("extra_fields", extra_fields)
59                .field("external_attributes", external_attributes)
60                .finish(),
61            // Self::Reader {
62            //     reader: _,
63            //     compression_level,
64            //     compression_type,
65            //     extra_fields,
66            //     external_attributes,
67            // } => f
68            //     .debug_struct("Reader")
69            //     .field("compression_level", compression_level)
70            //     .field("compression_type", compression_type)
71            //     .field("extra_fields", extra_fields)
72            //     .field("external_attributes", external_attributes)
73            //     .finish_non_exhaustive(),
74        }
75    }
76}
77
78#[derive(Debug)]
79pub struct ZipJob {
80    pub data_origin: ZipJobOrigin,
81    pub archive_path: String,
82}
83
84type Responder = oneshot::Sender<ZipJob>;
85pub enum ZipCommand {
86    Get { resp: Responder },
87}
88
89impl ZipJob {
90    #[inline]
91    #[allow(unused)]
92    const fn convert_attrs(attrs: u32) -> u16 {
93        (attrs & 0xFFFF) as u16
94    }
95
96    #[inline]
97    pub(crate) fn attributes_from_fs(metadata: &Metadata) -> u16 {
98        cfg_if! {
99            if #[cfg(target_os = "windows")] {
100                use std::os::windows::fs::MetadataExt;
101                Self::convert_attrs(metadata.file_attributes())
102            } else if #[cfg(target_os = "linux")] {
103                use std::os::linux::fs::MetadataExt;
104                Self::convert_attrs(metadata.st_mode())
105            } else if #[cfg(target_os = "unix")] {
106                use std::os::unix::fs::MetadataExt;
107                Self::convert_attrs(metadata.permissions().mode())
108            } else {
109                if metadata.is_dir() {
110                    0o040755
111                } else {
112                    0o100644
113                }
114            }
115        }
116    }
117
118    async fn generate_file<T: AsyncRead + Unpin>(
119        source: T,
120        uncompressed_size: Option<u32>,
121        archive_path: String,
122        attributes: u16,
123        compression_level: CompressionLevel,
124        compression_type: CompressionType,
125        extra_fields: ExtraFields,
126        process: Option<tokio::sync::mpsc::Sender<u64>>,
127    ) -> std::io::Result<ZipFile> {
128        let mut crc32_reader = Crc32Reader::new(source);
129        let uncompressed_size = uncompressed_size.unwrap_or(0) as usize;
130        let mut data = match compression_type {
131            CompressionType::Deflate => {
132                let mut encoder = async_compression::tokio::write::DeflateEncoder::with_quality(
133                    Vec::with_capacity(uncompressed_size),
134                    Level::Precise(compression_level.get() as i32),
135                );
136                let mut buf_reader = BufReader::with_capacity(1024 * 1024, &mut crc32_reader);
137                loop {
138                    let bytes = buf_reader.fill_buf().await?;
139                    if bytes.is_empty() {
140                        break;
141                    }
142                    let len = bytes.len();
143                    encoder.write_all(bytes).await?;
144                    buf_reader.consume(len);
145                    if let Some(process) = &process {
146                        process
147                            .send(len as u64)
148                            .await
149                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
150                    }
151                }
152                encoder.shutdown().await?;
153                let bytes = encoder.into_inner();
154                bytes
155            }
156            CompressionType::Stored => Vec::new(),
157        };
158        debug_assert!(uncompressed_size <= u32::MAX as usize);
159        let uncompressed_size = uncompressed_size as u32;
160        data.shrink_to_fit();
161        let crc = crc32_reader.sum();
162        Ok(ZipFile {
163            header: ZipFileHeader {
164                compression_type: CompressionType::Deflate,
165                crc,
166                uncompressed_size,
167                filename: archive_path,
168                external_file_attributes: (attributes as u32) << 16,
169                extra_fields,
170            },
171            data,
172        })
173    }
174
175    // fn gen_file<R: Read>(
176    //     source: R,
177    //     uncompressed_size: Option<u32>,
178    //     archive_path: String,
179    //     attributes: u16,
180    //     compression_level: CompressionLevel,
181    //     compression_type: CompressionType,
182    //     extra_fields: ExtraFields,
183    //     // zip_listener: Arc<Mutex<P>>,
184    // ) -> std::io::Result<ZipFile> {
185    //     println!("3");
186    //     let mut crc_reader = CrcReader::new(source);
187    //     let mut data = Vec::with_capacity(uncompressed_size.unwrap_or(0) as usize);
188    //     let uncompressed_size = match compression_type {
189    //         CompressionType::Deflate => {
190    //             let mut encoder = DeflateEncoder::new(&mut crc_reader, compression_level.into());
191    //
192    //             let buffer = &mut [0u8; 1024 * 1024 * 4];
193    //             let mut zip_bytes = 0;
194    //             loop {
195    //                 let read = encoder.read(buffer)?;
196    //                 if read == 0 {
197    //                     break;
198    //                 }
199    //                 zip_bytes += read as u64;
200    //                 data.extend_from_slice(&buffer[..read]);
201    //                 // let listener = zip_listener.lock().unwrap();
202    //                 // listener(zip_bytes, 0);
203    //             }
204    //             encoder.total_in() as usize
205    //             // encoder.read_to_end(&mut data)?;
206    //             // encoder.total_in() as usize
207    //         }
208    //         CompressionType::Stored => {
209    //             let buffer = &mut [0u8; 1024 * 1024 * 4];
210    //             let mut zip_bytes = 0;
211    //             loop {
212    //                 let read = crc_reader.read(buffer)?;
213    //                 if read == 0 {
214    //                     break;
215    //                 }
216    //                 zip_bytes += read as u64;
217    //                 data.extend_from_slice(&buffer[..read]);
218    //                 // let listener = zip_listener.lock().unwrap();
219    //                 // listener(zip_bytes, 0);
220    //             }
221    //             zip_bytes as usize
222    //             // crc_reader.read_to_end(&mut data)?
223    //         }
224    //     };
225    //     debug_assert!(uncompressed_size <= u32::MAX as usize);
226    //     let uncompressed_size = uncompressed_size as u32;
227    //     data.shrink_to_fit();
228    //     let crc = crc_reader.crc().sum();
229    //     Ok(ZipFile {
230    //         header: ZipFileHeader {
231    //             compression_type: CompressionType::Deflate,
232    //             crc,
233    //             uncompressed_size,
234    //             filename: archive_path,
235    //             external_file_attributes: (attributes as u32) << 16,
236    //             extra_fields,
237    //         },
238    //         data,
239    //     })
240    // }
241
242    pub async fn into_file(self, process: Option<mpsc::Sender<u64>>) -> std::io::Result<ZipFile> {
243        match self.data_origin {
244            ZipJobOrigin::Directory {
245                extra_fields,
246                external_attributes,
247            } => Ok(ZipFile::directory(
248                self.archive_path,
249                extra_fields,
250                external_attributes,
251            )),
252            ZipJobOrigin::Filesystem {
253                path,
254                compression_level,
255                compression_type,
256            } => {
257                let file = tokio::fs::File::open(&path).await?;
258                let file_metadata = tokio::fs::metadata(path).await?;
259                let uncompressed_size = file_metadata.len();
260                debug_assert!(uncompressed_size <= u32::MAX.into());
261                let uncompressed_size = uncompressed_size as u32;
262                let external_file_attributes = Self::attributes_from_fs(&file_metadata);
263                let extra_fields = ExtraFields::new_from_fs(&file_metadata);
264                Self::generate_file(
265                    file,
266                    Some(uncompressed_size),
267                    self.archive_path,
268                    external_file_attributes,
269                    compression_level,
270                    compression_type,
271                    extra_fields,
272                    process,
273                )
274                .await
275            } // ZipJobOrigin::RawData {
276              //     data,
277              //     compression_level,
278              //     compression_type,
279              //     extra_fields,
280              //     external_attributes,
281              // } => {
282              //     let uncompressed_size = data.len();
283              //     debug_assert!(uncompressed_size <= u32::MAX as usize);
284              //     let uncompressed_size = uncompressed_size as u32;
285              //     Self::gen_file_with_tokio(
286              //         data.as_ref(),
287              //         Some(uncompressed_size),
288              //         self.archive_path,
289              //         external_attributes,
290              //         compression_level,
291              //         compression_type,
292              //         extra_fields,
293              //         zip_listener,
294              //     )
295              // }
296              // ZipJobOrigin::Reader {
297              //     reader,
298              //     compression_level,
299              //     compression_type,
300              //     extra_fields,
301              //     external_attributes,
302              // } => Self::gen_file(
303              //     reader,
304              //     None,
305              //     self.archive_path,
306              //     external_attributes,
307              //     compression_level,
308              //     compression_type,
309              //     extra_fields,
310              //     zip_listener,
311              // ),
312        }
313    }
314
315    // pub fn into_file<P: Fn(u64, u64)>(
316    //     self,
317    //     zip_listener: Arc<Mutex<P>>,
318    // ) -> std::io::Result<ZipFile> {
319    //     match self.data_origin {
320    //         ZipJobOrigin::Directory {
321    //             extra_fields,
322    //             external_attributes,
323    //         } => Ok(ZipFile::directory(
324    //             self.archive_path,
325    //             extra_fields,
326    //             external_attributes,
327    //         )),
328    //         ZipJobOrigin::Filesystem {
329    //             path,
330    //             compression_level,
331    //             compression_type,
332    //         } => {
333    //             let file = File::open(path).unwrap();
334    //             let file_metadata = file.metadata().unwrap();
335    //             let uncompressed_size = file_metadata.len();
336    //             debug_assert!(uncompressed_size <= u32::MAX.into());
337    //             let uncompressed_size = uncompressed_size as u32;
338    //             let external_file_attributes = Self::attributes_from_fs(&file_metadata);
339    //             let extra_fields = ExtraFields::new_from_fs(&file_metadata);
340    //             Self::gen_file(
341    //                 file,
342    //                 Some(uncompressed_size),
343    //                 self.archive_path,
344    //                 external_file_attributes,
345    //                 compression_level,
346    //                 compression_type,
347    //                 extra_fields,
348    //                 // zip_listener,
349    //             )
350    //         } // ZipJobOrigin::RawData {
351    //           //     data,
352    //           //     compression_level,
353    //           //     compression_type,
354    //           //     extra_fields,
355    //           //     external_attributes,
356    //           // } => {
357    //           //     let uncompressed_size = data.len();
358    //           //     debug_assert!(uncompressed_size <= u32::MAX as usize);
359    //           //     let uncompressed_size = uncompressed_size as u32;
360    //           //     Self::gen_file(
361    //           //         data.as_ref(),
362    //           //         Some(uncompressed_size),
363    //           //         self.archive_path,
364    //           //         external_attributes,
365    //           //         compression_level,
366    //           //         compression_type,
367    //           //         extra_fields,
368    //           //         zip_listener,
369    //           //     )
370    //           // }
371    //           // ZipJobOrigin::Reader {
372    //           //     reader,
373    //           //     compression_level,
374    //           //     compression_type,
375    //           //     extra_fields,
376    //           //     external_attributes,
377    //           // } => Self::gen_file(
378    //           //     reader,
379    //           //     None,
380    //           //     self.archive_path,
381    //           //     external_attributes,
382    //           //     compression_level,
383    //           //     compression_type,
384    //           //     extra_fields,
385    //           //     zip_listener,
386    //           // ),
387    //     }
388    // }
389}