1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]

use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::{
    borrow::Cow,
    io::{Read, Seek, Write},
    num::NonZeroUsize,
    panic::{RefUnwindSafe, UnwindSafe},
    path::Path,
    sync::{mpsc, Mutex},
};

use level::CompressionLevel;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use tokio::io::{AsyncSeek, AsyncWrite};
use zip_archive_parts::{
    data::ZipData,
    extra_field::ExtraFields,
    file::ZipFile,
    job::{ZipJob, ZipJobOrigin},
};

pub mod level;
mod zip_archive_parts;

pub use zip_archive_parts::extra_field;

// TODO: tests, maybe examples

/// Compression type for the file. Directories always use [`Stored`](CompressionType::Stored).
/// Default is [`Deflate`](CompressionType::Deflate).
#[repr(u16)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum CompressionType {
    /// No compression at all, the data is stored as-is.
    ///
    /// This is used for directories because they have no data (no payload)
    Stored = 0,
    #[default]
    /// Deflate compression, the most common in ZIP files.
    Deflate = 8,
}

/// Structure that holds the current state of ZIP archive creation.
///
/// # Lifetimes
///
/// Because some of the methods allow supplying borrowed data, the lifetimes are used to indicate
/// that [`Self`](ZipArchive) borrows them. If you only provide owned data, such as
/// [`Vec<u8>`](Vec) or [`PathBuf`](std::path::PathBuf), you won't have to worry about lifetimes
/// and can simply use `'static`, if you ever need to specify them in your code.
///
/// - `'d` is the lifetime of borrowed data added via
/// [`add_file_from_memory`](Self::add_file_from_memory)
/// - `'p` is the lifetime of borrowed [`Path`]s used in
/// [`add_file_from_fs`](Self::add_file_from_fs)
/// - `'r` is the lifetime of of borrowed data in readers supplied to
/// [`add_file_from_reader`](Self::add_file_from_reader)
#[derive(Debug, Default)]
pub struct ZipArchive {
    pub jobs_queue: Vec<ZipJob>,
    data: ZipData,
}

impl ZipArchive {
    fn push_job(&mut self, job: ZipJob) {
        self.jobs_queue.push(job);
    }

    fn push_file(&mut self, file: ZipFile) {
        self.data.files.push(file);
    }

    /// Create an empty [`ZipArchive`]
    #[inline]
    pub fn new() -> Self {
        Self::default()
    }

    /// Add file from filesystem.
    ///
    /// Opens the file and reads data from it when [`compress`](Self::compress) is called.
    ///
    /// Default value for `compression_type` is [`Deflate`](CompressionType::Deflate).
    ///
    /// `compression_level` is ignored when [`CompressionType::Stored`] is used. Default value is
    /// [`CompressionLevel::best`].
    ///
    /// This method does not allow setting [`ExtraFields`] manually and instead uses the filesystem
    /// to obtain them.
    pub fn add_file_from_fs(
        &mut self,
        fs_path: PathBuf,
        archived_path: String,
        compression_level: Option<CompressionLevel>,
        compression_type: Option<CompressionType>,
    ) -> ZipJob {
        // let job = ZipJob {
        //     data_origin: ZipJobOrigin::Filesystem {
        //         path: fs_path.clone(),
        //         compression_level: compression_level.unwrap_or(CompressionLevel::best()),
        //         compression_type: compression_type.unwrap_or(CompressionType::Deflate),
        //     },
        //     archive_path: archived_path.clone(),
        // };
        // self.push_job(job);
        ZipJob {
            data_origin: ZipJobOrigin::Filesystem {
                path: fs_path,
                compression_level: compression_level.unwrap_or(CompressionLevel::best()),
                compression_type: compression_type.unwrap_or(CompressionType::Deflate),
            },
            archive_path: archived_path,
        }
    }

