use bytes::{Buf, BufMut, BytesMut};
use std::{
collections::VecDeque,
convert::TryFrom,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Instant,
};
use zip_core::{
Signature,
raw::{
LocalFileHeader, LocalFileHeaderFixed,
parse::{Parse, ParseExtend},
},
structs::CompressionMethod,
};
use crate::{
model::{Error, FileInfo},
proto::remote_file_info::RemoteFileInfo,
};
use super::{FileSystem, RemoteZip};
#[expect(type_alias_bounds)]
pub(super) type DownloadResult<R: RemoteZip, F: FileSystem> =
Result<(), Error<<R as RemoteZip>::Error, <F as FileSystem>::Error>>;
#[expect(type_alias_bounds)]
pub(super) type UnzipResult<R: RemoteZip, F: FileSystem> =
Result<Option<(u32, String)>, Error<<R as RemoteZip>::Error, <F as FileSystem>::Error>>;
pub(super) type Batch = VecDeque<RemoteFileInfo>;
pub(super) async fn download_batch<R, F, FN>(
mut batch: Batch,
remote: R,
f: FN,
bytes_downloaded: Arc<AtomicUsize>,
) -> DownloadResult<R, F>
where
R: RemoteZip,
F: FileSystem,
FN: Fn(BytesMut, RemoteFileInfo),
{
if batch.is_empty() {
return Ok(());
}
let (start_location, end_location) = {
let Some(first) = batch.front() else { return Ok(()) };
let Some(last) = batch.back() else { return Ok(()) };
(first.start_offset as usize, (last.end_offset_inclusive as usize))
};
let before = Instant::now();
let range = start_location..=end_location;
let stream = remote.fetch_bytes_stream(range.clone()).await.map_err(Error::Remote)?;
let elapsed = before.elapsed();
let batchsize = batch.len();
tracing::trace!(?elapsed, ?range, ?batchsize, "fetched batch metadata from zip");
use futures_lite::StreamExt;
let mut stream = Box::pin(stream);
let mut storage = BytesMut::with_capacity(end_location - start_location);
let mut next_rfile = batch.pop_front().unwrap();
#[derive(Debug)]
enum State {
None,
LocalHeaderFixed(LocalFileHeaderFixed),
LocalHeader(LocalFileHeader, usize),
Discard(usize),
}
let mut state = State::None;
while let Some(chunk) = stream.try_next().await.map_err(Error::Remote)? {
bytes_downloaded.fetch_add(chunk.len(), Ordering::SeqCst);
storage.put(chunk);
'moredata: loop {
match state {
State::Discard(to_be_discarded) => {
if storage.len() >= to_be_discarded {
let _ = storage.split_to(to_be_discarded);
state = State::None;
} else {
break 'moredata;
}
},
State::None => {
if let Ok(header) = LocalFileHeaderFixed::from_buf(&mut storage) {
if !header.is_valid_signature() {
return Err(Error::InvalidLocalHeaderSignature {
file_name: next_rfile.file_name,
});
};
state = State::LocalHeaderFixed(header);
} else {
break 'moredata;
}
},
State::LocalHeaderFixed(header) => {
let storage_before = storage.len();
match LocalFileHeader::from_buf_fixed(&mut storage, header) {
Ok(header) => {
state = State::LocalHeader(
header,
LocalFileHeaderFixed::SIZE_IN_BYTES + storage_before.saturating_sub(storage.len()),
);
},
Err((_, header)) => {
state = State::LocalHeaderFixed(header);
break 'moredata;
},
}
},
State::LocalHeader(header, bytes_read) => {
if storage.len() >= next_rfile.compressed_size as usize {
let data = storage.split_to(next_rfile.compressed_size as usize);
let bytes_read = bytes_read + next_rfile.compressed_size as usize;
let current_start_offset = next_rfile.start_offset as usize;
f(data, next_rfile);
if let Some(next) = batch.pop_front() {
next_rfile = next;
} else {
return Ok(());
}
let to_be_discarded = (next_rfile.start_offset as usize)
.checked_sub(current_start_offset + bytes_read)
.ok_or(Error::OverlappingBytesForDifferentFiles)?;
if to_be_discarded == 0 {
state = State::None;
} else {
state = State::Discard(to_be_discarded);
}
} else {
state = State::LocalHeader(header, bytes_read);
break 'moredata;
}
},
}
}
}
Err(Error::InsufficientDownloadRange {
file_name: next_rfile.file_name,
storage_size: storage.len() as u64,
expected_compressed_size: next_rfile.compressed_size as u64,
})
}
pub(super) async fn unzip_file<R, F>(
mut compressed: BytesMut,
rfile: RemoteFileInfo,
file_system: F,
bytes_unzipped: Arc<AtomicUsize>,
) -> UnzipResult<R, F>
where
R: RemoteZip,
F: FileSystem + Clone + Send + 'static,
{
let file_name = rfile.file_name.clone();
if compressed.len() != rfile.compressed_size as usize {
return Err(Error::WrongBytesLength {
file_name,
bytes_cnt: compressed.len() as u64,
expected_compressed_size: rfile.compressed_size as u64,
});
}
let file_system2 = file_system.clone();
let file_name = rfile.file_name.clone();
let info = FileInfo {
local_unix_path: file_name,
crc32: rfile.crc32,
};
let prepared = tokio::spawn(Box::pin(async move { file_system2.prepare_store_file(info).await }));
let file_data = match CompressionMethod::try_from(rfile.compression_method) {
Ok(CompressionMethod::Deflated) => {
#[cfg(feature = "deflate")]
{
use flate2::read::DeflateDecoder;
use std::io::Read;
let mut deflate_reader = DeflateDecoder::new(compressed.reader());
let mut decompressed = Vec::with_capacity(rfile.compressed_size as usize);
deflate_reader
.read_to_end(&mut decompressed)
.map_err(|_| Error::CompressionError)?;
bytes::Bytes::copy_from_slice(&decompressed)
}
#[cfg(not(feature = "deflate"))]
return Err(Error::UnsupportedCompressionMethod);
},
Ok(CompressionMethod::Stored) => compressed.copy_to_bytes(rfile.compressed_size as usize),
_ => {
return Err(Error::UnsupportedCompressionMethod);
},
};
if crc32fast::hash(&file_data) != rfile.crc32 {
return Err(Error::InvalidHash);
}
let prepared = prepared
.await
.map_err(|_| Error::JoinError)?
.map_err(Error::FileSystem)?;
let data_len = file_data.len();
file_system
.store_file(prepared, file_data)
.await
.map_err(Error::FileSystem)?;
bytes_unzipped.fetch_add(data_len, Ordering::SeqCst);
Ok(None)
}