1use crate::header::{Directory, Entry, FileMetadata, FilePosition, Integrity};
2use crate::{cfg_fs, cfg_integrity, cfg_stream, split_path};
3use std::io::SeekFrom;
4use tokio::io::{
5 self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, Take,
6};
7
8cfg_fs! {
9 use std::future::Future;
10 use std::path::Path;
11 use std::pin::Pin;
12 use tokio::fs::{read_dir, symlink_metadata, File as TokioFile};
13}
14
15cfg_integrity! {
16 use crate::BLOCK_SIZE;
17 use crate::header::{Algorithm, Hash};
18 use sha2::digest::Digest;
19 use sha2::Sha256;
20}
21
22cfg_stream! {
23 use bytes::Bytes;
24 use futures_core::Stream;
25 use futures_util::future::ok;
26 use futures_util::stream::{iter, once};
27 use futures_util::StreamExt;
28 use tokio_util::io::ReaderStream;
29}
30
31#[derive(Debug)]
33pub struct Writer<F: AsyncRead + Unpin> {
34 header: Directory,
35 file_offset: u64,
36 files: Vec<Take<F>>,
37}
38
39impl<F: AsyncRead + Unpin> Writer<F> {
40 pub fn new() -> Self {
42 Default::default()
43 }
44
45 fn add_folder_recursively(&mut self, segments: Vec<&str>) -> &mut Directory {
46 let mut dir = &mut self.header;
47 for seg in segments {
48 let entry = (dir.files)
49 .entry(seg.into())
50 .or_insert_with(|| Entry::Directory(Default::default()));
51 dir = match entry {
52 Entry::Directory(dir) => dir,
53 _ => unreachable!(),
54 }
55 }
56 dir
57 }
58
59 pub fn add(&mut self, path: &str, content: F, size: u64) {
74 self.add_with_options(path, content, size, false, None)
75 }
76
77 fn add_with_options(
78 &mut self,
79 path: &str,
80 content: F,
81 size: u64,
82 executable: bool,
83 integrity: Option<Integrity>,
84 ) {
85 let mut segments = split_path(path);
86 let filename = segments
87 .pop()
88 .expect("normalised path contains no filename");
89 let file_entry = FileMetadata {
90 pos: FilePosition::Offset(self.file_offset),
91 size,
92 executable,
93 integrity,
94 };
95 let result = self
96 .add_folder_recursively(segments)
97 .files
98 .insert(filename.into(), Entry::File(file_entry));
99 assert!(result.is_none());
100 self.file_offset += size;
101 self.files.push(content.take(size))
102 }
103
104 pub fn add_empty_folder(&mut self, path: &str) {
106 self.add_folder_recursively(split_path(path));
107 }
108
109 pub async fn write(self, dest: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> {
111 let header_bytes = serde_json::to_vec(&self.header)?;
112 let header_len = header_bytes.len() as u32;
113 let padding = match header_len % 4 {
114 0 => 0,
115 r => 4 - r,
116 };
117
118 dest.write_u32_le(4).await?;
119 dest.write_u32_le(header_len + padding + 8).await?;
120 dest.write_u32_le(header_len + padding + 4).await?;
121 dest.write_u32_le(header_len).await?;
122
123 dest.write_all(&header_bytes).await?;
124 dest.write_all(&vec![0; padding as _]).await?;
125
126 for mut file in self.files {
127 io::copy(&mut file, dest).await?;
128 }
129
130 Ok(())
131 }
132
133 cfg_stream! {
134 pub fn into_stream(self) -> io::Result<impl Stream<Item = io::Result<Bytes>>> {
135 let mut header_bytes = serde_json::to_vec(&self.header)?;
136 let header_len = header_bytes.len() as u32;
137 let padding = match header_len % 4 {
138 0 => 0,
139 r => 4 - r,
140 };
141 for _ in 0..padding {
142 header_bytes.extend(Some(0))
143 }
144
145 let mut header_meta = Vec::with_capacity((16 + header_len + padding) as _);
146 for i in [
147 4u32,
148 header_len + padding + 8,
149 header_len + padding + 4,
150 header_len,
151 ] {
152 header_meta.extend(i.to_le_bytes());
153 }
154
155 let stream = once(ok(header_meta.into()))
156 .chain(once(ok(header_bytes.into())))
157 .chain(iter(self.files.into_iter().map(ReaderStream::new)).flatten());
158 Ok(stream)
159 }
160 }
161}
162
163impl<F: AsyncRead + AsyncSeek + Unpin> Writer<F> {
164 pub async fn add_sized(&mut self, path: &str, mut content: F) -> io::Result<()> {
171 let size = content.seek(SeekFrom::End(0)).await? - content.stream_position().await?;
172 self.add(path, content, size);
173 Ok(())
174 }
175
176 cfg_integrity! {
177 pub async fn add_sized_with_integrity(&mut self, path: &str, mut content: F) -> io::Result<()> {
178 let mut global_state = Sha256::new();
179 let mut block = Vec::with_capacity(BLOCK_SIZE as _);
180 let mut blocks = Vec::new();
181 let mut size = 0;
182 loop {
183 let read_size = (&mut content)
184 .take(BLOCK_SIZE as _)
185 .read_to_end(&mut block)
186 .await?;
187 if read_size == 0 {
188 break;
189 }
190 size += read_size;
191 blocks.push(Hash(Sha256::digest(&block).to_vec()));
192 global_state.update(&block);
193 block.clear();
194 }
195 let integrity = Integrity {
196 algorithm: Algorithm::SHA256,
197 hash: Hash(global_state.finalize().to_vec()),
198 block_size: BLOCK_SIZE,
199 blocks,
200 };
201 content.rewind().await?;
202 self.add_with_options(path, content, size as _, false, Some(integrity));
203 Ok(())
204 }
205 }
206}
207
208impl<F: AsyncRead + Unpin> Default for Writer<F> {
209 fn default() -> Self {
210 Self {
211 header: Default::default(),
212 file_offset: 0,
213 files: Vec::new(),
214 }
215 }
216}
217
218cfg_fs! {
219 pub async fn pack_dir(
221 path: impl AsRef<Path>,
222 dest: &mut (impl AsyncWrite + Unpin),
223 ) -> io::Result<()> {
224 pack_dir_into_writer(path)
225 .await?
226 .write(dest)
227 .await
228 }
229
230 cfg_stream! {
231 pub async fn pack_dir_into_stream(
232 path: impl AsRef<Path>,
233 ) -> io::Result<impl Stream<Item = io::Result<Bytes>>> {
234 pack_dir_into_writer(path)
235 .await?
236 .into_stream()
237 }
238 }
239
240 pub async fn pack_dir_into_writer(
241 path: impl AsRef<Path>,
242 ) -> io::Result<Writer<TokioFile>> {
243 let path = path.as_ref().canonicalize()?;
244 let mut writer = Writer::<TokioFile>::new();
245 add_dir_files(&mut writer, &path, &path).await?;
246 Ok(writer)
247 }
248
249 fn add_dir_files<'a>(
250 writer: &'a mut Writer<TokioFile>,
251 path: &'a Path,
252 original_path: &'a Path,
253 ) -> Pin<Box<dyn Future<Output = io::Result<()>> + 'a>> {
254 Box::pin(async move {
255 if symlink_metadata(path).await?.is_dir() {
256 let mut rd = read_dir(path).await?;
257 while let Some(entry) = rd.next_entry().await? {
258 let file_type = entry.file_type().await?;
259 if file_type.is_dir() {
260 add_dir_files(writer, &entry.path(), original_path).await?;
261 } else if file_type.is_symlink() {
262 } else {
264 let absolute_path = entry.path();
265 let file = TokioFile::open(&absolute_path).await?;
266 let relative_path = absolute_path
267 .strip_prefix(original_path)
268 .unwrap()
269 .to_str()
270 .unwrap();
271 #[cfg(not(feature = "integrity"))]
272 writer.add_sized(relative_path, file).await?;
273 #[cfg(feature = "integrity")]
274 writer.add_sized_with_integrity(relative_path, file).await?;
275 }
276 }
277 }
278 Ok(())
279 })
280 }
281}