1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
 * Description: ???
 *
 * Copyright (C) 2023 Danny McClanahan <dmcC2@hypnicjerk.ai>
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (see LICENSE).
 */

//! ???

use crate::{
  destination::OutputWrapper,
  zip::{calculate_new_rightmost_components, DefaultInitializeZipOptions, ModifiedTimeBehavior},
  EntryName,
};

use displaydoc::Display;
use futures::stream::StreamExt;
use thiserror::Error;
use tokio::{io, sync::mpsc, task};
use tokio_stream::wrappers::ReceiverStream;
use zip::{
  read::ZipArchive,
  result::ZipError,
  write::{FileOptions as ZipLibraryFileOptions, ZipWriter},
};

use std::{
  io::{Seek, Write},
  path::PathBuf,
};

#[derive(Debug, Display, Error)]
pub enum MedusaMergeError {
  /// internal zip impl error: {0}
  Zip(#[from] ZipError),
  /// i/o error: {0}
  Io(#[from] io::Error),
  /// error joining threads: {0}
  Join(#[from] task::JoinError),
  /// error sending value: {0}
  Send(#[from] mpsc::error::SendError<IntermediateMergeEntry>),
}

#[derive(Debug, Clone)]
pub struct MergeGroup {
  pub prefix: Option<EntryName>,
  pub sources: Vec<PathBuf>,
}

#[derive(Default, Debug, Clone)]
pub struct MedusaMerge {
  pub groups: Vec<MergeGroup>,
}

pub enum IntermediateMergeEntry {
  AddDirectory(EntryName),
  MergeZip(ZipArchive<std::fs::File>),
}

const PARALLEL_MERGE_ENTRIES: usize = 10;

impl MedusaMerge {
  pub async fn merge<Output>(
    self,
    mtime_behavior: ModifiedTimeBehavior,
    output_zip: OutputWrapper<ZipWriter<Output>>,
  ) -> Result<OutputWrapper<ZipWriter<Output>>, MedusaMergeError>
  where
    Output: Write+Seek+Send+'static,
  {
    let Self { groups } = self;
    let zip_options = mtime_behavior.set_zip_options_static(ZipLibraryFileOptions::default());

    /* This shouldn't really need to be bounded at all, since the task is
     * entirely synchronous. */
    let (handle_tx, handle_rx) = mpsc::channel::<IntermediateMergeEntry>(PARALLEL_MERGE_ENTRIES);
    let mut handle_jobs = ReceiverStream::new(handle_rx);
    let handle_stream_task = task::spawn(async move {
      let mut previous_directory_components: Vec<String> = Vec::new();
      for MergeGroup { prefix, sources } in groups.into_iter() {
        let current_directory_components: Vec<String> = prefix
          .map(|p| {
            p.all_components()
              .map(|s| s.to_string())
              .collect::<Vec<_>>()
          })
          .unwrap_or_default();
        for new_rightmost_components in calculate_new_rightmost_components(
          &previous_directory_components,
          &current_directory_components,
        ) {
          let cur_intermediate_directory: String = new_rightmost_components.join("/");
          let intermediate_dir = EntryName::validate(cur_intermediate_directory)
            .expect("constructed virtual directory should be fine");
          handle_tx
            .send(IntermediateMergeEntry::AddDirectory(intermediate_dir))
            .await?;
        }
        previous_directory_components = current_directory_components;

        for src in sources.into_iter() {
          let archive = task::spawn_blocking(move || {
            let handle = std::fs::OpenOptions::new().read(true).open(src)?;
            ZipArchive::new(handle)
          })
          .await??;
          handle_tx
            .send(IntermediateMergeEntry::MergeZip(archive))
            .await?;
        }
      }
      Ok::<(), MedusaMergeError>(())
    });

    while let Some(intermediate_entry) = handle_jobs.next().await {
      let output_zip = output_zip.clone();
      match intermediate_entry {
        IntermediateMergeEntry::AddDirectory(name) => {
          task::spawn_blocking(move || {
            let mut output_zip = output_zip.lease();
            output_zip.add_directory(name.into_string(), zip_options)?;
            Ok::<(), ZipError>(())
          })
          .await??;
        },
        IntermediateMergeEntry::MergeZip(source_archive) => {
          task::spawn_blocking(move || {
            let mut output_zip = output_zip.lease();
            output_zip.merge_archive(source_archive)?;
            Ok::<(), ZipError>(())
          })
          .await??;
        },
      }
    }
    handle_stream_task.await??;

    Ok(output_zip)
  }
}