1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
use crate::cassandra::consistency::Consistency;
use crate::cassandra::custom_payload::{CustomPayload, CustomPayloadResponse};
use crate::cassandra::error::*;
use crate::cassandra::prepared::PreparedStatement;
use crate::cassandra::result::CassResult;
use crate::cassandra::util::Protected;
use crate::cassandra::write_type::WriteType;
use crate::cassandra_sys::cass_future_custom_payload_item;
use crate::cassandra_sys::cass_future_custom_payload_item_count;
use crate::cassandra_sys::cass_future_error_code;
use crate::cassandra_sys::cass_future_error_message;
use crate::cassandra_sys::cass_future_free;
use crate::cassandra_sys::cass_future_get_error_result;
use crate::cassandra_sys::cass_future_get_prepared;
use crate::cassandra_sys::cass_future_get_result;
use crate::cassandra_sys::cass_future_ready;
use crate::cassandra_sys::cass_future_set_callback;
use crate::cassandra_sys::CassError_;
use crate::cassandra_sys::CassFuture as _Future;
use crate::cassandra_sys::CASS_OK;
use crate::cassandra_sys::{cass_false, cass_true};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::slice;
use std::str;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
/// A future representing the result of a Cassandra driver operation.
///
/// On success, returns a result of type `T`. On failure, returns a Cassandra error.
///
/// When constructing this take care to supply the correct type argument, since it will
/// be used to control how the result is extracted from the underlying Cassandra
/// driver future (see `Completable`).
#[must_use]
#[derive(Debug)]
pub struct CassFuture<T> {
/// The underlying Cassandra driver future object.
inner: *mut _Future,
/// The current state of this future.
state: Arc<FutureTarget>,
/// Treat as if it contains a T.
phantom: PhantomData<T>,
}
// The underlying C type has no thread-local state, and explicitly supports access
// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
//
// But it can be used to move a value of type `T`, so `T` needs to be `Send` if the
// future is.
//
// The same is not true for `Sync`, because a future doesn't give multiple threads
// concurrent access to the value (you can only poll a future once).
unsafe impl<T> Sync for CassFuture<T> {}
unsafe impl<T> Send for CassFuture<T> where T: Send {}
impl<T> CassFuture<T> {
/// Wrap a Cassandra driver future to make it a proper Rust future.
///
/// When invoking this take care to supply the correct type argument, since it will
/// be used to control how the result is extracted from the underlying Cassandra
/// driver future (see `Completable`).
pub(crate) fn build(inner: *mut _Future) -> Self {
CassFuture {
inner,
state: Arc::new(FutureTarget {
inner: Mutex::new(FutureState::Created),
}),
phantom: PhantomData,
}
}
}
impl<T> Drop for CassFuture<T> {
/// Drop this CassFuture.
///
/// This also drops its reference to the FutureTarget, but if
/// we're waiting to be called back the FutureState::Awaiting holds another reference to
/// the target, which keeps it alive until the callback fires.
fn drop(&mut self) {
unsafe { cass_future_free(self.inner) };
}
}
/// A type is Completable if it can be returned from a Cassandra driver future.
/// You should only use this if you reasonably expect that a particular future will
/// have such a result; for `CassFuture`s we ensure this by construction.
pub trait Completable
where
Self: Sized,
{
/// Extract the result from the future, if present.
unsafe fn get(inner: *mut _Future) -> Option<Self>;
}
/// Futures that complete with no value, or report an error.
impl Completable for () {
unsafe fn get(_inner: *mut _Future) -> Option<Self> {
Some(())
}
}
/// The mainline case - a CassResult.
impl Completable for CassResult {
unsafe fn get(inner: *mut _Future) -> Option<Self> {
cass_future_get_result(inner)
.as_ref()
.map(|r| CassResult::build(r as *const _))
}
}
/// Futures that complete with a prepared statement.
impl Completable for PreparedStatement {
unsafe fn get(inner: *mut _Future) -> Option<Self> {
cass_future_get_prepared(inner)
.as_ref()
.map(|r| PreparedStatement::build(r as *const _))
}
}
/// For each custom payload defined in the future, convert it into a
/// name and value pair, then insert it into the CustomPayloadResponse.
///
unsafe fn payloads_from_future(future: *mut _Future) -> Result<CustomPayloadResponse> {
let cp_count = cass_future_custom_payload_item_count(future);
(0..cp_count)
.into_iter()
.map(|index| {
let mut name = mem::zeroed();
let mut name_length = mem::zeroed();
let mut value = mem::zeroed();
let mut value_size = mem::zeroed();
cass_future_custom_payload_item(
future,
index,
&mut name,
&mut name_length,
&mut value,
&mut value_size,
)
.to_result((name, name_length, value, value_size))
.and_then(|(name, name_length, value, value_size)| {
let name_slice = slice::from_raw_parts(name as *const u8, name_length);
str::from_utf8(name_slice)
.map_err(|err| err.into())
.map(|name| {
(
name.to_string(),
slice::from_raw_parts(value, value_size).to_vec(),
)
})
})
})
.collect::<Result<CustomPayloadResponse>>()
}
/// Futures that complete with a normal result and a custom payload response.
impl Completable for (CassResult, CustomPayloadResponse) {
unsafe fn get(inner: *mut _Future) -> Option<Self> {
payloads_from_future(inner).ok().and_then(|payloads| {
cass_future_get_result(inner)
.as_ref()
.map(|r| (CassResult::build(r as *const _), payloads))
})
}
}
/// A Cassandra future is a normal Rust future.
impl<T: Completable> Future for CassFuture<T> {
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut install_callback = false;
let ret = {
// Perform the following computations under the lock, and then release it.
//
// We must take care to avoid deadlock. The lock hierarchy is: take the Rust lock
// (self.state.inner) first, then take the C++ lock (internal to the C++
// implementation of futures).
//
// Poll is always called by the Rust event loop, never from within C++ code or
// from notify_task. `self.ready()` and `self.get_completion()` take an internal
// mutex within C++ code, but they never call back to Rust and so cannot violate
// the lock hierarchy. However `self.set_callback()` calls into Rust code
// (`notify_task`) if the future is already complete, so we must avoid holding the
// Rust lock while calling it. We achieve this by using a boolean flag to request
// the callback be set outside the lock region.
let mut lock = self.state.as_ref().inner.lock();
match *lock {
ref mut state @ FutureState::Created => {
// No task yet - schedule a callback. But as an optimization, if it's ready
// already, complete now without scheduling a callback.
if unsafe { cass_future_ready(self.inner) } == cass_true {
// Future is ready; wrap success in `Ok(Ready)` or report failure as `Err`.
Poll::Ready(unsafe { get_completion(self.inner) })
} else {
// Future is not ready; park this task and arrange to be called back when
// it is.
*state = FutureState::Awaiting {
waker: cx.waker().clone(),
keep_alive: self.state.clone(),
};
install_callback = true;
Poll::Pending
}
}
FutureState::Awaiting { ref mut waker, .. } => {
// Callback already scheduled; don't set it again (C doesn't support it anyway),
// but be sure to swizzle the new task into place. No need to check for
// readiness here; we have to wait for the callback anyway so we might as well
// do all the work in one place.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
Poll::Pending
}
FutureState::Ready => {
// Future has been marked ready by callback. Safe to return now.
Poll::Ready(unsafe { get_completion(self.inner) })
}
}
};
if install_callback {
// Install the callback. If callback cannot be sent, report immediate `Err`.
let data = (self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
unsafe { cass_future_set_callback(self.inner, Some(notify_task), data) }
.to_result(())?;
}
ret
}
}
impl<T: Completable> CassFuture<T> {
/// Synchronously executes the CassFuture, blocking until it
/// completes.
pub fn wait(self) -> Result<T> {
unsafe { get_completion(self.inner) }
}
}
/// Extract success or failure from a future.
///
/// This function will block if the future is not yet ready. In order to ensure that this
/// function will not block, you can check if the future is ready prior to calling this using
/// `cass_future_ready`. If the future is ready, this function will not block, otherwise
/// it will block on waiting for the future to become ready.
unsafe fn get_completion<T: Completable>(inner: *mut _Future) -> Result<T> {
// Wrap success in `Ok(Ready)` or report failure as `Err`.
// This will block if the future is not yet ready.
let rc = cass_future_error_code(inner);
match rc {
CASS_OK => match Completable::get(inner) {
None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
Some(v) => Ok(v),
},
_ => Err(get_cass_future_error(rc, inner)),
}
}
/// The target of a C++ Cassandra driver callback.
///
/// The C++ Cassandra driver calls the callback in the following two scenarios only:
///
/// * When the future is completed, if a callback is set.
/// * When a callback is set, if the future is completed.
///
/// Given a future can only be completed once, and a callback can only be set once, enforced
/// by internal locking, it is clear that we should expect at most one callback to occur.
///
/// The important thing to ensure is that Rust has not freed the target of that callback when
/// it occurs. The simplest way to achieve that is only to free the callback target once the
/// callback has occurred. Since the callback target is owned by the future, and the future is
/// typically freed after completion, that means we must only complete after we receive the
/// callback.
///
/// The FutureTarget is held by a CassFuture, and (in the FutureState::Awaiting state) by
/// a C++ Cassandra driver callback. The latter pointer is represented by one inside that state,
/// so that it is not freed early.
#[derive(Debug)]
struct FutureTarget {
inner: Mutex<FutureState>,
}
/// The state of a Cassandra future.
///
/// This is an FSM.
#[derive(Debug)]
enum FutureState {
/// Initial state: the future has been created but no callback has yet been installed.
Created,
/// The future has been created and a callback has been installed, invoking this task.
/// `keep_alive` is an Arc to the enclosing target (i.e., a cycle) which stands for the
/// pointer held by the C++ Cassandra driver future as the callback target. This prevents
/// it from being freed early.
Awaiting {
waker: Waker,
keep_alive: Arc<FutureTarget>,
},
/// The future has called back to indicate completion.
Ready,
}
/// Callback which wakes the task waiting on this future.
/// Called by the C++ driver when the future is ready,
/// with a pointer to the `CassFuture`.
unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
let future_target: &FutureTarget = &*(data as *const FutureTarget);
// The future is now ready, so transition to the appropriate state.
let state = {
let mut lock = future_target.inner.lock();
mem::replace(&mut *lock, FutureState::Ready)
};
if let FutureState::Awaiting { ref waker, .. } = state {
waker.wake_by_ref();
} else {
// This can never happen.
panic!("Callback invoked before callback set");
}
}