libmedusa_zip/
zip.rs

1/*
2 * Description: ???
3 *
4 * Copyright (C) 2023 Danny McClanahan <dmcC2@hypnicjerk.ai>
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0 (see LICENSE).
8 */
9
10//! ???
11
12use crate::{destination::OutputWrapper, EntryName, FileSource, MedusaNameFormatError};
13
14use cfg_if::cfg_if;
15use displaydoc::Display;
16use futures::stream::StreamExt;
17use once_cell::sync::Lazy;
18use parking_lot::Mutex;
19use rayon::prelude::*;
20use static_init;
21use tempfile;
22use thiserror::Error;
23use time::{error::ComponentRange, OffsetDateTime, UtcOffset};
24use tokio::{
25  fs, io,
26  sync::{mpsc, oneshot},
27  task,
28};
29use tokio_stream::wrappers::ReceiverStream;
30use zip::{
31  self,
32  result::{DateTimeRangeError, ZipError},
33  write::FileOptions as ZipLibraryFileOptions,
34  CompressionMethod as ZipCompressionMethod, DateTime as ZipDateTime, ZipArchive, ZipWriter,
35  ZIP64_BYTES_THR,
36};
37
38#[cfg(unix)]
39use std::os::unix::fs::PermissionsExt;
40use std::{
41  cmp,
42  io::{Seek, Write},
43  mem, num, ops,
44  path::{Path, PathBuf},
45  sync::Arc,
46};
47
48/// All types of errors from the parallel zip process.
49#[derive(Debug, Display, Error)]
50pub enum MedusaZipError {
51  /// i/o error: {0}
52  Io(#[from] io::Error),
53  /// zip error: {0}
54  Zip(#[from] ZipError),
55  /// join error: {0}
56  Join(#[from] task::JoinError),
57  /// error reconciling input sources: {0}
58  InputConsistency(#[from] InputConsistencyError),
59  /// error reading input file: {0}
60  InputRead(#[from] MedusaInputReadError),
61  /// error processing zip file entry options: {0}
62  ProcessZipOptions(#[from] InitializeZipOptionsError),
63  /// error receiving from a oneshot channel: {0}
64  OneshotRecv(#[from] oneshot::error::RecvError),
65  /// error sending intermediate archiev: {0}
66  Send(#[from] mpsc::error::SendError<ZipArchive<tempfile::SpooledTempFile>>),
67}
68
69pub trait DefaultInitializeZipOptions {
70  #[must_use]
71  fn set_zip_options_static(&self, options: ZipLibraryFileOptions) -> ZipLibraryFileOptions;
72}
73
74#[derive(Debug, Display, Error)]
75pub enum InitializeZipOptionsError {
76  /// i/o error: {0}
77  Io(#[from] io::Error),
78  /// date/time was out of range of a valid zip date: {0}
79  InvalidDateTime(#[from] DateTimeRangeError),
80  /// date/time was out of range for a valid date at all: {0}
81  InvalidOffsetDateTime(#[from] ComponentRange),
82}
83
84pub trait InitializeZipOptionsForSpecificFile {
85  #[must_use]
86  fn set_zip_options_for_file(
87    &self,
88    options: ZipLibraryFileOptions,
89    metadata: &std::fs::Metadata,
90  ) -> Result<ZipLibraryFileOptions, InitializeZipOptionsError>;
91}
92
93const MINIMUM_ZIP_TIME: ZipDateTime = ZipDateTime::zero();
94
95/* The `time` crate is extremely touchy about only ever extracting the local
96 * UTC offset within a single-threaded environment, which means it cannot be
97 * called anywhere reachable from the main function if we use #[tokio::main].
98 * static_init instead runs it at program initialization time. */
99#[static_init::dynamic]
100static LOCAL_UTC_OFFSET: UtcOffset =
101  UtcOffset::current_local_offset().expect("failed to capture local UTC offset");
102
103/* We could use the dynamic initialization order to avoid fetching the local
104 * offset twice, but that requires an unsafe block, which we'd prefer to
105 * avoid in this crate. */
106#[static_init::dynamic]
107static CURRENT_LOCAL_TIME: OffsetDateTime =
108  OffsetDateTime::now_local().expect("failed to capture local UTC offset");
109
110static CURRENT_ZIP_TIME: Lazy<ZipDateTime> = Lazy::new(|| {
111  (*CURRENT_LOCAL_TIME)
112    .try_into()
113    .expect("failed to convert local time into zip time at startup")
114});
115
116/* FIXME: establish one canonical place (probably the CLI help?) where the
117 * definition of these repeated enum cases are specified. */
118#[derive(Copy, Clone, Default, Debug)]
119pub enum ModifiedTimeBehavior {
120  #[default]
121  Reproducible,
122  CurrentTime,
123  PreserveSourceTime,
124  Explicit(ZipDateTime),
125}
126
127impl DefaultInitializeZipOptions for ModifiedTimeBehavior {
128  #[must_use]
129  fn set_zip_options_static(&self, options: ZipLibraryFileOptions) -> ZipLibraryFileOptions {
130    match self {
131      Self::Reproducible => options.last_modified_time(MINIMUM_ZIP_TIME),
132      Self::CurrentTime => options.last_modified_time(*CURRENT_ZIP_TIME),
133      Self::PreserveSourceTime => Self::CurrentTime.set_zip_options_static(options),
134      Self::Explicit(timestamp) => options.last_modified_time(*timestamp),
135    }
136  }
137}
138
139impl InitializeZipOptionsForSpecificFile for ModifiedTimeBehavior {
140  #[must_use]
141  fn set_zip_options_for_file(
142    &self,
143    options: ZipLibraryFileOptions,
144    metadata: &std::fs::Metadata,
145  ) -> Result<ZipLibraryFileOptions, InitializeZipOptionsError> {
146    match self {
147      Self::Reproducible => Ok(options.last_modified_time(MINIMUM_ZIP_TIME)),
148      Self::CurrentTime => Ok(options.last_modified_time(*CURRENT_ZIP_TIME)),
149      Self::PreserveSourceTime => {
150        /* NB: this is not blocking, but will Err on platforms without this available
151         * (the docs don't specify which platforms:
152         * https://doc.rust-lang.org/nightly/std/fs/struct.Metadata.html#method.modified). */
153        let modified_time = metadata.modified()?;
154        let modified_time: ZipDateTime = OffsetDateTime::from(modified_time)
155          .to_offset(*LOCAL_UTC_OFFSET)
156          .try_into()?;
157        Ok(options.last_modified_time(modified_time))
158      },
159      Self::Explicit(timestamp) => Ok(options.last_modified_time(*timestamp)),
160    }
161  }
162}
163
164struct PreservePermsBehavior;
165
166impl InitializeZipOptionsForSpecificFile for PreservePermsBehavior {
167  #[must_use]
168  fn set_zip_options_for_file(
169    &self,
170    options: ZipLibraryFileOptions,
171    metadata: &std::fs::Metadata,
172  ) -> Result<ZipLibraryFileOptions, InitializeZipOptionsError> {
173    let permissions = metadata.permissions();
174    cfg_if! {
175      if #[cfg(unix)] {
176        let permissions_mode: u32 = permissions.mode();
177        Ok(options.unix_permissions(permissions_mode))
178      } else {
179        /* For non-unix, just don't bother trying to provide the same bits. */
180        Ok(options)
181      }
182    }
183  }
184}
185
186/* TODO: make this configurable! */
187const SMALL_FILE_FOR_NO_COMPRESSION_MAX_SIZE: usize = 1_000;
188
189struct SmallFileBehavior;
190
191impl InitializeZipOptionsForSpecificFile for SmallFileBehavior {
192  #[must_use]
193  fn set_zip_options_for_file(
194    &self,
195    options: ZipLibraryFileOptions,
196    metadata: &std::fs::Metadata,
197  ) -> Result<ZipLibraryFileOptions, InitializeZipOptionsError> {
198    if metadata.len() <= SMALL_FILE_FOR_NO_COMPRESSION_MAX_SIZE.try_into().unwrap() {
199      Ok(
200        options
201          .compression_method(ZipCompressionMethod::Stored)
202          .compression_level(None),
203      )
204    } else {
205      Ok(options)
206    }
207  }
208}
209
210struct LargeFileBehavior;
211
212impl InitializeZipOptionsForSpecificFile for LargeFileBehavior {
213  #[must_use]
214  fn set_zip_options_for_file(
215    &self,
216    options: ZipLibraryFileOptions,
217    metadata: &std::fs::Metadata,
218  ) -> Result<ZipLibraryFileOptions, InitializeZipOptionsError> {
219    Ok(options.large_file(metadata.len() > ZIP64_BYTES_THR))
220  }
221}
222
223#[derive(Copy, Clone, Default, Debug, Display)]
224pub enum CompressionMethod {
225  /// uncompressed
226  Stored,
227  /// deflate-compressed
228  #[default]
229  Deflated,
230}
231
232#[derive(Copy, Clone, Debug)]
233pub enum CompressionStrategy {
234  Stored,
235  Deflated(Option<u8>),
236}
237
238impl Default for CompressionStrategy {
239  fn default() -> Self { Self::Deflated(Some(6)) }
240}
241
242#[derive(Debug, Display, Error)]
243pub enum ParseCompressionOptionsError {
244  /// "stored" (uncompressed) does not accept a compression level (was: {0})
245  CompressionLevelWithStored(i8),
246  /// compression level {1} was invalid for method {0} which accepts {2:?}
247  InvalidCompressionLevel(CompressionMethod, i8, ops::RangeInclusive<i8>),
248  /// error converting from int (this should never happen!): {0}
249  TryFromInt(#[from] num::TryFromIntError),
250}
251
252impl CompressionStrategy {
253  const DEFLATE_RANGE: ops::RangeInclusive<i8> = ops::RangeInclusive::new(0, 9);
254
255  pub fn from_method_and_level(
256    method: CompressionMethod,
257    level: Option<i8>,
258  ) -> Result<Self, ParseCompressionOptionsError> {
259    match method {
260      CompressionMethod::Stored => match level {
261        None => Ok(Self::Stored),
262        Some(level) => Err(ParseCompressionOptionsError::CompressionLevelWithStored(
263          level,
264        )),
265      },
266      CompressionMethod::Deflated => match level {
267        None => Ok(Self::Deflated(None)),
268        Some(level) => {
269          if Self::DEFLATE_RANGE.contains(&level) {
270            Ok(Self::Deflated(Some(level.try_into()?)))
271          } else {
272            Err(ParseCompressionOptionsError::InvalidCompressionLevel(
273              method,
274              level,
275              Self::DEFLATE_RANGE,
276            ))
277          }
278        },
279      },
280    }
281  }
282}
283
284impl DefaultInitializeZipOptions for CompressionStrategy {
285  #[must_use]
286  fn set_zip_options_static(&self, options: ZipLibraryFileOptions) -> ZipLibraryFileOptions {
287    let (method, level): (ZipCompressionMethod, Option<i8>) = match self {
288      Self::Stored => (ZipCompressionMethod::Stored, None),
289      Self::Deflated(level) => (
290        ZipCompressionMethod::Deflated,
291        level.map(|l| {
292          l.try_into()
293            .expect("these values have already been checked")
294        }),
295      ),
296    };
297    options
298      .compression_method(method)
299      .compression_level(level.map(|l| {
300        l.try_into()
301          .expect("these values have already been checked")
302      }))
303  }
304}
305
306
307#[derive(Copy, Clone, Default, Debug)]
308pub struct ZipOutputOptions {
309  pub mtime_behavior: ModifiedTimeBehavior,
310  pub compression_options: CompressionStrategy,
311}
312
313
314#[derive(Clone, Default, Debug)]
315pub struct EntryModifications {
316  /// This prefixes a directory path to every entry without creating any of its
317  /// parent directories.
318  ///
319  /// These prefixes always come before any prefixes introduced by
320  /// [`Self::own_prefix`].
321  ///
322  /// `--silent-external-prefix .deps` => `[.deps/a, .deps/b, ...]`
323  /* FIXME: make these both EntryName (also, parse EntryName at clap validation time)! */
324  pub silent_external_prefix: Option<String>,
325  /// This prefixes a directory path to every entry, but this *will* create
326  /// parent directory entries in the output file.
327  ///
328  /// `--own-prefix .deps` => `[.deps/, .deps/a, .deps/b, ...]`
329  /* FIXME: explain how these work when stacked together! */
330  pub own_prefix: Option<String>,
331}
332
333#[derive(Debug, Display, Error)]
334pub enum InputConsistencyError {
335  /// name {0} was duplicated for source paths {1:?} and {2:?}
336  DuplicateName(EntryName, PathBuf, PathBuf),
337  /// error in name formatting: {0}
338  NameFormat(#[from] MedusaNameFormatError),
339}
340
341#[derive(Clone, Debug)]
342pub enum ZipEntrySpecification {
343  File(FileSource),
344  Directory(EntryName),
345}
346
347struct EntrySpecificationList(pub Vec<ZipEntrySpecification>);
348
349pub fn calculate_new_rightmost_components<'a, T>(
350  previous_directory_components: &[T],
351  current_directory_components: &'a [T],
352) -> impl Iterator<Item=&'a [T]>+'a
353where
354  T: Eq,
355{
356  /* Find the directory components shared between the previous and next
357   * entries. */
358  let mut shared_components: usize = 0;
359  for i in 0..cmp::min(
360    previous_directory_components.len(),
361    current_directory_components.len(),
362  ) {
363    if previous_directory_components[i] != current_directory_components[i] {
364      break;
365    }
366    shared_components += 1;
367  }
368  /* If all components are shared, then we don't need to introduce any new
369   * directories. */
370  (shared_components..current_directory_components.len()).map(|final_component_index| {
371    /* Otherwise, we introduce a new directory for each new dir component of the
372     * current entry. */
373    let cur_intermediate_components = &current_directory_components[..=final_component_index];
374    assert!(!cur_intermediate_components.is_empty());
375    cur_intermediate_components
376  })
377}
378
379impl EntrySpecificationList {
380  fn sort_and_deduplicate(specs: &mut Vec<FileSource>) -> Result<(), InputConsistencyError> {
381    /* Sort the resulting files so we can expect them to (mostly) be an inorder
382     * directory traversal. Note that directories with names less than top-level
383     * files will be sorted above those top-level files. */
384    specs.par_sort_unstable();
385
386    /* Check for duplicate names. */
387    {
388      let i = EntryName::empty();
389      let p = PathBuf::from("");
390      let mut prev_name: &EntryName = &i;
391      let mut prev_path: &Path = &p;
392      for FileSource { source, name } in specs.iter() {
393        if name == prev_name {
394          return Err(InputConsistencyError::DuplicateName(
395            name.clone(),
396            prev_path.to_path_buf(),
397            source.clone(),
398          ));
399        }
400        prev_name = name;
401        prev_path = source;
402      }
403    }
404
405    Ok(())
406  }
407
408  pub fn from_file_specs(
409    mut specs: Vec<FileSource>,
410    modifications: EntryModifications,
411  ) -> Result<Self, InputConsistencyError> {
412    Self::sort_and_deduplicate(&mut specs)?;
413
414    let mut ret: Vec<ZipEntrySpecification> = Vec::new();
415
416    let cached_prefix: EntryName = {
417      /* TODO: make EntryName work more cleanly for directories and files! */
418      let EntryModifications {
419        silent_external_prefix,
420        own_prefix,
421      } = modifications;
422      let silent_external_prefix: Vec<String> = silent_external_prefix
423        .map(EntryName::validate)
424        .transpose()?
425        .map(|name| {
426          name
427            .all_components()
428            .map(|s| s.to_string())
429            .collect::<Vec<_>>()
430        })
431        .unwrap_or_default();
432      let own_prefix: Vec<String> = own_prefix
433        .map(EntryName::validate)
434        .transpose()?
435        .map(|name| {
436          name
437            .all_components()
438            .map(|s| s.to_string())
439            .collect::<Vec<_>>()
440        })
441        .unwrap_or_default();
442
443      let mut cur_prefix: Vec<String> = silent_external_prefix;
444      for component in own_prefix.into_iter() {
445        cur_prefix.push(component);
446        let cur_intermediate_directory: String = cur_prefix.join("/");
447        let intermediate_dir = EntryName::validate(cur_intermediate_directory)
448          .expect("constructed virtual directory should be fine");
449        ret.push(ZipEntrySpecification::Directory(intermediate_dir));
450      }
451      if cur_prefix.is_empty() {
452        EntryName::empty()
453      } else {
454        EntryName::validate(cur_prefix.join("/")).unwrap()
455      }
456    };
457
458    let mut previous_directory_components: Vec<&str> = Vec::new();
459
460    /* NB: .iter_mut() is used here to enable the use of &str references in
461     * previous_directory_components! */
462    for FileSource { source, name } in specs.iter_mut() {
463      /* Split into directory components so we can add directory entries before any
464       * files from that directory. */
465      let current_directory_components: Vec<&str> = name.parent_components().collect();
466
467      for new_rightmost_components in calculate_new_rightmost_components(
468        &previous_directory_components,
469        &current_directory_components,
470      ) {
471        let cur_intermediate_directory: String = new_rightmost_components.join("/");
472        let mut intermediate_dir = EntryName::validate(cur_intermediate_directory)
473          .expect("constructed virtual directory should be fine");
474        intermediate_dir.add_prefix(&cached_prefix);
475        ret.push(ZipEntrySpecification::Directory(intermediate_dir));
476      }
477      /* Set the "previous" dir components to the components of the current entry. */
478      previous_directory_components = current_directory_components;
479
480      /* Finally we can just write the actual file now! */
481      let mut name = name.clone();
482      name.add_prefix(&cached_prefix);
483      ret.push(ZipEntrySpecification::File(FileSource {
484        source: mem::take(source),
485        name,
486      }));
487    }
488
489    Ok(Self(ret))
490  }
491}
492
493#[derive(Debug, Display, Error)]
494pub enum MedusaInputReadError {
495  /// Source file {0:?} from crawl could not be accessed: {1}.
496  SourceNotFound(PathBuf, #[source] io::Error),
497  /// error creating in-memory immediate file: {0}
498  Zip(#[from] ZipError),
499  /// error joining: {0}
500  Join(#[from] task::JoinError),
501  /// failed to send intermediate entry: {0:?}
502  Send(#[from] mpsc::error::SendError<IntermediateSingleEntry>),
503  /// failed to parse zip output options: {0}
504  InitZipOptions(#[from] InitializeZipOptionsError),
505}
506
507#[derive(Debug)]
508pub enum IntermediateSingleEntry {
509  Directory(EntryName),
510  File(oneshot::Receiver<Result<ZipArchive<tempfile::SpooledTempFile>, MedusaInputReadError>>),
511}
512
513const PER_FILE_SPOOL_THRESHOLD: usize = 3_000;
514
515impl IntermediateSingleEntry {
516  pub async fn open_handle(
517    entry: ZipEntrySpecification,
518    mut zip_options: zip::write::FileOptions,
519    options_initializers: Arc<ZipOptionsInitializers>,
520  ) -> Result<Self, MedusaInputReadError> {
521    match entry {
522      /* If it's a directory, we don't need any more info. */
523      ZipEntrySpecification::Directory(name) => Ok(Self::Directory(name)),
524      /* If it's a file, we're need to extract its contents. */
525      ZipEntrySpecification::File(FileSource { name, source }) => {
526        /* Get the file handle */
527        let handle = fs::OpenOptions::new()
528          .read(true)
529          .open(&source)
530          .await
531          .map_err(|e| MedusaInputReadError::SourceNotFound(source.clone(), e))?;
532        /* Get the filesystem metadata for this file. */
533        let metadata = handle
534          .metadata()
535          .await
536          .map_err(|e| MedusaInputReadError::SourceNotFound(source.clone(), e))?;
537        /* Configure the zip options for this file, such as compression, given the
538         * metadata. */
539        zip_options = options_initializers.set_zip_options_for_file(zip_options, &metadata)?;
540
541        /* Create the spooled temporary zip file. */
542        let mut zip_output: ZipWriter<tempfile::SpooledTempFile> = task::spawn_blocking(|| {
543          let temp_file = tempfile::spooled_tempfile(PER_FILE_SPOOL_THRESHOLD);
544          let zip_wrapper = ZipWriter::new(temp_file);
545          Ok::<_, MedusaInputReadError>(zip_wrapper)
546        })
547        .await??;
548
549        /* We can send a oneshot::Receiver over an mpsc::bounded() channel in order
550         * to force our receiving send of this the mpsc::bounded() to await
551         * until the oneshot::Receiver is complete. */
552        let (tx, rx) =
553          oneshot::channel::<Result<ZipArchive<tempfile::SpooledTempFile>, MedusaInputReadError>>();
554
555        let mut handle = handle.into_std().await;
556        task::spawn(async move {
557          let completed_single_zip: Result<
558            ZipArchive<tempfile::SpooledTempFile>,
559            MedusaInputReadError,
560          > = task::spawn_blocking(move || {
561            /* In parallel, we will be writing this input file out to a spooled temporary
562             * zip containing just this one entry. */
563            zip_output.start_file(name.into_string(), zip_options)?;
564            std::io::copy(&mut handle, &mut zip_output)
565              .map_err(|e| MedusaInputReadError::SourceNotFound(source.clone(), e))?;
566            let temp_zip = zip_output.finish_into_readable()?;
567            Ok::<ZipArchive<_>, MedusaInputReadError>(temp_zip)
568          })
569          .await
570          .expect("joining should not fail");
571          tx.send(completed_single_zip)
572            .expect("rx should always be open");
573        });
574        /* NB: not awaiting this spawned task! */
575
576        Ok(Self::File(rx))
577      },
578    }
579  }
580}
581
582#[derive(Copy, Clone, Default, Debug)]
583pub enum Parallelism {
584  /// Read source files and copy them to the output zip in order.
585  Synchronous,
586  /// Parallelize creation by splitting up the input into chunks.
587  #[default]
588  ParallelMerge,
589}
590
591#[derive(Clone)]
592pub struct MedusaZip {
593  pub input_files: Vec<FileSource>,
594  pub zip_options: ZipOutputOptions,
595  pub modifications: EntryModifications,
596  pub parallelism: Parallelism,
597}
598
599/* TODO: make these configurable!!! */
600const INTERMEDIATE_CHUNK_SIZE: usize = 2000;
601const MAX_PARALLEL_INTERMEDIATES: usize = 12;
602const PER_INTERMEDIATE_FILE_IO_QUEUE_LENGTH: usize = 20;
603const INTERMEDIATE_OUTPUT_SPOOL_THRESHOLD: usize = 20_000;
604
605pub struct ZipOptionsInitializers {
606  pub initializers: Vec<Box<dyn InitializeZipOptionsForSpecificFile+Send+Sync>>,
607}
608
609impl ZipOptionsInitializers {
610  pub fn set_zip_options_for_file(
611    &self,
612    mut options: zip::write::FileOptions,
613    metadata: &std::fs::Metadata,
614  ) -> Result<zip::write::FileOptions, InitializeZipOptionsError> {
615    let Self { initializers } = self;
616    for initializer in initializers.iter() {
617      options = initializer.set_zip_options_for_file(options, metadata)?;
618    }
619    Ok(options)
620  }
621}
622
623impl MedusaZip {
624  async fn zip_intermediate(
625    entries: &[ZipEntrySpecification],
626    zip_options: zip::write::FileOptions,
627    options_initializers: Arc<ZipOptionsInitializers>,
628  ) -> Result<ZipArchive<tempfile::SpooledTempFile>, MedusaZipError> {
629    /* (1) Create unnamed filesystem-backed temp file handle. */
630    let intermediate_output = task::spawn_blocking(|| {
631      let temp_file = tempfile::spooled_tempfile(INTERMEDIATE_OUTPUT_SPOOL_THRESHOLD);
632      let zip_wrapper = ZipWriter::new(temp_file);
633      Ok::<_, MedusaZipError>(zip_wrapper)
634    })
635    .await??;
636
637    /* (2) Map to individual file handles and/or in-memory "immediate" zip files. */
638    let (handle_tx, handle_rx) =
639      mpsc::channel::<IntermediateSingleEntry>(PER_INTERMEDIATE_FILE_IO_QUEUE_LENGTH);
640    let entries = entries.to_vec();
641    let handle_stream_task = task::spawn(async move {
642      for entry in entries.into_iter() {
643        let handle =
644          IntermediateSingleEntry::open_handle(entry, zip_options, options_initializers.clone())
645            .await?;
646        handle_tx.send(handle).await?;
647      }
648      Ok::<(), MedusaInputReadError>(())
649    });
650    let mut handle_jobs = ReceiverStream::new(handle_rx);
651
652    /* (3) Add file entries, in order. */
653    let intermediate_output = Arc::new(Mutex::new(intermediate_output));
654    while let Some(intermediate_entry) = handle_jobs.next().await {
655      let intermediate_output = intermediate_output.clone();
656      match intermediate_entry {
657        IntermediateSingleEntry::Directory(name) => {
658          task::spawn_blocking(move || {
659            let mut intermediate_output = intermediate_output.lock();
660            intermediate_output.add_directory(name.into_string(), zip_options)?;
661            Ok::<(), ZipError>(())
662          })
663          .await??;
664        },
665        IntermediateSingleEntry::File(tmp_merge_archive) => {
666          let tmp_merge_archive: ZipArchive<tempfile::SpooledTempFile> =
667            tmp_merge_archive.await??;
668          task::spawn_blocking(move || {
669            let mut intermediate_output = intermediate_output.lock();
670            intermediate_output.merge_archive(tmp_merge_archive)?;
671            Ok::<(), ZipError>(())
672          })
673          .await??;
674        },
675      }
676    }
677    handle_stream_task.await??;
678
679    /* (4) Convert the intermediate write archive into a file-backed read
680     * archive. */
681    let temp_for_read = task::spawn_blocking(move || {
682      let mut zip_wrapper = Arc::into_inner(intermediate_output)
683        .expect("no other references should exist to intermediate_output")
684        .into_inner();
685      let temp_file = zip_wrapper.finish_into_readable()?;
686      Ok::<_, ZipError>(temp_file)
687    })
688    .await??;
689
690    Ok(temp_for_read)
691  }
692
693  fn options_initializers(mtime_behavior: ModifiedTimeBehavior) -> ZipOptionsInitializers {
694    ZipOptionsInitializers {
695      initializers: vec![
696        Box::new(mtime_behavior),
697        Box::new(PreservePermsBehavior),
698        Box::new(SmallFileBehavior),
699        Box::new(LargeFileBehavior),
700      ],
701    }
702  }
703
704  async fn zip_parallel<Output>(
705    entries: Vec<ZipEntrySpecification>,
706    output_zip: OutputWrapper<ZipWriter<Output>>,
707    zip_options: zip::write::FileOptions,
708    mtime_behavior: ModifiedTimeBehavior,
709  ) -> Result<(), MedusaZipError>
710  where
711    Output: Write+Seek+Send+'static,
712  {
713    let options_initializers = Arc::new(Self::options_initializers(mtime_behavior));
714
715    let (intermediate_tx, intermediate_rx) =
716      mpsc::channel::<ZipArchive<tempfile::SpooledTempFile>>(MAX_PARALLEL_INTERMEDIATES);
717    let mut handle_intermediates = ReceiverStream::new(intermediate_rx);
718
719    /* (1) Split into however many subtasks (which may just be one) to do
720     * "normally". */
721    let intermediate_stream_task = task::spawn(async move {
722      for entry_chunk in entries.chunks(INTERMEDIATE_CHUNK_SIZE) {
723        let archive =
724          Self::zip_intermediate(entry_chunk, zip_options, options_initializers.clone()).await?;
725        intermediate_tx.send(archive).await?;
726      }
727      Ok::<(), MedusaZipError>(())
728    });
729
730    /* (2) ??? */
731    while let Some(intermediate_archive) = handle_intermediates.next().await {
732      let output_zip = output_zip.clone();
733      task::spawn_blocking(move || {
734        output_zip.lease().merge_archive(intermediate_archive)?;
735        Ok::<(), MedusaZipError>(())
736      })
737      .await??;
738    }
739    intermediate_stream_task.await??;
740
741    Ok(())
742  }
743
744  async fn zip_synchronous<Output>(
745    entries: Vec<ZipEntrySpecification>,
746    output_zip: OutputWrapper<ZipWriter<Output>>,
747    zip_options: zip::write::FileOptions,
748    mtime_behavior: ModifiedTimeBehavior,
749  ) -> Result<(), MedusaZipError>
750  where
751    Output: Write+Seek+Send+'static,
752  {
753    let options_initializers = Self::options_initializers(mtime_behavior);
754    for entry in entries.into_iter() {
755      let output_zip = output_zip.clone();
756      match entry {
757        ZipEntrySpecification::Directory(name) => {
758          task::spawn_blocking(move || {
759            let mut output_zip = output_zip.lease();
760            output_zip.add_directory(name.into_string(), zip_options)?;
761            Ok::<(), ZipError>(())
762          })
763          .await??;
764        },
765        ZipEntrySpecification::File(FileSource { name, source }) => {
766          let f = fs::OpenOptions::new()
767            .read(true)
768            .open(&source)
769            .await
770            .map_err(|e| MedusaInputReadError::SourceNotFound(source, e))?;
771          let metadata = f.metadata().await?;
772          let zip_options =
773            options_initializers.set_zip_options_for_file(zip_options, &metadata)?;
774          let mut f = f.into_std().await;
775          task::spawn_blocking(move || {
776            let mut output_zip = output_zip.lease();
777            output_zip.start_file(name.into_string(), zip_options)?;
778            std::io::copy(&mut f, &mut *output_zip)?;
779            Ok::<(), MedusaZipError>(())
780          })
781          .await??;
782        },
783      }
784    }
785
786    Ok(())
787  }
788
789  pub async fn zip<Output>(
790    self,
791    output_zip: OutputWrapper<ZipWriter<Output>>,
792  ) -> Result<OutputWrapper<ZipWriter<Output>>, MedusaZipError>
793  where
794    Output: Write+Seek+Send+'static,
795  {
796    let Self {
797      input_files,
798      zip_options: ZipOutputOptions {
799        mtime_behavior,
800        compression_options,
801      },
802      modifications,
803      parallelism,
804    } = self;
805
806    let EntrySpecificationList(entries) = task::spawn_blocking(move || {
807      EntrySpecificationList::from_file_specs(input_files, modifications)
808    })
809    .await??;
810
811    let static_options_initializers: Vec<Box<dyn DefaultInitializeZipOptions+Send+Sync>> =
812      vec![Box::new(mtime_behavior), Box::new(compression_options)];
813    let mut zip_options = ZipLibraryFileOptions::default();
814    for initializer in static_options_initializers.into_iter() {
815      zip_options = initializer.set_zip_options_static(zip_options);
816    }
817
818    match parallelism {
819      Parallelism::Synchronous => {
820        Self::zip_synchronous(entries, output_zip.clone(), zip_options, mtime_behavior).await?;
821      },
822      Parallelism::ParallelMerge => {
823        Self::zip_parallel(entries, output_zip.clone(), zip_options, mtime_behavior).await?;
824      },
825    }
826
827    Ok(output_zip)
828  }
829}