remozipsy 0.2.0

Remote Zip Sync - sync remote zip to local fs
Documentation
use core::{
    fmt::Debug,
    sync::atomic::{AtomicUsize, Ordering},
};
use std::{collections::VecDeque, sync::Arc};

use bytes::BytesMut;
use tokio::{sync::mpsc, task::JoinSet};

use super::{
    super::{FileSystem, RemoteZip},
    compare::{Batch, build_compared},
    download_unzip_delete::{DownloadResult, download_batch},
};
use crate::{
    model::{Config, Error, FileInfo, Progress, ProgressDetails},
    proto::sync::{
        compare::Compared,
        download_unzip_delete::{UnzipResult, unzip_file},
        remote::ProcessedRemoteFileInfo,
    },
};

#[derive(Debug)]
struct ProcessData<R: RemoteZip, F: FileSystem> {
    remote: R,
    file_system: F,
    config: Config,
    needs_download: VecDeque<Batch>,
    needs_deletion: VecDeque<FileInfo>,
    download_set: JoinSet<DownloadResult<R, F>>,
    unzip_set: JoinSet<UnzipResult<R, F>>,
    deletion_set: JoinSet<Result<(), F::Error>>,
    download_progress: ProgressDetails,
    unzip_progress: ProgressDetails,
    delete_progress: ProgressDetails,
    downloaded_tx: mpsc::UnboundedSender<(BytesMut, ProcessedRemoteFileInfo)>,
    downloaded_rx: mpsc::UnboundedReceiver<(BytesMut, ProcessedRemoteFileInfo)>,
    download_buf: Arc<AtomicUsize>,
    unzip_buf: Arc<AtomicUsize>,
    delete_buf: Arc<AtomicUsize>,
}

#[expect(private_interfaces, clippy::large_enum_variant)]
#[derive(Debug)]
pub(super) enum State<R: RemoteZip, F: FileSystem> {
    ToBeEvaluated(R, F, Config),
    Processing(ProcessData<R, F>),
    Finished,
}

impl<R, F> State<R, F>
where
    R: RemoteZip + Clone + Send + 'static,
    F: FileSystem + Clone + Send + 'static,
{
    pub(super) async fn progress(self) -> Option<(Progress<R::Error, F::Error>, Self)> {
        let res = match self {
            State::ToBeEvaluated(r, f, config) => evaluate(r, f, config).await,
            State::Processing(data) => process(data).await,
            State::Finished => Ok(None),
        };
        match res {
            Ok(ok) => ok,
            Err(e) => Some((Progress::Errored(e), State::Finished)),
        }
    }
}

