remozipsy 0.0.1

zip implementation independent structs and helpers
Documentation
use bytes::{Buf, BufMut, BytesMut};
//use flate2::read::DeflateDecoder;
use std::{
    collections::VecDeque,
    convert::TryFrom,
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Instant,
};
//use tokio::io::AsyncWriteExt;
use zip_core::{
    Signature,
    raw::{
        LocalFileHeader, LocalFileHeaderFixed,
        parse::{Parse, ParseExtend},
    },
    structs::CompressionMethod,
};

use crate::model::{Error, RemoteFileInfo};

use super::{FileSystem, RemoteZip};

#[expect(type_alias_bounds)]
pub(super) type DownloadResult<R: RemoteZip, F: FileSystem> =
    Result<(), Error<<R as RemoteZip>::Error, <F as FileSystem>::Error>>;
#[expect(type_alias_bounds)]
pub(super) type UnzipResult<R: RemoteZip, F: FileSystem> =
    Result<Option<(u32, String)>, Error<<R as RemoteZip>::Error, <F as FileSystem>::Error>>;
pub(super) type Batch = VecDeque<RemoteFileInfo>;

/// A bach consist of a list of files, seperated by junk data, e.g.
/// [file][junk][file][junk][file][junk]
/// We MUST NOT have junk data at the beginning of a Batch
pub(super) async fn download_batch<R, F, FN>(
    mut batch: Batch,
    remote: R,
    f: FN,
    bytes_downloaded: Arc<AtomicUsize>,
) -> DownloadResult<R, F>
where
    R: RemoteZip,
    F: FileSystem,
    FN: Fn(BytesMut, RemoteFileInfo),
{
    if batch.is_empty() {
        return Ok(());
    }

    let (start_location, end_location) = {
        let Some(first) = batch.front() else { return Ok(()) };
        let Some(last) = batch.back() else { return Ok(()) };
        (first.start_offset as usize, (last.end_offset_inclusive as usize))
    };

    let before = Instant::now();
    let range = start_location..=end_location;
    let stream = remote.fetch_bytes_stream(range.clone()).await.map_err(Error::Remote)?;
    let elapsed = before.elapsed();
    let batchsize = batch.len();
    tracing::trace!(?elapsed, ?range, ?batchsize, "fetched batch metadata from zip");

    use futures_lite::StreamExt;
    let mut stream = Box::pin(stream);

    let mut storage = BytesMut::with_capacity(end_location - start_location);
    let mut next_rfile = batch.pop_front().unwrap();

    // We use a State machine here, because sometimes we have enough Bytes to parse
    // the LocalFileHeaderFixed, but dont know it it's enough for the whole File
    #[derive(Debug)]
    enum State {
        None,
        LocalHeaderFixed(LocalFileHeaderFixed),
        LocalHeader(LocalFileHeader, usize),
        Discard(usize),
    }

    let mut state = State::None;

    while let Some(chunk) = stream.try_next().await.map_err(Error::Remote)? {
        bytes_downloaded.fetch_add(chunk.len(), Ordering::SeqCst);
        storage.put(chunk);

        'moredata: loop {
            match state {
                State::Discard(to_be_discarded) => {
                    if storage.len() >= to_be_discarded {
                        let _ = storage.split_to(to_be_discarded);
                        state = State::None;
                    } else {
                        break 'moredata;
                    }
                },
                State::None => {
                    if let Ok(header) = LocalFileHeaderFixed::from_buf(&mut storage) {
                        if !header.is_valid_signature() {
                            return Err(Error::InvalidLocalHeaderSignature {
                                file_name: next_rfile.file_name,
                            });
                        };
                        //tracing::trace!(?header, ?valid, "local file header fixed found");
                        state = State::LocalHeaderFixed(header);
                    } else {
                        break 'moredata;
                    }
                },
                State::LocalHeaderFixed(header) => {
                    let storage_before = storage.len();
                    match LocalFileHeader::from_buf_fixed(&mut storage, header) {
                        Ok(header) => {
                            //tracing::info!(?header, "local file header found");
                            state = State::LocalHeader(
                                header,
                                LocalFileHeaderFixed::SIZE_IN_BYTES + storage_before.saturating_sub(storage.len()),
                            );
                        },
                        Err((_, header)) => {
                            state = State::LocalHeaderFixed(header);
                            break 'moredata;
                        },
                    }
                },
                State::LocalHeader(header, bytes_read) => {
                    // now that we finally found the header, verify size
                    if storage.len() >= next_rfile.compressed_size as usize {
                        let data = storage.split_to(next_rfile.compressed_size as usize);
                        let bytes_read = bytes_read + next_rfile.compressed_size as usize;
                        let current_start_offset = next_rfile.start_offset as usize;

                        f(data, next_rfile);

                        if let Some(next) = batch.pop_front() {
                            next_rfile = next;
                        } else {
                            return Ok(());
                        }

                        // mark junk for cleanup (we cannot drop it here, because we dont know if the
                        // FULL junk is fetched already)
                        let to_be_discarded = (next_rfile.start_offset as usize)
                            .checked_sub(current_start_offset + bytes_read)
                            .ok_or(Error::OverlappingBytesForDifferentFiles)?;
                        if to_be_discarded == 0 {
                            state = State::None;
                        } else {
                            state = State::Discard(to_be_discarded);
                        }
                    } else {
                        state = State::LocalHeader(header, bytes_read);
                        break 'moredata;
                    }
                },
            }
        }
    }

    // when we reach this, the `next_rfile` hasn't been processed. so we got a bug
    Err(Error::InsufficientDownloadRange {
        file_name: next_rfile.file_name,
        storage_size: storage.len() as u64,
        expected_compressed_size: next_rfile.compressed_size as u64,
    })
}

