libmedusa_zip/
merge.rs

1/*
2 * Description: ???
3 *
4 * Copyright (C) 2023 Danny McClanahan <dmcC2@hypnicjerk.ai>
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0 (see LICENSE).
8 */
9
10//! ???
11
12use crate::{
13  destination::OutputWrapper,
14  zip::{calculate_new_rightmost_components, DefaultInitializeZipOptions, ModifiedTimeBehavior},
15  EntryName,
16};
17
18use displaydoc::Display;
19use futures::stream::StreamExt;
20use thiserror::Error;
21use tokio::{io, sync::mpsc, task};
22use tokio_stream::wrappers::ReceiverStream;
23use zip::{
24  read::ZipArchive,
25  result::ZipError,
26  write::{FileOptions as ZipLibraryFileOptions, ZipWriter},
27};
28
29use std::{
30  io::{Seek, Write},
31  path::PathBuf,
32};
33
34#[derive(Debug, Display, Error)]
35pub enum MedusaMergeError {
36  /// internal zip impl error: {0}
37  Zip(#[from] ZipError),
38  /// i/o error: {0}
39  Io(#[from] io::Error),
40  /// error joining threads: {0}
41  Join(#[from] task::JoinError),
42  /// error sending value: {0}
43  Send(#[from] mpsc::error::SendError<IntermediateMergeEntry>),
44}
45
46#[derive(Debug, Clone)]
47pub struct MergeGroup {
48  pub prefix: Option<EntryName>,
49  pub sources: Vec<PathBuf>,
50}
51
52#[derive(Default, Debug, Clone)]
53pub struct MedusaMerge {
54  pub groups: Vec<MergeGroup>,
55}
56
57pub enum IntermediateMergeEntry {
58  AddDirectory(EntryName),
59  MergeZip(ZipArchive<std::fs::File>),
60}
61
62const PARALLEL_MERGE_ENTRIES: usize = 10;
63
64impl MedusaMerge {
65  pub async fn merge<Output>(
66    self,
67    mtime_behavior: ModifiedTimeBehavior,
68    output_zip: OutputWrapper<ZipWriter<Output>>,
69  ) -> Result<OutputWrapper<ZipWriter<Output>>, MedusaMergeError>
70  where
71    Output: Write+Seek+Send+'static,
72  {
73    let Self { groups } = self;
74    let zip_options = mtime_behavior.set_zip_options_static(ZipLibraryFileOptions::default());
75
76    /* This shouldn't really need to be bounded at all, since the task is
77     * entirely synchronous. */
78    let (handle_tx, handle_rx) = mpsc::channel::<IntermediateMergeEntry>(PARALLEL_MERGE_ENTRIES);
79    let mut handle_jobs = ReceiverStream::new(handle_rx);
80    let handle_stream_task = task::spawn(async move {
81      let mut previous_directory_components: Vec<String> = Vec::new();
82      for MergeGroup { prefix, sources } in groups.into_iter() {
83        let current_directory_components: Vec<String> = prefix
84          .map(|p| {
85            p.all_components()
86              .map(|s| s.to_string())
87              .collect::<Vec<_>>()
88          })
89          .unwrap_or_default();
90        for new_rightmost_components in calculate_new_rightmost_components(
91          &previous_directory_components,
92          &current_directory_components,
93        ) {
94          let cur_intermediate_directory: String = new_rightmost_components.join("/");
95          let intermediate_dir = EntryName::validate(cur_intermediate_directory)
96            .expect("constructed virtual directory should be fine");
97          handle_tx
98            .send(IntermediateMergeEntry::AddDirectory(intermediate_dir))
99            .await?;
100        }
101        previous_directory_components = current_directory_components;
102
103        for src in sources.into_iter() {
104          let archive = task::spawn_blocking(move || {
105            let handle = std::fs::OpenOptions::new().read(true).open(src)?;
106            ZipArchive::new(handle)
107          })
108          .await??;
109          handle_tx
110            .send(IntermediateMergeEntry::MergeZip(archive))
111            .await?;
112        }
113      }
114      Ok::<(), MedusaMergeError>(())
115    });
116
117    while let Some(intermediate_entry) = handle_jobs.next().await {
118      let output_zip = output_zip.clone();
119      match intermediate_entry {
120        IntermediateMergeEntry::AddDirectory(name) => {
121          task::spawn_blocking(move || {
122            let mut output_zip = output_zip.lease();
123            output_zip.add_directory(name.into_string(), zip_options)?;
124            Ok::<(), ZipError>(())
125          })
126          .await??;
127        },
128        IntermediateMergeEntry::MergeZip(source_archive) => {
129          task::spawn_blocking(move || {
130            let mut output_zip = output_zip.lease();
131            output_zip.merge_archive(source_archive)?;
132            Ok::<(), ZipError>(())
133          })
134          .await??;
135        },
136      }
137    }
138    handle_stream_task.await??;
139
140    Ok(output_zip)
141  }
142}