use std::ffi::c_void;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll, Waker};
use crate::ceph::IoCtx;
use crate::error::RadosResult;
use crate::rados::{
rados_aio_cancel, rados_aio_create_completion, rados_aio_get_return_value,
rados_aio_is_complete, rados_aio_release, rados_aio_wait_for_complete_and_cb,
rados_completion_t,
};
pub struct Completion<'a> {
inner: rados_completion_t,
waker: Box<std::sync::Mutex<Option<std::task::Waker>>>,
ioctx: &'a IoCtx,
}
unsafe impl Send for Completion<'_> {}
#[no_mangle]
pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () {
let waker = unsafe {
let p = arg as *mut Mutex<Option<Waker>>;
p.as_mut().unwrap()
};
let waker = waker.lock().unwrap().take();
match waker {
Some(w) => w.wake(),
None => {}
}
}
impl Drop for Completion<'_> {
fn drop(&mut self) {
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
if !am_complete {
unsafe {
let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner);
assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);
}
}
unsafe {
assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0);
}
unsafe {
rados_aio_release(self.inner);
}
}
}
impl std::future::Future for Completion<'_> {
type Output = crate::error::RadosResult<u32>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut waker_locked = self.waker.lock().unwrap();
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
if am_complete {
drop(waker_locked);
unsafe {
let r = rados_aio_wait_for_complete_and_cb(self.inner);
assert_eq!(r, 0);
}
let r = unsafe { rados_aio_get_return_value(self.inner) };
let result = if r < 0 { Err(r.into()) } else { Ok(r) };
std::task::Poll::Ready(result.map(|e| e as u32))
} else {
*waker_locked = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}
pub fn with_completion<F>(ioctx: &IoCtx, f: F) -> RadosResult<Completion<'_>>
where
F: FnOnce(rados_completion_t) -> libc::c_int,
{
let mut waker = Box::new(Mutex::new(None));
let completion = unsafe {
let mut completion: rados_completion_t = std::ptr::null_mut();
let p: *mut Mutex<Option<Waker>> = &mut *waker;
let p = p as *mut c_void;
let r = rados_aio_create_completion(p, Some(completion_complete), None, &mut completion);
if r != 0 {
panic!("Error {} allocating RADOS completion: out of memory?", r);
}
assert!(!completion.is_null());
completion
};
let ret_code = f(completion);
if ret_code < 0 {
unsafe {
rados_aio_release(completion);
drop(completion)
}
Err(ret_code.into())
} else {
Ok(Completion {
ioctx,
inner: completion,
waker,
})
}
}