1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
// Copyright 2021 John Spray All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ffi::c_void;
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll, Waker};
use crate::ceph::IoCtx;
use crate::error::RadosResult;
use crate::rados::{
rados_aio_cancel, rados_aio_create_completion2, rados_aio_get_return_value,
rados_aio_is_complete, rados_aio_release, rados_aio_wait_for_complete_and_cb,
rados_completion_t,
};
pub struct Completion<'a> {
inner: rados_completion_t,
// Box to provide a stable address for completion_complete callback
// Mutex for memory fencing when writing from poll() and reading from completion_complete()
waker: Box<std::sync::Mutex<Option<std::task::Waker>>>,
// A reference to the IOCtx is required to issue a cancel on
// the operation if we are dropped before ready. This needs
// to be a Rust reference rather than a raw rados_ioctx_t because otherwise
// there would be nothing to stop the rados_ioctx_t being invalidated
// during the lifetime of this Completion.
// (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but
// not for reads.)
ioctx: &'a IoCtx,
}
unsafe impl Send for Completion<'_> {}
#[no_mangle]
pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () {
let waker = unsafe {
let p = arg as *mut Mutex<Option<Waker>>;
p.as_mut().unwrap()
};
let waker = waker.lock().unwrap().take();
match waker {
Some(w) => w.wake(),
None => {}
}
}
impl Drop for Completion<'_> {
fn drop(&mut self) {
// Ensure that after dropping the Completion, the AIO callback
// will not be called on our dropped waker Box. Only necessary
// if we got as far as successfully starting an operation using
// the completion.
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
if !am_complete {
unsafe {
let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner);
// It is unsound to proceed if the Objecter op is still in flight
assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);
}
}
unsafe {
// Even if is_complete was true, librados might not be done with
// our callback: wait til it is.
assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0);
}
unsafe {
rados_aio_release(self.inner);
}
}
}
impl std::future::Future for Completion<'_> {
type Output = crate::error::RadosResult<u32>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Hold lock across the check of am_complete and subsequent waker registration
// to avoid deadlock if callback is invoked in between.
let mut waker_locked = self.waker.lock().unwrap();
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
if am_complete {
// Unlock Waker so that completion callback can complete if racing with us.
drop(waker_locked);
// Ensure librados is finished with our callback ('complete' is true
// before it calls that)
unsafe {
let r = rados_aio_wait_for_complete_and_cb(self.inner);
assert_eq!(r, 0);
}
let r = unsafe { rados_aio_get_return_value(self.inner) };
let result = if r < 0 { Err(r.into()) } else { Ok(r) };
std::task::Poll::Ready(result.map(|e| e as u32))
} else {
// Register a waker
*waker_locked = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
}
/// Completions are only created via this wrapper, in order to ensure
/// that the Completion struct is only constructed around 'armed' rados_completion_t
/// instances (i.e. those that have been used to start an I/O).
pub fn with_completion<F>(ioctx: &IoCtx, f: F) -> RadosResult<Completion<'_>>
where
F: FnOnce(rados_completion_t) -> libc::c_int,
{
let mut waker = Box::new(Mutex::new(None));
let completion = unsafe {
let mut completion: rados_completion_t = std::ptr::null_mut();
let p: *mut Mutex<Option<Waker>> = &mut *waker;
let p = p as *mut c_void;
let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion);
if r != 0 {
panic!("Error {} allocating RADOS completion: out of memory?", r);
}
assert!(!completion.is_null());
completion
};
let ret_code = f(completion);
if ret_code < 0 {
// On error dispatching I/O, drop the unused rados_completion_t
unsafe {
rados_aio_release(completion);
drop(completion)
}
Err(ret_code.into())
} else {
// Pass the rados_completion_t into a Future-implementing wrapper and await it.
Ok(Completion {
ioctx,
inner: completion,
waker,
})
}
}