cassandra_cpp/cassandra/future.rs
1use crate::cassandra::custom_payload::CustomPayloadResponse;
2use crate::cassandra::error::*;
3use crate::cassandra::prepared::PreparedStatement;
4use crate::cassandra::result::CassResult;
5use crate::cassandra::util::{Protected, ProtectedWithSession};
6
7use crate::cassandra_sys::cass_future_custom_payload_item;
8use crate::cassandra_sys::cass_future_custom_payload_item_count;
9use crate::cassandra_sys::cass_future_error_code;
10
11use crate::cassandra_sys::cass_future_free;
12
13use crate::cassandra_sys::cass_future_get_prepared;
14use crate::cassandra_sys::cass_future_get_result;
15use crate::cassandra_sys::cass_future_ready;
16use crate::cassandra_sys::cass_future_set_callback;
17
18use crate::cassandra_sys::cass_true;
19use crate::cassandra_sys::CassFuture as _Future;
20use crate::cassandra_sys::CASS_OK;
21use crate::Session;
22
23use parking_lot::Mutex;
24
25use std::future::Future;
26use std::marker::PhantomData;
27use std::mem;
28use std::pin::Pin;
29use std::slice;
30use std::str;
31use std::sync::Arc;
32use std::task::{Context, Poll, Waker};
33
34/// A future representing the result of a Cassandra driver operation.
35///
36/// On success, returns a result of type `T`. On failure, returns a Cassandra error.
37///
38/// When constructing this take care to supply the correct type argument, since it will
39/// be used to control how the result is extracted from the underlying Cassandra
40/// driver future (see `Completable`).
41#[must_use]
42#[derive(Debug)]
43pub struct CassFuture<T> {
44 /// The underlying Cassandra driver future object.
45 inner: *mut _Future,
46
47 /// The current state of this future.
48 state: Arc<FutureTarget>,
49
50 /// The session the future is being executed upon.
51 session: Option<Session>,
52
53 /// Treat as if it contains a T.
54 phantom: PhantomData<T>,
55}
56
57// The underlying C type has no thread-local state, and explicitly supports access
58// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
59//
60// But it can be used to move a value of type `T`, so `T` needs to be `Send` if the
61// future is.
62//
63// The same is not true for `Sync`, because a future doesn't give multiple threads
64// concurrent access to the value (you can only poll a future once).
65unsafe impl<T> Sync for CassFuture<T> {}
66unsafe impl<T> Send for CassFuture<T> where T: Send {}
67impl<T> Unpin for CassFuture<T> {}
68
69impl<T> CassFuture<T> {
70 /// Wrap a Cassandra driver future to make it a proper Rust future.
71 ///
72 /// When invoking this take care to supply the correct type argument, since it will
73 /// be used to control how the result is extracted from the underlying Cassandra
74 /// driver future (see `Completable`).
75 pub(crate) fn build(session: Session, inner: *mut _Future) -> Self {
76 CassFuture {
77 inner,
78 session: Some(session),
79 state: Arc::new(FutureTarget {
80 inner: Mutex::new(FutureState::Created),
81 }),
82 phantom: PhantomData,
83 }
84 }
85
86 fn take_session(&mut self) -> Session {
87 self.session.take().expect(
88 "invariant: could not take session from CassFuture that already has had session taken.",
89 )
90 }
91}
92
93impl<T> Drop for CassFuture<T> {
94 /// Drop this CassFuture.
95 ///
96 /// This also drops its reference to the FutureTarget, but if
97 /// we're waiting to be called back the FutureState::Awaiting holds another reference to
98 /// the target, which keeps it alive until the callback fires.
99 fn drop(&mut self) {
100 unsafe { cass_future_free(self.inner) };
101 }
102}
103
104/// A type is Completable if it can be returned from a Cassandra driver future.
105/// You should only use this if you reasonably expect that a particular future will
106/// have such a result; for `CassFuture`s we ensure this by construction.
107pub trait Completable
108where
109 Self: Sized,
110{
111 /// Extract the result from the future, if present.
112 unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self>;
113}
114
115/// Futures that complete with no value, or report an error.
116impl Completable for () {
117 unsafe fn get(_session: Session, _inner: *mut _Future) -> Option<Self> {
118 Some(())
119 }
120}
121
122/// Futures that completes with just the session.
123impl Completable for Session {
124 unsafe fn get(session: Session, _inner: *mut _Future) -> Option<Self> {
125 Some(session)
126 }
127}
128
129/// The mainline case - a CassResult.
130impl Completable for CassResult {
131 unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
132 cass_future_get_result(inner)
133 .as_ref()
134 .map(|r| CassResult::build(r as *const _))
135 }
136}
137
138/// Futures that complete with a prepared statement.
139impl Completable for PreparedStatement {
140 unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self> {
141 cass_future_get_prepared(inner)
142 .as_ref()
143 .map(|r| PreparedStatement::build(r as *const _, session))
144 }
145}
146
147/// For each custom payload defined in the future, convert it into a
148/// name and value pair, then insert it into the CustomPayloadResponse.
149///
150unsafe fn payloads_from_future(future: *mut _Future) -> Result<CustomPayloadResponse> {
151 let cp_count = cass_future_custom_payload_item_count(future);
152 (0..cp_count)
153 .map(|index| {
154 let mut name = std::ptr::null();
155 let mut name_length = 0;
156 let mut value = std::ptr::null();
157 let mut value_size = 0;
158
159 cass_future_custom_payload_item(
160 future,
161 index,
162 &mut name,
163 &mut name_length,
164 &mut value,
165 &mut value_size,
166 )
167 .to_result((name, name_length, value, value_size))
168 .and_then(|(name, name_length, value, value_size)| {
169 let name_slice = slice::from_raw_parts(name as *const u8, name_length);
170 str::from_utf8(name_slice)
171 .map_err(|err| err.into())
172 .map(|name| {
173 (
174 name.to_string(),
175 slice::from_raw_parts(value, value_size).to_vec(),
176 )
177 })
178 })
179 })
180 .collect::<Result<CustomPayloadResponse>>()
181}
182
183/// Futures that complete with a normal result and a custom payload response.
184impl Completable for (CassResult, CustomPayloadResponse) {
185 unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
186 payloads_from_future(inner).ok().and_then(|payloads| {
187 cass_future_get_result(inner)
188 .as_ref()
189 .map(|r| (CassResult::build(r as *const _), payloads))
190 })
191 }
192}
193
194impl<T: Completable> CassFuture<T> {
195 /// Synchronously executes the CassFuture, blocking until it
196 /// completes.
197 pub fn wait(mut self) -> Result<T> {
198 unsafe { get_completion(self.take_session(), self.inner) }
199 }
200}
201
202/// A Cassandra future is a normal Rust future.
203impl<T: Completable> Future for CassFuture<T> {
204 type Output = Result<T>;
205
206 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
207 let mut install_callback = false;
208 let ret = {
209 // Perform the following computations under the lock, and then release it.
210 //
211 // We must take care to avoid deadlock. The lock hierarchy is: take the Rust lock
212 // (self.state.inner) first, then take the C++ lock (internal to the C++
213 // implementation of futures).
214 //
215 // Poll is always called by the Rust event loop, never from within C++ code or
216 // from notify_task. `self.ready()` and `self.get_completion()` take an internal
217 // mutex within C++ code, but they never call back to Rust and so cannot violate
218 // the lock hierarchy. However `self.set_callback()` calls into Rust code
219 // (`notify_task`) if the future is already complete, so we must avoid holding the
220 // Rust lock while calling it. We achieve this by using a boolean flag to request
221 // the callback be set outside the lock region.
222 let mut lock = self.state.as_ref().inner.lock();
223 match *lock {
224 ref mut state @ FutureState::Created => {
225 // No task yet - schedule a callback. But as an optimization, if it's ready
226 // already, complete now without scheduling a callback.
227 if unsafe { cass_future_ready(self.inner) } == cass_true {
228 // Future is ready; wrap success in `Ok(Ready)` or report failure as `Err`.
229 Poll::Ready(self.inner)
230 } else {
231 // Future is not ready; park this task and arrange to be called back when
232 // it is.
233 *state = FutureState::Awaiting {
234 waker: cx.waker().clone(),
235 keep_alive: self.state.clone(),
236 };
237 install_callback = true;
238 Poll::Pending
239 }
240 }
241 FutureState::Awaiting { ref mut waker, .. } => {
242 // Callback already scheduled; don't set it again (C doesn't support it anyway),
243 // but be sure to swizzle the new task into place. No need to check for
244 // readiness here; we have to wait for the callback anyway so we might as well
245 // do all the work in one place.
246 if !waker.will_wake(cx.waker()) {
247 *waker = cx.waker().clone();
248 }
249 Poll::Pending
250 }
251 FutureState::Ready => {
252 // Future has been marked ready by callback. Safe to return now.
253 Poll::Ready(self.inner)
254 }
255 }
256 };
257
258 if install_callback {
259 // Install the callback. If callback cannot be sent, report immediate `Err`.
260 let data = (self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
261 unsafe { cass_future_set_callback(self.inner, Some(notify_task), data) }
262 .to_result(())?;
263 }
264
265 match ret {
266 Poll::Pending => Poll::Pending,
267 Poll::Ready(inner) => {
268 Poll::Ready(unsafe { get_completion(self.take_session(), inner) })
269 }
270 }
271 }
272}
273
274/// Extract success or failure from a future.
275///
276/// This function will block if the future is not yet ready. In order to ensure that this
277/// function will not block, you can check if the future is ready prior to calling this using
278/// `cass_future_ready`. If the future is ready, this function will not block, otherwise
279/// it will block on waiting for the future to become ready.
280unsafe fn get_completion<T: Completable>(session: Session, inner: *mut _Future) -> Result<T> {
281 // Wrap success in `Ok(Ready)` or report failure as `Err`.
282 // This will block if the future is not yet ready.
283 let rc = cass_future_error_code(inner);
284 match rc {
285 CASS_OK => match Completable::get(session, inner) {
286 None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
287 Some(v) => Ok(v),
288 },
289 _ => Err(get_cass_future_error(rc, inner)),
290 }
291}
292
293/// The target of a C++ Cassandra driver callback.
294///
295/// The C++ Cassandra driver calls the callback in the following two scenarios only:
296///
297/// * When the future is completed, if a callback is set.
298/// * When a callback is set, if the future is completed.
299///
300/// Given a future can only be completed once, and a callback can only be set once, enforced
301/// by internal locking, it is clear that we should expect at most one callback to occur.
302///
303/// The important thing to ensure is that Rust has not freed the target of that callback when
304/// it occurs. The simplest way to achieve that is only to free the callback target once the
305/// callback has occurred. Since the callback target is owned by the future, and the future is
306/// typically freed after completion, that means we must only complete after we receive the
307/// callback.
308///
309/// The FutureTarget is held by a CassFuture, and (in the FutureState::Awaiting state) by
310/// a C++ Cassandra driver callback. The latter pointer is represented by one inside that state,
311/// so that it is not freed early.
312#[derive(Debug)]
313struct FutureTarget {
314 inner: Mutex<FutureState>,
315}
316
317/// The state of a Cassandra future.
318///
319/// This is an FSM.
320#[derive(Debug)]
321enum FutureState {
322 /// Initial state: the future has been created but no callback has yet been installed.
323 Created,
324
325 /// The future has been created and a callback has been installed, invoking this task.
326 /// `keep_alive` is an Arc to the enclosing target (i.e., a cycle) which stands for the
327 /// pointer held by the C++ Cassandra driver future as the callback target. This prevents
328 /// it from being freed early.
329 Awaiting {
330 waker: Waker,
331 keep_alive: Arc<FutureTarget>,
332 },
333
334 /// The future has called back to indicate completion.
335 Ready,
336}
337
338/// Callback which wakes the task waiting on this future.
339/// Called by the C++ driver when the future is ready,
340/// with a pointer to the `CassFuture`.
341unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
342 let future_target: &FutureTarget = &*(data as *const FutureTarget);
343 // The future is now ready, so transition to the appropriate state.
344 let state = {
345 let mut lock = future_target.inner.lock();
346 mem::replace(&mut *lock, FutureState::Ready)
347 };
348 if let FutureState::Awaiting { ref waker, .. } = state {
349 waker.wake_by_ref();
350 } else {
351 // This can never happen.
352 panic!("Callback invoked before callback set");
353 }
354}