use crate::handle::Handle;
use crate::meta::FileMeta;
use crate::{Error, Result};
use std::path::{Path, PathBuf};
use std::sync::Arc;
impl Handle {
pub async fn write_async(self: Arc<Self>, path: impl AsRef<Path>, data: Vec<u8>) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
#[cfg(target_os = "linux")]
{
if self.active_method() == crate::Method::Direct
&& std::env::var_os("FSYS_DISABLE_NATIVE_ASYNC").is_none()
{
if let Some(ring) = self.async_io_uring() {
return write_async_native(&self, &ring, &path, &data).await;
}
}
}
tokio::task::spawn_blocking(move || self.write(&path, &data))
.await
.map_err(join_error_to_io)?
}
pub async fn write_at_async(
self: Arc<Self>,
path: impl AsRef<Path>,
offset: u64,
data: Vec<u8>,
) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.write_at(&path, offset, &data))
.await
.map_err(join_error_to_io)?
}
pub async fn write_copy_async(
self: Arc<Self>,
path: impl AsRef<Path>,
data: Vec<u8>,
) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.write_copy(&path, &data))
.await
.map_err(join_error_to_io)?
}
pub async fn append_async(
self: Arc<Self>,
path: impl AsRef<Path>,
data: Vec<u8>,
) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.append(&path, &data))
.await
.map_err(join_error_to_io)?
}
pub async fn read_async(self: Arc<Self>, path: impl AsRef<Path>) -> Result<Vec<u8>> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.read(&path))
.await
.map_err(join_error_to_io)?
}
pub async fn read_at_async(
self: Arc<Self>,
path: impl AsRef<Path>,
offset: u64,
len: usize,
) -> Result<Vec<u8>> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.read_at(&path, offset, len))
.await
.map_err(join_error_to_io)?
}
pub async fn delete_async(self: Arc<Self>, path: impl AsRef<Path>) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.delete(&path))
.await
.map_err(join_error_to_io)?
}
pub async fn truncate_async(
self: Arc<Self>,
path: impl AsRef<Path>,
new_size: u64,
) -> Result<()> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.truncate(&path, new_size))
.await
.map_err(join_error_to_io)?
}
pub async fn rename_async(
self: Arc<Self>,
old: impl AsRef<Path>,
new: impl AsRef<Path>,
) -> Result<()> {
super::require_runtime()?;
let old: PathBuf = old.as_ref().to_path_buf();
let new: PathBuf = new.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.rename(&old, &new))
.await
.map_err(join_error_to_io)?
}
pub async fn copy_async(
self: Arc<Self>,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
) -> Result<u64> {
super::require_runtime()?;
let src: PathBuf = src.as_ref().to_path_buf();
let dst: PathBuf = dst.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.copy(&src, &dst))
.await
.map_err(join_error_to_io)?
}
pub async fn exists_async(self: Arc<Self>, path: impl AsRef<Path>) -> Result<bool> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.exists(&path))
.await
.map_err(join_error_to_io)?
}
pub async fn size_async(self: Arc<Self>, path: impl AsRef<Path>) -> Result<u64> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.size(&path))
.await
.map_err(join_error_to_io)?
}
pub async fn meta_async(self: Arc<Self>, path: impl AsRef<Path>) -> Result<FileMeta> {
super::require_runtime()?;
let path: PathBuf = path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || self.meta(&path))
.await
.map_err(join_error_to_io)?
}
}
fn join_error_to_io(e: tokio::task::JoinError) -> Error {
Error::Io(std::io::Error::other(format!(
"spawn_blocking task failed: {e}"
)))
}
#[cfg(target_os = "linux")]
async fn write_async_native(
handle: &Handle,
ring: &crate::async_io::completion_driver::AsyncIoUring,
path: &Path,
data: &[u8],
) -> Result<()> {
use crate::async_io::iouring_substrate::{fdatasync_native, write_at_native};
use std::os::fd::AsRawFd;
let resolved = handle.resolve_path(path)?;
let temp = Handle::gen_temp_path(&resolved);
let (file, direct_ok) =
crate::platform::open_write_new(&temp, handle.use_direct()).map_err(|e| {
Error::AtomicReplaceFailed {
step: "open_temp",
source: as_io_error(e),
}
})?;
if handle.use_direct() && !direct_ok {
handle.update_active_method(crate::Method::Data);
drop(file);
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "open_temp",
source: std::io::Error::other(
"O_DIRECT unsupported on this filesystem; active_method downgraded to Data — retry",
),
});
}
if data.is_empty() {
if let Err(e) = fdatasync_native(ring, file.as_raw_fd()).await {
drop(file);
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "fdatasync_native",
source: as_io_error(e),
});
}
drop(file);
if let Err(e) = crate::platform::atomic_rename(&temp, &resolved) {
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "rename",
source: as_io_error(e),
});
}
let _ = crate::platform::sync_parent_dir(&resolved);
return Ok(());
}
let sector_size = handle.sector_size() as usize;
let aligned_len = data.len().div_ceil(sector_size).saturating_mul(sector_size);
let mut buf = crate::platform::AlignedBuf::new(aligned_len, sector_size).map_err(|e| {
Error::AtomicReplaceFailed {
step: "alloc_aligned_buf",
source: as_io_error(e),
}
})?;
buf.as_mut_slice()[..data.len()].copy_from_slice(data);
let n = match write_at_native(ring, file.as_raw_fd(), buf.as_slice(), 0).await {
Ok(n) => n,
Err(e) => {
drop(file);
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "write_native",
source: as_io_error(e),
});
}
};
if n != aligned_len {
drop(file);
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "write_native_short",
source: std::io::Error::other("native io_uring write returned short count"),
});
}
if let Err(e) = fdatasync_native(ring, file.as_raw_fd()).await {
drop(file);
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "fdatasync_native",
source: as_io_error(e),
});
}
drop(file);
if let Err(e) = std::fs::OpenOptions::new()
.write(true)
.open(&temp)
.and_then(|f| f.set_len(data.len() as u64))
{
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "truncate",
source: e,
});
}
if let Err(e) = crate::platform::atomic_rename(&temp, &resolved) {
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "rename",
source: as_io_error(e),
});
}
let _ = crate::platform::sync_parent_dir(&resolved);
Ok(())
}
#[cfg(target_os = "linux")]
fn as_io_error(e: Error) -> std::io::Error {
match e {
Error::Io(io_err) => io_err,
other => std::io::Error::other(other.to_string()),
}
}