pub(super) async fn unzip_file<R, F>(
    mut compressed: BytesMut,
    rfile: RemoteFileInfo,
    file_system: F,
    bytes_unzipped: Arc<AtomicUsize>,
) -> UnzipResult<R, F>
where
    R: RemoteZip,
    F: FileSystem + Clone + Send + 'static,
{
    let file_name = rfile.file_name.clone();
    if compressed.len() != rfile.compressed_size as usize {
        return Err(Error::WrongBytesLength {
            file_name,
            bytes_cnt: compressed.len() as u64,
            expected_compressed_size: rfile.compressed_size as u64,
        });
    }

    let file_system2 = file_system.clone();
    let file_name = rfile.file_name.clone();
    let prepared = tokio::spawn(Box::pin(
        async move { file_system2.prepare_store_file(&file_name).await },
    ));

    let file_data = match CompressionMethod::try_from(rfile.compression_method) {
        Ok(CompressionMethod::Deflated) => {
            #[cfg(feature = "deflate")]
            {
                use flate2::read::DeflateDecoder;
                use std::io::Read;

                let mut deflate_reader = DeflateDecoder::new(compressed.reader());
                let mut decompressed = Vec::with_capacity(rfile.compressed_size as usize);
                deflate_reader
                    .read_to_end(&mut decompressed)
                    .map_err(|_| Error::CompressionError)?;
                bytes::Bytes::copy_from_slice(&decompressed)
            }
            #[cfg(not(feature = "deflate"))]
            return Err(Error::UnsupportedCompressionMethod);
        },
        Ok(CompressionMethod::Stored) => compressed.copy_to_bytes(rfile.compressed_size as usize),
        // should not happen at this point
        _ => {
            return Err(Error::UnsupportedCompressionMethod);
        },
    };

    if crc32fast::hash(&file_data) != rfile.crc32 {
        return Err(Error::InvalidHash);
    }

    let prepared = prepared
        .await
        .map_err(|_| Error::JoinError)?
        .map_err(Error::FileSystem)?;

    let data_len = file_data.len();
    file_system
        .store_file(prepared, file_data)
        .await
        .map_err(Error::FileSystem)?;

    bytes_unzipped.fetch_add(data_len, Ordering::SeqCst);

    Ok(None)
}