use crate::cassandra::custom_payload::CustomPayloadResponse;
use crate::cassandra::error::*;
use crate::cassandra::prepared::PreparedStatement;
use crate::cassandra::result::CassResult;
use crate::cassandra::util::{Protected, ProtectedWithSession};
use crate::cassandra_sys::cass_future_custom_payload_item;
use crate::cassandra_sys::cass_future_custom_payload_item_count;
use crate::cassandra_sys::cass_future_error_code;
use crate::cassandra_sys::cass_future_free;
use crate::cassandra_sys::cass_future_get_prepared;
use crate::cassandra_sys::cass_future_get_result;
use crate::cassandra_sys::cass_future_ready;
use crate::cassandra_sys::cass_future_set_callback;
use crate::cassandra_sys::cass_true;
use crate::cassandra_sys::CassFuture as _Future;
use crate::cassandra_sys::CASS_OK;
use crate::Session;
use parking_lot::Mutex;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::slice;
use std::str;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
#[must_use]
#[derive(Debug)]
pub struct CassFuture<T> {
inner: *mut _Future,
state: Arc<FutureTarget>,
session: Option<Session>,
phantom: PhantomData<T>,
}
unsafe impl<T> Sync for CassFuture<T> {}
unsafe impl<T> Send for CassFuture<T> where T: Send {}
impl<T> Unpin for CassFuture<T> {}
impl<T> CassFuture<T> {
pub(crate) fn build(session: Session, inner: *mut _Future) -> Self {
CassFuture {
inner,
session: Some(session),
state: Arc::new(FutureTarget {
inner: Mutex::new(FutureState::Created),
}),
phantom: PhantomData,
}
}
fn take_session(&mut self) -> Session {
self.session.take().expect(
"invariant: could not take session from CassFuture that already has had session taken.",
)
}
}
impl<T> Drop for CassFuture<T> {
fn drop(&mut self) {
unsafe { cass_future_free(self.inner) };
}
}
pub trait Completable
where
Self: Sized,
{
unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self>;
}
impl Completable for () {
unsafe fn get(_session: Session, _inner: *mut _Future) -> Option<Self> {
Some(())
}
}
impl Completable for Session {
unsafe fn get(session: Session, _inner: *mut _Future) -> Option<Self> {
Some(session)
}
}
impl Completable for CassResult {
unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
cass_future_get_result(inner)
.as_ref()
.map(|r| CassResult::build(r as *const _))
}
}
impl Completable for PreparedStatement {
unsafe fn get(session: Session, inner: *mut _Future) -> Option<Self> {
cass_future_get_prepared(inner)
.as_ref()
.map(|r| PreparedStatement::build(r as *const _, session))
}
}
unsafe fn payloads_from_future(future: *mut _Future) -> Result<CustomPayloadResponse> {
let cp_count = cass_future_custom_payload_item_count(future);
(0..cp_count)
.map(|index| {
let mut name = std::ptr::null();
let mut name_length = 0;
let mut value = std::ptr::null();
let mut value_size = 0;
cass_future_custom_payload_item(
future,
index,
&mut name,
&mut name_length,
&mut value,
&mut value_size,
)
.to_result((name, name_length, value, value_size))
.and_then(|(name, name_length, value, value_size)| {
let name_slice = slice::from_raw_parts(name as *const u8, name_length);
str::from_utf8(name_slice)
.map_err(|err| err.into())
.map(|name| {
(
name.to_string(),
slice::from_raw_parts(value, value_size).to_vec(),
)
})
})
})
.collect::<Result<CustomPayloadResponse>>()
}
impl Completable for (CassResult, CustomPayloadResponse) {
unsafe fn get(_session: Session, inner: *mut _Future) -> Option<Self> {
payloads_from_future(inner).ok().and_then(|payloads| {
cass_future_get_result(inner)
.as_ref()
.map(|r| (CassResult::build(r as *const _), payloads))
})
}
}
impl<T: Completable> CassFuture<T> {
pub fn wait(mut self) -> Result<T> {
unsafe { get_completion(self.take_session(), self.inner) }
}
}
impl<T: Completable> Future for CassFuture<T> {
type Output = Result<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut install_callback = false;
let ret = {
let mut lock = self.state.as_ref().inner.lock();
match *lock {
ref mut state @ FutureState::Created => {
if unsafe { cass_future_ready(self.inner) } == cass_true {
Poll::Ready(self.inner)
} else {
*state = FutureState::Awaiting {
waker: cx.waker().clone(),
keep_alive: self.state.clone(),
};
install_callback = true;
Poll::Pending
}
}
FutureState::Awaiting { ref mut waker, .. } => {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
Poll::Pending
}
FutureState::Ready => {
Poll::Ready(self.inner)
}
}
};
if install_callback {
let data = (self.state.as_ref() as *const FutureTarget) as *mut ::std::os::raw::c_void;
unsafe { cass_future_set_callback(self.inner, Some(notify_task), data) }
.to_result(())?;
}
match ret {
Poll::Pending => Poll::Pending,
Poll::Ready(inner) => {
Poll::Ready(unsafe { get_completion(self.take_session(), inner) })
}
}
}
}
unsafe fn get_completion<T: Completable>(session: Session, inner: *mut _Future) -> Result<T> {
let rc = cass_future_error_code(inner);
match rc {
CASS_OK => match Completable::get(session, inner) {
None => Err(CassErrorCode::LIB_NULL_VALUE.to_error()),
Some(v) => Ok(v),
},
_ => Err(get_cass_future_error(rc, inner)),
}
}
#[derive(Debug)]
struct FutureTarget {
inner: Mutex<FutureState>,
}
#[derive(Debug)]
enum FutureState {
Created,
Awaiting {
waker: Waker,
keep_alive: Arc<FutureTarget>,
},
Ready,
}
unsafe extern "C" fn notify_task(_c_future: *mut _Future, data: *mut ::std::os::raw::c_void) {
let future_target: &FutureTarget = &*(data as *const FutureTarget);
let state = {
let mut lock = future_target.inner.lock();
mem::replace(&mut *lock, FutureState::Ready)
};
if let FutureState::Awaiting { ref waker, .. } = state {
waker.wake_by_ref();
} else {
panic!("Callback invoked before callback set");
}
}