async_zip2/base/write/
entry_stream.rs

1// Copyright (c) 2021 Harry [Majored] [hello@majored.pw]
2// MIT License (https://github.com/Majored/rs-async-zip/blob/main/LICENSE)
3
4use crate::base::write::compressed_writer::CompressedAsyncWriter;
5use crate::base::write::io::offset::AsyncOffsetWriter;
6use crate::base::write::CentralDirectoryEntry;
7use crate::base::write::ZipFileWriter;
8use crate::entry::ZipEntry;
9use crate::error::{Result, Zip64ErrorCase, ZipError};
10use crate::spec::extra_field::ExtraFieldAsBytes;
11use crate::spec::header::{
12    CentralDirectoryRecord, ExtraField, GeneralPurposeFlag, HeaderId, LocalFileHeader,
13    Zip64ExtendedInformationExtraField,
14};
15use crate::string::StringEncoding;
16
17use std::io::Error;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use crate::base::read::get_zip64_extra_field_mut;
22use crate::spec::consts::{NON_ZIP64_MAX_NUM_FILES, NON_ZIP64_MAX_SIZE};
23use crc32fast::Hasher;
24use futures_util::io::{AsyncWrite, AsyncWriteExt};
25
26/// An entry writer which supports the streaming of data (ie. the writing of unknown size or data at runtime).
27///
28/// # Note
29/// - This writer cannot be manually constructed; instead, use [`ZipFileWriter::write_entry_stream()`].
30/// - [`EntryStreamWriter::close()`] must be called before a stream writer goes out of scope.
31/// - Utilities for working with [`AsyncWrite`] values are provided by [`AsyncWriteExt`].
32pub struct EntryStreamWriter<'b, W: AsyncWrite + Unpin> {
33    writer: AsyncOffsetWriter<CompressedAsyncWriter<'b, W>>,
34    cd_entries: &'b mut Vec<CentralDirectoryEntry>,
35    entry: ZipEntry,
36    hasher: Hasher,
37    lfh: LocalFileHeader,
38    lfh_offset: usize,
39    data_offset: usize,
40    force_no_zip64: bool,
41    /// To write back to the original writer if zip64 is required.
42    is_zip64: &'b mut bool,
43}
44
45impl<'b, W: AsyncWrite + Unpin> EntryStreamWriter<'b, W> {
46    pub(crate) async fn from_raw(
47        writer: &'b mut ZipFileWriter<W>,
48        mut entry: ZipEntry,
49    ) -> Result<EntryStreamWriter<'b, W>> {
50        let lfh_offset = writer.writer.offset();
51        let lfh = EntryStreamWriter::write_lfh(writer, &mut entry).await?;
52        let data_offset = writer.writer.offset();
53        let force_no_zip64 = writer.force_no_zip64;
54
55        let cd_entries = &mut writer.cd_entries;
56        let is_zip64 = &mut writer.is_zip64;
57        let writer = AsyncOffsetWriter::new(CompressedAsyncWriter::from_raw(&mut writer.writer, entry.compression()));
58
59        Ok(EntryStreamWriter {
60            writer,
61            cd_entries,
62            entry,
63            lfh,
64            lfh_offset,
65            data_offset,
66            hasher: Hasher::new(),
67            force_no_zip64,
68            is_zip64,
69        })
70    }
71
72    async fn write_lfh(writer: &'b mut ZipFileWriter<W>, entry: &mut ZipEntry) -> Result<LocalFileHeader> {
73        // Always emit a zip64 extended field, even if we don't need it, because we *might* need it.
74        // If we are forcing no zip, we will have to error later if the file is too large.
75        let (lfh_compressed, lfh_uncompressed) = if !writer.force_no_zip64 {
76            if !writer.is_zip64 {
77                writer.is_zip64 = true;
78            }
79            entry.extra_fields.push(ExtraField::Zip64ExtendedInformationExtraField(
80                Zip64ExtendedInformationExtraField {
81                    header_id: HeaderId::Zip64ExtendedInformationExtraField,
82                    data_size: 16,
83                    uncompressed_size: Some(entry.uncompressed_size),
84                    compressed_size: Some(entry.compressed_size),
85                    relative_header_offset: None,
86                    disk_start_number: None,
87                },
88            ));
89
90            (NON_ZIP64_MAX_SIZE, NON_ZIP64_MAX_SIZE)
91        } else {
92            if entry.compressed_size > NON_ZIP64_MAX_SIZE as u64 || entry.uncompressed_size > NON_ZIP64_MAX_SIZE as u64
93            {
94                return Err(ZipError::Zip64Needed(Zip64ErrorCase::LargeFile));
95            }
96
97            (entry.compressed_size as u32, entry.uncompressed_size as u32)
98        };
99
100        let lfh = LocalFileHeader {
101            compressed_size: lfh_compressed,
102            uncompressed_size: lfh_uncompressed,
103            compression: entry.compression().into(),
104            crc: entry.crc32,
105            extra_field_length: entry
106                .extra_fields()
107                .count_bytes()
108                .try_into()
109                .map_err(|_| ZipError::ExtraFieldTooLarge)?,
110            file_name_length: entry.filename().as_bytes().len().try_into().map_err(|_| ZipError::FileNameTooLarge)?,
111            mod_time: entry.last_modification_date().time,
112            mod_date: entry.last_modification_date().date,
113            version: crate::spec::version::as_needed_to_extract(entry),
114            flags: GeneralPurposeFlag {
115                data_descriptor: true,
116                encrypted: false,
117                filename_unicode: matches!(entry.filename().encoding(), StringEncoding::Utf8)
118                    && matches!(entry.comment().encoding(), StringEncoding::Utf8),
119            },
120        };
121
122        writer.writer.write_all(&crate::spec::consts::LFH_SIGNATURE.to_le_bytes()).await?;
123        writer.writer.write_all(&lfh.as_slice()).await?;
124        writer.writer.write_all(entry.filename().as_bytes()).await?;
125        writer.writer.write_all(&entry.extra_fields().as_bytes()).await?;
126
127        Ok(lfh)
128    }
129
130    /// Consumes this entry writer and completes all closing tasks.
131    ///
132    /// This includes:
133    /// - Finalising the CRC32 hash value for the written data.
134    /// - Calculating the compressed and uncompressed byte sizes.
135    /// - Constructing a central directory header.
136    /// - Pushing that central directory header to the [`ZipFileWriter`]'s store.
137    ///
138    /// Failure to call this function before going out of scope would result in a corrupted ZIP file.
139    pub async fn close(mut self) -> Result<()> {
140        self.writer.close().await?;
141
142        let crc = self.hasher.finalize();
143        let uncompressed_size = self.writer.offset() as u64;
144        let inner_writer = self.writer.into_inner().into_inner();
145        let compressed_size = (inner_writer.offset() - self.data_offset) as u64;
146
147        let (cdr_compressed_size, cdr_uncompressed_size) = if self.force_no_zip64 {
148            if uncompressed_size > NON_ZIP64_MAX_SIZE as u64 || compressed_size > NON_ZIP64_MAX_SIZE as u64 {
149                return Err(ZipError::Zip64Needed(Zip64ErrorCase::LargeFile));
150            }
151            (uncompressed_size as u32, compressed_size as u32)
152        } else {
153            // When streaming an entry, we are always using a zip64 field.
154            match get_zip64_extra_field_mut(&mut self.entry.extra_fields) {
155                // This case shouldn't be necessary but is included for completeness.
156                None => {
157                    self.entry.extra_fields.push(ExtraField::Zip64ExtendedInformationExtraField(
158                        Zip64ExtendedInformationExtraField {
159                            header_id: HeaderId::Zip64ExtendedInformationExtraField,
160                            data_size: 16,
161                            uncompressed_size: Some(uncompressed_size),
162                            compressed_size: Some(compressed_size),
163                            relative_header_offset: None,
164                            disk_start_number: None,
165                        },
166                    ));
167                    self.lfh.extra_field_length =
168                        self.entry.extra_fields().count_bytes().try_into().map_err(|_| ZipError::ExtraFieldTooLarge)?;
169                }
170                Some(zip64) => {
171                    zip64.uncompressed_size = Some(uncompressed_size);
172                    zip64.compressed_size = Some(compressed_size);
173                }
174            }
175
176            (NON_ZIP64_MAX_SIZE, NON_ZIP64_MAX_SIZE)
177        };
178
179        inner_writer.write_all(&crate::spec::consts::DATA_DESCRIPTOR_SIGNATURE.to_le_bytes()).await?;
180        inner_writer.write_all(&crc.to_le_bytes()).await?;
181        inner_writer.write_all(&cdr_compressed_size.to_le_bytes()).await?;
182        inner_writer.write_all(&cdr_uncompressed_size.to_le_bytes()).await?;
183
184        let cdh = CentralDirectoryRecord {
185            compressed_size: cdr_compressed_size,
186            uncompressed_size: cdr_uncompressed_size,
187            crc,
188            v_made_by: crate::spec::version::as_made_by(),
189            v_needed: self.lfh.version,
190            compression: self.lfh.compression,
191            extra_field_length: self.lfh.extra_field_length,
192            file_name_length: self.lfh.file_name_length,
193            file_comment_length: self
194                .entry
195                .comment()
196                .as_bytes()
197                .len()
198                .try_into()
199                .map_err(|_| ZipError::CommentTooLarge)?,
200            mod_time: self.lfh.mod_time,
201            mod_date: self.lfh.mod_date,
202            flags: self.lfh.flags,
203            disk_start: 0,
204            inter_attr: self.entry.internal_file_attribute(),
205            exter_attr: self.entry.external_file_attribute(),
206            lh_offset: self.lfh_offset as u32,
207        };
208
209        self.cd_entries.push(CentralDirectoryEntry { header: cdh, entry: self.entry });
210        // Ensure that we can fit this many files in this archive if forcing no zip64
211        if self.cd_entries.len() > NON_ZIP64_MAX_NUM_FILES as usize {
212            if self.force_no_zip64 {
213                return Err(ZipError::Zip64Needed(Zip64ErrorCase::TooManyFiles));
214            }
215            if !*self.is_zip64 {
216                *self.is_zip64 = true;
217            }
218        }
219
220        Ok(())
221    }
222}
223
224impl<'a, W: AsyncWrite + Unpin> AsyncWrite for EntryStreamWriter<'a, W> {
225    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<std::result::Result<usize, Error>> {
226        let poll = Pin::new(&mut self.writer).poll_write(cx, buf);
227
228        if let Poll::Ready(Ok(written)) = poll {
229            self.hasher.update(&buf[0..written]);
230        }
231
232        poll
233    }
234
235    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::result::Result<(), Error>> {
236        Pin::new(&mut self.writer).poll_flush(cx)
237    }
238
239    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<std::result::Result<(), Error>> {
240        Pin::new(&mut self.writer).poll_close(cx)
241    }
242}