use core::{
fmt::Debug,
sync::atomic::{AtomicUsize, Ordering},
};
use std::{collections::VecDeque, sync::Arc};
use bytes::BytesMut;
use tokio::{sync::mpsc, task::JoinSet};
use super::{
super::{FileSystem, RemoteZip},
compare::{Batch, build_compared},
download_unzip_delete::{DownloadResult, download_batch},
};
use crate::{
model::{Config, Error, FileInfo, Progress, ProgressDetails},
proto::sync::{
compare::Compared,
download_unzip_delete::{UnzipResult, unzip_file},
remote::ProcessedRemoteFileInfo,
},
};
#[derive(Debug)]
struct ProcessData<R: RemoteZip, F: FileSystem> {
remote: R,
file_system: F,
config: Config,
needs_download: VecDeque<Batch>,
needs_deletion: VecDeque<FileInfo>,
download_set: JoinSet<DownloadResult<R, F>>,
unzip_set: JoinSet<UnzipResult<R, F>>,
deletion_set: JoinSet<Result<(), F::Error>>,
download_progress: ProgressDetails,
unzip_progress: ProgressDetails,
delete_progress: ProgressDetails,
downloaded_tx: mpsc::UnboundedSender<(BytesMut, ProcessedRemoteFileInfo)>,
downloaded_rx: mpsc::UnboundedReceiver<(BytesMut, ProcessedRemoteFileInfo)>,
download_buf: Arc<AtomicUsize>,
unzip_buf: Arc<AtomicUsize>,
delete_buf: Arc<AtomicUsize>,
}
#[expect(private_interfaces, clippy::large_enum_variant)]
#[derive(Debug)]
pub(super) enum State<R: RemoteZip, F: FileSystem> {
ToBeEvaluated(R, F, Config),
Processing(ProcessData<R, F>),
Finished,
}
impl<R, F> State<R, F>
where
R: RemoteZip + Clone + Send + 'static,
F: FileSystem + Clone + Send + 'static,
{
pub(super) async fn progress(self) -> Option<(Progress<R::Error, F::Error>, Self)> {
let res = match self {
State::ToBeEvaluated(r, f, config) => evaluate(r, f, config).await,
State::Processing(data) => process(data).await,
State::Finished => Ok(None),
};
match res {
Ok(ok) => ok,
Err(e) => Some((Progress::Errored(e), State::Finished)),
}
}
}
async fn evaluate<R, F>(
remote: R,
mut file_system: F,
config: Config,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
R: RemoteZip + Clone + Send + 'static,
F: FileSystem + Clone + Send + 'static,
{
let (rfiles_r, file_infos_r) = futures_lite::future::zip(
super::remote::rfile_infos::<R, F>(&remote, &config),
file_system.all_files(),
)
.await;
let (rfiles, file_infos) = (rfiles_r?, file_infos_r.map_err(Error::FileSystem)?);
tracing::debug!("local files to be checked: {}", file_infos.len());
let compared = build_compared(rfiles, file_infos, &config);
tracing::info!(
"Need to download {} bytes in {} batches and delete {} files",
compared.needs_download_bytes,
compared.needs_download_batches.len(),
compared.needs_deletion.len(),
);
if compared.needs_download_batches.is_empty() && compared.needs_deletion.is_empty() {
return Ok(Some((Progress::Successful, State::Finished)));
}
let data = ProcessData::new(remote, file_system, config, compared);
Ok(Some((
Progress::Incomplete {
download: data.download_progress.clone(),
unzip: data.unzip_progress.clone(),
delete: data.delete_progress.clone(),
},
State::Processing(data),
)))
}
#[expect(private_interfaces)]
pub(super) async fn process<R, F>(
data: ProcessData<R, F>,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
R: RemoteZip + Clone + Send + 'static,
F: FileSystem + Clone + Send + 'static,
{
let ProcessData {
remote,
file_system,
config,
mut needs_download,
mut needs_deletion,
mut download_set,
mut unzip_set,
mut deletion_set,
mut download_progress,
mut unzip_progress,
mut delete_progress,
downloaded_tx,
mut downloaded_rx,
download_buf,
unzip_buf,
delete_buf,
} = data;
let parallel_downloads =
(config.max_parallel_downloads.saturating_sub(download_set.len())).min(needs_download.len());
if parallel_downloads > 0 {
for batch in needs_download.drain(0..parallel_downloads) {
let files_in_batch = batch.len();
tracing::trace!(?files_in_batch, "triggering download of next batch");
let future = download_batch::<R, F>(batch, remote.clone(), download_buf.clone(), downloaded_tx.clone());
match &config.runtime_handle {
Some(rt) => download_set.spawn_on(future, rt),
None => download_set.spawn(future),
};
}
}
while config
.max_parallel_filesystem
.saturating_sub(deletion_set.len())
.saturating_sub(unzip_set.len())
> 0
{
if let Ok((bytes, rfile)) = downloaded_rx.try_recv() {
tracing::trace!(name = &rfile.file_name, "triggering unzip");
let future = unzip_file::<R, F>(
bytes,
rfile,
file_system.clone(),
config.runtime_handle.clone(),
unzip_buf.clone(),
);
match &config.runtime_handle {
Some(rt) => unzip_set.spawn_on(future, rt),
None => unzip_set.spawn(future),
};
} else {
break;
}
}
let parallel_deletions = (config
.max_parallel_filesystem
.saturating_sub(deletion_set.len())
.saturating_sub(unzip_set.len()))
.min(needs_deletion.len());
if parallel_deletions > 0 {
let to_be_deleted: Vec<_> = needs_deletion.drain(0..parallel_deletions).collect();
tracing::trace!(?to_be_deleted, "triggering deletion of some files");
for file in to_be_deleted.into_iter() {
let file_system = file_system.clone();
let future = async move { file_system.delete_file(file).await };
match &config.runtime_handle {
Some(rt) => deletion_set.spawn_on(future, rt),
None => deletion_set.spawn(future),
};
}
}
if download_set.is_empty() && unzip_set.is_empty() && deletion_set.is_empty() {
tracing::debug!("Update complete.");
return Ok(Some((Progress::Successful, State::Finished)));
}
while let Some(result) = download_set.try_join_next() {
result.map_err(|_| Error::JoinError)??;
}
while let Some(result) = unzip_set.try_join_next() {
result.map_err(|_| Error::JoinError)?.map(|_| ())?;
}
while let Some(result) = deletion_set.try_join_next() {
result.map_err(|_| Error::JoinError)?.map_err(Error::FileSystem)?;
}
let (new_downloaded_bytes, new_unzipped_bytes, new_deleted_files) = (
download_buf.swap(0, Ordering::SeqCst) as u64,
unzip_buf.swap(0, Ordering::SeqCst) as u64,
delete_buf.swap(0, Ordering::SeqCst) as u64,
);
download_progress.add_chunk(new_downloaded_bytes);
unzip_progress.add_chunk(new_unzipped_bytes);
delete_progress.add_chunk(new_deleted_files);
Ok(Some((
Progress::Incomplete {
download: download_progress.clone(),
unzip: unzip_progress.clone(),
delete: delete_progress.clone(),
},
State::Processing(ProcessData {
remote,
file_system,
config,
needs_download,
needs_deletion,
download_set,
unzip_set,
deletion_set,
download_progress,
unzip_progress,
delete_progress,
downloaded_tx,
downloaded_rx,
download_buf,
unzip_buf,
delete_buf,
}),
)))
}
impl<R: RemoteZip, F: FileSystem> ProcessData<R, F> {
fn new(remote: R, file_system: F, config: Config, compared: Compared) -> Self {
let (download_progress, unzip_progress, delete_progress) = (
ProgressDetails::new(compared.needs_download_bytes),
ProgressDetails::new(compared.needs_persistent_bytes),
ProgressDetails::new(compared.needs_deletion.len() as u64),
);
let needs_download = compared.needs_download_batches;
let needs_deletion = compared.needs_deletion;
let (downloaded_tx, downloaded_rx) = mpsc::unbounded_channel();
let (download_buf, unzip_buf, delete_buf) = (
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
);
let download_set = JoinSet::new();
let unzip_set = JoinSet::new();
let deletion_set = JoinSet::new();
ProcessData {
remote,
file_system,
config,
needs_download,
needs_deletion,
download_set,
unzip_set,
deletion_set,
download_progress,
unzip_progress,
delete_progress,
downloaded_tx,
downloaded_rx,
download_buf,
unzip_buf,
delete_buf,
}
}
}