fusio 0.6.0

Fusio provides lean, minimal cost abstraction and extensible Read / Write trait to multiple storage on multiple poll-based / completion-based async runtime.
Documentation
use core::any::Any;
use std::{cmp, pin::Pin, sync::Arc};

use fusio_core::{DynWrite, Write};

use super::{MaybeSendFuture, MaybeSendStream};
use crate::{
    durability::FileCommit,
    error::Error,
    fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
    path::Path,
    DynRead, IoBuf, IoBufMut, MaybeSend, MaybeSync, Read,
};

pub trait DynFile: DynRead + DynWrite + DynFileCommit + 'static + Any {}

impl<F> DynFile for F where F: DynRead + DynWrite + DynFileCommit + 'static + Any {}

impl<'read> Read for Box<dyn DynFile + 'read> {
    async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
        let (result, buf) =
            DynRead::read_exact_at(self.as_mut(), unsafe { buf.slice_mut_unchecked(..) }, pos)
                .await;
        (result, unsafe { B::recover_from_slice_mut(buf) })
    }

    async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
        let res = DynRead::read_to_end_at(self.as_mut(), buf, pos).await;
        (res.0, res.1)
    }

    async fn size(&self) -> Result<u64, Error> {
        DynRead::size(self.as_ref()).await
    }
}

impl<'write> Write for Box<dyn DynFile + 'write> {
    async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
        let (result, buf) =
            DynWrite::write_all(self.as_mut(), unsafe { buf.slice_unchecked(..) }).await;
        (result, unsafe { B::recover_from_slice(buf) })
    }

    async fn flush(&mut self) -> Result<(), Error> {
        DynWrite::flush(self.as_mut()).await
    }

    async fn close(&mut self) -> Result<(), Error> {
        DynWrite::close(self.as_mut()).await
    }
}

impl FileCommit for Box<dyn DynFile + '_> {
    async fn commit(&mut self) -> Result<(), Error> {
        DynFileCommit::commit(self.as_mut()).await
    }
}

/// Dyn-compatible (object safe) version of [`FileCommit`] for dynamic files.
/// All types implementing [`FileCommit`] automatically implement this trait.
///
/// # Safety
/// Implementors must guarantee that the returned future upholds the same
/// semantics as [`FileCommit::commit`] on the underlying value and that it
/// does not outlive the provided `&mut self`. The dynamic dispatch machinery
/// assumes the future has exclusive access to the file handle for its
/// lifetime; violating this can lead to aliasing bugs or use-after-free.
pub unsafe trait DynFileCommit: MaybeSend {
    fn commit(&mut self) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + '_>>;
}

unsafe impl<T> DynFileCommit for T
where
    T: FileCommit,
{
    fn commit(&mut self) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + '_>> {
        Box::pin(async move { FileCommit::commit(self).await })
    }
}

pub trait DynFs: MaybeSend + MaybeSync {
    fn file_system(&self) -> FileSystemTag;

    fn open<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> {
        self.open_options(path, OpenOptions::default())
    }

    fn open_options<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
        options: OpenOptions,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>>;

    fn create_dir_all<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

    fn list<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<
        Box<
            dyn MaybeSendFuture<
                    Output = Result<
                        Pin<Box<dyn MaybeSendStream<Item = Result<FileMeta, Error>> + 's>>,
                        Error,
                    >,
                > + 's,
        >,
    >;

    fn remove<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

    fn copy<'s, 'path: 's>(
        &'s self,
        from: &'path Path,
        to: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

    fn link<'s, 'path: 's>(
        &'s self,
        from: &'path Path,
        to: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;
}

