use crate::metadata::Metadata;
use dashmap::DashMap;
use io_uring::IoUring;
use io_uring::cqueue::Entry as CEntry;
use io_uring::opcode;
use io_uring::squeue::Entry as SEntry;
use io_uring::types;
use std::cmp::min;
use std::collections::VecDeque;
use std::ffi::CString;
use std::io;
use std::mem::MaybeUninit;
use std::os::fd::AsRawFd;
use std::os::fd::FromRawFd;
use std::os::fd::IntoRawFd;
use std::os::fd::OwnedFd;
use std::os::fd::RawFd;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::thread;
use tokio::sync::oneshot;
pub const URING_LEN_MAX: u64 = 2 * 1024 * 1024 * 1024 - 4096;
const MAX_REGISTERED_FILES: u32 = 4096;
fn path_to_cstring(path: &Path) -> io::Result<CString> {
CString::new(path.as_os_str().as_bytes()).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("path contains null byte at position {}", e.nul_position()),
)
})
}
pub unsafe trait IoBuf: Send + 'static {
fn as_ptr(&self) -> *const u8;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub unsafe trait IoBufMut: Send + 'static {
fn as_mut_ptr(&mut self) -> *mut u8;
fn capacity(&self) -> usize;
}
unsafe impl IoBuf for Vec<u8> {
fn as_ptr(&self) -> *const u8 {
Vec::as_ptr(self)
}
fn len(&self) -> usize {
Vec::len(self)
}
}
unsafe impl IoBufMut for Vec<u8> {
fn as_mut_ptr(&mut self) -> *mut u8 {
Vec::as_mut_ptr(self)
}
fn capacity(&self) -> usize {
Vec::capacity(self)
}
}
unsafe impl IoBuf for Box<[u8]> {
fn as_ptr(&self) -> *const u8 {
<[u8]>::as_ptr(self)
}
fn len(&self) -> usize {
<[u8]>::len(self)
}
}
unsafe impl IoBufMut for Box<[u8]> {
fn as_mut_ptr(&mut self) -> *mut u8 {
<[u8]>::as_mut_ptr(self)
}
fn capacity(&self) -> usize {
<[u8]>::len(self)
}
}
#[derive(Clone, Copy)]
#[doc(hidden)]
pub enum Target {
Fd(RawFd),
Fixed { index: u32, raw_fd: RawFd },
}
pub trait UringTarget {
#[doc(hidden)]
fn as_target(&self, uring_identity: &Arc<()>) -> Target;
}
impl<T: AsRawFd> UringTarget for T {
fn as_target(&self, _uring_identity: &Arc<()>) -> Target {
Target::Fd(self.as_raw_fd())
}
}
pub struct RegisteredFile {
index: u32,
raw_fd: RawFd,
uring_identity: Arc<()>,
}
impl UringTarget for RegisteredFile {
fn as_target(&self, uring_identity: &Arc<()>) -> Target {
assert!(
Arc::ptr_eq(&self.uring_identity, uring_identity),
"RegisteredFile used with wrong Uring instance"
);
Target::Fixed {
index: self.index,
raw_fd: self.raw_fd,
}
}
}
struct ReadRequest {
target: Target,
buf_ptr: *mut u8,
buf_len: u32,
offset: u64,
}
unsafe impl Send for ReadRequest {}
unsafe impl Sync for ReadRequest {}
struct WriteRequest {
target: Target,
buf_ptr: *const u8,
buf_len: u32,
offset: u64,
}
unsafe impl Send for WriteRequest {}
unsafe impl Sync for WriteRequest {}
struct SyncRequest {
target: Target,
datasync: bool,
}
struct StatxRequest {
target: Target,
statx_buf: Box<MaybeUninit<libc::statx>>,
}
struct FallocateRequest {
target: Target,
offset: u64,
len: u64,
mode: i32,
}
struct FadviseRequest {
target: Target,
offset: u64,
len: i64,
advice: i32,
}
struct FtruncateRequest {
target: Target,
len: u64,
}
struct OpenAtRequest {
dir_fd: RawFd,
path: CString,
flags: i32,
mode: u32,
}
struct StatxPathRequest {
dir_fd: RawFd,
path: CString,
flags: i32,
statx_buf: Box<MaybeUninit<libc::statx>>,
}
struct CloseRequest {
fd: RawFd,
}
struct RenameAtRequest {
old_dir_fd: RawFd,
old_path: CString,
new_dir_fd: RawFd,
new_path: CString,
flags: u32,
}
struct UnlinkAtRequest {
dir_fd: RawFd,
path: CString,
flags: i32,
}
struct MkdirAtRequest {
dir_fd: RawFd,
path: CString,
mode: u32,
}
struct SymlinkAtRequest {
new_dir_fd: RawFd,
target: CString,
link_path: CString,
}
struct LinkAtRequest {
old_dir_fd: RawFd,
old_path: CString,
new_dir_fd: RawFd,
new_path: CString,
flags: i32,
}
pub struct ReadResult<B> {
pub buf: B,
pub bytes_read: u32,
}
pub struct WriteResult<B> {
pub buf: B,
pub bytes_written: u32,
}
enum Message {
Read {
req: ReadRequest,
res: oneshot::Sender<io::Result<u32>>,
},
Write {
req: WriteRequest,
res: oneshot::Sender<io::Result<u32>>,
},
Sync {
req: SyncRequest,
res: oneshot::Sender<io::Result<()>>,
},
Statx {
req: StatxRequest,
res: oneshot::Sender<io::Result<Metadata>>,
},
Fallocate {
req: FallocateRequest,
res: oneshot::Sender<io::Result<()>>,
},
Fadvise {
req: FadviseRequest,
res: oneshot::Sender<io::Result<()>>,
},
Ftruncate {
req: FtruncateRequest,
res: oneshot::Sender<io::Result<()>>,
},
OpenAt {
req: OpenAtRequest,
res: oneshot::Sender<io::Result<OwnedFd>>,
},
StatxPath {
req: StatxPathRequest,
res: oneshot::Sender<io::Result<Metadata>>,
},
Close {
req: CloseRequest,
res: oneshot::Sender<io::Result<()>>,
},
RenameAt {
req: RenameAtRequest,
res: oneshot::Sender<io::Result<()>>,
},
UnlinkAt {
req: UnlinkAtRequest,
res: oneshot::Sender<io::Result<()>>,
},
MkdirAt {
req: MkdirAtRequest,
res: oneshot::Sender<io::Result<()>>,
},
SymlinkAt {
req: SymlinkAtRequest,
res: oneshot::Sender<io::Result<()>>,
},
LinkAt {
req: LinkAtRequest,
res: oneshot::Sender<io::Result<()>>,
},
}
pub const DEFAULT_RING_SIZE: u32 = 16384;
#[derive(Clone, Debug)]
pub struct UringCfg {
pub ring_size: u32,
pub coop_taskrun: bool,
pub defer_taskrun: bool,
pub iopoll: bool,
pub sqpoll: Option<u32>,
}
impl Default for UringCfg {
fn default() -> Self {
Self {
ring_size: DEFAULT_RING_SIZE,
coop_taskrun: false,
defer_taskrun: false,
iopoll: false,
sqpoll: None,
}
}
}
#[derive(Clone)]
pub struct Uring {
sender: crossbeam_channel::Sender<Message>,
ring: Arc<IoUring<SEntry, CEntry>>,
next_file_slot: Arc<AtomicU32>,
identity: Arc<()>,
}
macro_rules! build_op {
($target:expr, | $fd:ident | $op:expr) => {
match $target {
Target::Fd(raw) => {
let $fd = types::Fd(raw);
$op
}
Target::Fixed { index, .. } => {
let $fd = types::Fixed(index);
$op
}
}
};
}
macro_rules! build_op_fd_only {
($target:expr, | $fd:ident | $op:expr) => {
match $target {
Target::Fd(raw) => {
let $fd = types::Fd(raw);
$op
}
Target::Fixed { raw_fd, .. } => {
let $fd = types::Fd(raw_fd);
$op
}
}
};
}
fn handle_completion(msg: Message, result: i32) {
let result: io::Result<i32> = if result < 0 {
Err(io::Error::from_raw_os_error(-result))
} else {
Ok(result)
};
match msg {
Message::Read { res, .. } => {
let _ = res.send(result.map(|n| n as u32));
}
Message::Write { res, .. } => {
let _ = res.send(result.map(|n| n as u32));
}
Message::Sync { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::Statx { req, res } => {
let outcome = result.map(|_| {
let statx = unsafe { (*req.statx_buf).assume_init() };
Metadata(statx)
});
let _ = res.send(outcome);
}
Message::Fallocate { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::Fadvise { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::Ftruncate { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::OpenAt { res, .. } => {
let outcome = result.map(|fd| {
unsafe { OwnedFd::from_raw_fd(fd) }
});
let _ = res.send(outcome);
}
Message::StatxPath { req, res } => {
let outcome = result.map(|_| {
let statx = unsafe { (*req.statx_buf).assume_init() };
Metadata(statx)
});
let _ = res.send(outcome);
}
Message::Close { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::RenameAt { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::UnlinkAt { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::MkdirAt { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::SymlinkAt { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
Message::LinkAt { res, .. } => {
let _ = res.send(result.map(|_| ()));
}
}
}
impl Uring {
pub fn new(cfg: UringCfg) -> io::Result<Self> {
let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
let pending: Arc<DashMap<u64, Message>> = Default::default();
let ring = {
let mut builder = IoUring::<SEntry, CEntry>::builder();
builder.setup_clamp();
if cfg.coop_taskrun {
builder.setup_coop_taskrun();
};
if cfg.defer_taskrun {
builder.setup_defer_taskrun();
};
if cfg.iopoll {
builder.setup_iopoll();
}
if let Some(sqpoll) = cfg.sqpoll {
builder.setup_sqpoll(sqpoll);
};
builder.build(cfg.ring_size)?
};
let _ = ring.submitter().register_files_sparse(MAX_REGISTERED_FILES);
let ring = Arc::new(ring);
thread::spawn({
let pending = pending.clone();
let ring = ring.clone();
let mut msgbuf = VecDeque::new();
move || {
let mut submission = unsafe { ring.submission_shared() };
let mut next_id = 0u64;
while let Ok(init_msg) = receiver.recv() {
msgbuf.push_back(init_msg);
while let Ok(msg) = receiver.try_recv() {
msgbuf.push_back(msg);
}
while let Some(msg) = msgbuf.pop_front() {
let id = next_id;
next_id = next_id.wrapping_add(1);
let submission_entry = match &msg {
Message::Read { req, .. } => {
build_op!(req.target, |fd| opcode::Read::new(
fd,
req.buf_ptr,
req.buf_len
)
.offset(req.offset)
.build()
.user_data(id))
}
Message::Write { req, .. } => {
build_op!(req.target, |fd| opcode::Write::new(
fd,
req.buf_ptr,
req.buf_len
)
.offset(req.offset)
.build()
.user_data(id))
}
Message::Sync { req, .. } => {
build_op!(req.target, |fd| {
let mut fsync = opcode::Fsync::new(fd);
if req.datasync {
fsync = fsync.flags(types::FsyncFlags::DATASYNC);
}
fsync.build().user_data(id)
})
}
Message::Statx { req, .. } => {
const STATX_BASIC_STATS: u32 = 0x000007ff; const AT_EMPTY_PATH: i32 = 0x1000; static EMPTY_PATH: &std::ffi::CStr = c"";
let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
build_op_fd_only!(req.target, |fd| opcode::Statx::new(
fd,
EMPTY_PATH.as_ptr(),
statx_ptr
)
.flags(AT_EMPTY_PATH)
.mask(STATX_BASIC_STATS)
.build()
.user_data(id))
}
Message::Fallocate { req, .. } => {
build_op!(req.target, |fd| opcode::Fallocate::new(fd, req.len)
.offset(req.offset)
.mode(req.mode)
.build()
.user_data(id))
}
Message::Fadvise { req, .. } => {
build_op!(req.target, |fd| opcode::Fadvise::new(
fd, req.len, req.advice
)
.offset(req.offset)
.build()
.user_data(id))
}
Message::Ftruncate { req, .. } => {
build_op!(req.target, |fd| opcode::Ftruncate::new(fd, req.len)
.build()
.user_data(id))
}
Message::OpenAt { req, .. } => {
opcode::OpenAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
.flags(req.flags)
.mode(req.mode)
.build()
.user_data(id)
}
Message::StatxPath { req, .. } => {
const STATX_BASIC_STATS: u32 = 0x000007ff;
let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
opcode::Statx::new(types::Fd(req.dir_fd), req.path.as_ptr(), statx_ptr)
.flags(req.flags)
.mask(STATX_BASIC_STATS)
.build()
.user_data(id)
}
Message::Close { req, .. } => {
opcode::Close::new(types::Fd(req.fd)).build().user_data(id)
}
Message::RenameAt { req, .. } => opcode::RenameAt::new(
types::Fd(req.old_dir_fd),
req.old_path.as_ptr(),
types::Fd(req.new_dir_fd),
req.new_path.as_ptr(),
)
.flags(req.flags)
.build()
.user_data(id),
Message::UnlinkAt { req, .. } => {
opcode::UnlinkAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
.flags(req.flags)
.build()
.user_data(id)
}
Message::MkdirAt { req, .. } => {
opcode::MkDirAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
.mode(req.mode)
.build()
.user_data(id)
}
Message::SymlinkAt { req, .. } => opcode::SymlinkAt::new(
types::Fd(req.new_dir_fd),
req.target.as_ptr(),
req.link_path.as_ptr(),
)
.build()
.user_data(id),
Message::LinkAt { req, .. } => opcode::LinkAt::new(
types::Fd(req.old_dir_fd),
req.old_path.as_ptr(),
types::Fd(req.new_dir_fd),
req.new_path.as_ptr(),
)
.flags(req.flags)
.build()
.user_data(id),
};
pending.insert(id, msg);
if submission.is_full() {
submission.sync();
ring.submit_and_wait(1).unwrap();
}
unsafe {
submission.push(&submission_entry).unwrap();
};
}
submission.sync();
ring.submit().unwrap();
}
}
});
thread::spawn({
let pending = pending.clone();
let ring = ring.clone();
move || {
let mut completion = unsafe { ring.completion_shared() };
loop {
let Some(e) = completion.next() else {
ring.submit_and_wait(1).unwrap();
completion.sync();
continue;
};
let id = e.user_data();
let (_, req) = pending
.remove(&id)
.expect("completion for unknown request id");
handle_completion(req, e.result());
}
}
});
Ok(Self {
sender,
ring,
next_file_slot: Arc::new(AtomicU32::new(0)),
identity: Arc::new(()),
})
}
pub fn register(&self, file: &impl AsRawFd) -> io::Result<RegisteredFile> {
let raw_fd = file.as_raw_fd();
let slot = self.next_file_slot.fetch_add(1, Ordering::SeqCst);
if slot >= MAX_REGISTERED_FILES {
return Err(io::Error::new(
io::ErrorKind::Other,
"maximum registered files exceeded",
));
}
self
.ring
.submitter()
.register_files_update(slot, &[raw_fd])?;
Ok(RegisteredFile {
index: slot,
raw_fd,
uring_identity: self.identity.clone(),
})
}
fn send(&self, msg: Message) {
self.sender.send(msg).expect("uring submission thread dead");
}
pub async fn read_into_at<B: IoBufMut>(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
mut buf: B,
) -> io::Result<ReadResult<B>> {
let offset: u64 = offset
.try_into()
.map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
let target = file.as_target(&self.identity);
let ptr = buf.as_mut_ptr();
let cap = buf.capacity();
let (tx, rx) = oneshot::channel();
self.send(Message::Read {
req: ReadRequest {
target,
buf_ptr: ptr,
buf_len: cap.try_into().unwrap(),
offset,
},
res: tx,
});
let bytes_read = rx.await.expect("uring completion channel dropped")?;
Ok(ReadResult { buf, bytes_read })
}
pub async fn read_at(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
len: impl TryInto<u32>,
) -> io::Result<ReadResult<Vec<u8>>> {
let len: u32 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
let buf = vec![0u8; len as usize];
self.read_into_at(file, offset, buf).await
}
pub async fn write_at<B: IoBuf>(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
buf: B,
) -> io::Result<WriteResult<B>> {
let offset: u64 = offset
.try_into()
.map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
let target = file.as_target(&self.identity);
let ptr = buf.as_ptr();
let len = buf.len();
let (tx, rx) = oneshot::channel();
self.send(Message::Write {
req: WriteRequest {
target,
buf_ptr: ptr,
buf_len: len.try_into().unwrap(),
offset,
},
res: tx,
});
let bytes_written = rx.await.expect("uring completion channel dropped")?;
Ok(WriteResult { buf, bytes_written })
}
pub async fn read_exact_at(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
len: impl TryInto<u32>,
) -> io::Result<Vec<u8>> {
let len: u32 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
let result = self.read_at(file, offset, len).await?;
if result.bytes_read < len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("expected {} bytes, got {}", len, result.bytes_read),
));
}
Ok(result.buf)
}
pub async fn write_all_at(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
data: &[u8],
) -> io::Result<()> {
let mut offset: u64 = offset
.try_into()
.map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
let mut written = 0usize;
while written < data.len() {
let remaining = data.len() - written;
let chunk_size = min(remaining, URING_LEN_MAX as usize);
let chunk = data[written..written + chunk_size].to_vec();
let result = self.write_at(file, offset, chunk).await?;
if result.bytes_written == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write any bytes",
));
}
written += result.bytes_written as usize;
offset += result.bytes_written as u64;
}
Ok(())
}
pub async fn sync(&self, file: &impl UringTarget) -> io::Result<()> {
let target = file.as_target(&self.identity);
let (tx, rx) = oneshot::channel();
self.send(Message::Sync {
req: SyncRequest {
target,
datasync: false,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn datasync(&self, file: &impl UringTarget) -> io::Result<()> {
let target = file.as_target(&self.identity);
let (tx, rx) = oneshot::channel();
self.send(Message::Sync {
req: SyncRequest {
target,
datasync: true,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn statx(&self, file: &impl UringTarget) -> io::Result<Metadata> {
let target = file.as_target(&self.identity);
let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
let (tx, rx) = oneshot::channel();
self.send(Message::Statx {
req: StatxRequest { target, statx_buf },
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn fallocate(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
len: impl TryInto<u64>,
mode: i32,
) -> io::Result<()> {
let offset: u64 = offset
.try_into()
.map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
let len: u64 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
let target = file.as_target(&self.identity);
let (tx, rx) = oneshot::channel();
self.send(Message::Fallocate {
req: FallocateRequest {
target,
offset,
len,
mode,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn fadvise(
&self,
file: &impl UringTarget,
offset: impl TryInto<u64>,
len: impl TryInto<u64>,
advice: i32,
) -> io::Result<()> {
let offset: u64 = offset
.try_into()
.map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
let len: u64 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
let len: i64 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds i64::MAX"))?;
let target = file.as_target(&self.identity);
let (tx, rx) = oneshot::channel();
self.send(Message::Fadvise {
req: FadviseRequest {
target,
offset,
len,
advice,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn ftruncate(&self, file: &impl UringTarget, len: impl TryInto<u64>) -> io::Result<()> {
let len: u64 = len
.try_into()
.map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
let target = file.as_target(&self.identity);
let (tx, rx) = oneshot::channel();
self.send(Message::Ftruncate {
req: FtruncateRequest { target, len },
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn open(&self, path: impl AsRef<Path>, flags: i32, mode: u32) -> io::Result<OwnedFd> {
self.open_at(libc::AT_FDCWD, path, flags, mode).await
}
pub async fn open_at(
&self,
dir_fd: RawFd,
path: impl AsRef<Path>,
flags: i32,
mode: u32,
) -> io::Result<OwnedFd> {
let path = path_to_cstring(path.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::OpenAt {
req: OpenAtRequest {
dir_fd,
path,
flags,
mode,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn statx_path(&self, path: impl AsRef<Path>) -> io::Result<Metadata> {
self.statx_at(libc::AT_FDCWD, path, 0).await
}
pub async fn statx_at(
&self,
dir_fd: RawFd,
path: impl AsRef<Path>,
flags: i32,
) -> io::Result<Metadata> {
let path = path_to_cstring(path.as_ref())?;
let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
let (tx, rx) = oneshot::channel();
self.send(Message::StatxPath {
req: StatxPathRequest {
dir_fd,
path,
flags,
statx_buf,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn close(&self, fd: impl IntoRawFd) -> io::Result<()> {
let raw_fd = fd.into_raw_fd();
let (tx, rx) = oneshot::channel();
self.send(Message::Close {
req: CloseRequest { fd: raw_fd },
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn rename(
&self,
old_path: impl AsRef<Path>,
new_path: impl AsRef<Path>,
) -> io::Result<()> {
self
.rename_at(libc::AT_FDCWD, old_path, libc::AT_FDCWD, new_path, 0)
.await
}
pub async fn rename_at(
&self,
old_dir_fd: RawFd,
old_path: impl AsRef<Path>,
new_dir_fd: RawFd,
new_path: impl AsRef<Path>,
flags: u32,
) -> io::Result<()> {
let old_path = path_to_cstring(old_path.as_ref())?;
let new_path = path_to_cstring(new_path.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::RenameAt {
req: RenameAtRequest {
old_dir_fd,
old_path,
new_dir_fd,
new_path,
flags,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn unlink(&self, path: impl AsRef<Path>) -> io::Result<()> {
self.unlink_at(libc::AT_FDCWD, path, 0).await
}
pub async fn rmdir(&self, path: impl AsRef<Path>) -> io::Result<()> {
self
.unlink_at(libc::AT_FDCWD, path, libc::AT_REMOVEDIR)
.await
}
pub async fn unlink_at(
&self,
dir_fd: RawFd,
path: impl AsRef<Path>,
flags: i32,
) -> io::Result<()> {
let path = path_to_cstring(path.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::UnlinkAt {
req: UnlinkAtRequest {
dir_fd,
path,
flags,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn mkdir(&self, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
self.mkdir_at(libc::AT_FDCWD, path, mode).await
}
pub async fn mkdir_at(&self, dir_fd: RawFd, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
let path = path_to_cstring(path.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::MkdirAt {
req: MkdirAtRequest { dir_fd, path, mode },
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn symlink(
&self,
target: impl AsRef<Path>,
link_path: impl AsRef<Path>,
) -> io::Result<()> {
self.symlink_at(target, libc::AT_FDCWD, link_path).await
}
pub async fn symlink_at(
&self,
target: impl AsRef<Path>,
new_dir_fd: RawFd,
link_path: impl AsRef<Path>,
) -> io::Result<()> {
let target = path_to_cstring(target.as_ref())?;
let link_path = path_to_cstring(link_path.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::SymlinkAt {
req: SymlinkAtRequest {
new_dir_fd,
target,
link_path,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
pub async fn hard_link(
&self,
original: impl AsRef<Path>,
link: impl AsRef<Path>,
) -> io::Result<()> {
self
.hard_link_at(libc::AT_FDCWD, original, libc::AT_FDCWD, link, 0)
.await
}
pub async fn hard_link_at(
&self,
old_dir_fd: RawFd,
original: impl AsRef<Path>,
new_dir_fd: RawFd,
link: impl AsRef<Path>,
flags: i32,
) -> io::Result<()> {
let old_path = path_to_cstring(original.as_ref())?;
let new_path = path_to_cstring(link.as_ref())?;
let (tx, rx) = oneshot::channel();
self.send(Message::LinkAt {
req: LinkAtRequest {
old_dir_fd,
old_path,
new_dir_fd,
new_path,
flags,
},
res: tx,
});
rx.await.expect("uring completion channel dropped")
}
}