remozipsy 0.0.1

zip implementation independent structs and helpers
Documentation
use core::{
    fmt::Debug,
    sync::atomic::{AtomicUsize, Ordering},
    time::Duration,
};
use std::{
    collections::VecDeque,
    sync::{Arc, Mutex},
};

use bytes::BytesMut;
use download::Batch;
use tokio::task::JoinHandle;
use zip_core::structs::CompressionMethod;

mod compare;
mod download;
mod handle_processor;
mod remote;

use super::{FileSystem, RemoteZip};
use crate::{
    model::{Config, Error, LocalFileInfo, ProgressDetails, RemoteFileInfo},
    proto::sync::{
        download::{DownloadResult, UnzipResult, download_batch, unzip_file},
        handle_processor::process_handles,
    },
};

#[derive(Debug)]
pub enum Progress<RE: Debug, FE: Debug> {
    DownloadExtracting {
        download: ProgressDetails,
        unzip:    ProgressDetails,
    },
    Deleting(ProgressDetails),
    Successful,
    Errored(Error<RE, FE>),
}

#[derive(Debug)]
pub enum State<R: RemoteZip, F: FileSystem> {
    ToBeEvaluated(R, F, Config),
    Download(
        R,
        F,
        Config,
        VecDeque<Batch>,
        Vec<LocalFileInfo>,
        ProgressDetails,
        ProgressDetails,
        Vec<JoinHandle<DownloadResult<R, F>>>,
        #[allow(clippy::type_complexity)] Arc<Mutex<Vec<JoinHandle<UnzipResult<R, F>>>>>,
        Vec<UnzipResult<R, F>>,
        Arc<AtomicUsize>,
        Arc<AtomicUsize>,
    ),
    DeleteFiles(F, Vec<LocalFileInfo>, ProgressDetails),
    Finished,
}

impl<R, F> State<R, F>
where
    R: RemoteZip + Clone + Send + 'static,
    F: FileSystem + Clone + Send + 'static,
{
    pub fn new(remote: R, file_system: F, config: Config) -> Self { Self::ToBeEvaluated(remote, file_system, config) }

    /// This is the `fn` that advances this state-machine.
    /// On call it will evaluate a small step towards completion and returns a
    /// progress afterwards.
    ///
    /// Usage: call it in a loop and present status to user.
    pub async fn progress(self) -> Option<(Progress<R::Error, F::Error>, Self)> {
        let res = match self {
            State::ToBeEvaluated(r, f, config) => evaluate(r, f, config).await,
            State::Download(r, f, config, nd, ndel, dowp, unzp, dowh, uh, ur, rx, tx) => {
                download(r, f, config, nd, ndel, (dowp, unzp), (dowh, uh), ur, (rx, tx)).await
            },
            State::DeleteFiles(f, ndel, delp) => delete(f, ndel, delp).await,
            State::Finished => Ok(None),
        };
        match res {
            Ok(ok) => ok,
            Err(e) => Some((Progress::Errored(e), State::Finished)),
        }
    }
}

