cassandra_cpp/cassandra/
future.rs

1use crate::cassandra::custom_payload::CustomPayloadResponse;
2use crate::cassandra::error::*;
3use crate::cassandra::prepared::PreparedStatement;
4use crate::cassandra::result::CassResult;
5use crate::cassandra::util::{Protected, ProtectedWithSession};
6
7use crate::cassandra_sys::cass_future_custom_payload_item;
8use crate::cassandra_sys::cass_future_custom_payload_item_count;
9use crate::cassandra_sys::cass_future_error_code;
10
11use crate::cassandra_sys::cass_future_free;
12
13use crate::cassandra_sys::cass_future_get_prepared;
14use crate::cassandra_sys::cass_future_get_result;
15use crate::cassandra_sys::cass_future_ready;
16use crate::cassandra_sys::cass_future_set_callback;
17
18use crate::cassandra_sys::cass_true;
19use crate::cassandra_sys::CassFuture as _Future;
20use crate::cassandra_sys::CASS_OK;
21use crate::Session;
22
23use parking_lot::Mutex;
24
25use std::future::Future;
26use std::marker::PhantomData;
27use std::mem;
28use std::pin::Pin;
29use std::slice;
30use std::str;
31use std::sync::Arc;
32use std::task::{Context, Poll, Waker};
33
34/// A future representing the result of a Cassandra driver operation.
35///
36/// On success, returns a result of type `T`. On failure, returns a Cassandra error.
37///
38/// When constructing this take care to supply the correct type argument, since it will
39/// be used to control how the result is extracted from the underlying Cassandra
40/// driver future (see `Completable`).
41#[must_use]
42#[derive(Debug)]
43pub struct CassFuture<T> {
44    /// The underlying Cassandra driver future object.
45    inner: *mut _Future,
46
47    /// The current state of this future.
48    state: Arc<FutureTarget>,
49
50    /// The session the future is being executed upon.
51    session: Option<Session>,
52
53    /// Treat as if it contains a T.
54    phantom: PhantomData<T>,
55}
56
57// The underlying C type has no thread-local state, and explicitly supports access
58// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
59//
60// But it can be used to move a value of type `T`, so `T` needs to be `Send` if the
61// future is.
62//
63// The same is not true for `Sync`, because a future doesn't give multiple threads
64// concurrent access to the value (you can only poll a future once).
65unsafe impl<T> Sync for CassFuture<T> {}
66unsafe impl<T> Send for CassFuture<T> where T: Send {}
67impl<T> Unpin for CassFuture<T> {}
68
69impl<T> CassFuture<T> {
70    /// Wrap a Cassandra driver future to make it a proper Rust future.
71    ///
72    /// When invoking this take care to supply the correct type argument, since it will
73    /// be used to control how the result is extracted from the underlying Cassandra
74    /// driver future (see `Completable`).
75    pub(crate) fn build(session: Session, inner: *mut _Future) -> Self {
76        CassFuture {
77            inner,
78            session: Some(session),
79            state: Arc::new(FutureTarget {
80                inner: Mutex::new(FutureState::Created),
81            }),
82            phantom: PhantomData,
83        }
84    }
85
86    fn take_session(&mut self) -> Session {
87        self.session.take().expect(
88            "invariant: could not take session from CassFuture that already has had session taken.",
89        )
90    }
91}
92
93impl<T> Drop for CassFuture<T> {
94    /// Drop this CassFuture.
95    ///
96    /// This also drops its reference to the FutureTarget, but if
97    /// we're waiting to be called back the FutureState::Awaiting holds another reference to
98    /// the target, which keeps it alive until the callback fires.
99    fn drop(&mut self) {
100        unsafe { cass_future_free(self.inner) };
101    }
102}
103
104/// A type is Completable if it can be returned from a Cassandra driver future.
105/// You should only use this if you reasonably expect that a particular future will
106/// have such a result; for `CassFuture`s we ensure this by construction.
107pub trait Completable
108where
109    Self: Sized,
110{
111    /// Extract the result from the future, if present.
112    unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self>;
113}
114
115/// Futures that complete with no value, or report an error.
116impl Completable for () {
117    unsafe fn get(_session: Session, _inner: *mut _Future) -> Option<Self> {
118        Some(())
119    }
120}
121
122/// Futures that completes with just the session.
123impl Completable for Session {
124    unsafe fn get(session: Session, _inner: *mut _Future) -> Option<Self> {
125        Some(session)
126    }
127}
128
129/// The mainline case - a CassResult.
130impl Completable for CassResult {
131    unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
132        cass_future_get_result(inner)
133            .as_ref()
134            .map(|r| CassResult::build(r as *const _))
135    }
136}
137
138/// Futures that complete with a prepared statement.
139impl Completable for PreparedStatement {
140    unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self> {
141        cass_future_get_prepared(inner)
142            .as_ref()
143            .map(|r| PreparedStatement::build(r as *const _, session))
144    }
145}
146
147/// For each custom payload defined in the future, convert it into a
148/// name and value pair, then insert it into the CustomPayloadResponse.
149///
150unsafe fn payloads_from_future(future: *mut _Future) -> Result<CustomPayloadResponse> {
151    let cp_count = cass_future_custom_payload_item_count(future);
152    (0..cp_count)
153        .map(|index| {
154            let mut name = std::ptr::null();
155            let mut name_length = 0;
156            let mut value = std::ptr::null();
157            let mut value_size = 0;
158
159            cass_future_custom_payload_item(
160                future,
161                index,
162                &mut name,
163                &mut name_length,
164                &mut value,
165                &mut value_size,
166            )
167            .to_result((name, name_length, value, value_size))
168            .and_then(|(name, name_length, value, value_size)| {
169                let name_slice = slice::from_raw_parts(name as *const u8, name_length);
170                str::from_utf8(name_slice)
171                    .map_err(|err| err.into())
172                    .map(|name| {
173                        (
174                            name.to_string(),
175                            slice::from_raw_parts(value, value_size).to_vec(),
176                        )
177                    })
178            })
179        })
180        .collect::<Result<CustomPayloadResponse>>()
181}
182
183/// Futures that complete with a normal result and a custom payload response.
184impl Completable for (CassResult, CustomPayloadResponse) {
185    unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
186        payloads_from_future(inner).ok().and_then(|payloads| {
187            cass_future_get_result(inner)
188                .as_ref()
189                .map(|r| (CassResult::build(r as *const _), payloads))
190        })
191    }
192}
193
194impl<T: Completable> CassFuture<T> {
195    /// Synchronously executes the CassFuture, blocking until it
196    /// completes.
197    pub fn wait(mut self) -> Result<T> {
198        unsafe { get_completion(self.take_session(), self.inner) }
199    }
200}
201
202/// A Cassandra future is a normal Rust future.
203impl<T: Completable> Future for CassFuture<T> {
204    type Output = Result<T>;
205
206    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
207        let mut install_callback = false;
208        let ret = {
209            // Perform the following computations under the lock, and then release it.
210            //
211            // We must take care to avoid deadlock. The lock hierarchy is: take the Rust lock
212            // (self.state.inner) first, then take the C++ lock (internal to the C++
213            // implementation of futures).
214            //
215            // Poll is always called by the Rust event loop, never from within C++ code or
216            // from notify_task. `self.ready()` and `self.get_completion()` take an internal
217            // mutex within C++ code, but they never call back to Rust and so cannot violate
218            // the lock hierarchy. However `self.set_callback()` calls into Rust code
219            // (`notify_task`) if the future is already complete, so we must avoid holding the
220            // Rust lock while calling it. We achieve this by using a boolean flag to request
221            // the callback be set outside the lock region.
222            let mut lock = self.state.as_ref().inner.lock();
223            match *lock {
224                ref mut state @ FutureState::Created => {
225                    // No task yet - schedule a callback. But as an optimization, if it's ready
226                    // already, complete now without scheduling a callback.
227                    if unsafe { cass_future_ready(self.inner) } == cass_true {
228                        // Future is ready; wrap success in `Ok(Ready)` or report failure as `Err`.
229                        Poll::Ready(self.inner)
230                    } else {
231                        // Future is not ready; park this task and arrange to be called back when
232                        // it is.
233                        *state = FutureState::Awaiting {
234                            waker: cx.waker().clone(),
235                            keep_alive: self.state.clone(),
236                        };
237                        install_callback = true;
238                        Poll::Pending
239                    }
240                }
241                FutureState::Awaiting { ref mut waker, .. } => {
242                    // Callback already scheduled; don't set it again (C doesn't support it anyway),
243                    // but be sure to swizzle the new task into place. No need to check for
244                    // readiness here; we have to wait for the callback anyway so we might as well
245                    // do all the work in one place.
246                    if !waker.will_wake(cx.waker()) {
247                        *waker = cx.waker().clone();
248                    }
249                    Poll::Pending
250                }
251                FutureState::Ready => {
252                    // Future has been marked ready by callback. Safe to return now.
253                    Poll::Ready(self.inner)
254                }
255            }
256        };
257
258        if install_callback {
259            // Install the callback. If callback cannot be sent, report immediate `Err`.
260            let data = (self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
261            unsafe { cass_future_set_callback(self.inner, Some(notify_task), data) }
262                .to_result(())?;
263        }
264
265        match ret {
266            Poll::Pending => Poll::Pending,
267            Poll::Ready(inner) => {
268                Poll::Ready(unsafe { get_completion(self.take_session(), inner) })
269            }
270        }
271    }
272}
273
274/// Extract success or failure from a future.
275///
276/// This function will block if the future is not yet ready. In order to ensure that this
277/// function will not block, you can check if the future is ready prior to calling this using
278/// `cass_future_ready`. If the future is ready, this function will not block, otherwise
279/// it will block on waiting for the future to become ready.
280unsafe fn get_completion<T: Completable>(session: Session, inner: *mut _Future) -> Result<T> {
281    // Wrap success in `Ok(Ready)` or report failure as `Err`.
282    // This will block if the future is not yet ready.
283    let rc = cass_future_error_code(inner);
284    match rc {
285        CASS_OK => match Completable::get(session, inner) {
286            None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
287            Some(v) => Ok(v),
288        },
289        _ => Err(get_cass_future_error(rc, inner)),
290    }
291}
292
293/// The target of a C++ Cassandra driver callback.
294///
295/// The C++ Cassandra driver calls the callback in the following two scenarios only:
296///
297/// * When the future is completed, if a callback is set.
298/// * When a callback is set, if the future is completed.
299///
300/// Given a future can only be completed once, and a callback can only be set once, enforced
301/// by internal locking, it is clear that we should expect at most one callback to occur.
302///
303/// The important thing to ensure is that Rust has not freed the target of that callback when
304/// it occurs. The simplest way to achieve that is only to free the callback target once the
305/// callback has occurred. Since the callback target is owned by the future, and the future is
306/// typically freed after completion, that means we must only complete after we receive the
307/// callback.
308///
309/// The FutureTarget is held by a CassFuture, and (in the FutureState::Awaiting state) by
310/// a C++ Cassandra driver callback. The latter pointer is represented by one inside that state,
311/// so that it is not freed early.
312#[derive(Debug)]
313struct FutureTarget {
314    inner: Mutex<FutureState>,
315}
316
317/// The state of a Cassandra future.
318///
319/// This is an FSM.
320#[derive(Debug)]
321enum FutureState {
322    /// Initial state: the future has been created but no callback has yet been installed.
323    Created,
324
325    /// The future has been created and a callback has been installed, invoking this task.
326    /// `keep_alive` is an Arc to the enclosing target (i.e., a cycle) which stands for the
327    /// pointer held by the C++ Cassandra driver future as the callback target. This prevents
328    /// it from being freed early.
329    Awaiting {
330        waker: Waker,
331        keep_alive: Arc<FutureTarget>,
332    },
333
334    /// The future has called back to indicate completion.
335    Ready,
336}
337
338/// Callback which wakes the task waiting on this future.
339/// Called by the C++ driver when the future is ready,
340/// with a pointer to the `CassFuture`.
341unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
342    let future_target: &FutureTarget = &*(data as *const FutureTarget);
343    // The future is now ready, so transition to the appropriate state.
344    let state = {
345        let mut lock = future_target.inner.lock();
346        mem::replace(&mut *lock, FutureState::Ready)
347    };
348    if let FutureState::Awaiting { ref waker, .. } = state {
349        waker.wake_by_ref();
350    } else {
351        // This can never happen.
352        panic!("Callback invoked before callback set");
353    }
354}