1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
use cassandra::error::*;
use cassandra::prepared::PreparedStatement;
use cassandra::result::CassResult;
use cassandra::util::Protected;
use cassandra::consistency::Consistency;
use cassandra::write_type::WriteType;
use cassandra_sys::CassError_;
use cassandra_sys::CASS_OK;
use cassandra_sys::CassFuture as _Future;
use cassandra_sys::cass_future_error_code;
use cassandra_sys::cass_future_error_message;
use cassandra_sys::cass_future_free;
use cassandra_sys::cass_future_get_error_result;
use cassandra_sys::cass_future_get_prepared;
use cassandra_sys::cass_future_get_result;
use cassandra_sys::cass_future_ready;
use cassandra_sys::cass_future_set_callback;
use cassandra_sys::{cass_true, cass_false};

use std::mem;
use std::slice;
use std::str;
use std::sync::{Arc, Mutex};
use std::marker::PhantomData;
use futures;

/// A future representing the result of a Cassandra driver operation.
///
/// On success, returns a result of type `T`. On failure, returns a Cassandra error.
///
/// When constructing this take care to supply the correct type argument, since it will
/// be used to control how the result is extracted from the underlying Cassandra
/// driver future (see `Completable`).
#[must_use]
#[derive(Debug)]
pub struct CassFuture<T> {
    /// The underlying Cassandra driver future object.
    inner: *mut _Future,

    /// The current state of this future.
    state: Arc<FutureTarget>,

    /// Treat as if it contains a T.
    phantom: PhantomData<T>,
}

// The underlying C type has no thread-local state, and explicitly supports access
// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
//
// But it can be used to move a value of type `T`, so `T` needs to be `Send` if the
// future is.
//
// The same is not true for `Sync`, because a future doesn't give multiple threads
// concurrent access to the value (you can only poll a future once).
unsafe impl<T> Sync for CassFuture<T> {}
unsafe impl<T> Send for CassFuture<T> where T: Send {}

impl<T> CassFuture<T> {
    /// Wrap a Cassandra driver future to make it a proper Rust future.
    ///
    /// When invoking this take care to supply the correct type argument, since it will
    /// be used to control how the result is extracted from the underlying Cassandra
    /// driver future (see `Completable`).
    pub(crate) fn build(inner: *mut _Future) -> Self {
        CassFuture {
            inner,
            state: Arc::new(FutureTarget { inner: Mutex::new(FutureState::Created) }),
            phantom: PhantomData,
        }
    }
}

impl<T> Drop for CassFuture<T> {
    /// Drop this CassFuture.
    ///
    /// This also drops its reference to the FutureTarget, but if
    /// we're waiting to be called back the FutureState::Awaiting holds another reference to
    /// the target, which keeps it alive until the callback fires.
    fn drop(&mut self) { unsafe { cass_future_free(self.inner) }; }
}

/// A type is Completable if it can be returned from a Cassandra driver future.
/// You should only use this if you reasonably expect that a particular future will
/// have such a result; for `CassFuture`s we ensure this by construction.
pub trait Completable where Self: Sized {
    /// Extract the result from the future, if present.
    unsafe fn get(inner: *mut _Future) -> Option<Self>;
}

/// Futures that complete with no value, or report an error.
impl Completable for () {
    unsafe fn get(_inner: *mut _Future) -> Option<Self> {
        Some(())
    }
}

/// The mainline case - a CassResult.
impl Completable for CassResult {
    unsafe fn get(inner: *mut _Future) -> Option<Self> {
        cass_future_get_result(inner).as_ref().map(|r| CassResult::build(r as *const _))
    }
}

/// Futures that complete with a prepared statement.
impl Completable for PreparedStatement {
    unsafe fn get(inner: *mut _Future) -> Option<Self> {
        cass_future_get_prepared(inner).as_ref().map(|r| PreparedStatement::build(r as *const _))
    }
}

