use cassandra::error::*;
use cassandra::prepared::PreparedStatement;
use cassandra::result::CassResult;
use cassandra::util::Protected;
use cassandra::consistency::Consistency;
use cassandra::write_type::WriteType;
use cassandra_sys::CassError_;
use cassandra_sys::CASS_OK;
use cassandra_sys::CassFuture as _Future;
use cassandra_sys::cass_future_error_code;
use cassandra_sys::cass_future_error_message;
use cassandra_sys::cass_future_free;
use cassandra_sys::cass_future_get_error_result;
use cassandra_sys::cass_future_get_prepared;
use cassandra_sys::cass_future_get_result;
use cassandra_sys::cass_future_ready;
use cassandra_sys::cass_future_set_callback;
use cassandra_sys::{cass_true, cass_false};
use std::mem;
use std::slice;
use std::str;
use std::sync::{Arc, Mutex};
use std::marker::PhantomData;
use futures;
#[must_use]
#[derive(Debug)]
pub struct CassFuture<T> {
inner: *mut _Future,
state: Arc<FutureTarget>,
phantom: PhantomData<T>,
}
unsafe impl<T> Sync for CassFuture<T> {}
unsafe impl<T> Send for CassFuture<T> where T: Send {}
impl<T> CassFuture<T> {
pub(crate) fn build(inner: *mut _Future) -> Self {
CassFuture {
inner,
state: Arc::new(FutureTarget { inner: Mutex::new(FutureState::Created) }),
phantom: PhantomData,
}
}
}
impl<T> Drop for CassFuture<T> {
fn drop(&mut self) { unsafe { cass_future_free(self.inner) }; }
}
pub trait Completable where Self: Sized {
unsafe fn get(inner: *mut _Future) -> Option<Self>;
}
impl Completable for () {
unsafe fn get(_inner: *mut _Future) -> Option<Self> {
Some(())
}
}
impl Completable for CassResult {
unsafe fn get(inner: *mut _Future) -> Option<Self> {
cass_future_get_result(inner).as_ref().map(|r| CassResult::build(r as *const _))
}
}
impl Completable for PreparedStatement {
unsafe fn get(inner: *mut _Future) -> Option<Self> {
cass_future_get_prepared(inner).as_ref().map(|r| PreparedStatement::build(r as *const _))
}
}
impl<T: Completable> futures::Future for CassFuture<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { unsafe {
let mut install_callback = false;
let ret = {
let mut lock = self.state.as_ref().inner.lock().expect("poll");
match *lock {
ref mut state @ FutureState::Created => {
if cass_future_ready(self.inner) == cass_true {
get_completion(self.inner).map(futures::Async::Ready)
} else {
*state = FutureState::Awaiting {
task: futures::task::current(),
keep_alive: self.state.clone(),
};
install_callback = true;
Ok(futures::Async::NotReady)
}
},
FutureState::Awaiting { ref mut task, .. } => {
*task = futures::task::current();
Ok(futures::Async::NotReady)
},
FutureState::Ready => {
get_completion(self.inner).map(futures::Async::Ready)
}
}
};
if install_callback {
let data =
(self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
cass_future_set_callback(self.inner, Some(notify_task), data)
.to_result(())
} else {
Ok(())
}.and_then(move |_| ret)
}}
}
unsafe fn get_completion<T: Completable>(inner: *mut _Future) -> Result<T> {
let rc = cass_future_error_code(inner);
match rc {
CASS_OK => {
match Completable::get(inner) {
None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
Some(v) => Ok(v)
}
},
_ => Err(get_cass_future_error(rc, inner)),
}
}
#[derive(Debug)]
struct FutureTarget {
inner: Mutex<FutureState>,
}
#[derive(Debug)]
enum FutureState {
Created,
Awaiting { task: futures::task::Task, keep_alive: Arc<FutureTarget> },
Ready,
}
unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
let future_target: &FutureTarget = &*(data as *const FutureTarget);
let state = {
let mut lock = future_target.inner.lock().expect("notify_task");
mem::replace(&mut *lock, FutureState::Ready)
};
if let FutureState::Awaiting { ref task, .. } = state {
task.notify();
} else {
panic!("Callback invoked before callback set");
}
}