async_mtzip/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::path::PathBuf;
4use std::{
5    num::NonZeroUsize,
6};
7
8use tokio::io::{AsyncSeek, AsyncWrite};
9use tokio::sync::{mpsc, oneshot};
10mod zip;
11use zip::{
12    data::ZipData,
13    extra_field::ExtraFields,
14    file::ZipFile,
15    job::{ZipJob, ZipJobOrigin},
16};
17
18
19use crate::zip::job::ZipCommand;
20use crate::zip::level::CompressionLevel;
21pub use zip::extra_field;
22pub use zip::level;
23pub use zip::job;
24pub use zip::file::dirs;
25
26#[repr(u16)]
27#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
28pub enum CompressionType {
29    Stored = 0,
30    #[default]
31    Deflate = 8,
32}
33
34#[derive(Debug, Default)]
35pub struct ZipArchive {
36    pub jobs_queue: Vec<ZipJob>,
37    data: ZipData,
38}
39
40impl ZipArchive {
41    fn push_job(&mut self, job: ZipJob) {
42        self.jobs_queue.push(job);
43    }
44
45    #[inline]
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    pub fn add_file_from_fs(
51        &mut self,
52        fs_path: PathBuf,
53        archived_path: String,
54        compression_level: Option<CompressionLevel>,
55        compression_type: Option<CompressionType>,
56    ) -> ZipJob {
57        // let job = ZipJob {
58        //     data_origin: ZipJobOrigin::Filesystem {
59        //         path: fs_path.clone(),
60        //         compression_level: compression_level.unwrap_or(CompressionLevel::best()),
61        //         compression_type: compression_type.unwrap_or(CompressionType::Deflate),
62        //     },
63        //     archive_path: archived_path.clone(),
64        // };
65        // self.push_job(job);
66        ZipJob {
67            data_origin: ZipJobOrigin::Filesystem {
68                path: fs_path,
69                compression_level: compression_level.unwrap_or(CompressionLevel::best()),
70                compression_type: compression_type.unwrap_or(CompressionType::Deflate),
71            },
72            archive_path: archived_path,
73        }
74    }
75
76    pub async fn add_file(
77        &mut self,
78        fs_path: PathBuf,
79        archived_path: String,
80        compression_level: Option<CompressionLevel>,
81        compression_type: Option<CompressionType>,
82    ) {
83        let job = ZipJob {
84            data_origin: ZipJobOrigin::Filesystem {
85                path: fs_path,
86                compression_level: compression_level.unwrap_or(CompressionLevel::best()),
87                compression_type: compression_type.unwrap_or(CompressionType::Deflate),
88            },
89            archive_path: archived_path,
90        };
91        self.push_job(job);
92    }
93
94    pub fn add_directory(&mut self, archived_path: String, attributes: Option<u16>) -> ZipJob {
95        ZipJob {
96            data_origin: ZipJobOrigin::Directory {
97                extra_fields: ExtraFields::default(),
98                external_attributes: attributes.unwrap_or(ZipFile::default_dir_attrs()),
99            },
100            archive_path: archived_path,
101        }
102    }
103
104    #[inline]
105    pub async fn write<W: AsyncWrite + AsyncSeek + Unpin>(
106        &mut self,
107        writer: &mut W,
108        jobs_provider: mpsc::Sender<ZipCommand>,
109        process: Option<mpsc::Sender<u64>>,
110    ) -> std::io::Result<()> {
111        let threads = Self::get_threads();
112        let mut rx = {
113            let (tx, rx) = mpsc::channel::<ZipFile>(threads);
114            for _ in 0..Self::get_threads() {
115                let tx = tx.clone();
116                let process = process.clone();
117                let jobs_provider = jobs_provider.clone();
118                tokio::spawn(async move {
119                    loop {
120                        let (resp_tx, resp_rx) = oneshot::channel();
121                        let resp = jobs_provider.send(ZipCommand::Get { resp: resp_tx }).await;
122                        if resp.is_err() {
123                            break;
124                        }
125                        let job = resp_rx.await;
126                        let job = if job.is_err() {
127                            break;
128                        } else {
129                            job.unwrap()
130                        };
131                        let process_new = process.clone();
132                        let zip_file = job.into_file(process_new).await.unwrap();
133                        tx.send(zip_file).await.unwrap()
134                    }
135                });
136            }
137            rx
138        };
139        self.data.write_with_tokio(writer, &mut rx).await
140    }
141
142    fn get_threads() -> usize {
143        std::thread::available_parallelism()
144            .map(NonZeroUsize::get)
145            .unwrap_or(1)
146    }
147}