/// Checks if an update is necessary.
async fn evaluate<R, F>(
    remote: R,
    mut file_system: F,
    config: Config,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
    R: RemoteZip + Clone + Send + 'static,
    F: FileSystem + Clone + Send + 'static,
{
    // evaluate local and remote simultaneously
    let (rfiles_r, file_infos_r) = futures_lite::future::zip(
        super::remote::rfile_infos::<R, F>(&remote, &config),
        file_system.all_files(),
    )
    .await;

    let (rfiles, file_infos) = (rfiles_r?, file_infos_r.map_err(Error::FileSystem)?);

    tracing::debug!("local files to be checked: {}", file_infos.len());

    let compared = build_compared(rfiles, file_infos, &config);

    tracing::info!(
        "Need to download {} bytes in {} batches and delete {} files",
        compared.needs_download_bytes,
        compared.needs_download_batches.len(),
        compared.needs_deletion.len(),
    );

    // quick exit if there's nothing to do
    if compared.needs_download_batches.is_empty() && compared.needs_deletion.is_empty() {
        return Ok(Some((Progress::Successful, State::Finished)));
    }

    let data = ProcessData::new(remote, file_system, config, compared);

    Ok(Some((
        Progress::Incomplete {
            download: data.download_progress.clone(),
            unzip:    data.unzip_progress.clone(),
            delete:   data.delete_progress.clone(),
        },
        State::Processing(data),
    )))
}

/// Coordinates the update: download of new chunks, unzipping files and writing
/// them to disk, and deleting unneeded files.
#[expect(private_interfaces)]
pub(super) async fn process<R, F>(
    data: ProcessData<R, F>,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
    R: RemoteZip + Clone + Send + 'static,
    F: FileSystem + Clone + Send + 'static,
{
    let ProcessData {
        remote,
        file_system,
        config,
        mut needs_download,
        mut needs_deletion,
        mut download_set,
        mut unzip_set,
        mut deletion_set,
        mut download_progress,
        mut unzip_progress,
        mut delete_progress,
        downloaded_tx,
        mut downloaded_rx,
        download_buf,
        unzip_buf,
        delete_buf,
    } = data;

    // triggering new downloads/unzips/deletions if capacity is there
    let parallel_downloads =
        (config.max_parallel_downloads.saturating_sub(download_set.len())).min(needs_download.len());
    if parallel_downloads > 0 {
        for batch in needs_download.drain(0..parallel_downloads) {
            let files_in_batch = batch.len();
            tracing::trace!(?files_in_batch, "triggering download of next batch");
            let future = download_batch::<R, F>(batch, remote.clone(), download_buf.clone(), downloaded_tx.clone());
            match &config.runtime_handle {
                Some(rt) => download_set.spawn_on(future, rt),
                None => download_set.spawn(future),
            };
        }
    }

    while config
        .max_parallel_filesystem
        .saturating_sub(deletion_set.len())
        .saturating_sub(unzip_set.len())
        > 0
    {
        if let Ok((bytes, rfile)) = downloaded_rx.try_recv() {
            tracing::trace!(name = &rfile.file_name, "triggering unzip");
            let future = unzip_file::<R, F>(
                bytes,
                rfile,
                file_system.clone(),
                config.runtime_handle.clone(),
                unzip_buf.clone(),
            );
            match &config.runtime_handle {
                Some(rt) => unzip_set.spawn_on(future, rt),
                None => unzip_set.spawn(future),
            };
        } else {
            break;
        }
    }

    let parallel_deletions = (config
        .max_parallel_filesystem
        .saturating_sub(deletion_set.len())
        .saturating_sub(unzip_set.len()))
    .min(needs_deletion.len());
    if parallel_deletions > 0 {
        let to_be_deleted: Vec<_> = needs_deletion.drain(0..parallel_deletions).collect();
        tracing::trace!(?to_be_deleted, "triggering deletion of some files");
        for file in to_be_deleted.into_iter() {
            let file_system = file_system.clone();
            let future = async move { file_system.delete_file(file).await };
            match &config.runtime_handle {
                Some(rt) => deletion_set.spawn_on(future, rt),
                None => deletion_set.spawn(future),
            };
        }
    }

    // check if we are done
    if download_set.is_empty() && unzip_set.is_empty() && deletion_set.is_empty() {
        tracing::debug!("Update complete.");
        return Ok(Some((Progress::Successful, State::Finished)));
    }

    // break on first error process finished futures
    // NOTE: we might wanna unzip to a tmp location in the future and then in a
    // second stage only moves the unzips and delete the files later on, so that we
    // are cancel-safe during this phase
    while let Some(result) = download_set.try_join_next() {
        result.map_err(|_| Error::JoinError)??;
    }
    while let Some(result) = unzip_set.try_join_next() {
        result.map_err(|_| Error::JoinError)?.map(|_| ())?;
    }
    while let Some(result) = deletion_set.try_join_next() {
        result.map_err(|_| Error::JoinError)?.map_err(Error::FileSystem)?;
    }

    // we used AtomicUsize, should be available on most platforms
    let (new_downloaded_bytes, new_unzipped_bytes, new_deleted_files) = (
        download_buf.swap(0, Ordering::SeqCst) as u64,
        unzip_buf.swap(0, Ordering::SeqCst) as u64,
        delete_buf.swap(0, Ordering::SeqCst) as u64,
    );
    download_progress.add_chunk(new_downloaded_bytes);
    unzip_progress.add_chunk(new_unzipped_bytes);
    delete_progress.add_chunk(new_deleted_files);

    Ok(Some((
        Progress::Incomplete {
            download: download_progress.clone(),
            unzip:    unzip_progress.clone(),
            delete:   delete_progress.clone(),
        },
        State::Processing(ProcessData {
            remote,
            file_system,
            config,
            needs_download,
            needs_deletion,
            download_set,
            unzip_set,
            deletion_set,
            download_progress,
            unzip_progress,
            delete_progress,
            downloaded_tx,
            downloaded_rx,
            download_buf,
            unzip_buf,
            delete_buf,
        }),
    )))
}

impl<R: RemoteZip, F: FileSystem> ProcessData<R, F> {
    fn new(remote: R, file_system: F, config: Config, compared: Compared) -> Self {
        let (download_progress, unzip_progress, delete_progress) = (
            ProgressDetails::new(compared.needs_download_bytes),
            ProgressDetails::new(compared.needs_persistent_bytes),
            ProgressDetails::new(compared.needs_deletion.len() as u64),
        );
        let needs_download = compared.needs_download_batches;
        let needs_deletion = compared.needs_deletion;

        let (downloaded_tx, downloaded_rx) = mpsc::unbounded_channel();

        let (download_buf, unzip_buf, delete_buf) = (
            Arc::new(AtomicUsize::new(0)),
            Arc::new(AtomicUsize::new(0)),
            Arc::new(AtomicUsize::new(0)),
        );
        let download_set = JoinSet::new();
        let unzip_set = JoinSet::new();
        let deletion_set = JoinSet::new();

        ProcessData {
            remote,
            file_system,
            config,
            needs_download,
            needs_deletion,
            download_set,
            unzip_set,
            deletion_set,
            download_progress,
            unzip_progress,
            delete_progress,
            downloaded_tx,
            downloaded_rx,
            download_buf,
            unzip_buf,
            delete_buf,
        }
    }
}