    pub async fn add_file_from_fs_with_tokio(
        &mut self,
        fs_path: PathBuf,
        archived_path: String,
        compression_level: Option<CompressionLevel>,
        compression_type: Option<CompressionType>,
    ) {
        let job = ZipJob {
            data_origin: ZipJobOrigin::Filesystem {
                path: fs_path,
                compression_level: compression_level.unwrap_or(CompressionLevel::best()),
                compression_type: compression_type.unwrap_or(CompressionType::Deflate),
            },
            archive_path: archived_path,
        };
        self.push_job(job);
    }

    /// Add file with data from memory.
    ///
    /// The data can be either borrowed or owned by the [`ZipArchive`] struct to avoid lifetime
    /// hell.
    ///
    /// Default value for `compression_type` is [`Deflate`](CompressionType::Deflate).
    ///
    /// `compression_level` is ignored when [`CompressionType::Stored`] is used. Default value is
    /// [`CompressionLevel::best`].
    ///
    /// `extra_fields` parameter allows setting extra attributes. Currently it supports NTFS and
    /// UNIX filesystem attributes, see more in [`ExtraFields`] description.
    // pub fn add_file_from_memory(
    //     &mut self,
    //     data: impl Into<Cow<'d, [u8]>>,
    //     archived_path: String,
    //     compression_level: Option<CompressionLevel>,
    //     compression_type: Option<CompressionType>,
    //     file_attributes: Option<u16>,
    //     extra_fields: Option<ExtraFields>,
    // ) {
    //     let job = ZipJob {
    //         data_origin: ZipJobOrigin::RawData {
    //             data: data.into(),
    //             compression_level: compression_level.unwrap_or(CompressionLevel::best()),
    //             compression_type: compression_type.unwrap_or(CompressionType::Deflate),
    //             external_attributes: file_attributes.unwrap_or(ZipFile::default_file_attrs()),
    //             extra_fields: extra_fields.unwrap_or_default(),
    //         },
    //         archive_path: archived_path,
    //     };
    //     self.push_job(job);
    // }

    /// Add a file with data from a reader.
    ///
    /// This method takes any type implementing [`Read`] and allows it to have borrowed data (`'r`)
    ///
    /// Default value for `compression_type` is [`Deflate`](CompressionType::Deflate).
    ///
    /// `compression_level` is ignored when [`CompressionType::Stored`] is used. Default value is
    /// [`CompressionLevel::best`].
    ///
    /// `extra_fields` parameter allows setting extra attributes. Currently it supports NTFS and
    /// UNIX filesystem attributes, see more in [`ExtraFields`] description.
    // pub fn add_file_from_reader<R: Read + Send + Sync + UnwindSafe + RefUnwindSafe + 'r>(
    //     &mut self,
    //     reader: R,
    //     archived_path: String,
    //     compression_level: Option<CompressionLevel>,
    //     compression_type: Option<CompressionType>,
    //     file_attributes: Option<u16>,
    //     extra_fields: Option<ExtraFields>,
    // ) {
    //     let job = ZipJob {
    //         data_origin: ZipJobOrigin::Reader {
    //             reader: Box::new(reader),
    //             compression_level: compression_level.unwrap_or(CompressionLevel::best()),
    //             compression_type: compression_type.unwrap_or(CompressionType::Deflate),
    //             external_attributes: file_attributes.unwrap_or(ZipFile::default_file_attrs()),
    //             extra_fields: extra_fields.unwrap_or_default(),
    //         },
    //         archive_path: archived_path,
    //     };
    //     self.push_job(job)
    // }

    /// Add a directory entry.
    ///
    /// All directories in the tree should be added. This method does not asssociate any filesystem
    /// properties to the entry.
    pub fn add_directory<P: Fn(u64, u64)>(
        &mut self,
        archived_path: String,
        attributes: Option<u16>,
        zip_listener: Arc<Mutex<P>>,
    ) -> ZipJob {
        let job = ZipJob {
            data_origin: ZipJobOrigin::Directory {
                extra_fields: ExtraFields::default(),
                external_attributes: attributes.unwrap_or(ZipFile::default_dir_attrs()),
            },
            archive_path: archived_path,
        };
        // let file = job.into_file(zip_listener).expect("No failing code path");
        // self.push_file(file);
        job
    }