/// checks if an update is necessary
async fn evaluate<R, F>(
    remote: R,
    file_system: F,
    config: Config,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
    R: RemoteZip,
    F: FileSystem,
{
    // evaluate local and remote simultaneously
    let (rfiles_r, file_infos_r) =
        futures_lite::future::zip(remote::rfile_infos::<R, F>(&remote, &config), file_system.all_files()).await;

    let (rfiles, file_infos) = (rfiles_r?, file_infos_r.map_err(Error::FileSystem)?);

    tracing::debug!("local files to be checked: {}", file_infos.len());

    let compared = compare::build_compared(rfiles, file_infos);

    tracing::info!(
        "Need to download {} bytes in {} batches and delete {} files",
        compared.needs_download_bytes,
        compared.needs_download_batches.len(),
        compared.needs_deletion.len(),
    );

    if compared.needs_download_batches.iter().any(|batch| {
        batch.iter().any(|rfi| {
            !matches!(
                CompressionMethod::try_from(rfi.compression_method),
                Ok(CompressionMethod::Deflated) | Ok(CompressionMethod::Stored)
            )
        })
    }) {
        return Err(Error::UnsupportedCompressionMethod);
    }

    let progress_download = ProgressDetails::new(compared.needs_download_bytes);
    let progress_unzip = ProgressDetails::new(compared.needs_persistent_bytes);
    Ok(Some((
        Progress::DownloadExtracting {
            download: progress_download.clone(),
            unzip:    progress_unzip.clone(),
        },
        State::Download(
            remote,
            file_system,
            config,
            compared.needs_download_batches,
            compared.needs_deletion,
            progress_download,
            progress_unzip,
            Vec::new(),
            Arc::new(Mutex::new(Vec::new())),
            Vec::new(),
            Arc::new(AtomicUsize::new(0)),
            Arc::new(AtomicUsize::new(0)),
        ),
    )))
}

/// coordinates the update: download of new chunks, unzipping files and writing
/// them to disk
#[expect(clippy::type_complexity)]
#[expect(clippy::too_many_arguments)]
async fn download<R, F>(
    remote: R,
    file_system: F,
    config: Config,
    mut needs_download: VecDeque<Batch>,
    need_deletion: Vec<LocalFileInfo>,
    (mut progress_download, mut progress_unzip): (ProgressDetails, ProgressDetails),
    (mut download_handles, mut unzip_handles): (
        Vec<JoinHandle<DownloadResult<R, F>>>,
        Arc<Mutex<Vec<JoinHandle<UnzipResult<R, F>>>>>,
    ),
    mut unzip_results: Vec<UnzipResult<R, F>>,
    (dc, zc): (Arc<AtomicUsize>, Arc<AtomicUsize>),
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
    R: RemoteZip + Clone + Send + 'static,
    F: FileSystem + Clone + Send + 'static,
{
    if needs_download.is_empty() && download_handles.is_empty() {
        // when `get_mut` succeeds all unzips have been added, because there exist no
        // further references to this arc.
        if let Some(unzip_handles) = Arc::get_mut(&mut unzip_handles) {
            let unzip_finished = unzip_handles.get_mut().unwrap().is_empty();
            if unzip_finished {
                tracing::debug!("checking zips for errors");
                for result in unzip_results {
                    result?;
                }

                tracing::info!("Download complete. Unzip complete. Deleting entries now");
                let progress = ProgressDetails::new(
                    need_deletion.len() as u64, /* bytes make no sense in terms of deletion */
                );

                return Ok(Some((
                    Progress::Deleting(progress.clone()),
                    State::DeleteFiles(file_system, need_deletion, progress),
                )));
            }
        } else {
            tracing::debug!("Download complete. waiting for Unzip to complete.");
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }

    let (finished_download_handles, finished_unzip_handles) =
        process_handles::<R, F>(&mut download_handles, &unzip_handles);
    if !finished_download_handles.is_empty() {
        tracing::trace!(cnt = finished_download_handles.len(), "downloads finished");
    }
    for finished_handle in finished_download_handles.into_iter() {
        finished_handle.await.map_err(|_| Error::JoinError)??; //TODO: make sure download errors raised dont interupt ongoing unzips, to keep file changes "atomic"
    }

    if let Some(finished_unzip_handles) = finished_unzip_handles {
        unzip_results.reserve(finished_unzip_handles.len());
        for finished_handle in finished_unzip_handles.into_iter() {
            unzip_results.push(finished_handle.await.map_err(|_| Error::JoinError)?);
        }
    }

    // spawn new downloads if capacity is there
    let unzip_handles2 = unzip_handles.clone();
    let file_system2 = file_system.clone();
    let zc2 = zc.clone();
    let spawn_unzip = move |bytes: BytesMut, rfile: RemoteFileInfo| {
        let zc2 = zc2.clone();
        let file_system2 = file_system2.clone();
        let name = &rfile.file_name;
        tracing::trace!(?name, "triggering unzip");
        let new_task = tokio::spawn(unzip_file::<R, F>(bytes, rfile, file_system2, zc2));
        let mut unzip_handles2 = unzip_handles2.lock().unwrap(); //SYNC LOCK: carefull not to hold this over .await
        unzip_handles2.push(new_task);
    };

    while download_handles.len() < config.max_parallel_downloads {
        if let Some(next_batch) = needs_download.pop_front() {
            let files_in_batch = next_batch.len();
            tracing::trace!(?files_in_batch, "triggering download of next batch");
            let remote = remote.clone();
            download_handles.push(tokio::spawn(download_batch::<R, F, _>(
                next_batch,
                remote,
                spawn_unzip.clone(),
                dc.clone(),
            )));
        } else {
            break;
        }
    }

    let download_count = dc.swap(0, Ordering::SeqCst);
    let unzip_count = zc.swap(0, Ordering::SeqCst);
    progress_download.add_chunk(download_count as u64); //we used usize, so be available on most platforms
    progress_unzip.add_chunk(unzip_count as u64);

    if download_count > 0 || unzip_count > 0 {
        let d_len = download_handles.len();
        let u_len = unzip_handles.try_lock().map(|l| l.len()).unwrap_or_default();
        tracing::trace!(?download_count, ?unzip_count, ?d_len, ?u_len, "status changed");
    }

    Ok(Some((
        Progress::DownloadExtracting {
            download: progress_download.clone(),
            unzip:    progress_unzip.clone(),
        },
        State::Download(
            remote,
            file_system,
            config,
            needs_download,
            need_deletion,
            progress_download,
            progress_unzip,
            download_handles,
            unzip_handles,
            unzip_results,
            dc,
            zc,
        ),
    )))
}

/// coordinates the deletion after all file where downloaded and finishing the
/// process
async fn delete<R, F>(
    file_system: F,
    mut needs_deletion: Vec<LocalFileInfo>,
    progress: ProgressDetails,
) -> Result<Option<(Progress<R::Error, F::Error>, State<R, F>)>, Error<R::Error, F::Error>>
where
    R: RemoteZip,
    F: FileSystem + Clone,
{
    match needs_deletion.pop() {
        Some(file) => {
            file_system
                .delete_file(&file.local_unix_path)
                .await
                .map_err(Error::FileSystem)?;
            Ok(Some((
                Progress::Deleting(progress.clone()),
                State::DeleteFiles(file_system, needs_deletion, progress),
            )))
        },
        None => Ok(Some((Progress::Successful, State::Finished))),
    }
}