use std::mem;
use std::ptr;
use std::sync::Arc;
use ceph_rust::rados::{self, rados_completion_t};
use futures::{Async, Future, Poll};
use futures::task::AtomicTask;
use libc;
use errors::{self, Error, Result};
#[derive(Debug)]
pub struct Return<T> {
pub data: T,
pub value: u32,
}
struct CompletionInfo<T> {
task: Arc<AtomicTask>,
data: Arc<T>,
}
extern "C" fn callback<T>(_handle: rados_completion_t, info_ptr: *mut libc::c_void) {
let CompletionInfo { task, data } =
*unsafe { Box::from_raw(info_ptr as *mut CompletionInfo<T>) };
mem::drop(data);
task.notify();
}
#[derive(Debug)]
pub struct Completion<T> {
task: Arc<AtomicTask>,
data: Option<Arc<T>>,
handle: rados_completion_t,
}
impl<T> Completion<T> {
pub fn new<F>(data: T, init: F) -> Result<Completion<T>>
where
F: FnOnce(rados_completion_t) -> Result<()>,
{
let mut completion_handle = ptr::null_mut();
let task = Arc::new(AtomicTask::new());
let data = Arc::new(data);
let info_ptr = Box::into_raw(Box::new(CompletionInfo {
task: task.clone(),
data: data.clone(),
}));
let callback_ptr = callback::<T> as extern "C" fn(*mut libc::c_void, *mut libc::c_void);
errors::librados(unsafe {
rados::rados_aio_create_completion(
info_ptr as *mut libc::c_void,
Some(callback_ptr),
None,
&mut completion_handle,
)
})?;
match init(completion_handle) {
Ok(()) => {
Ok(Completion {
task,
data: Some(data),
handle: completion_handle,
})
}
Err(error) => {
unsafe {
rados::rados_aio_release(completion_handle);
}
Err(error)
}
}
}
}
impl<T> Future for Completion<T> {
type Item = Return<T>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.task.register();
let value =
errors::librados_res(unsafe { rados::rados_aio_get_return_value(self.handle) })?;
match Arc::try_unwrap(self.data.take().unwrap()) {
Ok(data) => Ok(Async::Ready(Return { value, data })),
Err(arc) => {
self.data = Some(arc);
Ok(Async::NotReady)
}
}
}
}
impl<T> Drop for Completion<T> {
fn drop(&mut self) {
unsafe {
rados::rados_aio_release(self.handle);
}
}
}
unsafe impl<T: Send> Send for Completion<T> {}