    pub fn add_directory_with_tokio(
        &mut self,
        archived_path: String,
        attributes: Option<u16>,
    ) -> ZipJob {
        ZipJob {
            data_origin: ZipJobOrigin::Directory {
                extra_fields: ExtraFields::default(),
                external_attributes: attributes.unwrap_or(ZipFile::default_dir_attrs()),
            },
            archive_path: archived_path,
        }
    }

    /// Add a directory entry.
    ///
    /// All directories in the tree should be added. Use this method if you want to manually set
    /// filesystem properties of the directory.
    ///
    /// `extra_fields` parameter allows setting extra attributes. Currently it supports NTFS and
    /// UNIX filesystem attributes, see more in [`ExtraFields`] description.
    // pub fn add_directory_with_metadata(
    //     &mut self,
    //     archived_path: String,
    //     extra_fields: ExtraFields,
    //     attributes: Option<u16>,
    // ) {
    //     let job = ZipJob {
    //         data_origin: ZipJobOrigin::Directory {
    //             extra_fields,
    //             external_attributes: attributes.unwrap_or(ZipFile::default_dir_attrs()),
    //         },
    //         archive_path: archived_path,
    //     };
    //     let file = job.into_file().expect("No failing code path");
    //     self.push_file(file);
    // }

    /// Add a directory entry.
    ///
    /// All directories in the tree should be added. This method will take the metadata from
    /// filesystem and add it to the entry in the zip file.
    // pub fn add_directory_with_metadata_from_fs<P: AsRef<Path>>(
    //     &mut self,
    //     archived_path: String,
    //     fs_path: P,
    // ) -> std::io::Result<()> {
    //     let metadata = std::fs::metadata(fs_path)?;
    //     let job = ZipJob {
    //         data_origin: ZipJobOrigin::Directory {
    //             extra_fields: ExtraFields::new_from_fs(&metadata),
    //             external_attributes: ZipJob::attributes_from_fs(&metadata),
    //         },
    //         archive_path: archived_path,
    //     };
    //     let file = job.into_file().expect("No failing code path");
    //     self.push_file(file);
    //     Ok(())
    // }

    /// Compress contents. Will be done automatically on [`write`](Self::write) call if files were
    /// added between last compression and [`write`](Self::write) call. Automatically chooses
    /// amount of threads to use based on how much are available.
    #[inline]
    pub fn compress<P: Fn(u64, u64) + Send>(&mut self, zip_listener: Arc<Mutex<P>>) {
        self.compress_with_threads(Self::get_threads(), zip_listener);
    }

    /// Compress contents. Will be done automatically on
    /// [`write_with_threads`](Self::write_with_threads) call if files were added between last
    /// compression and [`write`](Self::write). Allows specifying amount of threads that will be
    /// used.
    ///
    /// Example of getting amount of threads that this library uses in
    /// [`compress`](Self::compress):
    ///
    /// ```
    /// let threads = std::thread::available_parallelism()
    ///     .map(NonZeroUsize::get)
    ///     .unwrap_or(1);
    ///
    /// zipper.compress_with_threads(threads);
    /// ```
    #[inline]
    pub fn compress_with_threads<P: Fn(u64, u64) + Send>(
        &mut self,
        threads: usize,
        zip_listener: Arc<Mutex<P>>,
    ) {
        if !self.jobs_queue.is_empty() {
            self.compress_with_consumer(
                threads,
                |zip_data, rx| zip_data.files.extend(rx),
                zip_listener,
            )
        }
    }

