use bytes::Bytes;
use std::convert::TryInto;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::error::{FdbError, FdbResult};
use crate::future::{FdbFuture, FdbFutureKeyValueArray, FdbStreamKeyValue};
use crate::transaction::{FdbTransaction, ReadTransaction};
use crate::tuple::key_util;
use crate::{Key, KeySelector, KeyValue};
#[cfg(feature = "fdb-7_1")]
use crate::Mapper;
#[cfg(feature = "fdb-7_1")]
use crate::future::FdbStreamMappedKeyValue;
pub use crate::option::StreamingMode;
#[derive(Clone, Debug, PartialEq)]
pub struct Range {
begin: Key,
end: Key,
}
impl Range {
pub fn new(begin: impl Into<Key>, end: impl Into<Key>) -> Range {
Range {
begin: begin.into(),
end: end.into(),
}
}
pub fn starts_with(prefix_key: impl Into<Key>) -> Range {
let prefix_key = prefix_key.into();
Range::new(
prefix_key.clone(),
key_util::strinc(prefix_key).unwrap_or_else(|err| {
panic!("Error occurred during `bytes_util::strinc`: {:?}", err)
}),
)
}
pub fn begin_key_ref(&self) -> &Key {
&self.begin
}
pub fn end_key_ref(&self) -> &Key {
&self.end
}
pub fn into_begin_key(self) -> Key {
self.begin
}
pub fn into_end_key(self) -> Key {
self.end
}
pub fn into_parts(self) -> (Key, Key) {
let Range { begin, end } = self;
(begin, end)
}
pub fn into_stream<T>(self, rt: &T, options: RangeOptions) -> FdbStreamKeyValue
where
T: ReadTransaction,
{
let (begin_key, end_key) = self.into_parts();
let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
let end_key_selector = KeySelector::first_greater_or_equal(end_key);
rt.get_range(begin_key_selector, end_key_selector, options)
}
#[cfg(feature = "fdb-7_1")]
pub fn into_mapped_stream<T>(
self,
rt: &T,
mapper: Mapper,
options: RangeOptions,
) -> FdbStreamMappedKeyValue
where
T: ReadTransaction,
{
let (begin_key, end_key) = self.into_parts();
let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
let end_key_selector = KeySelector::first_greater_or_equal(end_key);
rt.get_mapped_range(begin_key_selector, end_key_selector, mapper, options)
}
}
#[derive(Clone, Debug)]
pub struct RangeOptions {
pub(crate) limit: i32,
pub(crate) mode: StreamingMode,
pub(crate) reverse: bool,
}
impl RangeOptions {
pub fn set_limit(&mut self, limit: i32) {
self.limit = limit;
}
pub fn get_limit(&self) -> i32 {
self.limit
}
pub fn set_mode(&mut self, mode: StreamingMode) {
self.mode = mode;
}
pub fn get_mode(&self) -> StreamingMode {
self.mode
}
pub fn set_reverse(&mut self, reverse: bool) {
self.reverse = reverse;
}
pub fn get_reverse(&self) -> bool {
self.reverse
}
pub(crate) fn new(limit: i32, mode: StreamingMode, reverse: bool) -> RangeOptions {
RangeOptions {
limit,
mode,
reverse,
}
}
}
impl Default for RangeOptions {
fn default() -> RangeOptions {
RangeOptions {
limit: 0,
mode: StreamingMode::Iterator,
reverse: false,
}
}
}
#[derive(Debug)]
pub(crate) struct KeyValueArray {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
}
impl KeyValueArray {
pub(crate) fn new(kvs: Vec<KeyValue>, count: i32, more: bool) -> KeyValueArray {
let index = 0;
KeyValueArray {
kvs,
index,
count,
more,
}
}
}
#[derive(Debug)]
enum RangeResultStateMachineState {
Fetching,
KeyValueArrayAvailable,
Error,
Done,
}
#[derive(Debug)]
enum RangeResultStateMachineData {
Fetching {
fdb_future_key_value_array: FdbFutureKeyValueArray,
},
KeyValueArrayAvailable {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
},
Error {
fdb_error: FdbError,
},
Done,
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
enum RangeResultStateMachineEvent {
FetchOk {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
},
FetchNextBatch {
fdb_future_key_value_array: FdbFutureKeyValueArray,
},
FetchError {
fdb_error: FdbError,
},
FetchDone,
}
#[derive(Debug)]
pub(crate) struct RangeResultStateMachine {
transaction: FdbTransaction,
snapshot: bool,
mode: StreamingMode,
reverse: bool,
iteration: Option<i32>,
limit: Option<i32>,
begin_sel: KeySelector,
end_sel: KeySelector,
range_result_state_machine_state: RangeResultStateMachineState,
range_result_state_machine_data: RangeResultStateMachineData,
}
impl RangeResultStateMachine {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
transaction: FdbTransaction,
begin_sel: KeySelector,
end_sel: KeySelector,
mode: StreamingMode,
iteration: Option<i32>,
reverse: bool,
limit: Option<i32>,
snapshot: bool,
fdb_future_key_value_array: FdbFutureKeyValueArray,
) -> RangeResultStateMachine {
RangeResultStateMachine {
transaction,
snapshot,
mode,
reverse,
iteration,
limit,
begin_sel,
end_sel,
range_result_state_machine_state: RangeResultStateMachineState::Fetching,
range_result_state_machine_data: RangeResultStateMachineData::Fetching {
fdb_future_key_value_array,
},
}
}
pub(crate) fn poll_next(
mut self: Pin<&mut RangeResultStateMachine>,
cx: &mut Context<'_>,
) -> Poll<Option<FdbResult<KeyValue>>> {
loop {
match self.range_result_state_machine_state {
RangeResultStateMachineState::Fetching => {
if let RangeResultStateMachineData::Fetching {
ref mut fdb_future_key_value_array,
} = self.range_result_state_machine_data
{
match Pin::new(fdb_future_key_value_array).poll(cx) {
Poll::Ready(res) => match res {
Ok(key_value_array) => {
let KeyValueArray {
kvs,
index,
count,
more,
} = key_value_array;
if count == 0 {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchDone,
);
} else {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchOk {
kvs,
index,
count,
more,
},
);
}
}
Err(fdb_error) => {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchError { fdb_error },
);
}
},
Poll::Pending => return Poll::Pending,
}
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::KeyValueArrayAvailable => {
if let RangeResultStateMachineData::KeyValueArrayAvailable {
ref kvs,
ref mut index,
count,
more,
} = self.range_result_state_machine_data
{
if *index == count {
if more {
if let Some(0) = self.limit {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchDone,
);
} else {
let options = match self.limit {
Some(limit) => RangeOptions {
limit,
mode: self.mode,
reverse: self.reverse,
},
None => RangeOptions {
limit: 0,
mode: self.mode,
reverse: self.reverse,
},
};
let fdb_future_key_value_array = fdb_transaction_get_range(
self.transaction.get_c_api_ptr(),
self.begin_sel.clone(),
self.end_sel.clone(),
options,
self.iteration.unwrap_or(0),
self.snapshot,
);
self.step_once_with_event(
RangeResultStateMachineEvent::FetchNextBatch {
fdb_future_key_value_array,
},
);
}
} else {
self.step_once_with_event(RangeResultStateMachineEvent::FetchDone);
}
} else {
let result = kvs[TryInto::<usize>::try_into(*index).unwrap()].clone();
*index += 1;
return Poll::Ready(Some(Ok(result)));
}
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::Error => {
if let RangeResultStateMachineData::Error { fdb_error } =
self.range_result_state_machine_data
{
return Poll::Ready(Some(Err(fdb_error)));
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::Done => return Poll::Ready(None),
}
}
}
fn step_once_with_event(&mut self, event: RangeResultStateMachineEvent) {
self.range_result_state_machine_state = match self.range_result_state_machine_state {
RangeResultStateMachineState::Fetching => match event {
RangeResultStateMachineEvent::FetchOk {
kvs,
index,
count,
more,
} => {
if more {
if let Some(iteration) = self.iteration.as_mut() {
*iteration += 1;
}
if let Some(limit) = self.limit.as_mut() {
*limit -= count;
}
let last_index = TryInto::<usize>::try_into(count - 1).unwrap();
if self.reverse {
self.end_sel = KeySelector::first_greater_or_equal(
kvs[last_index].get_key_ref().clone(),
);
} else {
self.begin_sel = KeySelector::first_greater_than(
kvs[last_index].get_key_ref().clone(),
);
}
}
self.range_result_state_machine_data =
RangeResultStateMachineData::KeyValueArrayAvailable {
kvs,
index,
count,
more,
};
RangeResultStateMachineState::KeyValueArrayAvailable
}
RangeResultStateMachineEvent::FetchDone => {
self.range_result_state_machine_data = RangeResultStateMachineData::Done;
RangeResultStateMachineState::Done
}
RangeResultStateMachineEvent::FetchError { fdb_error } => {
self.range_result_state_machine_data =
RangeResultStateMachineData::Error { fdb_error };
RangeResultStateMachineState::Error
}
_ => panic!("Invalid event!"),
},
RangeResultStateMachineState::KeyValueArrayAvailable => match event {
RangeResultStateMachineEvent::FetchNextBatch {
fdb_future_key_value_array,
} => {
self.range_result_state_machine_data = RangeResultStateMachineData::Fetching {
fdb_future_key_value_array,
};
RangeResultStateMachineState::Fetching
}
RangeResultStateMachineEvent::FetchDone => {
self.range_result_state_machine_data = RangeResultStateMachineData::Done;
RangeResultStateMachineState::Done
}
_ => panic!("Invalid event!"),
},
RangeResultStateMachineState::Error | RangeResultStateMachineState::Done => {
panic!("Invalid event!");
}
}
}
}
pub(crate) fn fdb_transaction_get_range(
transaction: *mut fdb_sys::FDBTransaction,
begin_key: KeySelector,
end_key: KeySelector,
options: RangeOptions,
iteration: i32,
snapshot: bool,
) -> FdbFutureKeyValueArray {
let (key, begin_or_equal, begin_offset) = begin_key.deconstruct();
let bk = Bytes::from(key);
let begin_key_name = bk.as_ref().as_ptr();
let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
let begin_or_equal = if begin_or_equal { 1 } else { 0 };
let (key, end_or_equal, end_offset) = end_key.deconstruct();
let ek = Bytes::from(key);
let end_key_name = ek.as_ref().as_ptr();
let end_key_name_length = ek.as_ref().len().try_into().unwrap();
let end_or_equal = if end_or_equal { 1 } else { 0 };
let target_bytes = 0;
let limit = options.get_limit();
let mode = options.get_mode().code();
let reverse = if options.get_reverse() { 1 } else { 0 };
let s = if snapshot { 1 } else { 0 };
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_range(
transaction,
begin_key_name,
begin_key_name_length,
begin_or_equal,
begin_offset,
end_key_name,
end_key_name_length,
end_or_equal,
end_offset,
limit,
target_bytes,
mode,
iteration,
s,
reverse,
)
})
}
#[cfg(test)]
mod tests {
use impls::impls;
use super::RangeOptions;
#[test]
fn impls() {
#[rustfmt::skip]
assert!(impls!(
RangeOptions:
Default));
}
}