pub mod durability;
#[cfg(feature = "dyn")]
pub mod dynamic;
pub mod error;
#[cfg(feature = "executor")]
pub mod executor;
#[cfg(feature = "fs")]
pub mod fs;
pub mod impls;
pub mod path;
pub use durability::{DirSync, DurabilityLevel, FileCommit, FileSync};
#[cfg(all(feature = "dyn", feature = "fs"))]
pub use dynamic::fs::DynFs;
#[cfg(feature = "monoio")]
pub use executor::monoio::MonoioExecutor;
#[cfg(all(feature = "executor-web", target_arch = "wasm32"))]
pub use executor::web::WebExecutor;
#[cfg(feature = "fs")]
pub use fs::{CasCondition, Fs, FsCas, OpenOptions};
pub use fusio_core::{
error::{BoxedError, Error},
IoBuf, IoBufMut, MaybeSend, MaybeSync, Read, Write,
};
#[cfg(feature = "dyn")]
pub use fusio_core::{DynRead, DynWrite};
pub use impls::*;
#[cfg(test)]
mod tests {
use fusio_core::{error::Error, IoBuf, IoBufMut};
use super::{Read, Write};
#[allow(unused)]
struct CountWrite<W> {
cnt: usize,
w: W,
}
impl<W> CountWrite<W> {
#[allow(unused)]
fn new(w: W) -> Self {
Self { cnt: 0, w }
}
}
impl<W> Write for CountWrite<W>
where
W: Write,
{
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
let (result, buf) = self.w.write_all(buf).await;
(result.inspect(|_| self.cnt += buf.bytes_init()), buf)
}
async fn flush(&mut self) -> Result<(), Error> {
self.w.flush().await.map(Into::into)
}
async fn close(&mut self) -> Result<(), Error> {
self.w.close().await
}
}
#[allow(unused)]
struct CountRead<R> {
cnt: usize,
r: R,
}
impl<R> CountRead<R> {
#[allow(unused)]
fn new(r: R) -> Self {
Self { cnt: 0, r }
}
}
impl<R> Read for CountRead<R>
where
R: Read,
{
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
let (result, buf) = self.r.read_exact_at(buf, pos).await;
match result {
Ok(()) => {
self.cnt += buf.bytes_init();
(Ok(()), buf)
}
Err(e) => (Err(e), buf),
}
}
async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
let (result, buf) = self.r.read_to_end_at(buf, pos).await;
match result {
Ok(()) => {
self.cnt += buf.bytes_init();
(Ok(()), buf)
}
Err(e) => (Err(e), buf),
}
}
async fn size(&self) -> Result<u64, Error> {
self.r.size().await
}
}
#[allow(unused)]
async fn write_and_read<W, R>(write: W, read: R)
where
W: Write,
R: Read,
{
let mut writer = CountWrite::new(write);
#[cfg(feature = "completion-based")]
writer.write_all(vec![2, 0, 2, 4]).await;
#[cfg(not(feature = "completion-based"))]
writer.write_all(&[2, 0, 2, 4][..]).await;
writer.close().await.unwrap();
let mut reader = CountRead::new(read);
{
let mut buf = vec![];
let (result, buf) = reader.read_to_end_at(buf, 0).await;
result.unwrap();
assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}
{
let mut buf = vec![];
let (result, buf) = reader.read_to_end_at(buf, 2).await;
result.unwrap();
assert_eq!(buf.bytes_init(), 2);
assert_eq!(buf.as_slice(), &[2, 4]);
}
}
#[allow(unused)]
#[cfg(not(target_arch = "wasm32"))]
async fn test_local_fs_read_write<S>(fs: S) -> Result<(), Error>
where
S: crate::fs::Fs,
{
use std::collections::HashSet;
use fusio_core::error::Error;
use futures_util::StreamExt;
use tempfile::TempDir;
use crate::{fs::OpenOptions, path::Path, DynFs};
let tmp_dir = TempDir::new()?;
let work_dir_path = tmp_dir.path().join("work");
let work_file_path = work_dir_path.join("test.file");
S::create_dir_all(
&Path::from_absolute_path(&work_dir_path).map_err(|err| Error::Path(Box::new(err)))?,
)
.await?;
assert!(work_dir_path.exists());
assert!(fs
.open_options(
&Path::from_absolute_path(&work_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default()
)
.await
.is_err());
{
let _ = fs
.open_options(
&Path::from_absolute_path(&work_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().create(true).write(true),
)
.await?;
assert!(work_file_path.exists());
}
{
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true),
)
.await?;
file.write_all("Hello! fusio".as_bytes()).await.0?;
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true),
)
.await?;
file.write_all("Hello! world".as_bytes()).await.0?;
file.flush().await.unwrap();
file.close().await.unwrap();
let mut file = fs
.open_options(
&Path::from_absolute_path(&work_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().read(true),
)
.await?;
let (result, buf) = file.read_exact_at(vec![0u8; 12], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! fusio");
let (result, buf) = file.read_exact_at(vec![0u8; 12], 12).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! world");
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
#[allow(unused)]
async fn test_local_fs_copy_link<F: crate::fs::Fs>(src_fs: F) -> Result<(), Error> {
use std::collections::HashSet;
use futures_util::StreamExt;
use tempfile::TempDir;
use crate::{fs::OpenOptions, path::Path, DynFs};
let tmp_dir = TempDir::new()?;
let work_dir_path = tmp_dir.path().join("work_dir");
let src_file_path = work_dir_path.join("src_test.file");
let dst_file_path = work_dir_path.join("dst_test.file");
F::create_dir_all(
&Path::from_absolute_path(&work_dir_path).map_err(|err| Error::Path(Box::new(err)))?,
)
.await?;
let _ = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().create(true),
)
.await?;
let _ = src_fs
.open_options(
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().create(true),
)
.await?;
{
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true),
)
.await?;
src_file.write_all("Hello! fusio".as_bytes()).await.0?;
src_file.close().await?;
src_fs
.copy(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
)
.await?;
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true).read(true),
)
.await?;
src_file.write_all("Hello! world".as_bytes()).await.0?;
src_file.flush().await?;
src_file.close().await?;
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true).read(true),
)
.await?;
let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! fusio");
let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! world");
let mut dst_file = src_fs
.open_options(
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().read(true),
)
.await?;
let (result, buf) = dst_file.read_to_end_at(vec![], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! fusio");
}
src_fs
.remove(
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
)
.await?;
{
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true),
)
.await?;
src_file.write_all("Hello! fusio".as_bytes()).await.0?;
src_file.close().await?;
src_fs
.link(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
)
.await?;
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true).read(true),
)
.await?;
src_file.write_all("Hello! world".as_bytes()).await.0?;
src_file.flush().await?;
src_file.close().await?;
let mut src_file = src_fs
.open_options(
&Path::from_absolute_path(&src_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().write(true).read(true),
)
.await?;
let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! fusio");
let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! world");
let mut dst_file = src_fs
.open_options(
&Path::from_absolute_path(&dst_file_path)
.map_err(|err| Error::Path(Box::new(err)))?,
OpenOptions::default().read(true),
)
.await?;
let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! fusio");
let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"Hello! world");
}
Ok(())
}
#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
#[tokio::test(flavor = "multi_thread")]
async fn test_tokio() {
use tempfile::tempfile;
use tokio::fs::File;
use crate::disk::tokio::TokioFile;
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();
let read_file = TokioFile::new(File::from_std(read));
let write_file = TokioFile::new(File::from_std(write));
write_and_read(write_file, read_file).await;
}
#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
#[tokio::test(flavor = "multi_thread")]
async fn test_tokio_fs() {
use crate::disk::TokioFs;
test_local_fs_read_write(TokioFs).await.unwrap();
test_local_fs_copy_link(TokioFs).await.unwrap();
}
#[cfg(all(feature = "tokio-uring", target_os = "linux"))]
#[test]
fn test_tokio_uring_fs() {
use crate::disk::tokio_uring::fs::TokioUringFs;
tokio_uring::start(async {
test_local_fs_read_write(TokioUringFs).await.unwrap();
test_local_fs_copy_link(TokioUringFs).await.unwrap();
})
}
#[cfg(all(feature = "monoio", not(target_arch = "wasm32")))]
#[monoio::test]
async fn test_monoio_fs() {
use crate::disk::monoio::fs::MonoIoFs;
test_local_fs_read_write(MonoIoFs).await.unwrap();
test_local_fs_copy_link(MonoIoFs).await.unwrap();
}
#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
#[tokio::test(flavor = "multi_thread")]
async fn test_read_exact() {
use tempfile::tempfile;
use tokio::fs::File;
use crate::disk::tokio::TokioFile;
let mut file = TokioFile::new(File::from_std(tempfile().unwrap()));
let (result, _) = file.write_all(&b"hello, world"[..]).await;
result.unwrap();
let (result, buf) = file.read_exact_at(vec![0u8; 5], 0).await;
result.unwrap();
assert_eq!(buf.as_slice(), b"hello");
let (result, _) = file.read_exact_at(vec![0u8; 8], 5).await;
assert!(result.is_err());
if let Error::Io(e) = result.unwrap_err() {
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
}
}
#[cfg(all(feature = "monoio", not(target_arch = "wasm32")))]
#[monoio::test]
async fn test_monoio() {
use monoio::fs::File;
use tempfile::tempfile;
use crate::disk::monoio::MonoioFile;
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();
write_and_read(
MonoioFile::from(File::from_std(write).unwrap()),
MonoioFile::from(File::from_std(read).unwrap()),
)
.await;
}
#[cfg(all(feature = "tokio-uring", target_os = "linux"))]
#[test]
fn test_tokio_uring() {
use tempfile::tempfile;
use tokio_uring::fs::File;
use crate::disk::tokio_uring::TokioUringFile;
tokio_uring::start(async {
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();
write_and_read(
TokioUringFile::from(File::from_std(write)),
TokioUringFile::from(File::from_std(read)),
)
.await;
});
}
}