    /// Write compressed data to a writer (usually a file). Executes [`compress`](Self::compress)
    /// if files were added between last [`compress`](Self::compress) call and this call.
    /// Automatically chooses the amount of threads cpu has.
    #[inline]
    pub fn write<W: Write + Seek, P: Fn(u64, u64) + Send>(
        &mut self,
        writer: &mut W,
        zip_listener: Arc<Mutex<P>>,
    ) -> std::io::Result<()> {
        self.write_with_threads(writer, Self::get_threads(), zip_listener)
    }

    #[inline]
    pub async fn write_with_tokio<W: AsyncWrite + AsyncSeek + Unpin>(
        &mut self,
        writer: &mut W,
        jobs: Arc<tokio::sync::Mutex<Vec<ZipJob>>>,
        process: Option<tokio::sync::mpsc::Sender<u64>>,
    ) -> std::io::Result<()> {
        let threads = Self::get_threads();
        let mut rx = {
            let (tx, rx) = tokio::sync::mpsc::channel::<ZipFile>(threads);
            let size = {
                let jobs = jobs.lock().await;
                jobs.len()
            };
            let max_threads = threads.min(size);
            for _ in 0..max_threads {
                let tx = tx.clone();
                let jobs_drain_ref = jobs.clone();
                let process = process.clone();
                tokio::spawn(async move {
                    loop {
                        let next_job = jobs_drain_ref.lock().await.pop();
                        if let Some(job) = next_job {
                            let process_new = process.clone();
                            let zip_file = job
                                .into_file_with_tokio(process_new)
                                .await.unwrap();
                            tx.send(zip_file).await.unwrap()
                        } else {
                            break;
                        }
                    }
                });
            }
            rx
        };
        self.data
            .write_with_tokio(writer, &mut rx)
            .await
    }

    /// Write compressed data to a writer (usually a file). Executes
    /// [`compress_with_threads`](Self::compress_with_threads) if files were added between last
    /// [`compress`](Self::compress) call and this call. Allows specifying amount of threads that
    /// will be used.
    ///
    /// Example of getting amount of threads that this library uses in [`write`](Self::write):
    ///
    /// ```
    /// let threads = std::thread::available_parallelism()
    ///     .map(NonZeroUsize::get)
    ///     .unwrap_or(1);
    ///
    /// zipper.compress_with_threads(threads);
    /// ```
    #[inline]
    pub fn write_with_threads<W: Write + Seek, P: Fn(u64, u64) + Send>(
        &mut self,
        writer: &mut W,
        threads: usize,
        zip_listener: Arc<Mutex<P>>,
    ) -> std::io::Result<()> {
        if !self.jobs_queue.is_empty() {
            self.compress_with_consumer(
                threads,
                |zip_data, rx| zip_data.write(writer, rx),
                zip_listener,
            )
        } else {
            self.data.write(writer, std::iter::empty())
        }
    }

    #[inline]
    pub async fn write_with_threads_with_tokio<W: AsyncWrite + AsyncSeek + Unpin>(
        &mut self,
        writer: &mut W,
        threads: usize,
        jobs: Arc<tokio::sync::Mutex<Vec<ZipJob>>>,
        tx: Option<tokio::sync::mpsc::Sender<u64>>,
    ) -> std::io::Result<()> {
        let size = {
            let jobs = jobs.lock().await;
            jobs.len()
        };
        if size > 0 {
            self.compress_with_consumer_with_tokio(
                jobs, threads,
                writer,
                tx,
                // |zip_data, rx| async move {
                //     zip_data.write_with_tokio(writer, rx).await.expect("");
                // },
                // zip_listener,
            )
                .await;
            Ok(())
        } else {
            // self.data.write_with_tokio(writer, tokio::sync::mpsc::Receiver::).await
            Ok(())
        }
    }

