1pub mod aws;
3mod list;
4pub use list::ZipEntries;
5
6use std::sync::Arc;
7
8use anyhow::Error;
9use anyhow::{ensure, Context, Result};
10use async_zip::{Compression as AsyncCompression, ZipEntryBuilder};
11use aws_sdk_s3::output::GetObjectOutput;
12use clap::ValueEnum;
13use cobalt_async::checksum::CRC32Sink;
14use cobalt_async::counter::ByteLimit;
15use cobalt_aws::s3::S3Object;
16use cobalt_aws::s3::{AsyncMultipartUpload, AsyncPutObject};
17use futures::future;
18use futures::lock::Mutex;
19use futures::prelude::*;
20use futures::stream;
21use serde::{Deserialize, Serialize};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio_stream::wrappers::LinesStream;
24use tokio_util::compat::FuturesAsyncWriteCompatExt;
25use typed_builder::TypedBuilder;
26
27#[derive(Debug, Clone, ValueEnum, Copy, PartialEq, Eq)]
30pub enum Compression {
31 Stored,
33 Deflate,
35 Bzip,
37 Lzma,
39 Zstd,
41 Xz,
43}
44
45impl From<Compression> for AsyncCompression {
47 fn from(c: Compression) -> Self {
48 match c {
49 Compression::Stored => AsyncCompression::Stored,
50 Compression::Deflate => AsyncCompression::Deflate,
51 Compression::Bzip => AsyncCompression::Bz,
52 Compression::Lzma => AsyncCompression::Lzma,
53 Compression::Zstd => AsyncCompression::Zstd,
54 Compression::Xz => AsyncCompression::Xz,
55 }
56 }
57}
58
59const MAX_FILES_IN_ZIP: u16 = u16::MAX;
61const MAX_FILE_IN_ZIP_SIZE_BYTES: u32 = u32::MAX;
63const MAX_ZIP_FILE_SIZE_BYTES: u32 = u32::MAX;
65
66#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
69pub struct ManifestEntry {
70 pub object: S3Object,
72 pub filename_in_zip: String,
74 pub crc32: u32,
76}
77
78impl ManifestEntry {
79 pub fn new(object: &S3Object, crc32: u32, filename_in_zip: &str) -> Self {
81 ManifestEntry {
82 object: object.clone(),
83 crc32,
84 filename_in_zip: filename_in_zip.to_owned(),
85 }
86 }
87}
88
89pub struct ManifestFileUpload<'a> {
94 buffer: cobalt_aws::s3::AsyncPutObject<'a>,
95}
96
97impl<'a> ManifestFileUpload<'a> {
98 pub fn new(client: &'a aws_sdk_s3::Client, dst: &S3Object) -> ManifestFileUpload<'a> {
101 let manifest_upload = cobalt_aws::s3::AsyncPutObject::new(client, &dst.bucket, &dst.key);
102 ManifestFileUpload {
103 buffer: manifest_upload,
104 }
105 }
106
107 pub async fn write_manifest_entry(&mut self, entry: &ManifestEntry) -> Result<()> {
109 let manifest_entry = serde_json::to_string(&entry)? + "\n";
110 self.buffer
111 .write_all(manifest_entry.as_bytes())
112 .await
113 .map_err(Error::from)
114 }
115
116 pub async fn upload_object(&mut self) -> Result<()> {
119 self.buffer.close().await.map_err(Error::from)
120 }
121}
122
123enum ZipWrite {
126 Stream,
128 Whole,
130}
131
132#[derive(Debug, TypedBuilder)]
136pub struct Archiver<'a> {
137 #[builder(default)]
138 prefix_strip: Option<&'a str>,
139 compression: Compression,
140 #[builder(default=5 * bytesize::MIB as usize)]
141 part_size: usize,
142 #[builder(default = 2)]
143 src_fetch_buffer: usize,
144 #[builder(default = false)]
145 data_descriptors: bool,
146}
147
148impl<'a> Archiver<'a> {
149 fn entry_write_type(&self) -> ZipWrite {
155 if self.data_descriptors {
156 ZipWrite::Stream
157 } else {
158 ZipWrite::Whole
159 }
160 }
161
162 pub async fn create_zip<I>(
168 &self,
169 client: &aws_sdk_s3::Client,
170 srcs: I,
171 output_location: &S3Object,
172 manifest_object: Option<&S3Object>,
173 ) -> Result<()>
174 where
175 I: IntoIterator<Item = Result<S3Object>>,
176 {
177 ensure!(
179 self.prefix_strip.filter(|s| s.starts_with('/')).is_none(),
180 "prefix_strip must not start with `/`"
181 );
182 ensure!(
184 self.prefix_strip.is_none() || self.prefix_strip.filter(|s| s.ends_with('/')).is_some(),
185 "prefix_strip must end with `/`"
186 );
187
188 let upload =
190 AsyncMultipartUpload::new(client, output_location, self.part_size, None).await?;
191
192 let mut byte_limit =
193 ByteLimit::new_from_inner(upload, MAX_ZIP_FILE_SIZE_BYTES.into()).compat_write();
194 let zip = async_zip::write::ZipFileWriter::new(&mut byte_limit);
195 let zip = Arc::new(Mutex::new(zip));
196
197 let zip_stream = stream::iter(srcs.into_iter().zip(1_u64..))
198 .map(|(src, src_index)| src.map(|s| (s, src_index)))
199 .and_then(
200 |(src, src_index)| match src_index <= MAX_FILES_IN_ZIP.into() {
201 false => future::err(Error::msg("ZIP64 is not supported: Too many zip entries.")),
202 true => future::ok(src),
203 },
204 ).and_then(|src|{
205 future::ok((src
206 .key
207 .trim_start_matches(self.prefix_strip.unwrap_or_default())
208 .to_owned(), src))
209 })
210 .and_then(|(entry_path, src)| {
211 match entry_path.is_empty() {
212 true => future::err(Error::msg(format!(
213 "{} with out prefix {:?} is an invalid entry ",
214 src.key, self.prefix_strip
215 ))),
216 false => future::ok((src, entry_path)),
217 }
218 })
219 .map_ok(move |(src, entry_path)| {
220 client
221 .get_object()
222 .bucket(&src.bucket)
223 .key(&src.key)
224 .send()
225 .map_ok(|r| (r, src, entry_path))
226 .map_err(anyhow::Error::from)
227 })
228 .try_buffered(self.src_fetch_buffer) .and_then(|(response, src, entry_path)|{
230 match response.content_length() > MAX_FILE_IN_ZIP_SIZE_BYTES.into() {
231 true => future::err(Error::msg(format!(
232 "ZIP64 is not supported: Max file size is {MAX_FILE_IN_ZIP_SIZE_BYTES}, {src:?} is {} bytes", response.content_length)
233 )),
234 false => future::ok((response, src, entry_path))
235 }
236 })
237 .and_then(|(response, src, entry_path)| {
238 let zip = zip.clone();
239 async move {
240 self.process_entry(
241 &mut *zip.lock().await,
242 response,
243 &entry_path,
244 self.entry_write_type()
245 ).map_ok(|crc32| ManifestEntry::new(&src, crc32, &entry_path))
246 .await
247 }
248 });
249
250 match manifest_object {
252 Some(object) => {
253 zip_stream
254 .try_fold(
255 ManifestFileUpload::new(client, object),
256 |mut manifest_upload, entry| async move {
257 manifest_upload.write_manifest_entry(&entry).await?;
258 Ok(manifest_upload)
259 },
260 )
261 .await?
262 .upload_object()
263 .await?
264 }
265 None => zip_stream.map_ok(|_| ()).try_collect().await?,
266 };
267
268 let zip = Arc::try_unwrap(zip)
269 .map_err(|_| anyhow::Error::msg("Failed to unwrap ZipFileWriter"))?;
270 let zip = zip.into_inner();
271 zip.close().await?;
272 byte_limit.shutdown().await?;
274 Ok(())
275 }
276
277 async fn process_entry<T: tokio::io::AsyncWrite + Unpin>(
278 &self,
279 zip: &mut async_zip::write::ZipFileWriter<T>,
280 mut response: GetObjectOutput,
281 entry_path: &str,
282 write_type: ZipWrite,
283 ) -> Result<u32> {
284 let opts = ZipEntryBuilder::new(entry_path.to_owned(), self.compression.into());
285
286 let mut crc_sink = CRC32Sink::default();
288
289 match write_type {
290 ZipWrite::Stream => {
291 let mut entry_writer = zip.write_entry_stream(opts).await?;
292 while let Some(bytes) = response.body.next().await {
293 let bytes = bytes?;
294 entry_writer.write_all(&bytes).await?;
295 crc_sink.send(bytes).await?;
296 }
297 entry_writer.close().await?;
299 }
300 ZipWrite::Whole => {
301 let bytes = response.body.collect().await?.into_bytes();
302 zip.write_entry_whole(opts, &bytes).await?;
303 crc_sink.send(bytes).await?;
304 }
305 }
306 crc_sink.close().await?;
307 let crc = crc_sink
308 .value()
309 .context("Expected CRC Sink to have value")?;
310 Ok(crc)
311 }
312}
313
314pub async fn validate_manifest_file(
321 client: &aws_sdk_s3::Client,
322 manifest_file: &S3Object,
323 fetch_concurrency: usize,
324 validate_concurrency: usize,
325) -> Result<()> {
326 let manifest_lines = client
327 .get_object()
328 .bucket(&manifest_file.bucket)
329 .key(&manifest_file.key)
330 .send()
331 .map_ok(|r| r.body.into_async_read())
332 .map_ok(|l| BufReader::with_capacity(64 * bytesize::KB as usize, l))
333 .map_ok(|b| b.lines())
334 .await?;
335
336 LinesStream::new(manifest_lines)
337 .map_err(anyhow::Error::from)
338 .and_then(|l| {
339 future::ready(serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from))
340 })
341 .map_ok(move |entry| {
342 client
343 .get_object()
344 .bucket(&entry.object.bucket)
345 .key(&entry.object.key)
346 .send()
347 .map_err(anyhow::Error::from)
348 .map_ok(move |r| (r, entry))
349 })
350 .try_buffered(fetch_concurrency)
351 .map_ok(|(r, entry)| async move {
352 let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(CRC32Sink::default());
353 let mut buf =
354 BufReader::with_capacity(64 * bytesize::KB as usize, r.body.into_async_read());
355 tokio::io::copy(&mut buf, &mut tokio_sink).await?;
356 tokio_sink.shutdown().await?;
357 let sink_crc32 = tokio_sink
358 .into_inner()
359 .value()
360 .context("Expected a crc32 to be calculated")?;
361 ensure!(
362 entry.crc32 == sink_crc32,
363 "CRC for Entry {:?} in manifest file {:?} was {:?} does not match {sink_crc32}",
364 entry.object,
365 manifest_file,
366 entry.crc32
367 );
368 Ok(())
369 })
370 .try_buffered(validate_concurrency)
371 .try_collect()
372 .await
373}
374
375pub async fn validate_zip_entry_bytes(
379 client: &aws_sdk_s3::Client,
380 manifest_file: &S3Object,
381 zip_file: &S3Object,
382) -> Result<()> {
383 let manifest_request = client
384 .get_object()
385 .bucket(&manifest_file.bucket)
386 .key(&manifest_file.key)
387 .send()
388 .map_ok(|r| r.body.into_async_read())
389 .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r))
390 .map_ok(|b| b.lines());
391
392 let zip_request = client
393 .get_object()
394 .bucket(&zip_file.bucket)
395 .key(&zip_file.key)
396 .send()
397 .map_ok(|r| r.body.into_async_read())
398 .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r));
399
400 let (manifest_lines, zip_response) = futures::join!(manifest_request, zip_request);
401 let mut manifest_lines = manifest_lines?;
402
403 let mut zip_reader = async_zip::read::stream::ZipFileReader::new(zip_response?);
404
405 while !zip_reader.finished() {
406 if let Some(reader) = zip_reader.entry_reader().await? {
407 let entry_name = reader.entry().filename().to_owned();
408 let mut sink = FuturesAsyncWriteCompatExt::compat_write(CRC32Sink::default());
409 let crc_copy = std::panic::AssertUnwindSafe(
411 reader.copy_to_end_crc(&mut sink, 64 * bytesize::KB as usize),
412 )
413 .catch_unwind()
414 .map_err(|_| anyhow::Error::msg("Failed to "));
415
416 let (crc_copy, manifest_line) = futures::join!(crc_copy, manifest_lines.next_line());
417 crc_copy??;
418 sink.shutdown().await?;
419
420 let manifest_entry = manifest_line?
421 .context("Manifest has too few entries")
422 .and_then(|l| {
423 serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from)
424 })?;
425 validate_manifest_entry(
426 &manifest_entry,
427 &entry_name,
428 sink.into_inner()
429 .value()
430 .context("Expected a crc32 value")?,
431 )?;
432 }
433 }
434 ensure!(
435 manifest_lines.next_line().await?.is_none(),
436 "Manifest has more entries that the zip."
437 );
438
439 Ok(())
440}
441
442pub async fn unarchive_all(
446 client: &aws_sdk_s3::Client,
447 zip_file: &S3Object,
448 dst: &S3Object,
449 chunk_size: usize,
450) -> Result<()> {
451 ensure!(dst.key.ends_with('/'), "destination key must end with `/`");
452
453 let zip_response = client
454 .get_object()
455 .bucket(&zip_file.bucket)
456 .key(&zip_file.key)
457 .send()
458 .map_ok(|r| r.body.into_async_read())
459 .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r))
460 .await;
461
462 let mut zip_reader = async_zip::read::stream::ZipFileReader::new(zip_response?);
463 while !zip_reader.finished() {
464 if let Some(reader) = zip_reader.entry_reader().await? {
465 let entry = reader.entry();
466 let entry_name = entry.filename().to_owned();
467 let entry_size = entry.uncompressed_size();
468 let dst_key = dst.key.to_owned() + &entry_name;
469 if entry_size > chunk_size.try_into()? {
470 let dst_file = S3Object::new(&dst.bucket, dst_key);
471 let writer = AsyncMultipartUpload::new(client, &dst_file, chunk_size, None).await?;
472 let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(writer);
473 reader
474 .copy_to_end_crc(&mut tokio_sink, 64 * bytesize::KIB as usize)
475 .await?;
476 tokio_sink.shutdown().await?;
477 } else {
478 let writer = AsyncPutObject::new(client, &dst.bucket, &dst_key);
479 let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(writer);
480 reader
481 .copy_to_end_crc(&mut tokio_sink, 64 * bytesize::KIB as usize)
482 .await?;
483 tokio_sink.shutdown().await?;
484 }
485 }
486 }
487
488 Ok(())
489}
490
491fn validate_manifest_entry(
494 manifest_entry: &ManifestEntry,
495 filename: &str,
496 crc32: u32,
497) -> Result<()> {
498 ensure!(
499 manifest_entry.filename_in_zip == filename,
500 format!(
501 "Validation manifest entry filename {manifest_entry:?} did not match zip {filename}",
502 )
503 );
504 ensure!(
505 manifest_entry.crc32 == crc32,
506 format!("Validation error manifest entry {manifest_entry:?} crc32 did not match {crc32}",)
507 );
508 Ok(())
509}
510
511pub async fn validate_zip_central_dir(
515 client: &aws_sdk_s3::Client,
516 manifest_file: &S3Object,
517 zip_file: &S3Object,
518) -> Result<()> {
519 let manifest_request = client
520 .get_object()
521 .bucket(&manifest_file.bucket)
522 .key(&manifest_file.key)
523 .send()
524 .map_ok(|r| r.body.into_async_read())
525 .map_ok(|l| BufReader::with_capacity(64 * bytesize::KB as usize, l))
526 .map_ok(|b| b.lines());
527
528 let zip_request = aws::S3ObjectSeekableRead::new(client, zip_file, None);
529
530 let (manifest_lines, zip_response) = futures::join!(manifest_request, zip_request);
531 let mut manifest_lines = manifest_lines?;
532 let zip_response = zip_response?;
533
534 let zip_reader = async_zip::read::seek::ZipFileReader::new(zip_response).await?;
535
536 for entry in zip_reader.entries() {
537 let manifest_entry = manifest_lines
538 .next_line()
539 .await?
540 .context("Manifest has too few entries")
541 .and_then(|l| serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from))?;
542 validate_manifest_entry(&manifest_entry, entry.filename(), entry.crc32())?
543 }
544 ensure!(
545 manifest_lines.next_line().await?.is_none(),
546 "Manifest has more entries that the zip."
547 );
548 Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 #[test]
556 fn test_s3_tryfrom() {
557 let bucket = "test-bucket.to_owned()";
558 let key = "test-key";
559 let url: url::Url = format!("s3://{bucket}/{key}")
560 .parse()
561 .expect("Expected successful URL parsing");
562 let obj: S3Object = url.try_into().expect("Expected successful URL conversion");
563 assert_eq!(bucket, obj.bucket);
564 assert_eq!(key, obj.key);
565 }
566
567 #[test]
568 fn test_s3_tryfrom_no_path() {
569 let url: url::Url = "s3://test-bucket"
570 .parse()
571 .expect("Expected successful URL parsing");
572 let result: Result<S3Object> = url.try_into();
573 assert!(result.is_err())
574 }
575
576 #[test]
577 fn test_s3_tryfrom_file_url() {
578 let url: url::Url = "file://path/to/file"
579 .parse()
580 .expect("Expected successful URL parsing");
581 let result: Result<S3Object> = url.try_into();
582 assert!(result.is_err())
583 }
584}