cobalt_s3_archiver/
lib.rs

1//! Allows ZIP archives to be created in S3 from files stored in S3.
2pub mod aws;
3mod list;
4pub use list::ZipEntries;
5
6use std::sync::Arc;
7
8use anyhow::Error;
9use anyhow::{ensure, Context, Result};
10use async_zip::{Compression as AsyncCompression, ZipEntryBuilder};
11use aws_sdk_s3::output::GetObjectOutput;
12use clap::ValueEnum;
13use cobalt_async::checksum::CRC32Sink;
14use cobalt_async::counter::ByteLimit;
15use cobalt_aws::s3::S3Object;
16use cobalt_aws::s3::{AsyncMultipartUpload, AsyncPutObject};
17use futures::future;
18use futures::lock::Mutex;
19use futures::prelude::*;
20use futures::stream;
21use serde::{Deserialize, Serialize};
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio_stream::wrappers::LinesStream;
24use tokio_util::compat::FuturesAsyncWriteCompatExt;
25use typed_builder::TypedBuilder;
26
27/// The compression algorithms that are available to be used
28/// to compress entries in the ZIP archive.
29#[derive(Debug, Clone, ValueEnum, Copy, PartialEq, Eq)]
30pub enum Compression {
31    /// No compression.
32    Stored,
33    /// Use Deflate compression.
34    Deflate,
35    /// Use Bzip2 compression.
36    Bzip,
37    /// Use LZMA compression.
38    Lzma,
39    /// Use ZSTD compression.
40    Zstd,
41    /// Use XZ compression.
42    Xz,
43}
44
45/// Conversion from [Compression] into [async_zip::Compression]
46impl From<Compression> for AsyncCompression {
47    fn from(c: Compression) -> Self {
48        match c {
49            Compression::Stored => AsyncCompression::Stored,
50            Compression::Deflate => AsyncCompression::Deflate,
51            Compression::Bzip => AsyncCompression::Bz,
52            Compression::Lzma => AsyncCompression::Lzma,
53            Compression::Zstd => AsyncCompression::Zstd,
54            Compression::Xz => AsyncCompression::Xz,
55        }
56    }
57}
58
59/// Maximum files allowed in a non ZIP64 ZIP file.
60const MAX_FILES_IN_ZIP: u16 = u16::MAX;
61/// Maximum size of file allowed allowed in a non ZIP64 ZIP file.
62const MAX_FILE_IN_ZIP_SIZE_BYTES: u32 = u32::MAX;
63/// Maximum total size of ZIP file in non ZIP64 ZIP file.
64const MAX_ZIP_FILE_SIZE_BYTES: u32 = u32::MAX;
65
66/// A single entry in the manifest file which describes
67/// a single entry in the output ZIP archive.
68#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
69pub struct ManifestEntry {
70    /// S3 Object of the source object for this entry.
71    pub object: S3Object,
72    /// The file name of the entry in the ZIP archive.
73    pub filename_in_zip: String,
74    /// CRC32 of the source file.
75    pub crc32: u32,
76}
77
78impl ManifestEntry {
79    /// Create a new [ManifestEntry] using the provided values.
80    pub fn new(object: &S3Object, crc32: u32, filename_in_zip: &str) -> Self {
81        ManifestEntry {
82            object: object.clone(),
83            crc32,
84            filename_in_zip: filename_in_zip.to_owned(),
85        }
86    }
87}
88
89/// Serializes a sequence of [ManifestEntry] instances into S3 as a JSONL file.
90///
91/// This will buffer all [ManifestEntry] records in memory and write to S3 when
92/// [ManifestFileUpload::upload_object] is called.
93pub struct ManifestFileUpload<'a> {
94    buffer: cobalt_aws::s3::AsyncPutObject<'a>,
95}
96
97impl<'a> ManifestFileUpload<'a> {
98    /// Create a new [ManifestFileUpload] instance, which will write the data into
99    /// the `dst` [S3Object] when [ManifestFileUpload::upload_object] is called.
100    pub fn new(client: &'a aws_sdk_s3::Client, dst: &S3Object) -> ManifestFileUpload<'a> {
101        let manifest_upload = cobalt_aws::s3::AsyncPutObject::new(client, &dst.bucket, &dst.key);
102        ManifestFileUpload {
103            buffer: manifest_upload,
104        }
105    }
106
107    /// Adds the `entry` into the buffer
108    pub async fn write_manifest_entry(&mut self, entry: &ManifestEntry) -> Result<()> {
109        let manifest_entry = serde_json::to_string(&entry)? + "\n";
110        self.buffer
111            .write_all(manifest_entry.as_bytes())
112            .await
113            .map_err(Error::from)
114    }
115
116    /// Writes the bytes into S3.  Calling this method twice will cause
117    /// an [Err] to be returned.
118    pub async fn upload_object(&mut self) -> Result<()> {
119        self.buffer.close().await.map_err(Error::from)
120    }
121}
122
123/// Type of write to do. By reading the [Whole] file
124/// into memory a `Data descriptor` record is not needed.
125enum ZipWrite {
126    /// Stream the file into the Zip
127    Stream,
128    /// Read the entire file before writing into the Zip
129    Whole,
130}
131
132/// An [Archiver] allows the creation of a ZIP archive
133/// in S3 allowing control of what and how the data
134/// is written into S3.
135#[derive(Debug, TypedBuilder)]
136pub struct Archiver<'a> {
137    #[builder(default)]
138    prefix_strip: Option<&'a str>,
139    compression: Compression,
140    #[builder(default=5 * bytesize::MIB as usize)]
141    part_size: usize,
142    #[builder(default = 2)]
143    src_fetch_buffer: usize,
144    #[builder(default = false)]
145    data_descriptors: bool,
146}
147
148impl<'a> Archiver<'a> {
149    /// Returns the [ZipWrite] type based on
150    /// if the [Archiver] was created with
151    /// `data_descriptors` set to true of false.
152    /// `data_descriptors` set to true will return
153    /// [ZipWrite::Stream] else [ZipWrite::Whole].
154    fn entry_write_type(&self) -> ZipWrite {
155        if self.data_descriptors {
156            ZipWrite::Stream
157        } else {
158            ZipWrite::Whole
159        }
160    }
161
162    /// Creates a ZIP archive in S3 at the `output_location`
163    /// using the `client` from the `srcs` [S3Object]s.
164    /// Optionally a `manifest` object is created in S3 which
165    /// contains details of the files which have been added
166    /// into the archive.
167    pub async fn create_zip<I>(
168        &self,
169        client: &aws_sdk_s3::Client,
170        srcs: I,
171        output_location: &S3Object,
172        manifest_object: Option<&S3Object>,
173    ) -> Result<()>
174    where
175        I: IntoIterator<Item = Result<S3Object>>,
176    {
177        //S3 keys can start with "/" but this is a little confusing
178        ensure!(
179            self.prefix_strip.filter(|s| s.starts_with('/')).is_none(),
180            "prefix_strip must not start with `/`"
181        );
182        //Fail early to ensure that the dir structure in the Zip does not have '/' at the root.
183        ensure!(
184            self.prefix_strip.is_none() || self.prefix_strip.filter(|s| s.ends_with('/')).is_some(),
185            "prefix_strip must end with `/`"
186        );
187
188        //Create the upload and the zip writer.
189        let upload =
190            AsyncMultipartUpload::new(client, output_location, self.part_size, None).await?;
191
192        let mut byte_limit =
193            ByteLimit::new_from_inner(upload, MAX_ZIP_FILE_SIZE_BYTES.into()).compat_write();
194        let zip = async_zip::write::ZipFileWriter::new(&mut byte_limit);
195        let zip = Arc::new(Mutex::new(zip));
196
197        let zip_stream = stream::iter(srcs.into_iter().zip(1_u64..))
198        .map(|(src, src_index)| src.map(|s| (s, src_index)))
199        .and_then(
200            |(src, src_index)| match src_index <= MAX_FILES_IN_ZIP.into() {
201                false => future::err(Error::msg("ZIP64 is not supported: Too many zip entries.")),
202                true => future::ok(src),
203            },
204        ).and_then(|src|{
205         future::ok((src
206                .key
207                .trim_start_matches(self.prefix_strip.unwrap_or_default())
208                .to_owned(), src))
209        })
210        .and_then(|(entry_path, src)| {
211           match entry_path.is_empty() {
212                true => future::err(Error::msg(format!(
213                    "{} with out prefix {:?} is an invalid entry ",
214                    src.key, self.prefix_strip
215                ))),
216                false => future::ok((src, entry_path)),
217            }
218        })
219        .map_ok(move |(src, entry_path)| {
220            client
221                .get_object()
222                .bucket(&src.bucket)
223                .key(&src.key)
224                .send()
225                .map_ok(|r| (r, src, entry_path))
226                .map_err(anyhow::Error::from)
227        })
228        .try_buffered(self.src_fetch_buffer) //This prefetches the S3 src objects
229        .and_then(|(response, src, entry_path)|{
230            match response.content_length() > MAX_FILE_IN_ZIP_SIZE_BYTES.into() {
231                true => future::err(Error::msg(format!(
232                            "ZIP64 is not supported: Max file size is {MAX_FILE_IN_ZIP_SIZE_BYTES}, {src:?} is {} bytes", response.content_length)
233                        )),
234                false => future::ok((response, src, entry_path))
235            }
236        })
237        .and_then(|(response, src, entry_path)| {
238            let zip = zip.clone();
239            async move {
240                self.process_entry(
241                    &mut *zip.lock().await,
242                    response,
243                    &entry_path,
244                    self.entry_write_type()
245                ).map_ok(|crc32| ManifestEntry::new(&src, crc32, &entry_path))
246                .await
247            }
248        });
249
250        // If manifests are needed fold a upload over the stream
251        match manifest_object {
252            Some(object) => {
253                zip_stream
254                    .try_fold(
255                        ManifestFileUpload::new(client, object),
256                        |mut manifest_upload, entry| async move {
257                            manifest_upload.write_manifest_entry(&entry).await?;
258                            Ok(manifest_upload)
259                        },
260                    )
261                    .await?
262                    .upload_object()
263                    .await?
264            }
265            None => zip_stream.map_ok(|_| ()).try_collect().await?,
266        };
267
268        let zip = Arc::try_unwrap(zip)
269            .map_err(|_| anyhow::Error::msg("Failed to unwrap ZipFileWriter"))?;
270        let zip = zip.into_inner();
271        zip.close().await?;
272        //The zip writer does not close the multipart upload
273        byte_limit.shutdown().await?;
274        Ok(())
275    }
276
277    async fn process_entry<T: tokio::io::AsyncWrite + Unpin>(
278        &self,
279        zip: &mut async_zip::write::ZipFileWriter<T>,
280        mut response: GetObjectOutput,
281        entry_path: &str,
282        write_type: ZipWrite,
283    ) -> Result<u32> {
284        let opts = ZipEntryBuilder::new(entry_path.to_owned(), self.compression.into());
285
286        //crc is needed for validation
287        let mut crc_sink = CRC32Sink::default();
288
289        match write_type {
290            ZipWrite::Stream => {
291                let mut entry_writer = zip.write_entry_stream(opts).await?;
292                while let Some(bytes) = response.body.next().await {
293                    let bytes = bytes?;
294                    entry_writer.write_all(&bytes).await?;
295                    crc_sink.send(bytes).await?;
296                }
297                // If this is not done the Zip file produced silently corrupts
298                entry_writer.close().await?;
299            }
300            ZipWrite::Whole => {
301                let bytes = response.body.collect().await?.into_bytes();
302                zip.write_entry_whole(opts, &bytes).await?;
303                crc_sink.send(bytes).await?;
304            }
305        }
306        crc_sink.close().await?;
307        let crc = crc_sink
308            .value()
309            .context("Expected CRC Sink to have value")?;
310        Ok(crc)
311    }
312}
313
314/// Validates that all the src files from [ManifestEntry] records in the
315/// manifest file exist, and that their CRC32 values match the value in the manifest.
316/// The `fetch_concurrency` in the number of source files that will
317/// be fetched concurrently from S3.
318/// The `validate_concurrency` is the number of source files that will
319/// be validated concurrently.
320pub async fn validate_manifest_file(
321    client: &aws_sdk_s3::Client,
322    manifest_file: &S3Object,
323    fetch_concurrency: usize,
324    validate_concurrency: usize,
325) -> Result<()> {
326    let manifest_lines = client
327        .get_object()
328        .bucket(&manifest_file.bucket)
329        .key(&manifest_file.key)
330        .send()
331        .map_ok(|r| r.body.into_async_read())
332        .map_ok(|l| BufReader::with_capacity(64 * bytesize::KB as usize, l))
333        .map_ok(|b| b.lines())
334        .await?;
335
336    LinesStream::new(manifest_lines)
337        .map_err(anyhow::Error::from)
338        .and_then(|l| {
339            future::ready(serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from))
340        })
341        .map_ok(move |entry| {
342            client
343                .get_object()
344                .bucket(&entry.object.bucket)
345                .key(&entry.object.key)
346                .send()
347                .map_err(anyhow::Error::from)
348                .map_ok(move |r| (r, entry))
349        })
350        .try_buffered(fetch_concurrency)
351        .map_ok(|(r, entry)| async move {
352            let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(CRC32Sink::default());
353            let mut buf =
354                BufReader::with_capacity(64 * bytesize::KB as usize, r.body.into_async_read());
355            tokio::io::copy(&mut buf, &mut tokio_sink).await?;
356            tokio_sink.shutdown().await?;
357            let sink_crc32 = tokio_sink
358                .into_inner()
359                .value()
360                .context("Expected a crc32 to be calculated")?;
361            ensure!(
362                entry.crc32 == sink_crc32,
363                "CRC for Entry {:?} in manifest file {:?} was {:?} does not match {sink_crc32}",
364                entry.object,
365                manifest_file,
366                entry.crc32
367            );
368            Ok(())
369        })
370        .try_buffered(validate_concurrency)
371        .try_collect()
372        .await
373}
374
375/// Validate each ZIP entry by reading each [ManifestEntry] from the `manifest` file
376/// and ensuring that the CRC32 of bytes in the ZIP archive match those recorded in the
377/// manifest file.
378pub async fn validate_zip_entry_bytes(
379    client: &aws_sdk_s3::Client,
380    manifest_file: &S3Object,
381    zip_file: &S3Object,
382) -> Result<()> {
383    let manifest_request = client
384        .get_object()
385        .bucket(&manifest_file.bucket)
386        .key(&manifest_file.key)
387        .send()
388        .map_ok(|r| r.body.into_async_read())
389        .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r))
390        .map_ok(|b| b.lines());
391
392    let zip_request = client
393        .get_object()
394        .bucket(&zip_file.bucket)
395        .key(&zip_file.key)
396        .send()
397        .map_ok(|r| r.body.into_async_read())
398        .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r));
399
400    let (manifest_lines, zip_response) = futures::join!(manifest_request, zip_request);
401    let mut manifest_lines = manifest_lines?;
402
403    let mut zip_reader = async_zip::read::stream::ZipFileReader::new(zip_response?);
404
405    while !zip_reader.finished() {
406        if let Some(reader) = zip_reader.entry_reader().await? {
407            let entry_name = reader.entry().filename().to_owned();
408            let mut sink = FuturesAsyncWriteCompatExt::compat_write(CRC32Sink::default());
409            //Using the stream reader panics with Stored items
410            let crc_copy = std::panic::AssertUnwindSafe(
411                reader.copy_to_end_crc(&mut sink, 64 * bytesize::KB as usize),
412            )
413            .catch_unwind()
414            .map_err(|_| anyhow::Error::msg("Failed to "));
415
416            let (crc_copy, manifest_line) = futures::join!(crc_copy, manifest_lines.next_line());
417            crc_copy??;
418            sink.shutdown().await?;
419
420            let manifest_entry = manifest_line?
421                .context("Manifest has too few entries")
422                .and_then(|l| {
423                    serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from)
424                })?;
425            validate_manifest_entry(
426                &manifest_entry,
427                &entry_name,
428                sink.into_inner()
429                    .value()
430                    .context("Expected a crc32 value")?,
431            )?;
432        }
433    }
434    ensure!(
435        manifest_lines.next_line().await?.is_none(),
436        "Manifest has more entries that the zip."
437    );
438
439    Ok(())
440}
441
442/// Extract all files from the ZIP at the `zip_file` location into the destination prefix.
443/// If any of the files in the ZIP are larger than `chunk_size` then is is uploaded to S3
444/// as a multipart upload.
445pub async fn unarchive_all(
446    client: &aws_sdk_s3::Client,
447    zip_file: &S3Object,
448    dst: &S3Object,
449    chunk_size: usize,
450) -> Result<()> {
451    ensure!(dst.key.ends_with('/'), "destination key must end with `/`");
452
453    let zip_response = client
454        .get_object()
455        .bucket(&zip_file.bucket)
456        .key(&zip_file.key)
457        .send()
458        .map_ok(|r| r.body.into_async_read())
459        .map_ok(|r| BufReader::with_capacity(64 * bytesize::KB as usize, r))
460        .await;
461
462    let mut zip_reader = async_zip::read::stream::ZipFileReader::new(zip_response?);
463    while !zip_reader.finished() {
464        if let Some(reader) = zip_reader.entry_reader().await? {
465            let entry = reader.entry();
466            let entry_name = entry.filename().to_owned();
467            let entry_size = entry.uncompressed_size();
468            let dst_key = dst.key.to_owned() + &entry_name;
469            if entry_size > chunk_size.try_into()? {
470                let dst_file = S3Object::new(&dst.bucket, dst_key);
471                let writer = AsyncMultipartUpload::new(client, &dst_file, chunk_size, None).await?;
472                let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(writer);
473                reader
474                    .copy_to_end_crc(&mut tokio_sink, 64 * bytesize::KIB as usize)
475                    .await?;
476                tokio_sink.shutdown().await?;
477            } else {
478                let writer = AsyncPutObject::new(client, &dst.bucket, &dst_key);
479                let mut tokio_sink = FuturesAsyncWriteCompatExt::compat_write(writer);
480                reader
481                    .copy_to_end_crc(&mut tokio_sink, 64 * bytesize::KIB as usize)
482                    .await?;
483                tokio_sink.shutdown().await?;
484            }
485        }
486    }
487
488    Ok(())
489}
490
491/// Validate that the `filename` and `crc32`
492/// match the `manifest_entry`.
493fn validate_manifest_entry(
494    manifest_entry: &ManifestEntry,
495    filename: &str,
496    crc32: u32,
497) -> Result<()> {
498    ensure!(
499        manifest_entry.filename_in_zip == filename,
500        format!(
501            "Validation manifest entry filename {manifest_entry:?} did not match zip {filename}",
502        )
503    );
504    ensure!(
505        manifest_entry.crc32 == crc32,
506        format!("Validation error manifest entry {manifest_entry:?} crc32 did not match {crc32}",)
507    );
508    Ok(())
509}
510
511/// Validate that the CRC32 value and filenames in each [ManifestEntry]
512/// in the `manifest_file` matches those in the central directory
513/// of the `zip_file`.
514pub async fn validate_zip_central_dir(
515    client: &aws_sdk_s3::Client,
516    manifest_file: &S3Object,
517    zip_file: &S3Object,
518) -> Result<()> {
519    let manifest_request = client
520        .get_object()
521        .bucket(&manifest_file.bucket)
522        .key(&manifest_file.key)
523        .send()
524        .map_ok(|r| r.body.into_async_read())
525        .map_ok(|l| BufReader::with_capacity(64 * bytesize::KB as usize, l))
526        .map_ok(|b| b.lines());
527
528    let zip_request = aws::S3ObjectSeekableRead::new(client, zip_file, None);
529
530    let (manifest_lines, zip_response) = futures::join!(manifest_request, zip_request);
531    let mut manifest_lines = manifest_lines?;
532    let zip_response = zip_response?;
533
534    let zip_reader = async_zip::read::seek::ZipFileReader::new(zip_response).await?;
535
536    for entry in zip_reader.entries() {
537        let manifest_entry = manifest_lines
538            .next_line()
539            .await?
540            .context("Manifest has too few entries")
541            .and_then(|l| serde_json::from_str::<ManifestEntry>(&l).map_err(anyhow::Error::from))?;
542        validate_manifest_entry(&manifest_entry, entry.filename(), entry.crc32())?
543    }
544    ensure!(
545        manifest_lines.next_line().await?.is_none(),
546        "Manifest has more entries that the zip."
547    );
548    Ok(())
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    #[test]
556    fn test_s3_tryfrom() {
557        let bucket = "test-bucket.to_owned()";
558        let key = "test-key";
559        let url: url::Url = format!("s3://{bucket}/{key}")
560            .parse()
561            .expect("Expected successful URL parsing");
562        let obj: S3Object = url.try_into().expect("Expected successful URL conversion");
563        assert_eq!(bucket, obj.bucket);
564        assert_eq!(key, obj.key);
565    }
566
567    #[test]
568    fn test_s3_tryfrom_no_path() {
569        let url: url::Url = "s3://test-bucket"
570            .parse()
571            .expect("Expected successful URL parsing");
572        let result: Result<S3Object> = url.try_into();
573        assert!(result.is_err())
574    }
575
576    #[test]
577    fn test_s3_tryfrom_file_url() {
578        let url: url::Url = "file://path/to/file"
579            .parse()
580            .expect("Expected successful URL parsing");
581        let result: Result<S3Object> = url.try_into();
582        assert!(result.is_err())
583    }
584}