    /// Starts the compression jobs and passes teh mpsc receiver to teh consumer function, which
    /// might either store the data in [`ZipData`] - [`Self::compress_with_threads`]; or write the
    /// zip data as soon as it's available - [`Self::write_with_threads`]
    fn compress_with_consumer<F, T, P>(
        &mut self,
        threads: usize,
        consumer: F,
        zip_listener: Arc<Mutex<P>>,
    ) -> T
        where
            F: FnOnce(&mut ZipData, mpsc::Receiver<ZipFile>) -> T,
            P: Fn(u64, u64) + Send,
    {
        let jobs_drain = Mutex::new(self.jobs_queue.drain(..));
        // let listener = Arc::new(Mutex::new(zip_listener));
        let listener_ref = &zip_listener;
        let jobs_drain_ref = &jobs_drain;
        std::thread::scope(|s| {
            let rx = {
                let (tx, rx) = mpsc::channel();
                for _ in 0..threads {
                    let thread_tx = tx.clone();
                    s.spawn(move || loop {
                        let next_job = jobs_drain_ref.lock().unwrap().next_back();
                        if let Some(job) = next_job {
                            let zip_file = job.into_file(listener_ref.clone()).unwrap();
                            thread_tx.send(zip_file).unwrap();
                        } else {
                            break;
                        }
                    });
                }
                rx
            };
            consumer(&mut self.data, rx)
        })
    }

    async fn compress_with_consumer_with_tokio<W: AsyncWrite + AsyncSeek + Unpin>(
        &mut self,
        jobs: Arc<tokio::sync::Mutex<Vec<ZipJob>>>,
        threads: usize,
        writer: &mut W,
        process: Option<tokio::sync::mpsc::Sender<u64>>,
    ) {
        let mut rx = {
            let (tx, mut rx) = tokio::sync::mpsc::channel::<ZipFile>(2);
            let size = {
                let jobs = jobs.lock().await;
                jobs.len()
            };
            let max_threads = threads.min(size);
            for _ in 0..max_threads {
                let tx = tx.clone();
                let mut jobs_drain_ref = jobs.clone();
                let process = process.clone();
                tokio::spawn(async move {
                    loop {
                        let next_job = jobs_drain_ref.lock().await.pop();
                        if let Some(job) = next_job {
                            let process_new = process.clone();
                            let zip_file = job
                                .into_file_with_tokio(process_new)
                                .await
                                .expect("No failing code path");
                            tx.send(zip_file).await.unwrap();
                        } else {
                            break;
                        }
                    }
                });
            }
            rx
        };
        self.data
            .write_with_tokio(writer, &mut rx)
            .await
            .expect("No failing code path");
    }

    fn get_threads() -> usize {
        std::thread::available_parallelism()
            .map(NonZeroUsize::get)
            .unwrap_or(1)
    }
}

#[cfg(feature = "rayon")]
impl ZipArchive {
    /// Compress contents and use rayon for parallelism.
    ///
    /// Uses whatever thread pool this function is executed in.
    ///
    /// If you want to limit the amount of threads to be used, use
    /// [`rayon::ThreadPoolBuilder::num_threads`] and either set it as a global pool, or
    /// [`rayon::ThreadPool::install`] the call to this method in it.
    pub fn compress_with_rayon(&mut self) {
        if !self.jobs_queue.is_empty() {
            let files_par_iter = self
                .jobs_queue
                .par_drain(..)
                .map(|job| job.into_file().unwrap());
            self.data.files.par_extend(files_par_iter)
        }
    }

    /// Write the contents to a writer.
    ///
    /// This method uses teh same thread logic as [`Self::compress_with_rayon`], refer to  its
    /// documentation for details on how to control the parallelism and thread allocation.
    pub fn write_with_rayon<W: Write + Seek + Send>(
        &mut self,
        writer: &mut W,
    ) -> std::io::Result<()> {
        if !self.jobs_queue.is_empty() {
            let files_par_iter = self
                .jobs_queue
                .par_drain(..)
                .map(|job| job.into_file().unwrap());
            self.data.write_rayon(writer, files_par_iter)
        } else {
            self.data.write_rayon(writer, rayon::iter::empty())
        }
    }
}