1use crate::{
13 destination::OutputWrapper,
14 zip::{calculate_new_rightmost_components, DefaultInitializeZipOptions, ModifiedTimeBehavior},
15 EntryName,
16};
17
18use displaydoc::Display;
19use futures::stream::StreamExt;
20use thiserror::Error;
21use tokio::{io, sync::mpsc, task};
22use tokio_stream::wrappers::ReceiverStream;
23use zip::{
24 read::ZipArchive,
25 result::ZipError,
26 write::{FileOptions as ZipLibraryFileOptions, ZipWriter},
27};
28
29use std::{
30 io::{Seek, Write},
31 path::PathBuf,
32};
33
34#[derive(Debug, Display, Error)]
35pub enum MedusaMergeError {
36 Zip(#[from] ZipError),
38 Io(#[from] io::Error),
40 Join(#[from] task::JoinError),
42 Send(#[from] mpsc::error::SendError<IntermediateMergeEntry>),
44}
45
46#[derive(Debug, Clone)]
47pub struct MergeGroup {
48 pub prefix: Option<EntryName>,
49 pub sources: Vec<PathBuf>,
50}
51
52#[derive(Default, Debug, Clone)]
53pub struct MedusaMerge {
54 pub groups: Vec<MergeGroup>,
55}
56
57pub enum IntermediateMergeEntry {
58 AddDirectory(EntryName),
59 MergeZip(ZipArchive<std::fs::File>),
60}
61
62const PARALLEL_MERGE_ENTRIES: usize = 10;
63
64impl MedusaMerge {
65 pub async fn merge<Output>(
66 self,
67 mtime_behavior: ModifiedTimeBehavior,
68 output_zip: OutputWrapper<ZipWriter<Output>>,
69 ) -> Result<OutputWrapper<ZipWriter<Output>>, MedusaMergeError>
70 where
71 Output: Write+Seek+Send+'static,
72 {
73 let Self { groups } = self;
74 let zip_options = mtime_behavior.set_zip_options_static(ZipLibraryFileOptions::default());
75
76 let (handle_tx, handle_rx) = mpsc::channel::<IntermediateMergeEntry>(PARALLEL_MERGE_ENTRIES);
79 let mut handle_jobs = ReceiverStream::new(handle_rx);
80 let handle_stream_task = task::spawn(async move {
81 let mut previous_directory_components: Vec<String> = Vec::new();
82 for MergeGroup { prefix, sources } in groups.into_iter() {
83 let current_directory_components: Vec<String> = prefix
84 .map(|p| {
85 p.all_components()
86 .map(|s| s.to_string())
87 .collect::<Vec<_>>()
88 })
89 .unwrap_or_default();
90 for new_rightmost_components in calculate_new_rightmost_components(
91 &previous_directory_components,
92 ¤t_directory_components,
93 ) {
94 let cur_intermediate_directory: String = new_rightmost_components.join("/");
95 let intermediate_dir = EntryName::validate(cur_intermediate_directory)
96 .expect("constructed virtual directory should be fine");
97 handle_tx
98 .send(IntermediateMergeEntry::AddDirectory(intermediate_dir))
99 .await?;
100 }
101 previous_directory_components = current_directory_components;
102
103 for src in sources.into_iter() {
104 let archive = task::spawn_blocking(move || {
105 let handle = std::fs::OpenOptions::new().read(true).open(src)?;
106 ZipArchive::new(handle)
107 })
108 .await??;
109 handle_tx
110 .send(IntermediateMergeEntry::MergeZip(archive))
111 .await?;
112 }
113 }
114 Ok::<(), MedusaMergeError>(())
115 });
116
117 while let Some(intermediate_entry) = handle_jobs.next().await {
118 let output_zip = output_zip.clone();
119 match intermediate_entry {
120 IntermediateMergeEntry::AddDirectory(name) => {
121 task::spawn_blocking(move || {
122 let mut output_zip = output_zip.lease();
123 output_zip.add_directory(name.into_string(), zip_options)?;
124 Ok::<(), ZipError>(())
125 })
126 .await??;
127 },
128 IntermediateMergeEntry::MergeZip(source_archive) => {
129 task::spawn_blocking(move || {
130 let mut output_zip = output_zip.lease();
131 output_zip.merge_archive(source_archive)?;
132 Ok::<(), ZipError>(())
133 })
134 .await??;
135 },
136 }
137 }
138 handle_stream_task.await??;
139
140 Ok(output_zip)
141 }
142}