/// A Cassandra future is a normal Rust future.
impl<T: Completable> futures::Future for CassFuture<T> {
    type Item = T;
    type Error = Error;

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { unsafe {
        let mut install_callback = false;
        let ret = {
            // Perform the following computations under the lock, and then release it.
            //
            // We must take care to avoid deadlock. The lock hierarchy is: take the Rust lock
            // (self.state.inner) first, then take the C++ lock (internal to the C++
            // implementation of futures).
            //
            // Poll is always called by the Rust event loop, never from within C++ code or
            // from notify_task. `self.ready()` and `self.get_completion()` take an internal
            // mutex within C++ code, but they never call back to Rust and so cannot violate
            // the lock hierarchy. However `self.set_callback()` calls into Rust code
            // (`notify_task`) if the future is already complete, so we must avoid holding the
            // Rust lock while calling it. We achieve this by using a boolean flag to request
            // the callback be set outside the lock region.
            let mut lock = self.state.as_ref().inner.lock().expect("poll");
            match *lock {
                ref mut state @ FutureState::Created => {
                    // No task yet - schedule a callback. But as an optimization, if it's ready
                    // already, complete now without scheduling a callback.
                    if cass_future_ready(self.inner) == cass_true {
                        // Future is ready; wrap success in `Ok(Ready)` or report failure as `Err`.
                        get_completion(self.inner).map(futures::Async::Ready)
                    } else {
                        // Future is not ready; park this task and arrange to be called back when
                        // it is.
                        *state = FutureState::Awaiting {
                            task: futures::task::current(),
                            keep_alive: self.state.clone(),
                        };
                        install_callback = true;
                        Ok(futures::Async::NotReady)
                    }
                },
                FutureState::Awaiting { ref mut task, .. } => {
                    // Callback already scheduled; don't set it again (C doesn't support it anyway),
                    // but be sure to swizzle the new task into place. No need to check for
                    // readiness here; we have to wait for the callback anyway so we might as well
                    // do all the work in one place.
                    *task = futures::task::current();
                    Ok(futures::Async::NotReady)
                },
                FutureState::Ready => {
                    // Future has been marked ready by callback. Safe to return now.
                    get_completion(self.inner).map(futures::Async::Ready)
                }
            }
        };

        if install_callback {
            // Install the callback. If callback cannot be sent, report immediate `Err`.
            let data =
                (self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
            cass_future_set_callback(self.inner, Some(notify_task), data)
                .to_result(())
        } else {
            Ok(())
        }.and_then(move |_| ret)
    }}
}

/// Extract success or failure from a completed future.
unsafe fn get_completion<T: Completable>(inner: *mut _Future) -> Result<T> {
    // Future is ready; wrap success in `Ok(Ready)` or report failure as `Err`.
    let rc = cass_future_error_code(inner);
    match rc {
        CASS_OK => {
            match Completable::get(inner) {
                None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
                Some(v) => Ok(v)
            }
        },
        _ => Err(get_cass_future_error(rc, inner)),
    }
}

/// The target of a C++ Cassandra driver callback.
///
/// The C++ Cassandra driver calls the callback in the following two scenarios only:
///
/// * When the future is completed, if a callback is set.
/// * When a callback is set, if the future is completed.
///
/// Given a future can only be completed once, and a callback can only be set once, enforced
/// by internal locking, it is clear that we should expect at most one callback to occur.
///
/// The important thing to ensure is that Rust has not freed the target of that callback when
/// it occurs. The simplest way to achieve that is only to free the callback target once the
/// callback has occurred. Since the callback target is owned by the future, and the future is
/// typically freed after completion, that means we must only complete after we receive the
/// callback.
///
/// The FutureTarget is held by a CassFuture, and (in the FutureState::Awaiting state) by
/// a C++ Cassandra driver callback. The latter pointer is represented by one inside that state,
/// so that it is not freed early.
#[derive(Debug)]
struct FutureTarget {
    inner: Mutex<FutureState>,
}

/// The state of a Cassandra future.
///
/// This is an FSM.
#[derive(Debug)]
enum FutureState {
    /// Initial state: the future has been created but no callback has yet been installed.
    Created,

    /// The future has been created and a callback has been installed, invoking this task.
    /// `keep_alive` is an Arc to the enclosing target (i.e., a cycle) which stands for the
    /// pointer held by the C++ Cassandra driver future as the callback target. This prevents
    /// it from being freed early.
    Awaiting { task: futures::task::Task, keep_alive: Arc<FutureTarget> },

    /// The future has called back to indicate completion.
    Ready,
}

/// Callback which wakes the task waiting on this future.
/// Called by the C++ driver when the future is ready,
/// with a pointer to the `CassFuture`.
unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
    let future_target: &FutureTarget = &*(data as *const FutureTarget);
    // The future is now ready, so transition to the appropriate state.
    let mut lock = future_target.inner.lock().expect("notify_task");
    let state = mem::replace(&mut *lock, FutureState::Ready);
    if let FutureState::Awaiting { ref task, .. } = state {
        task.notify();
    } else {
        /// This can never happen.
        panic!("Callback invoked before callback set");
    }
}