ceph_async/
completion.rs

1// Copyright 2021 John Spray All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ffi::c_void;
16use std::pin::Pin;
17use std::sync::Mutex;
18use std::task::{Context, Poll, Waker};
19
20use crate::ceph::IoCtx;
21use crate::error::RadosResult;
22use crate::rados::{
23    rados_aio_cancel, rados_aio_create_completion, rados_aio_get_return_value,
24    rados_aio_is_complete, rados_aio_release, rados_aio_wait_for_complete_and_cb,
25    rados_completion_t,
26};
27
28pub struct Completion<'a> {
29    inner: rados_completion_t,
30
31    // Box to provide a stable address for completion_complete callback
32    // Mutex for memory fencing when writing from poll() and reading from completion_complete()
33    waker: Box<std::sync::Mutex<Option<std::task::Waker>>>,
34
35    // A reference to the IOCtx is required to issue a cancel on
36    // the operation if we are dropped before ready.  This needs
37    // to be a Rust reference rather than a raw rados_ioctx_t because otherwise
38    // there would be nothing to stop the rados_ioctx_t being invalidated
39    // during the lifetime of this Completion.
40    // (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but
41    //  not for reads.)
42    ioctx: &'a IoCtx,
43}
44
45unsafe impl Send for Completion<'_> {}
46
47#[no_mangle]
48pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () {
49    let waker = unsafe {
50        let p = arg as *mut Mutex<Option<Waker>>;
51        p.as_mut().unwrap()
52    };
53
54    let waker = waker.lock().unwrap().take();
55    match waker {
56        Some(w) => w.wake(),
57        None => {}
58    }
59}
60
61impl Drop for Completion<'_> {
62    fn drop(&mut self) {
63        // Ensure that after dropping the Completion, the AIO callback
64        // will not be called on our dropped waker Box.  Only necessary
65        // if we got as far as successfully starting an operation using
66        // the completion.
67        let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
68        if !am_complete {
69            unsafe {
70                let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner);
71
72                // It is unsound to proceed if the Objecter op is still in flight
73                assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);
74            }
75        }
76
77        unsafe {
78            // Even if is_complete was true, librados might not be done with
79            // our callback: wait til it is.
80            assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0);
81        }
82
83        unsafe {
84            rados_aio_release(self.inner);
85        }
86    }
87}
88
89impl std::future::Future for Completion<'_> {
90    type Output = crate::error::RadosResult<u32>;
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93        // Hold lock across the check of am_complete and subsequent waker registration
94        // to avoid deadlock if callback is invoked in between.
95        let mut waker_locked = self.waker.lock().unwrap();
96
97        let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
98
99        if am_complete {
100            // Unlock Waker so that completion callback can complete if racing with us.
101            drop(waker_locked);
102
103            // Ensure librados is finished with our callback ('complete' is true
104            // before it calls that)
105            unsafe {
106                let r = rados_aio_wait_for_complete_and_cb(self.inner);
107                assert_eq!(r, 0);
108            }
109
110            let r = unsafe { rados_aio_get_return_value(self.inner) };
111            let result = if r < 0 { Err(r.into()) } else { Ok(r) };
112            std::task::Poll::Ready(result.map(|e| e as u32))
113        } else {
114            // Register a waker
115            *waker_locked = Some(cx.waker().clone());
116
117            std::task::Poll::Pending
118        }
119    }
120}
121
122/// Completions are only created via this wrapper, in order to ensure
123/// that the Completion struct is only constructed around 'armed' rados_completion_t
124/// instances (i.e. those that have been used to start an I/O).
125pub fn with_completion<F>(ioctx: &IoCtx, f: F) -> RadosResult<Completion<'_>>
126where
127    F: FnOnce(rados_completion_t) -> libc::c_int,
128{
129    let mut waker = Box::new(Mutex::new(None));
130
131    let completion = unsafe {
132        let mut completion: rados_completion_t = std::ptr::null_mut();
133        let p: *mut Mutex<Option<Waker>> = &mut *waker;
134        let p = p as *mut c_void;
135
136        let r = rados_aio_create_completion(p, Some(completion_complete), None, &mut completion);
137        if r != 0 {
138            panic!("Error {} allocating RADOS completion: out of memory?", r);
139        }
140        assert!(!completion.is_null());
141
142        completion
143    };
144
145    let ret_code = f(completion);
146
147    if ret_code < 0 {
148        // On error dispatching I/O, drop the unused rados_completion_t
149        unsafe {
150            rados_aio_release(completion);
151            drop(completion)
152        }
153        Err(ret_code.into())
154    } else {
155        // Pass the rados_completion_t into a Future-implementing wrapper and await it.
156        Ok(Completion {
157            ioctx,
158            inner: completion,
159            waker,
160        })
161    }
162}