impl<F> DynFs for F
where
    F: Fs,
    F::File: FileCommit,
{
    fn file_system(&self) -> FileSystemTag {
        Fs::file_system(self)
    }

    fn open_options<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
        options: OpenOptions,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<Box<dyn DynFile>, Error>> + 's>> {
        Box::pin(async move {
            let file = F::open_options(self, path, options).await?;
            Ok(Box::new(file) as Box<dyn DynFile>)
        })
    }

    fn create_dir_all<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
        Box::pin(F::create_dir_all(path))
    }

    fn list<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<
        Box<
            dyn MaybeSendFuture<
                    Output = Result<
                        Pin<Box<dyn MaybeSendStream<Item = Result<FileMeta, Error>> + 's>>,
                        Error,
                    >,
                > + 's,
        >,
    > {
        Box::pin(async move {
            let stream = F::list(self, path).await?;
            Ok(Box::pin(stream)
                as Pin<
                    Box<dyn MaybeSendStream<Item = Result<FileMeta, Error>>>,
                >)
        })
    }

    fn remove<'s, 'path: 's>(
        &'s self,
        path: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
        Box::pin(F::remove(self, path))
    }

    fn copy<'s, 'path: 's>(
        &'s self,
        from: &'path Path,
        to: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
        Box::pin(F::copy(self, from, to))
    }

    fn link<'s, 'path: 's>(
        &'s self,
        from: &'path Path,
        to: &'path Path,
    ) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
        Box::pin(F::link(self, from, to))
    }
}

pub async fn copy(
    from_fs: &Arc<dyn DynFs>,
    from: &Path,
    to_fs: &Arc<dyn DynFs>,
    to: &Path,
) -> Result<(), Error> {
    if from_fs.file_system() == to_fs.file_system() {
        from_fs.copy(from, to).await?;
        return Ok(());
    }
    let mut from_file = from_fs
        .open_options(from, OpenOptions::default().read(true))
        .await?;
    let from_file_size = DynRead::size(&from_file).await? as usize;

    let mut to_file = to_fs
        .open_options(to, OpenOptions::default().create(true).write(true))
        .await?;
    let buf_size = cmp::min(from_file_size, 4 * 1024);
    let mut buf = Some(vec![0u8; buf_size]);
    let mut read_pos = 0u64;

    while (read_pos as usize) < from_file_size - 1 {
        let tmp = buf.take().unwrap();
        let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await;
        result?;
        read_pos += tmp.bytes_init() as u64;

        let (result, tmp) = Write::write_all(&mut to_file, tmp).await;
        result?;
        buf = Some(tmp);
    }
    DynWrite::close(&mut to_file).await?;

    Ok(())
}

#[cfg(test)]
mod tests {
    #[cfg(all(feature = "tokio", not(feature = "completion-based")))]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_dyn_fs() {
        use fusio_core::Write;
        use tempfile::tempfile;

        use crate::disk::tokio::TokioFile;

        let file = TokioFile::new(tokio::fs::File::from_std(tempfile().unwrap()));
        let mut dyn_file: Box<dyn super::DynFile> = Box::new(file);
        let buf = [24, 9, 24, 0];
        let (result, _) = dyn_file.write_all(&buf[..]).await;
        result.unwrap();
    }

    #[cfg(all(feature = "tokio", not(feature = "completion-based")))]
    #[tokio::test(flavor = "multi_thread")]
    async fn test_dyn_buf_fs() {
        use fusio_core::Write;
        use tempfile::NamedTempFile;

        use crate::{
            disk::TokioFs,
            dynamic::DynFile,
            fs::{Fs, OpenOptions},
            impls::buffered::BufWriter,
            path::Path,
            Read,
        };

        let open_options = OpenOptions::default().create(true).write(true).read(true);
        let fs = TokioFs;
        let temp_file = NamedTempFile::new().unwrap();
        let path = temp_file.into_temp_path();
        // Use filesystem-aware conversion to handle Windows paths correctly.
        let fusio_path = Path::from_filesystem_path(&path).unwrap();
        let mut dyn_file = Box::new(BufWriter::new(
            fs.open_options(&fusio_path, open_options).await.unwrap(),
            5,
        )) as Box<dyn DynFile>;

        let buf = [24, 9, 24, 0];
        let (result, _) = dyn_file.write_all(&buf[..]).await;
        result.unwrap();
        let (_, buf) = dyn_file.read_to_end_at(vec![], 0).await;
        assert!(buf.is_empty());

        let buf = [34, 19, 34, 10];
        let (result, _) = dyn_file.write_all(&buf[..]).await;
        result.unwrap();
        dyn_file.flush().await.unwrap();
        let (_, buf) = dyn_file.read_exact_at(vec![0; 4], 0).await;
        assert_eq!(buf.as_slice(), &[24, 9, 24, 0]);

        dyn_file.flush().await.unwrap();
        let (_, buf) = dyn_file.read_to_end_at(vec![], 0).await;
        assert_eq!(buf.as_slice(), &[24, 9, 24, 0, 34, 19, 34, 10])
    }
}