hive_asar/
writer.rs

1use crate::header::{Directory, Entry, FileMetadata, FilePosition, Integrity};
2use crate::{cfg_fs, cfg_integrity, cfg_stream, split_path};
3use std::io::SeekFrom;
4use tokio::io::{
5  self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, Take,
6};
7
8cfg_fs! {
9  use std::future::Future;
10  use std::path::Path;
11  use std::pin::Pin;
12  use tokio::fs::{read_dir, symlink_metadata, File as TokioFile};
13}
14
15cfg_integrity! {
16  use crate::BLOCK_SIZE;
17  use crate::header::{Algorithm, Hash};
18  use sha2::digest::Digest;
19  use sha2::Sha256;
20}
21
22cfg_stream! {
23  use bytes::Bytes;
24  use futures_core::Stream;
25  use futures_util::future::ok;
26  use futures_util::stream::{iter, once};
27  use futures_util::StreamExt;
28  use tokio_util::io::ReaderStream;
29}
30
31/// Asar archive writer.
32#[derive(Debug)]
33pub struct Writer<F: AsyncRead + Unpin> {
34  header: Directory,
35  file_offset: u64,
36  files: Vec<Take<F>>,
37}
38
39impl<F: AsyncRead + Unpin> Writer<F> {
40  /// Creates a new, empty archive writer.
41  pub fn new() -> Self {
42    Default::default()
43  }
44
45  fn add_folder_recursively(&mut self, segments: Vec<&str>) -> &mut Directory {
46    let mut dir = &mut self.header;
47    for seg in segments {
48      let entry = (dir.files)
49        .entry(seg.into())
50        .or_insert_with(|| Entry::Directory(Default::default()));
51      dir = match entry {
52        Entry::Directory(dir) => dir,
53        _ => unreachable!(),
54      }
55    }
56    dir
57  }
58
59  /// Add an entry to the archive.
60  ///
61  /// The entry's parent directories are created recursively if they do not
62  /// exist.
63  ///
64  /// `size` should correspond with `content`. If `size` is smaller, exactly
65  /// `size` bytes will be written. If `size` is bigger, the
66  /// [`Writer::write`] method will fail. For convenience, you may want
67  /// to use `add_sized`.
68  ///
69  /// # Panic
70  ///
71  /// The method panics if normalised `path` contains no filename, or if the
72  /// path is already occupied by a previously inserted file.
73  pub fn add(&mut self, path: &str, content: F, size: u64) {
74    self.add_with_options(path, content, size, false, None)
75  }
76
77  fn add_with_options(
78    &mut self,
79    path: &str,
80    content: F,
81    size: u64,
82    executable: bool,
83    integrity: Option<Integrity>,
84  ) {
85    let mut segments = split_path(path);
86    let filename = segments
87      .pop()
88      .expect("normalised path contains no filename");
89    let file_entry = FileMetadata {
90      pos: FilePosition::Offset(self.file_offset),
91      size,
92      executable,
93      integrity,
94    };
95    let result = self
96      .add_folder_recursively(segments)
97      .files
98      .insert(filename.into(), Entry::File(file_entry));
99    assert!(result.is_none());
100    self.file_offset += size;
101    self.files.push(content.take(size))
102  }
103
104  /// Adds an empty folder recursively to the archive.
105  pub fn add_empty_folder(&mut self, path: &str) {
106    self.add_folder_recursively(split_path(path));
107  }
108
109  /// Finishes the archive and writes the content into `dest`.
110  pub async fn write(self, dest: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> {
111    let header_bytes = serde_json::to_vec(&self.header)?;
112    let header_len = header_bytes.len() as u32;
113    let padding = match header_len % 4 {
114      0 => 0,
115      r => 4 - r,
116    };
117
118    dest.write_u32_le(4).await?;
119    dest.write_u32_le(header_len + padding + 8).await?;
120    dest.write_u32_le(header_len + padding + 4).await?;
121    dest.write_u32_le(header_len).await?;
122
123    dest.write_all(&header_bytes).await?;
124    dest.write_all(&vec![0; padding as _]).await?;
125
126    for mut file in self.files {
127      io::copy(&mut file, dest).await?;
128    }
129
130    Ok(())
131  }
132
133  cfg_stream! {
134    pub fn into_stream(self) -> io::Result<impl Stream<Item = io::Result<Bytes>>> {
135      let mut header_bytes = serde_json::to_vec(&self.header)?;
136      let header_len = header_bytes.len() as u32;
137      let padding = match header_len % 4 {
138        0 => 0,
139        r => 4 - r,
140      };
141      for _ in 0..padding {
142        header_bytes.extend(Some(0))
143      }
144
145      let mut header_meta = Vec::with_capacity((16 + header_len + padding) as _);
146      for i in [
147        4u32,
148        header_len + padding + 8,
149        header_len + padding + 4,
150        header_len,
151      ] {
152        header_meta.extend(i.to_le_bytes());
153      }
154
155      let stream = once(ok(header_meta.into()))
156        .chain(once(ok(header_bytes.into())))
157        .chain(iter(self.files.into_iter().map(ReaderStream::new)).flatten());
158      Ok(stream)
159    }
160  }
161}
162
163impl<F: AsyncRead + AsyncSeek + Unpin> Writer<F> {
164  /// Add an entry to the archive.
165  ///
166  /// Similar to [`Writer::add`], but it uses [`AsyncSeekExt::seek`] to
167  /// determine the size of the content.
168  ///
169  /// For more information see [`Writer::add`].
170  pub async fn add_sized(&mut self, path: &str, mut content: F) -> io::Result<()> {
171    let size = content.seek(SeekFrom::End(0)).await? - content.stream_position().await?;
172    self.add(path, content, size);
173    Ok(())
174  }
175
176  cfg_integrity! {
177    pub async fn add_sized_with_integrity(&mut self, path: &str, mut content: F) -> io::Result<()> {
178      let mut global_state = Sha256::new();
179      let mut block = Vec::with_capacity(BLOCK_SIZE as _);
180      let mut blocks = Vec::new();
181      let mut size = 0;
182      loop {
183        let read_size = (&mut content)
184          .take(BLOCK_SIZE as _)
185          .read_to_end(&mut block)
186          .await?;
187        if read_size == 0 {
188          break;
189        }
190        size += read_size;
191        blocks.push(Hash(Sha256::digest(&block).to_vec()));
192        global_state.update(&block);
193        block.clear();
194      }
195      let integrity = Integrity {
196        algorithm: Algorithm::SHA256,
197        hash: Hash(global_state.finalize().to_vec()),
198        block_size: BLOCK_SIZE,
199        blocks,
200      };
201      content.rewind().await?;
202      self.add_with_options(path, content, size as _, false, Some(integrity));
203      Ok(())
204    }
205  }
206}
207
208impl<F: AsyncRead + Unpin> Default for Writer<F> {
209  fn default() -> Self {
210    Self {
211      header: Default::default(),
212      file_offset: 0,
213      files: Vec::new(),
214    }
215  }
216}
217
218cfg_fs! {
219  /// Pack a directory to asar archive.
220  pub async fn pack_dir(
221    path: impl AsRef<Path>,
222    dest: &mut (impl AsyncWrite + Unpin),
223  ) -> io::Result<()> {
224    pack_dir_into_writer(path)
225      .await?
226      .write(dest)
227      .await
228  }
229
230  cfg_stream! {
231    pub async fn pack_dir_into_stream(
232      path: impl AsRef<Path>,
233    ) -> io::Result<impl Stream<Item = io::Result<Bytes>>> {
234      pack_dir_into_writer(path)
235        .await?
236        .into_stream()
237    }
238  }
239
240  pub async fn pack_dir_into_writer(
241    path: impl AsRef<Path>,
242  ) -> io::Result<Writer<TokioFile>> {
243    let path = path.as_ref().canonicalize()?;
244    let mut writer = Writer::<TokioFile>::new();
245    add_dir_files(&mut writer, &path, &path).await?;
246    Ok(writer)
247  }
248
249  fn add_dir_files<'a>(
250    writer: &'a mut Writer<TokioFile>,
251    path: &'a Path,
252    original_path: &'a Path,
253  ) -> Pin<Box<dyn Future<Output = io::Result<()>> + 'a>> {
254    Box::pin(async move {
255      if symlink_metadata(path).await?.is_dir() {
256        let mut rd = read_dir(path).await?;
257        while let Some(entry) = rd.next_entry().await? {
258          let file_type = entry.file_type().await?;
259          if file_type.is_dir() {
260            add_dir_files(writer, &entry.path(), original_path).await?;
261          } else if file_type.is_symlink() {
262            // do nothing
263          } else {
264            let absolute_path = entry.path();
265            let file = TokioFile::open(&absolute_path).await?;
266            let relative_path = absolute_path
267              .strip_prefix(original_path)
268              .unwrap()
269              .to_str()
270              .unwrap();
271            #[cfg(not(feature = "integrity"))]
272            writer.add_sized(relative_path, file).await?;
273            #[cfg(feature = "integrity")]
274            writer.add_sized_with_integrity(relative_path, file).await?;
275          }
276        }
277      }
278      Ok(())
279    })
280  }
281}