rad/async.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
//! Wrappers around `rados_completion_t`, providing a safe, futures-based API
//! for asynchronous RADOS operations.
use std::mem;
use std::ptr;
use std::sync::Arc;
use ceph_rust::rados::{self, rados_completion_t};
use futures::{Async, Future, Poll};
use futures::task::AtomicTask;
use libc;
use errors::{self, Error, Result};
/// The result of a `Completion`'s successful execution.
#[derive(Debug)]
pub struct Return<T> {
/// The data previously stored in the `Completion<T>`.
pub data: T,
/// The non-error return value of the RADOS completion.
pub value: u32,
}
/// The info struct passed into a RADOS callback, providing a trigger to potentially deallocate
/// associated data and also an `AtomicTask` object for notifying all registered tasks.
struct CompletionInfo<T> {
task: Arc<AtomicTask>,
data: Arc<T>,
}
/// The callback passed into librados, and called on future completion.
extern "C" fn callback<T>(_handle: rados_completion_t, info_ptr: *mut libc::c_void) {
let CompletionInfo { task, data } =
*unsafe { Box::from_raw(info_ptr as *mut CompletionInfo<T>) };
// Allow a poll to unwrap the contained data.
mem::drop(data);
// `AtomicTask` should be notified *after* data is produced. Data is produced, here, by
// reducing the strong reference count of the `data: Arc<T>` to `1`, and thus allowing a
// successful `.poll()` to `Arc::try_unwrap()` the data.
task.notify();
}
/// The type of a wrapped `rados_completion_t`, with associated allocated custom data and
/// `AtomicTask`. This is a bare-metal `RadosFuture`.
#[derive(Debug)]
pub struct Completion<T> {
task: Arc<AtomicTask>,
data: Option<Arc<T>>,
handle: rados_completion_t,
}
impl<T> Completion<T> {
/// Construct a new `Completion` from a piece of data and an initialization function. The
/// initialization function takes in a `rados_completion_t` and is intended to call a
/// `rados_aio_*` function on it, which will manipulate the completion's internal state and
/// return an error code, which can be reified to a `Result<()>` using `errors::librados`.
pub fn new<F>(data: T, init: F) -> Result<Completion<T>>
where
F: FnOnce(rados_completion_t) -> Result<()>,
{
let mut completion_handle = ptr::null_mut();
let task = Arc::new(AtomicTask::new());
let data = Arc::new(data);
let info_ptr = Box::into_raw(Box::new(CompletionInfo {
task: task.clone(),
data: data.clone(),
}));
let callback_ptr = callback::<T> as extern "C" fn(*mut libc::c_void, *mut libc::c_void);
// Kraken and later Ceph releases *make no distinction* between the "acked" and "complete"
// callbacks. As such, we are free to use strictly the "complete" callback.
errors::librados(unsafe {
rados::rados_aio_create_completion(
info_ptr as *mut libc::c_void,
Some(callback_ptr),
None,
&mut completion_handle,
)
})?;
match init(completion_handle) {
Ok(()) => {
Ok(Completion {
task,
data: Some(data),
handle: completion_handle,
})
}
Err(error) => {
unsafe {
rados::rados_aio_release(completion_handle);
}
Err(error)
}
}
}
}
impl<T> Future for Completion<T> {
type Item = Return<T>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// `AtomicTask` should have `.register()` called before a consumer checks for produced data.
self.task.register();
let value =
errors::librados_res(unsafe { rados::rados_aio_get_return_value(self.handle) })?;
match Arc::try_unwrap(self.data.take().unwrap()) {
Ok(data) => Ok(Async::Ready(Return { value, data })),
Err(arc) => {
self.data = Some(arc);
Ok(Async::NotReady)
}
}
}
}
impl<T> Drop for Completion<T> {
fn drop(&mut self) {
unsafe {
rados::rados_aio_release(self.handle);
}
}
}
/// Conceptually, the `T` is only ever accessed from this `Completion`. Even when dropped, the
/// `T` is never accessed in a concurrent manner; and thus if `T` is `Send`, `Completion<T>`
/// is `Send`.
///
/// Another way to look at it is this: while `T` is inside an `Arc` at all, *it is only (if ever)
/// accessed by FFI code*, which has the responsibility of ensuring its accesses remain sane. As
/// such, we are free to simply mandate `T` is `Send`, as the responsibility of ensuring `Sync`
/// accesses to whatever buffer `T` is being used as falls to the foreign code.
unsafe impl<T: Send> Send for Completion<T> {}