ceph_async/completion.rs
1// Copyright 2021 John Spray All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::c_void;
16use std::pin::Pin;
17use std::sync::Mutex;
18use std::task::{Context, Poll, Waker};
19
20use crate::ceph::IoCtx;
21use crate::error::RadosResult;
22use crate::rados::{
23 rados_aio_cancel, rados_aio_create_completion, rados_aio_get_return_value,
24 rados_aio_is_complete, rados_aio_release, rados_aio_wait_for_complete_and_cb,
25 rados_completion_t,
26};
27
28pub struct Completion<'a> {
29 inner: rados_completion_t,
30
31 // Box to provide a stable address for completion_complete callback
32 // Mutex for memory fencing when writing from poll() and reading from completion_complete()
33 waker: Box<std::sync::Mutex<Option<std::task::Waker>>>,
34
35 // A reference to the IOCtx is required to issue a cancel on
36 // the operation if we are dropped before ready. This needs
37 // to be a Rust reference rather than a raw rados_ioctx_t because otherwise
38 // there would be nothing to stop the rados_ioctx_t being invalidated
39 // during the lifetime of this Completion.
40 // (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but
41 // not for reads.)
42 ioctx: &'a IoCtx,
43}
44
45unsafe impl Send for Completion<'_> {}
46
47#[no_mangle]
48pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () {
49 let waker = unsafe {
50 let p = arg as *mut Mutex<Option<Waker>>;
51 p.as_mut().unwrap()
52 };
53
54 let waker = waker.lock().unwrap().take();
55 match waker {
56 Some(w) => w.wake(),
57 None => {}
58 }
59}
60
61impl Drop for Completion<'_> {
62 fn drop(&mut self) {
63 // Ensure that after dropping the Completion, the AIO callback
64 // will not be called on our dropped waker Box. Only necessary
65 // if we got as far as successfully starting an operation using
66 // the completion.
67 let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
68 if !am_complete {
69 unsafe {
70 let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner);
71
72 // It is unsound to proceed if the Objecter op is still in flight
73 assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);
74 }
75 }
76
77 unsafe {
78 // Even if is_complete was true, librados might not be done with
79 // our callback: wait til it is.
80 assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0);
81 }
82
83 unsafe {
84 rados_aio_release(self.inner);
85 }
86 }
87}
88
89impl std::future::Future for Completion<'_> {
90 type Output = crate::error::RadosResult<u32>;
91
92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93 // Hold lock across the check of am_complete and subsequent waker registration
94 // to avoid deadlock if callback is invoked in between.
95 let mut waker_locked = self.waker.lock().unwrap();
96
97 let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
98
99 if am_complete {
100 // Unlock Waker so that completion callback can complete if racing with us.
101 drop(waker_locked);
102
103 // Ensure librados is finished with our callback ('complete' is true
104 // before it calls that)
105 unsafe {
106 let r = rados_aio_wait_for_complete_and_cb(self.inner);
107 assert_eq!(r, 0);
108 }
109
110 let r = unsafe { rados_aio_get_return_value(self.inner) };
111 let result = if r < 0 { Err(r.into()) } else { Ok(r) };
112 std::task::Poll::Ready(result.map(|e| e as u32))
113 } else {
114 // Register a waker
115 *waker_locked = Some(cx.waker().clone());
116
117 std::task::Poll::Pending
118 }
119 }
120}
121
122/// Completions are only created via this wrapper, in order to ensure
123/// that the Completion struct is only constructed around 'armed' rados_completion_t
124/// instances (i.e. those that have been used to start an I/O).
125pub fn with_completion<F>(ioctx: &IoCtx, f: F) -> RadosResult<Completion<'_>>
126where
127 F: FnOnce(rados_completion_t) -> libc::c_int,
128{
129 let mut waker = Box::new(Mutex::new(None));
130
131 let completion = unsafe {
132 let mut completion: rados_completion_t = std::ptr::null_mut();
133 let p: *mut Mutex<Option<Waker>> = &mut *waker;
134 let p = p as *mut c_void;
135
136 let r = rados_aio_create_completion(p, Some(completion_complete), None, &mut completion);
137 if r != 0 {
138 panic!("Error {} allocating RADOS completion: out of memory?", r);
139 }
140 assert!(!completion.is_null());
141
142 completion
143 };
144
145 let ret_code = f(completion);
146
147 if ret_code < 0 {
148 // On error dispatching I/O, drop the unused rados_completion_t
149 unsafe {
150 rados_aio_release(completion);
151 drop(completion)
152 }
153 Err(ret_code.into())
154 } else {
155 // Pass the rados_completion_t into a Future-implementing wrapper and await it.
156 Ok(Completion {
157 ioctx,
158 inner: completion,
159 waker,
160 })
161 }
162}