use bytes::Bytes;
use futures::task::AtomicWaker;
use futures::Stream;
use std::convert::TryInto;
use std::ffi;
use std::ffi::{CStr, CString};
use std::future::Future;
use std::marker::PhantomData;
use std::marker::Unpin;
use std::pin::Pin;
use std::ptr::{self, NonNull};
use std::slice;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::{check, FdbResult};
use crate::range::{
fdb_transaction_get_range, KeyValueArray, RangeOptions, RangeResultStateMachine, StreamingMode,
};
use crate::transaction::FdbTransaction;
use crate::{Key, KeySelector, KeyValue, Value};
#[cfg(feature = "fdb-7_1")]
use crate::{MappedKeyValue, Mapper};
#[cfg(feature = "fdb-7_1")]
use crate::mapped_range::{
fdb_transaction_get_mapped_range, MappedKeyValueArray, MappedRangeResultStateMachine,
};
#[cfg(feature = "fdb-7_1")]
use crate::range::Range;
#[derive(Debug)]
pub struct FdbFuture<T> {
c_ptr: Option<NonNull<fdb_sys::FDBFuture>>,
callback_set: bool,
waker: Option<Arc<AtomicWaker>>,
_marker: PhantomData<T>,
}
impl<T> FdbFuture<T> {
pub unsafe fn is_ready(&self) -> bool {
let fut_c_ptr = self.c_ptr.as_ref().unwrap().as_ptr();
fdb_sys::fdb_future_is_ready(fut_c_ptr) != 0
}
pub(crate) fn new(c_ptr: *mut fdb_sys::FDBFuture) -> FdbFuture<T> {
FdbFuture {
c_ptr: Some(NonNull::new(c_ptr).expect("c_ptr cannot be null")),
callback_set: false,
waker: Some(Arc::new(AtomicWaker::new())),
_marker: PhantomData,
}
}
}
unsafe impl<T> Send for FdbFuture<T> {}
unsafe impl<T> Sync for FdbFuture<T> {}
#[allow(non_snake_case)]
extern "C" fn FDBCallback(_f: *mut fdb_sys::FDBFuture, callback_parameter: *mut ffi::c_void) {
let arc_atomic_waker = unsafe { Arc::from_raw(callback_parameter as *const AtomicWaker) };
arc_atomic_waker.wake();
}
impl<T> Drop for FdbFuture<T> {
fn drop(&mut self) {
if let Some(c_ptr) = self.c_ptr.take() {
unsafe {
fdb_sys::fdb_future_destroy(c_ptr.as_ptr());
}
}
}
}
impl<T> Future for FdbFuture<T>
where
T: FdbFutureGet + Unpin,
{
type Output = FdbResult<T>;
fn poll(self: Pin<&mut FdbFuture<T>>, cx: &mut Context<'_>) -> Poll<FdbResult<T>> {
if self.waker.is_none() {
panic!("Poll called after Poll::Ready(...) was returned!");
}
let fut_c_ptr = self.c_ptr.as_ref().unwrap().as_ptr();
let fdb_fut_ref = self.get_mut();
if unsafe { fdb_sys::fdb_future_is_ready(fut_c_ptr) } != 0 {
fdb_fut_ref.waker = None;
Poll::Ready(unsafe { FdbFutureGet::get(fut_c_ptr) })
} else {
let arc_atomic_waker_ref = fdb_fut_ref.waker.as_ref().unwrap();
arc_atomic_waker_ref.register(cx.waker());
if unsafe { fdb_sys::fdb_future_is_ready(fut_c_ptr) } != 0 {
fdb_fut_ref.waker = None;
Poll::Ready(unsafe { FdbFutureGet::get(fut_c_ptr) })
} else if !fdb_fut_ref.callback_set {
let arc_atomic_waker_copy_ptr = Arc::into_raw(arc_atomic_waker_ref.clone());
match check(unsafe {
fdb_sys::fdb_future_set_callback(
fut_c_ptr,
Some(FDBCallback),
arc_atomic_waker_copy_ptr as *mut ffi::c_void,
)
}) {
Ok(_) => {
fdb_fut_ref.callback_set = true;
Poll::Pending
}
Err(err) => {
drop(unsafe { Arc::from_raw(arc_atomic_waker_copy_ptr) });
Poll::Ready(Err(err))
}
}
} else {
Poll::Pending
}
}
}
}
pub trait FdbFutureGet {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<Self>
where
Self: Sized;
}
pub type FdbFutureUnit = FdbFuture<()>;
impl FdbFutureGet for () {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<()> {
check(fdb_sys::fdb_future_get_error(future))
}
}
pub type FdbFutureI64 = FdbFuture<i64>;
impl FdbFutureGet for i64 {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<i64> {
let mut out = 0;
check(fdb_sys::fdb_future_get_int64(future, &mut out)).map(|_| out)
}
}
pub type FdbFutureKey = FdbFuture<Key>;
impl FdbFutureGet for Key {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<Key> {
let mut out_key = ptr::null();
let mut out_key_length = 0;
check(fdb_sys::fdb_future_get_key(
future,
&mut out_key,
&mut out_key_length,
))
.map(|_| {
Bytes::copy_from_slice(if out_key_length == 0 {
&b""[..]
} else {
slice::from_raw_parts(out_key, out_key_length.try_into().unwrap())
})
.into()
})
}
}
pub type FdbFutureMaybeValue = FdbFuture<Option<Value>>;
impl FdbFutureGet for Option<Value> {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<Option<Value>> {
let mut out_present = 0;
let mut out_value = ptr::null();
let mut out_value_length = 0;
check(fdb_sys::fdb_future_get_value(
future,
&mut out_present,
&mut out_value,
&mut out_value_length,
))
.map(|_| {
if out_present != 0 {
Some(
Bytes::copy_from_slice(if out_value_length == 0 {
&b""[..]
} else {
slice::from_raw_parts(out_value, out_value_length.try_into().unwrap())
})
.into(),
)
} else {
None
}
})
}
}
pub type FdbFutureCStringArray = FdbFuture<Vec<CString>>;
impl FdbFutureGet for Vec<CString> {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<Vec<CString>> {
let mut out_strings = ptr::null_mut();
let mut out_count = 0;
check(fdb_sys::fdb_future_get_string_array(
future,
&mut out_strings,
&mut out_count,
))
.map(|_| {
let mut cstring_list = Vec::with_capacity(out_count.try_into().unwrap());
(0..out_count).into_iter().for_each(|i| {
cstring_list.push(CString::from(CStr::from_ptr(
*out_strings.offset(i.try_into().unwrap()),
)));
});
cstring_list
})
}
}
#[cfg(feature = "fdb-7_1")]
pub type FdbFutureKeyArray = FdbFuture<Vec<Key>>;
#[cfg(feature = "fdb-7_1")]
impl FdbFutureGet for Vec<Key> {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<Vec<Key>> {
let mut out_key_array = ptr::null();
let mut out_count = 0;
check(fdb_sys::fdb_future_get_key_array(
future,
&mut out_key_array,
&mut out_count,
))
.map(|_| {
let mut ks = Vec::with_capacity(out_count.try_into().unwrap());
(0..out_count).into_iter().for_each(|i| {
let k = out_key_array.offset(i.try_into().unwrap());
let key = Bytes::copy_from_slice(slice::from_raw_parts(
(*k).key,
(*k).key_length.try_into().unwrap(),
))
.into();
ks.push(key);
});
ks
})
}
}
pub(crate) type FdbFutureKeyValueArray = FdbFuture<KeyValueArray>;
impl FdbFutureGet for KeyValueArray {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<KeyValueArray> {
let mut out_kv = ptr::null();
let mut out_count = 0;
let mut out_more = 0;
check(fdb_sys::fdb_future_get_keyvalue_array(
future,
&mut out_kv,
&mut out_count,
&mut out_more,
))
.map(|_| {
let mut kvs = Vec::with_capacity(out_count.try_into().unwrap());
(0..out_count).into_iter().for_each(|i| {
let kv = out_kv.offset(i.try_into().unwrap());
let key = Bytes::copy_from_slice(slice::from_raw_parts(
(*kv).key,
(*kv).key_length.try_into().unwrap(),
))
.into();
let value = Bytes::copy_from_slice(slice::from_raw_parts(
(*kv).value,
(*kv).value_length.try_into().unwrap(),
))
.into();
kvs.push(KeyValue::new(key, value));
});
KeyValueArray::new(kvs, out_count, out_more != 0)
})
}
}
#[cfg(feature = "fdb-7_1")]
pub(crate) type FdbFutureMappedKeyValueArray = FdbFuture<MappedKeyValueArray>;
#[cfg(feature = "fdb-7_1")]
impl FdbFutureGet for MappedKeyValueArray {
unsafe fn get(future: *mut fdb_sys::FDBFuture) -> FdbResult<MappedKeyValueArray> {
let mut out_mkv = ptr::null();
let mut out_count = 0;
let mut out_more = 0;
check(fdb_sys::fdb_future_get_mappedkeyvalue_array(
future,
&mut out_mkv,
&mut out_count,
&mut out_more,
))
.map(|_| {
let mut mkvs = Vec::with_capacity(out_count.try_into().unwrap());
(0..out_count).into_iter().for_each(|i| {
let mkv = out_mkv.offset(i.try_into().unwrap());
let key_value = {
let key = Bytes::copy_from_slice(slice::from_raw_parts(
(*mkv).key.key,
(*mkv).key.key_length.try_into().unwrap(),
))
.into();
let value = Bytes::copy_from_slice(slice::from_raw_parts(
(*mkv).value.key,
(*mkv).value.key_length.try_into().unwrap(),
))
.into();
KeyValue::new(key, value)
};
let range = {
let begin = Bytes::copy_from_slice(slice::from_raw_parts(
(*mkv).getRange.begin.key.key,
(*mkv).getRange.begin.key.key_length.try_into().unwrap(),
));
let end = Bytes::copy_from_slice(slice::from_raw_parts(
(*mkv).getRange.end.key.key,
(*mkv).getRange.end.key.key_length.try_into().unwrap(),
));
Range::new(begin, end)
};
let range_result_count = (*mkv).getRange.m_size;
let mut range_result = Vec::with_capacity(range_result_count.try_into().unwrap());
(0..range_result_count).into_iter().for_each(|j| {
let kv = (*mkv).getRange.data.offset(j.try_into().unwrap());
let key = Bytes::copy_from_slice(slice::from_raw_parts(
(*kv).key,
(*kv).key_length.try_into().unwrap(),
))
.into();
let value = Bytes::copy_from_slice(slice::from_raw_parts(
(*kv).value,
(*kv).value_length.try_into().unwrap(),
))
.into();
range_result.push(KeyValue::new(key, value));
});
mkvs.push(MappedKeyValue::new(key_value, range, range_result));
});
MappedKeyValueArray::new(mkvs, out_count, out_more != 0)
})
}
}
#[derive(Debug)]
pub struct FdbStreamKeyValue {
range_result_state_machine: RangeResultStateMachine,
}
impl FdbStreamKeyValue {
pub(crate) fn new(
transaction: FdbTransaction,
begin: KeySelector,
end: KeySelector,
options: RangeOptions,
snapshot: bool,
) -> FdbStreamKeyValue {
let limit = if options.get_limit() == 0 {
None
} else {
Some(options.get_limit())
};
let mode = options.get_mode();
let reverse = options.get_reverse();
let iteration = if options.get_mode() == StreamingMode::Iterator {
Some(1)
} else {
None
};
let fdb_future_key_value_array = fdb_transaction_get_range(
transaction.get_c_api_ptr(),
begin.clone(),
end.clone(),
RangeOptions::new(limit.unwrap_or(0), mode, reverse),
iteration.unwrap_or(0),
snapshot,
);
let range_result_state_machine = RangeResultStateMachine::new(
transaction,
begin,
end,
mode,
iteration,
reverse,
limit,
snapshot,
fdb_future_key_value_array,
);
FdbStreamKeyValue {
range_result_state_machine,
}
}
}
impl Stream for FdbStreamKeyValue {
type Item = FdbResult<KeyValue>;
fn poll_next(
mut self: Pin<&mut FdbStreamKeyValue>,
cx: &mut Context<'_>,
) -> Poll<Option<FdbResult<KeyValue>>> {
Pin::new(&mut self.range_result_state_machine).poll_next(cx)
}
}
#[cfg(feature = "fdb-7_1")]
#[derive(Debug)]
pub struct FdbStreamMappedKeyValue {
mapped_range_result_state_machine: MappedRangeResultStateMachine,
}
#[cfg(feature = "fdb-7_1")]
impl FdbStreamMappedKeyValue {
pub(crate) fn new(
transaction: FdbTransaction,
begin: KeySelector,
end: KeySelector,
mapper: Mapper,
options: RangeOptions,
snapshot: bool,
) -> FdbStreamMappedKeyValue {
let limit = if options.get_limit() == 0 {
None
} else {
Some(options.get_limit())
};
let mode = options.get_mode();
let reverse = options.get_reverse();
let iteration = if options.get_mode() == StreamingMode::Iterator {
Some(1)
} else {
None
};
let fdb_future_mapped_key_value_array = fdb_transaction_get_mapped_range(
transaction.get_c_api_ptr(),
begin.clone(),
end.clone(),
mapper.clone(),
RangeOptions::new(limit.unwrap_or(0), mode, reverse),
iteration.unwrap_or(0),
snapshot,
);
let mapped_range_result_state_machine = MappedRangeResultStateMachine::new(
transaction,
begin,
end,
mapper,
mode,
iteration,
reverse,
limit,
snapshot,
fdb_future_mapped_key_value_array,
);
FdbStreamMappedKeyValue {
mapped_range_result_state_machine,
}
}
}
#[cfg(feature = "fdb-7_1")]
impl Stream for FdbStreamMappedKeyValue {
type Item = FdbResult<MappedKeyValue>;
fn poll_next(
mut self: Pin<&mut FdbStreamMappedKeyValue>,
cx: &mut Context<'_>,
) -> Poll<Option<FdbResult<MappedKeyValue>>> {
Pin::new(&mut self.mapped_range_result_state_machine).poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use futures::task::AtomicWaker;
use futures::Stream;
use impls::impls;
use std::future::Future;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::sync::Arc;
use super::{
FdbFutureCStringArray, FdbFutureI64, FdbFutureKey, FdbFutureKeyValueArray,
FdbFutureMaybeValue, FdbFutureUnit, FdbStreamKeyValue,
};
#[cfg(feature = "fdb-7_1")]
use super::{FdbFutureKeyArray, FdbFutureMappedKeyValueArray, FdbStreamMappedKeyValue};
#[test]
fn impls() {
#[rustfmt::skip]
assert!(impls!(
FdbFutureUnit:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbFutureI64:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbFutureKey:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbFutureMaybeValue:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbFutureCStringArray:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbFutureKeyValueArray:
Send &
Future &
!Clone &
!Copy));
#[rustfmt::skip]
assert!(impls!(
FdbStreamKeyValue:
Send &
Stream &
!Clone &
!Copy));
#[cfg(feature = "fdb-7_1")]
#[rustfmt::skip]
assert!(impls!(
FdbFutureKeyArray:
Send &
Future &
!Clone &
!Copy));
#[cfg(feature = "fdb-7_1")]
#[rustfmt::skip]
assert!(impls!(
FdbFutureMappedKeyValueArray:
Send &
Future &
!Clone &
!Copy));
#[cfg(feature = "fdb-7_1")]
#[rustfmt::skip]
assert!(impls!(
FdbStreamMappedKeyValue:
Send &
Stream &
!Clone &
!Copy));
}
#[allow(dead_code)]
#[derive(Debug)]
struct DummyFdbFuture<T> {
c_ptr: Option<NonNull<fdb_sys::FDBFuture>>,
callback_set: bool,
waker: Option<Arc<AtomicWaker>>,
_marker: PhantomData<T>,
}
unsafe impl<T> Send for DummyFdbFuture<T> {}
#[test]
fn trait_bounds() {
fn trait_bounds_for_fdb_transaction<T>(_t: T)
where
T: Send + 'static,
{
}
let d = DummyFdbFuture::<()> {
c_ptr: Some(NonNull::dangling()),
callback_set: false,
waker: Some(Arc::new(AtomicWaker::new())),
_marker: PhantomData,
};
trait_bounds_for_fdb_transaction(d);
}
}