use core::pin::Pin;
use core::ptr;
use core::sync::atomic::AtomicU32;
use core::sync::atomic::Ordering::*;
use core::task::{Context, Poll};
use core::{mem, ptr::NonNull};
pub mod ffi;
mod socket;
use pin_project::pin_project;
pub use socket::{SocketAddrStorage, socket_addr_to_dual_stack};
use alloc::boxed::Box;
use alloc::{sync::Arc, vec::Vec};
use derive_more::Deref;
use crate::Guard;
use crate::linux::sys::{self, Errno, SockAddr, SyscallError};
use crate::sync::lock::{Lock, LockKey, Mutex};
use crate::sync::{PinnedWaiter, Signal, WaitersExt};
use crate::sync::{Waiter, Waiters, q::Queue, ring::Ring, slab::Slab};
#[derive(Deref, Clone, Copy)]
pub struct Fd(i32);
pub type Result<T> = core::result::Result<T, IoUringError>;
pub struct Submission {
ring: NonNull<u8>,
ring_size: usize,
sqes: NonNull<ffi::Sqe>,
sqe_count: u32,
head: *const AtomicU32,
tail: *mut AtomicU32,
mask: u32,
entries: u32,
flags: *const AtomicU32,
dropped: *const AtomicU32,
array: NonNull<u32>,
cached_head: AtomicU32,
pending: AtomicU32,
submitters: Waiters,
}
pub struct Completions {
ring: NonNull<u8>,
ring_size: usize,
cqes: NonNull<ffi::Cqe>,
head: *mut AtomicU32,
tail: *const AtomicU32,
mask: u32,
entries: u32,
overflow: *const AtomicU32,
flags: *const AtomicU32,
cached_tail: AtomicU32,
waiters: Waiters,
}
pub struct Inner {
ring: Fd,
sq: Submission,
cq: Completions,
ops: Mutex<Slab<Op>>,
submits: Ring<Submit>,
completions: Queue<Complete>,
}
pub type SlabIndex = usize;
pub struct Op {
waiter: Waiter,
op: Type,
fd: Fd,
buffer: NonNull<u8>,
len: usize,
offset: u64,
result: Option<i32>,
link_next: Option<usize>,
}
unsafe impl Send for Op {}
unsafe impl Sync for Op {}
pub struct Submit {
idx: SlabIndex,
sqe: Sqe,
}
pub struct Sqe {
fd: Fd,
opcode: u8,
flags: u8,
off: u64,
addr: u64,
len: u32,
}
#[repr(C)]
pub struct IoVec {
pub base: *mut u8,
pub len: usize,
}
unsafe impl Send for IoVec {}
impl<'a> From<&'a mut Vec<u8>> for IoVec {
fn from(value: &'a mut Vec<u8>) -> Self {
Self {
base: value.as_mut_ptr(),
len: value.len(),
}
}
}
impl<'a> From<&'a mut [u8]> for IoVec {
fn from(value: &'a mut [u8]) -> Self {
Self {
base: value.as_mut_ptr(),
len: value.len(),
}
}
}
#[repr(C)]
pub struct MsgHdr {
pub name: *mut core::ffi::c_void, pub namelen: u32, pub iov: *mut IoVec, pub iovlen: usize, pub control: *mut core::ffi::c_void, pub controllen: usize, pub flags: i32, }
unsafe impl Send for MsgHdr {}
unsafe impl Sync for MsgHdr {}
struct Complete {
idx: SlabIndex,
result: i32,
flags: u32,
}
#[repr(u8)]
#[derive(Clone, Copy)]
enum Type {
Read = 0,
Write = 1,
Fsync = 2,
Accept = 3,
Connect = 4,
Socket = 5,
Close = 6,
SendMsg = 7,
RecvMsg = 8,
ProvideBuffers = 9,
RemoveBuffers = 10,
Openat = 11,
Statx = 12,
Mkdirat = 13,
Unlinkat = 14,
Renameat = 15,
}
pub struct Batch<'a> {
ring: &'a IoUring,
ops: Vec<SlabIndex>, }
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IoUringError {
#[error("Invalid number of entries specified")]
InvalidEntries,
#[error("Too many entries requested")]
TooManyEntries,
#[error("Permission denied")]
PermissionDenied,
#[error("Out of memory")]
OutOfMemory,
#[error("io_uring not supported on this kernel")]
UnsupportedOperation,
#[error("Ring has been shut down")]
RingShutdown,
#[error("Submission queue full")]
SubmissionQueueFull,
#[error("Completion queue overflow")]
CompletionQueueOverflow,
#[error("Invalid file descriptor")]
InvalidDescriptor,
#[error("Operation canceled")]
OperationCanceled,
#[error("Buffer too small")]
BufferTooSmall,
#[error("Invalid memory address")]
InvalidAddress,
#[error("Memory mapping failed")]
MmapFailed,
#[error("Unknown system error: {0}")]
System(i32),
}
impl From<SyscallError> for IoUringError {
fn from(err: SyscallError) -> Self {
match err.errno() {
Errno::Inval => IoUringError::InvalidEntries,
Errno::MFile => IoUringError::TooManyEntries,
Errno::NFile => IoUringError::TooManyEntries,
Errno::Perm => IoUringError::PermissionDenied,
Errno::Acces => IoUringError::PermissionDenied,
Errno::NoMem => IoUringError::OutOfMemory,
Errno::NoSys => IoUringError::UnsupportedOperation,
Errno::Busy => IoUringError::SubmissionQueueFull,
Errno::BadF => IoUringError::InvalidDescriptor,
Errno::Fault => IoUringError::InvalidAddress,
Errno::Shutdown => IoUringError::RingShutdown,
Errno::NoBufs => IoUringError::BufferTooSmall,
_ => IoUringError::System(err.raw()),
}
}
}
pub struct IoUring(Arc<Inner>);
unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}
impl IoUring {
pub fn with_capacity(capacity: u32) -> Result<Self> {
let mut params = ffi::Params {
sq_entries: capacity,
cq_entries: capacity * 2,
..unsafe { mem::zeroed() }
};
let fd = unsafe {
sys::io_uring_setup(capacity, &mut params as *mut _ as *mut _)
.map_err(IoUringError::from)?
};
let ring = Fd(fd as i32);
let sq_ring_size =
params.sq_off.array as usize + params.sq_entries as usize * mem::size_of::<u32>();
let sq_ring = unsafe {
sys::mmap(
ptr::null_mut(),
sq_ring_size,
ffi::Prot::Read | ffi::Prot::Write,
ffi::Map::Shared | ffi::Map::Populate,
*ring,
ffi::MmapOffset::SqRing.as_u64() as i64,
)
.map_err(IoUringError::from)?
};
let sqe_size = params.sq_entries as usize * mem::size_of::<ffi::Sqe>();
let sqes = unsafe {
sys::mmap(
ptr::null_mut(),
sqe_size,
ffi::Prot::Read | ffi::Prot::Write,
ffi::Map::Shared | ffi::Map::Populate,
*ring,
ffi::MmapOffset::Sqes.as_u64() as i64,
)
.map_err(IoUringError::from)?
};
let cq_ring_size = params.cq_entries as usize * mem::size_of::<ffi::Cqe>();
let cq_ring = unsafe {
sys::mmap(
ptr::null_mut(),
sqe_size,
ffi::Prot::Read | ffi::Prot::Write,
ffi::Map::Shared | ffi::Map::Populate,
*ring,
ffi::MmapOffset::Sqes.as_u64() as i64,
)
.map_err(IoUringError::from)?
};
let sq = unsafe {
let base = sq_ring as *mut u8;
Submission {
ring: NonNull::new_unchecked(sq_ring as *mut u8),
ring_size: sq_ring_size,
sqes: NonNull::new_unchecked(sqes as *mut ffi::Sqe),
sqe_count: params.sq_entries,
head: base.add(params.sq_off.head as usize) as *const AtomicU32,
tail: base.add(params.sq_off.tail as usize) as *mut AtomicU32,
mask: *((base.add(params.sq_off.ring_mask as usize)) as *const u32),
entries: *((base.add(params.sq_off.ring_entries as usize)) as *const u32),
flags: base.add(params.sq_off.flags as usize) as *const AtomicU32,
dropped: base.add(params.sq_off.dropped as usize) as *const AtomicU32,
array: NonNull::new_unchecked(base.add(params.sq_off.array as usize) as *mut u32),
cached_head: AtomicU32::new(0),
pending: AtomicU32::new(0),
submitters: Waiters::new(),
}
};
let cq = unsafe {
let base = cq_ring as *mut u8;
Completions {
ring: NonNull::new_unchecked(cq_ring as *mut u8),
ring_size: cq_ring_size,
cqes: NonNull::new_unchecked(base.add(params.cq_off.cqes as usize) as *mut ffi::Cqe),
head: base.add(params.cq_off.head as usize) as *mut AtomicU32,
tail: base.add(params.cq_off.tail as usize) as *const AtomicU32,
mask: *((base.add(params.cq_off.ring_mask as usize)) as *const u32),
entries: *((base.add(params.cq_off.ring_entries as usize)) as *const u32),
overflow: base.add(params.cq_off.overflow as usize) as *const AtomicU32,
flags: base.add(params.cq_off.flags as usize) as *const AtomicU32,
cached_tail: AtomicU32::new(0),
waiters: Waiters::new(),
}
};
let inner = Arc::new(Inner {
ring,
sq,
cq,
ops: Mutex::new(Slab::new()),
submits: Ring::new(1024), completions: Queue::new(),
});
Ok(IoUring(inner))
}
pub async fn submit(&self) -> Result<u32> {
self.submit_and_wait(None).await
}
pub async fn submit_and_wait(&self, wait_for: Option<u32>) -> Result<u32> {
SubmitFuture {
ring: self,
wait_for,
waiter: None,
}
.await
}
fn get_sqe(&self) -> Option<(*mut ffi::Sqe, u32)> {
unsafe {
let sq = &self.0.sq;
let head = (*sq.head).load(Acquire);
let tail = (*sq.tail).load(Relaxed);
let next = tail.wrapping_add(1);
if next.wrapping_sub(head) > sq.entries {
return None;
}
let idx = tail & sq.mask;
let sqe = sq.sqes.as_ptr().add(idx as usize);
Some((sqe, idx))
}
}
pub async fn wait_cqe(&self) -> Result<Complete> {
CqeFuture {
ring: self,
waiter: None,
}
.await
}
pub async fn wait_cqes(&self, count: u32) -> Result<Vec<Complete>> {
let mut completions = Vec::with_capacity(count as usize);
for _ in 0..count {
completions.push(self.wait_cqe().await?);
}
Ok(completions)
}
fn peek_cqe(&self) -> Option<*const ffi::Cqe> {
unsafe {
let cq = &self.0.cq;
let head = (*cq.head).load(Relaxed);
let tail = (*cq.tail).load(Acquire);
if head == tail {
return None;
}
let idx = head & cq.mask;
Some(cq.cqes.as_ptr().add(idx as usize))
}
}
fn cq_advance(&self, count: u32) {
unsafe {
let cq = &self.0.cq;
let head = (*cq.head).load(Relaxed);
(*cq.head).store(head.wrapping_add(count), Release);
}
}
async fn reap_completions(&self) {
while let Some(cqe_ptr) = self.peek_cqe() {
unsafe {
let cqe = &*cqe_ptr;
let idx = cqe.user_data as SlabIndex;
if let Some(op) = self.0.ops.lock(Waiter::default()).await.get_mut(idx) {
op.result = Some(cqe.res);
op.waiter.signal();
op.waiter.wake_by_ref();
self.0.completions.enqueue(Complete {
idx,
result: cqe.res,
flags: cqe.flags,
});
}
}
self.cq_advance(1);
}
self.0.cq.waiters.notify_all();
}
pub fn sq_ready(&self) -> u32 {
self.0.sq.pending.load(Relaxed)
}
fn sq_advance(&self) {
unsafe {
let sq = &self.0.sq;
let tail = (*sq.tail).load(Relaxed);
(*sq.tail).store(tail.wrapping_add(1), Release);
sq.pending.fetch_add(1, Relaxed);
}
}
pub async fn read<'a>(&'a self, fd: Fd, buf: &'a mut [u8], offset: u64) -> Read<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Read,
fd,
buffer: NonNull::new(buf.as_mut_ptr()).unwrap(),
len: buf.len(),
offset,
result: None,
link_next: None,
});
Read {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn write<'a>(&'a self, fd: Fd, buf: &'a [u8], offset: u64) -> Write<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Write,
fd,
buffer: NonNull::new(buf.as_ptr() as *mut u8).unwrap(),
len: buf.len(),
offset,
result: None,
link_next: None,
});
Write {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn register_files(&self, fds: &[Fd]) -> Result<()> {
RegisterFilesFuture {
ring: self,
fds: fds.to_vec(),
started: false,
}
.await
}
pub async fn unregister_files(&self) -> Result<()> {
UnregisterFilesFuture {
ring: self,
started: false,
}
.await
}
pub async fn register_buffers(&self, buffers: Vec<Vec<u8>>) -> Result<()> {
RegisterBuffersFuture {
ring: self,
buffers,
started: false,
}
.await
}
pub async fn fsync<'a>(&'a self, fd: Fd, flags: u32) -> Fsync<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Fsync,
fd,
buffer: NonNull::dangling(),
len: flags as usize,
offset: 0,
result: None,
link_next: None,
});
Fsync {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn accept<'a>(&'a self, fd: Fd) -> Accept<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Accept,
fd,
buffer: NonNull::dangling(),
len: 0,
offset: 0,
result: None,
link_next: None,
});
Accept {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
addr_buf: [0u8; 128],
}
}
pub async fn connect<'a>(&'a self, fd: Fd, addr: SocketAddrStorage) -> Connect<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Connect,
fd,
buffer: NonNull::dangling(),
len: mem::size_of::<SocketAddrStorage>(),
offset: 0,
result: None,
link_next: None,
});
Connect {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
addr,
}
}
pub async fn socket<'a>(&'a self, domain: i32, sock_type: i32, protocol: i32) -> Socket<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Socket,
fd: Fd(0), buffer: NonNull::dangling(),
len: domain as usize,
offset: sock_type as u64,
result: None,
link_next: None,
});
Socket {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
protocol,
}
}
pub async fn close<'a>(&'a self, fd: Fd) -> Close<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Close,
fd,
buffer: NonNull::dangling(),
len: 0,
offset: 0,
result: None,
link_next: None,
});
Close {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn recvmsg<'a>(&'a self, fd: Fd, msg: &'a mut MsgHdr) -> RecvMsg<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::RecvMsg,
fd,
buffer: NonNull::new(msg as *mut _ as *mut u8).unwrap(),
len: 0,
offset: 0,
result: None,
link_next: None,
});
RecvMsg {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn sendmsg<'a>(&'a self, fd: Fd, msg: &'a MsgHdr, flags: i32) -> SendMsg<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::SendMsg,
fd,
buffer: NonNull::new(msg as *const _ as *mut u8).unwrap(),
len: flags as usize,
offset: 0,
result: None,
link_next: None,
});
SendMsg {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn sq_needs_wakeup(&self) -> bool {
unsafe {
let flags = (*self.0.sq.flags).load(Acquire);
flags & (1 << 0) != 0
}
}
pub fn as_raw_fd(&self) -> i32 {
*self.0.ring
}
pub async fn openat<'a>(
&'a self,
dfd: i32,
path: &'a [u8],
flags: i32,
mode: u32,
) -> Openat<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Openat,
fd: Fd(dfd),
buffer: NonNull::new(path.as_ptr() as *mut u8).unwrap(),
len: path.len(),
offset: ((flags as u64) << 32) | (mode as u64),
result: None,
link_next: None,
});
Openat {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn statx<'a>(
&'a self,
dfd: i32,
path: &'a [u8],
flags: i32,
mask: u32,
statx_buf: &'a mut Statx,
) -> StatxOp<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Statx,
fd: Fd(dfd),
buffer: NonNull::new(path.as_ptr() as *mut u8).unwrap(),
len: path.len(),
offset: ((flags as u64) << 32) | (mask as u64),
result: None,
link_next: None,
});
StatxOp {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
statx_buf,
}
}
pub async fn mkdirat<'a>(&'a self, dfd: i32, path: &'a [u8], mode: u32) -> Mkdirat<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Mkdirat,
fd: Fd(dfd),
buffer: NonNull::new(path.as_ptr() as *mut u8).unwrap(),
len: path.len(),
offset: mode as u64,
result: None,
link_next: None,
});
Mkdirat {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn unlinkat<'a>(&'a self, dfd: i32, path: &'a [u8], flags: i32) -> Unlinkat<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Unlinkat,
fd: Fd(dfd),
buffer: NonNull::new(path.as_ptr() as *mut u8).unwrap(),
len: path.len(),
offset: flags as u64,
result: None,
link_next: None,
});
Unlinkat {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
}
}
pub async fn renameat<'a>(
&'a self,
olddfd: i32,
oldpath: &'a [u8],
newdfd: i32,
newpath: &'a [u8],
) -> Renameat<'a> {
let idx = self.0.ops.lock(Waiter::default()).await.insert(Op {
waiter: Waiter::default(),
op: Type::Renameat,
fd: Fd(olddfd),
buffer: NonNull::new(oldpath.as_ptr() as *mut u8).unwrap(),
len: oldpath.len(),
offset: newdfd as u64,
result: None,
link_next: None,
});
Renameat {
ring: self,
idx: Some(idx),
submitted: false,
lock_future: None,
guard: None,
newpath,
}
}
}
struct SubmitFuture<'a> {
ring: &'a IoUring,
wait_for: Option<u32>,
waiter: Option<Waiter>,
}
impl<'a> Future for SubmitFuture<'a> {
type Output = Result<u32>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { wait_for, .. } = &*self;
let Inner {
ring,
sq,
cq,
ops,
submits,
completions,
} = &*self.ring.0;
let to_submit = sq.pending.load(Relaxed);
if to_submit == 0 && wait_for.is_none() {
return Poll::Ready(Ok(0));
}
let flags = if wait_for.is_some() {
ffi::Enter::GetEvents.as_u32()
} else {
0
};
let ret = unsafe {
sys::io_uring_enter(
**ring,
to_submit,
wait_for.unwrap_or_default(),
flags,
ptr::null(),
0,
)
};
match ret {
Ok(submitted) => {
sq.pending.fetch_sub(submitted, Relaxed);
Poll::Ready(Ok(submitted))
}
Err(e) if e.errno() == Errno::Again => {
let mut waiter = Waiter::default();
waiter.assign_waker(cx.waker().clone());
unsafe { sq.submitters.enqueue(waiter) };
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
struct CqeFuture<'a> {
ring: &'a IoUring,
waiter: Option<Pin<Box<Waiter>>>,
}
impl<'a> Future for CqeFuture<'a> {
type Output = Result<Complete>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(complete) = unsafe { self.ring.0.completions.dequeue() } {
return Poll::Ready(Ok(complete));
}
self.ring.reap_completions();
if let Some(complete) = unsafe { self.ring.0.completions.dequeue() } {
return Poll::Ready(Ok(complete));
}
if self.waiter.is_none() {
let waiter = Waiter::default();
waiter.assign_waker(cx.waker().clone());
unsafe { self.ring.0.cq.waiters.enqueue(waiter) };
}
if self.ring.sq_ready() > 0 {
let _ = self.ring.submit();
}
Poll::Pending
}
}
pub struct Read<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Read<'a> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Read as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = op.offset;
(*sqe).addr = op.buffer.as_ptr() as u64;
(*sqe).len = op.len as u32;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(result as usize))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Read<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Write<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Write<'a> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Write as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = op.offset;
(*sqe).addr = op.buffer.as_ptr() as u64;
(*sqe).len = op.len as u32;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(result as usize))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Write<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
struct RegisterFilesFuture<'a> {
ring: &'a IoUring,
fds: Vec<Fd>,
started: bool,
}
impl<'a> Future for RegisterFilesFuture<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.started {
let ret = unsafe {
sys::io_uring_register(
self.ring.as_raw_fd(),
ffi::Register::Files as u32,
self.fds.as_ptr() as *const _,
self.fds.len() as u32,
)
};
self.started = true;
match ret {
Ok(_) => Poll::Ready(Ok(())),
Err(e) if e.errno() == Errno::Again => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())),
}
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
struct UnregisterFilesFuture<'a> {
ring: &'a IoUring,
started: bool,
}
impl<'a> Future for UnregisterFilesFuture<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.started {
let ret = unsafe {
sys::io_uring_register(
self.ring.as_raw_fd(),
ffi::Register::UnregisterFiles as u32,
ptr::null(),
0,
)
};
self.started = true;
match ret {
Ok(_) => Poll::Ready(Ok(())),
Err(e) if e.errno() == Errno::Again => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())),
}
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
struct RegisterBuffersFuture<'a> {
ring: &'a IoUring,
buffers: Vec<Vec<u8>>,
started: bool,
}
impl<'a> Future for RegisterBuffersFuture<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.started {
let iovecs: Vec<IoVec> = self.buffers.iter_mut().map(IoVec::from).collect();
let ret = unsafe {
sys::io_uring_register(
self.ring.as_raw_fd(),
ffi::Register::Buffers as u32,
iovecs.as_ptr() as *const _,
iovecs.len() as u32,
)
};
self.started = true;
match ret {
Ok(_) => Poll::Ready(Ok(())),
Err(e) if e.errno() == Errno::Again => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())),
}
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
pub struct Fsync<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Fsync<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Fsync as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = 0;
(*sqe).len = op.len as u32; (*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Fsync<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Accept<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
addr_buf: [u8; 128],
}
impl<'a> Future for Accept<'a> {
type Output = Result<(Fd, SocketAddrStorage)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Accept as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = self.addr_buf.as_mut_ptr() as u64;
(*sqe).len = self.addr_buf.len() as u32;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
let new_fd = Fd(result);
let addr = unsafe {
ptr::read(self.addr_buf.as_ptr() as *const SocketAddrStorage)
};
Poll::Ready(Ok((new_fd, addr)))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Accept<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Connect<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
addr: SocketAddrStorage,
}
impl<'a> Future for Connect<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Connect as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = &self.addr as *const _ as u64;
(*sqe).len = mem::size_of::<SocketAddrStorage>() as u32;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Connect<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Socket<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
protocol: i32,
}
impl<'a> Future for Socket<'a> {
type Output = Result<Fd>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Socket as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.len as i32; (*sqe).off = op.offset; (*sqe).addr = self.protocol as u64;
(*sqe).len = 0;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(Fd(result)))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Socket<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Close<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Close<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Close as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = 0;
(*sqe).len = 0;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Close<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct SendMsg<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for SendMsg<'a> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Sendmsg as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = op.buffer.as_ptr() as u64;
(*sqe).len = 0;
(*sqe).rw_flags = op.len as u32; (*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(result as usize))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for SendMsg<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct RecvMsg<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for RecvMsg<'a> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Recvmsg as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0;
(*sqe).off = 0;
(*sqe).addr = op.buffer.as_ptr() as u64;
(*sqe).len = 0;
(*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(result as usize))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for RecvMsg<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
#[repr(C)]
pub struct Statx {
pub stx_mask: u32,
pub stx_blksize: u32,
pub stx_attributes: u64,
pub stx_nlink: u32,
pub stx_uid: u32,
pub stx_gid: u32,
pub stx_mode: u16,
__spare0: [u16; 1],
pub stx_ino: u64,
pub stx_size: u64,
pub stx_blocks: u64,
pub stx_attributes_mask: u64,
pub stx_atime: StatxTimestamp,
pub stx_btime: StatxTimestamp,
pub stx_ctime: StatxTimestamp,
pub stx_mtime: StatxTimestamp,
pub stx_rdev_major: u32,
pub stx_rdev_minor: u32,
pub stx_dev_major: u32,
pub stx_dev_minor: u32,
__spare2: [u64; 14],
}
#[repr(C)]
#[derive(Clone, Copy)]
pub struct StatxTimestamp {
pub tv_sec: i64,
pub tv_nsec: u32,
__reserved: u32,
}
pub const AT_FDCWD: i32 = -100;
pub const AT_REMOVEDIR: i32 = 0x200;
pub const AT_SYMLINK_NOFOLLOW: i32 = 0x100;
pub const AT_EMPTY_PATH: i32 = 0x1000;
pub const AT_STATX_SYNC_AS_STAT: i32 = 0x0000;
pub const O_RDONLY: i32 = 0;
pub const O_WRONLY: i32 = 1;
pub const O_RDWR: i32 = 2;
pub const O_CREAT: i32 = 0x40;
pub const O_EXCL: i32 = 0x80;
pub const O_TRUNC: i32 = 0x200;
pub const O_APPEND: i32 = 0x400;
pub const O_NONBLOCK: i32 = 0x800;
pub const O_DIRECTORY: i32 = 0x10000;
pub const O_CLOEXEC: i32 = 0x80000;
pub const S_IRUSR: u32 = 0o400;
pub const S_IWUSR: u32 = 0o200;
pub const S_IXUSR: u32 = 0o100;
pub const S_IRGRP: u32 = 0o040;
pub const S_IWGRP: u32 = 0o020;
pub const S_IXGRP: u32 = 0o010;
pub const S_IROTH: u32 = 0o004;
pub const S_IWOTH: u32 = 0o002;
pub const S_IXOTH: u32 = 0o001;
pub const STATX_TYPE: u32 = 0x0001;
pub const STATX_MODE: u32 = 0x0002;
pub const STATX_NLINK: u32 = 0x0004;
pub const STATX_UID: u32 = 0x0008;
pub const STATX_GID: u32 = 0x0010;
pub const STATX_ATIME: u32 = 0x0020;
pub const STATX_MTIME: u32 = 0x0040;
pub const STATX_CTIME: u32 = 0x0080;
pub const STATX_INO: u32 = 0x0100;
pub const STATX_SIZE: u32 = 0x0200;
pub const STATX_BLOCKS: u32 = 0x0400;
pub const STATX_BASIC_STATS: u32 = 0x07ff;
pub struct Openat<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Openat<'a> {
type Output = Result<Fd>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Openat as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0; (*sqe).off = op.offset; (*sqe).addr = op.buffer.as_ptr() as u64; (*sqe).len = op.len as u32; (*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(Fd(result)))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Openat<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct StatxOp<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
statx_buf: &'a mut Statx,
}
impl<'a> Future for StatxOp<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Statx as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0; (*sqe).off = op.offset; (*sqe).addr = op.buffer.as_ptr() as u64; (*sqe).len = op.len as u32; (*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2[0] = &mut *self.statx_buf as *mut _ as u64;
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for StatxOp<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Mkdirat<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Mkdirat<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Mkdirat as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0; (*sqe).off = op.offset; (*sqe).addr = op.buffer.as_ptr() as u64; (*sqe).len = op.len as u32; (*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Mkdirat<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Unlinkat<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
}
impl<'a> Future for Unlinkat<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Unlinkat as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0; (*sqe).off = op.offset; (*sqe).addr = op.buffer.as_ptr() as u64; (*sqe).len = op.len as u32; (*sqe).rw_flags = 0;
(*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
(*sqe).__pad2 = [0; 2];
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Unlinkat<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}
pub struct Renameat<'a> {
ring: &'a IoUring,
idx: Option<SlabIndex>,
submitted: bool,
lock_future: Option<Pin<Box<dyn Future<Output = Guard<'a, Mutex<Slab<Op>>>> + 'a + Send>>>,
guard: Option<Guard<'a, Mutex<Slab<Op>>>>,
newpath: &'a [u8],
}
impl<'a> Future for Renameat<'a> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.submitted {
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_ref().unwrap();
if let Some((sqe, array_idx)) = self.ring.get_sqe() {
let op = guard.get(idx).unwrap();
unsafe {
(*sqe).opcode = ffi::OpCode::Renameat as u8;
(*sqe).flags = 0;
(*sqe).ioprio = 0;
(*sqe).fd = op.fd.0; (*sqe).off = op.offset; (*sqe).addr = op.buffer.as_ptr() as u64; (*sqe).len = op.len as u32; (*sqe).__pad2[0] = self.newpath.as_ptr() as u64; (*sqe).rw_flags = self.newpath.len() as u32; (*sqe).user_data = idx as u64;
(*sqe).buf_index = 0;
(*sqe).personality = 0;
(*sqe).splice_fd_in = 0;
*self.ring.0.sq.array.as_ptr().add(array_idx as usize) = array_idx;
}
self.ring.sq_advance();
self.submitted = true;
self.guard
.as_mut()
.unwrap()
.get_mut(idx)
.unwrap()
.waiter
.assign_waker(cx.waker().clone());
self.guard = None;
} else {
self.guard = None;
return Poll::Pending;
}
}
}
if let Some(idx) = self.idx {
if self.guard.is_none() {
if self.lock_future.is_none() {
self.lock_future = Some(Box::pin(self.ring.0.ops.lock(Waiter::default())));
}
match Pin::new(self.lock_future.as_mut().unwrap()).poll(cx) {
Poll::Ready(guard) => {
self.guard = Some(guard);
self.lock_future = None;
}
Poll::Pending => return Poll::Pending,
}
}
let guard = self.guard.as_mut().unwrap();
let op = guard.get(idx).unwrap();
if op.waiter.signaled() {
if let Some(result) = op.result {
guard.remove(idx);
self.idx = None;
self.guard = None;
if result < 0 {
Poll::Ready(Err(IoUringError::System(-result)))
} else {
Poll::Ready(Ok(()))
}
} else {
self.guard = None;
Poll::Pending
}
} else {
self.guard = None;
Poll::Pending
}
} else {
Poll::Ready(Err(IoUringError::InvalidDescriptor))
}
}
}
impl<'a> Drop for Renameat<'a> {
fn drop(&mut self) {
if let Some(idx) = self.idx {
if !self.submitted {
let ring = self.ring.0.clone();
crate::runtime::spawn_local(async move {
ring.ops.lock(Waiter::default()).await.remove(idx);
});
}
}
}
}