use core::{
fmt::Debug,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use bytes::BytesMut;
use download::Batch;
use tokio::task::JoinHandle;
use zip_core::structs::CompressionMethod;
mod compare;
mod download;
mod handle_processor;
mod remote;
use super::{FileSystem, RemoteZip};
use crate::{
model::{Config, Error, LocalFileInfo, ProgressDetails, RemoteFileInfo},
proto::sync::{
download::{DownloadResult, UnzipResult, download_batch, unzip_file},
handle_processor::process_handles,
},
};
#[derive(Debug)]
pub enum Progress<RE: Debug, FE: Debug> {
DownloadExtracting {
download: ProgressDetails,
unzip: ProgressDetails,
},
Deleting(ProgressDetails),
Successful,
Errored(Error<RE, FE>),
}
#[derive(Debug)]
pub enum State<R: RemoteZip, F: FileSystem> {
ToBeEvaluated(R, F, Config),
Download(
R,
F,
Config,
VecDeque<Batch>,
Vec<LocalFileInfo>,
ProgressDetails,
ProgressDetails,
Vec<JoinHandle<DownloadResult<R, F>>>,
#[allow(clippy::type_complexity)] Arc<Mutex<Vec<JoinHandle<UnzipResult<R, F>>>>>,
Vec<UnzipResult<R, F>>,
Arc<AtomicUsize>,
Arc<AtomicUsize>,
),
DeleteFiles(F, Vec<LocalFileInfo>, ProgressDetails),
Finished,
}
impl<R, F> State<R, F>
where
R: RemoteZip + Clone + Send + 'static,
F: FileSystem + Clone + Send + 'static,
{
pub fn new(remote: R, file_system: F, config: Config) -> Self { Self::ToBeEvaluated(remote, file_system, config) }
pub async fn progress(self) -> Option<(Progress<R::Error, F::Error>, Self)> {
let res = match self {
State::ToBeEvaluated(r, f, config) => evaluate(r, f, config).await,
State::Download(r, f, config, nd, ndel, dowp, unzp, dowh, uh, ur, rx, tx) => {
download(r, f, config, nd, ndel, (dowp, unzp), (dowh, uh), ur, (rx, tx)).await
},
State::DeleteFiles(f, ndel, delp) => delete(f, ndel, delp).await,
State::Finished => Ok(None),
};
match res {
Ok(ok) => ok,
Err(e) => Some((Progress::Errored(e), State::Finished)),
}
}
}
async fn evaluate<R, F>(
remote: R,
file_system: F,
config: Config,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
R: RemoteZip,
F: FileSystem,
{
let (rfiles_r, file_infos_r) =
futures_lite::future::zip(remote::rfile_infos::<R, F>(&remote, &config), file_system.all_files()).await;
let (rfiles, file_infos) = (rfiles_r?, file_infos_r.map_err(Error::FileSystem)?);
tracing::debug!("local files to be checked: {}", file_infos.len());
let compared = compare::build_compared(rfiles, file_infos);
tracing::info!(
"Need to download {} bytes in {} batches and delete {} files",
compared.needs_download_bytes,
compared.needs_download_batches.len(),
compared.needs_deletion.len(),
);
if compared.needs_download_batches.iter().any(|batch| {
batch.iter().any(|rfi| {
!matches!(
CompressionMethod::try_from(rfi.compression_method),
Ok(CompressionMethod::Deflated) | Ok(CompressionMethod::Stored)
)
})
}) {
return Err(Error::UnsupportedCompressionMethod);
}
let progress_download = ProgressDetails::new(compared.needs_download_bytes);
let progress_unzip = ProgressDetails::new(compared.needs_persistent_bytes);
Ok(Some((
Progress::DownloadExtracting {
download: progress_download.clone(),
unzip: progress_unzip.clone(),
},
State::Download(
remote,
file_system,
config,
compared.needs_download_batches,
compared.needs_deletion,
progress_download,
progress_unzip,
Vec::new(),
Arc::new(Mutex::new(Vec::new())),
Vec::new(),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
),
)))
}
#[expect(clippy::type_complexity)]
#[expect(clippy::too_many_arguments)]
async fn download<R, F>(
remote: R,
file_system: F,
config: Config,
mut needs_download: VecDeque<Batch>,
need_deletion: Vec<LocalFileInfo>,
(mut progress_download, mut progress_unzip): (ProgressDetails, ProgressDetails),
(mut download_handles, mut unzip_handles): (
Vec<JoinHandle<DownloadResult<R, F>>>,
Arc<Mutex<Vec<JoinHandle<UnzipResult<R, F>>>>>,
),
mut unzip_results: Vec<UnzipResult<R, F>>,
(dc, zc): (Arc<AtomicUsize>, Arc<AtomicUsize>),
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
R: RemoteZip + Clone + Send + 'static,
F: FileSystem + Clone + Send + 'static,
{
if needs_download.is_empty() && download_handles.is_empty() {
if let Some(unzip_handles) = Arc::get_mut(&mut unzip_handles) {
let unzip_finished = unzip_handles.get_mut().unwrap().is_empty();
if unzip_finished {
tracing::debug!("checking zips for errors");
for result in unzip_results {
result?;
}
tracing::info!("Download complete. Unzip complete. Deleting entries now");
let progress = ProgressDetails::new(
need_deletion.len() as u64,
);
return Ok(Some((
Progress::Deleting(progress.clone()),
State::DeleteFiles(file_system, need_deletion, progress),
)));
}
} else {
tracing::debug!("Download complete. waiting for Unzip to complete.");
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
let (finished_download_handles, finished_unzip_handles) =
process_handles::<R, F>(&mut download_handles, &unzip_handles);
if !finished_download_handles.is_empty() {
tracing::trace!(cnt = finished_download_handles.len(), "downloads finished");
}
for finished_handle in finished_download_handles.into_iter() {
finished_handle.await.map_err(|_| Error::JoinError)??; }
if let Some(finished_unzip_handles) = finished_unzip_handles {
unzip_results.reserve(finished_unzip_handles.len());
for finished_handle in finished_unzip_handles.into_iter() {
unzip_results.push(finished_handle.await.map_err(|_| Error::JoinError)?);
}
}
let unzip_handles2 = unzip_handles.clone();
let file_system2 = file_system.clone();
let zc2 = zc.clone();
let spawn_unzip = move |bytes: BytesMut, rfile: RemoteFileInfo| {
let zc2 = zc2.clone();
let file_system2 = file_system2.clone();
let name = &rfile.file_name;
tracing::trace!(?name, "triggering unzip");
let new_task = tokio::spawn(unzip_file::<R, F>(bytes, rfile, file_system2, zc2));
let mut unzip_handles2 = unzip_handles2.lock().unwrap(); unzip_handles2.push(new_task);
};
while download_handles.len() < config.max_parallel_downloads {
if let Some(next_batch) = needs_download.pop_front() {
let files_in_batch = next_batch.len();
tracing::trace!(?files_in_batch, "triggering download of next batch");
let remote = remote.clone();
download_handles.push(tokio::spawn(download_batch::<R, F, _>(
next_batch,
remote,
spawn_unzip.clone(),
dc.clone(),
)));
} else {
break;
}
}
let download_count = dc.swap(0, Ordering::SeqCst);
let unzip_count = zc.swap(0, Ordering::SeqCst);
progress_download.add_chunk(download_count as u64); progress_unzip.add_chunk(unzip_count as u64);
if download_count > 0 || unzip_count > 0 {
let d_len = download_handles.len();
let u_len = unzip_handles.try_lock().map(|l| l.len()).unwrap_or_default();
tracing::trace!(?download_count, ?unzip_count, ?d_len, ?u_len, "status changed");
}
Ok(Some((
Progress::DownloadExtracting {
download: progress_download.clone(),
unzip: progress_unzip.clone(),
},
State::Download(
remote,
file_system,
config,
needs_download,
need_deletion,
progress_download,
progress_unzip,
download_handles,
unzip_handles,
unzip_results,
dc,
zc,
),
)))
}
async fn delete<R, F>(
file_system: F,
mut needs_deletion: Vec<LocalFileInfo>,
progress: ProgressDetails,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
R: RemoteZip,
F: FileSystem + Clone,
{
match needs_deletion.pop() {
Some(file) => {
file_system
.delete_file(&file.local_unix_path)
.await
.map_err(Error::FileSystem)?;
Ok(Some((
Progress::Deleting(progress.clone()),
State::DeleteFiles(file_system, needs_deletion, progress),
)))
},
None => Ok(Some((Progress::Successful, State::Finished))),
}
}