//! Provides types for working with FDB range.
use bytes::Bytes;
use std::convert::TryInto;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::error::{FdbError, FdbResult};
use crate::future::{FdbFuture, FdbFutureKeyValueArray, FdbStreamKeyValue};
use crate::transaction::{FdbTransaction, ReadTransaction};
use crate::tuple::key_util;
use crate::{Key, KeySelector, KeyValue};
#[cfg(feature = "fdb-7_1")]
use crate::Mapper;
#[cfg(feature = "fdb-7_1")]
use crate::future::FdbStreamMappedKeyValue;
pub use crate::option::StreamingMode;
/// [`Range`] describes an exact range of keyspace, specified by a
/// begin and end key.
///
/// As with all FDB APIs, begin is inclusive, and end exclusive.
#[derive(Clone, Debug, PartialEq)]
pub struct Range {
begin: Key,
end: Key,
}
impl Range {
/// Construct a new [`Range`] with an inclusive begin key an
/// exclusive end key.
pub fn new(begin: impl Into<Key>, end: impl Into<Key>) -> Range {
Range {
begin: begin.into(),
end: end.into(),
}
}
/// Return a [`Range`] that describes all possible keys that are
/// prefixed with the specified key.
///
/// # Panic
///
/// Panics if the supplied [`Key`] is empty or contains only
/// `0xFF` bytes.
pub fn starts_with(prefix_key: impl Into<Key>) -> Range {
let prefix_key = prefix_key.into();
Range::new(
prefix_key.clone(),
key_util::strinc(prefix_key).unwrap_or_else(|err| {
panic!("Error occurred during `bytes_util::strinc`: {:?}", err)
}),
)
}
/// Gets a reference to the begin [`Key`] of the [`Range`].
pub fn begin_key_ref(&self) -> &Key {
&self.begin
}
/// Gets a reference to the end [`Key`] of the [`Range`].
pub fn end_key_ref(&self) -> &Key {
&self.end
}
/// Extract begin [`Key`] from the [`Range`].
pub fn into_begin_key(self) -> Key {
self.begin
}
/// Extract end [`Key`] from the [`Range`].
pub fn into_end_key(self) -> Key {
self.end
}
/// Extract begin and end [`Key`] from the [`Range`].
pub fn into_parts(self) -> (Key, Key) {
let Range { begin, end } = self;
(begin, end)
}
/// Gets an ordered range of keys and values from the database.
///
/// The returned [`FdbStreamKeyValue`] implements [`Stream`] trait
/// that yields a [`KeyValue`] item.
///
/// [`Stream`]: futures::Stream
pub fn into_stream<T>(self, rt: &T, options: RangeOptions) -> FdbStreamKeyValue
where
T: ReadTransaction,
{
let (begin_key, end_key) = self.into_parts();
let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
let end_key_selector = KeySelector::first_greater_or_equal(end_key);
rt.get_range(begin_key_selector, end_key_selector, options)
}
#[cfg(feature = "fdb-7_1")]
/// Gets an ordered range of mapped keys and values from the
/// database.
///
/// The returned [`FdbStreamMappedKeyValue`] implements [`Stream`] trait
/// that yields a [`MappedKeyValue`] item.
///
/// [`Stream`]: futures::Stream
/// [`MappedKeyValue`]: crate::MappedKeyValue
pub fn into_mapped_stream<T>(
self,
rt: &T,
mapper: Mapper,
options: RangeOptions,
) -> FdbStreamMappedKeyValue
where
T: ReadTransaction,
{
let (begin_key, end_key) = self.into_parts();
let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
let end_key_selector = KeySelector::first_greater_or_equal(end_key);
rt.get_mapped_range(begin_key_selector, end_key_selector, mapper, options)
}
}
/// [`RangeOptions`] specify how a database range operation is carried out.
///
/// There are three parameters for which accessors methods are provided.
///
/// 1. Limit restricts the number of key-value pairs returned as part
/// of a range read. A value of zero indicates no limit.
///
/// 2. Mode sets the [streaming mode] of the range read, allowing
/// database to balance latency and bandwidth for this read.
///
/// 3. Reverse indicates that the read should be performed
/// lexicographic order (when false) or reverse lexicographic (when
/// true).
///
/// When reverse is true and limit is non-zero, last limit
/// key-value pairs in the range are returned. Ranges in reverse is
/// supported natively by the database should have minimal extra
/// cost.
///
/// To create a value of [`RangeOptions`] type, use
/// [`Default::default`] method. The default value represents - no
/// limit, [iterator streaming mode] and lexicographic order.
///
/// [streaming mode]: StreamingMode
/// [iterator streaming mode]: StreamingMode::Iterator
#[derive(Clone, Debug)]
pub struct RangeOptions {
pub(crate) limit: i32,
pub(crate) mode: StreamingMode,
pub(crate) reverse: bool,
}
impl RangeOptions {
/// Set limit
pub fn set_limit(&mut self, limit: i32) {
self.limit = limit;
}
/// Get limit
pub fn get_limit(&self) -> i32 {
self.limit
}
/// Set streaming mode
pub fn set_mode(&mut self, mode: StreamingMode) {
self.mode = mode;
}
/// Get streaming mode
pub fn get_mode(&self) -> StreamingMode {
self.mode
}
/// Set the read order (lexicographic or non-lexicographic)
pub fn set_reverse(&mut self, reverse: bool) {
self.reverse = reverse;
}
/// Get the read order (lexicographic or non-lexicographic)
pub fn get_reverse(&self) -> bool {
self.reverse
}
pub(crate) fn new(limit: i32, mode: StreamingMode, reverse: bool) -> RangeOptions {
RangeOptions {
limit,
mode,
reverse,
}
}
}
impl Default for RangeOptions {
fn default() -> RangeOptions {
RangeOptions {
limit: 0,
mode: StreamingMode::Iterator,
reverse: false,
}
}
}
// Java API refers to this type `RangeResult` and Go API has something
// simliar with `futureKeyValueArray` and `[]KeyValue`. Go API
// `RangeResult` is similar to Java API `RangeQuery`. Be careful and
// don't confuse Java API `RangeResult` with Go API `RangeResult`.
#[derive(Debug)]
pub(crate) struct KeyValueArray {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
}
impl KeyValueArray {
pub(crate) fn new(kvs: Vec<KeyValue>, count: i32, more: bool) -> KeyValueArray {
let index = 0;
KeyValueArray {
kvs,
index,
count,
more,
}
}
}
#[derive(Debug)]
enum RangeResultStateMachineState {
Fetching,
KeyValueArrayAvailable,
Error,
Done,
}
#[derive(Debug)]
enum RangeResultStateMachineData {
Fetching {
fdb_future_key_value_array: FdbFutureKeyValueArray,
},
KeyValueArrayAvailable {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
},
Error {
fdb_error: FdbError,
},
Done,
}
// The variant names match with the sismic events.
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
enum RangeResultStateMachineEvent {
FetchOk {
kvs: Vec<KeyValue>,
index: i32,
count: i32,
more: bool,
},
FetchNextBatch {
fdb_future_key_value_array: FdbFutureKeyValueArray,
},
FetchError {
fdb_error: FdbError,
},
FetchDone,
}
// A state machine that returns the key-value pairs from the database
// satisfying the range specified in a range read.
//
// See `sismic/range_result_state_machine.yaml` for the design of the
// state machine.
#[derive(Debug)]
pub(crate) struct RangeResultStateMachine {
transaction: FdbTransaction,
snapshot: bool,
mode: StreamingMode,
reverse: bool,
// This is *only* used in case of `StreamingMode::Iterator`. In
// other cases, we set it to `None`.
iteration: Option<i32>,
// When `limit` is `None`, it means that the C API is allowed to
// choose how many key values it can return. If `limit` is
// `Some(x)` then that is the *maximum* allowed KVs, but it can
// return less. Therefore in subsequent calls to `get_range`, we
// reduce the limit.
//
// *Note* When `StreamingMode::Exact` is used, `limit` *must* be
// specified. However, we don't check for this as binding tester
// checks for `2210` errors.
limit: Option<i32>,
begin_sel: KeySelector,
end_sel: KeySelector,
range_result_state_machine_state: RangeResultStateMachineState,
range_result_state_machine_data: RangeResultStateMachineData,
}
impl RangeResultStateMachine {
// We need to have these parameters in order to construct a value
// of `RangeResultStateMachine` type. This is an internal API and
// the meaning of the parameters documented above.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
transaction: FdbTransaction,
begin_sel: KeySelector,
end_sel: KeySelector,
mode: StreamingMode,
iteration: Option<i32>,
reverse: bool,
limit: Option<i32>,
snapshot: bool,
fdb_future_key_value_array: FdbFutureKeyValueArray,
) -> RangeResultStateMachine {
RangeResultStateMachine {
transaction,
snapshot,
mode,
reverse,
iteration,
limit,
begin_sel,
end_sel,
range_result_state_machine_state: RangeResultStateMachineState::Fetching,
range_result_state_machine_data: RangeResultStateMachineData::Fetching {
fdb_future_key_value_array,
},
}
}
pub(crate) fn poll_next(
mut self: Pin<&mut RangeResultStateMachine>,
cx: &mut Context<'_>,
) -> Poll<Option<FdbResult<KeyValue>>> {
loop {
match self.range_result_state_machine_state {
RangeResultStateMachineState::Fetching => {
if let RangeResultStateMachineData::Fetching {
ref mut fdb_future_key_value_array,
} = self.range_result_state_machine_data
{
match Pin::new(fdb_future_key_value_array).poll(cx) {
Poll::Ready(res) => match res {
Ok(key_value_array) => {
let KeyValueArray {
kvs,
index,
count,
more,
} = key_value_array;
if count == 0 {
// In case count is zero, we are done.
self.step_once_with_event(
RangeResultStateMachineEvent::FetchDone,
);
} else {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchOk {
kvs,
index,
count,
more,
},
);
}
}
Err(fdb_error) => {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchError { fdb_error },
);
}
},
Poll::Pending => return Poll::Pending,
}
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::KeyValueArrayAvailable => {
if let RangeResultStateMachineData::KeyValueArrayAvailable {
ref kvs,
ref mut index,
count,
more,
} = self.range_result_state_machine_data
{
// Unlike in Python, where the `index ==
// count` check is done when returning the
// last element, in our case the last element
// gets returned and in the next call to
// `poll_next`, we do our check.
if *index == count {
// Should we get more?
if more {
if let Some(0) = self.limit {
self.step_once_with_event(
RangeResultStateMachineEvent::FetchDone,
);
} else {
// `limit` is either `None` or
// non-zero.
// iteration, limit, begin_sel and
// end_sel have already been updated
// in the transition action.
let options = match self.limit {
Some(limit) => RangeOptions {
limit,
mode: self.mode,
reverse: self.reverse,
},
None => RangeOptions {
limit: 0,
mode: self.mode,
reverse: self.reverse,
},
};
let fdb_future_key_value_array = fdb_transaction_get_range(
self.transaction.get_c_api_ptr(),
self.begin_sel.clone(),
self.end_sel.clone(),
options,
self.iteration.unwrap_or(0),
self.snapshot,
);
self.step_once_with_event(
RangeResultStateMachineEvent::FetchNextBatch {
fdb_future_key_value_array,
},
);
}
} else {
self.step_once_with_event(RangeResultStateMachineEvent::FetchDone);
}
} else {
// We need to remove elements from the
// beginning. If we used `Vec::remove`
// that would keep shifting elements to
// the left. Instead of modifying `kvs`,
// we just clone the element that we need.
//
// Safety: `index` starts with `0` (set in
// `KeyValueArray::new`) and is
// incremented till it reaches
// `count`.
let result = kvs[TryInto::<usize>::try_into(*index).unwrap()].clone();
*index += 1;
return Poll::Ready(Some(Ok(result)));
}
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::Error => {
if let RangeResultStateMachineData::Error { fdb_error } =
self.range_result_state_machine_data
{
return Poll::Ready(Some(Err(fdb_error)));
} else {
panic!("invalid range_result_state_machine_data");
}
}
RangeResultStateMachineState::Done => return Poll::Ready(None),
}
}
}
fn step_once_with_event(&mut self, event: RangeResultStateMachineEvent) {
self.range_result_state_machine_state = match self.range_result_state_machine_state {
RangeResultStateMachineState::Fetching => match event {
RangeResultStateMachineEvent::FetchOk {
kvs,
index,
count,
more,
} => {
// tansition action
// Once we are done with `kvs` we'll we need to
// fetch the next batch if `more` is `true`. Do
// the required setup for creating the next
// `FdbFutureKeyValueArray` in case it is
// needed. This would be used by `FetchNextBatch`
// event.
if more {
// This assumes that we have mode to be
// `StreamingMode::Iterator`.
if let Some(iteration) = self.iteration.as_mut() {
*iteration += 1;
}
if let Some(limit) = self.limit.as_mut() {
*limit -= count;
}
// Safety: We only generate the `FetchOk` event
// when count > 0, otherwise we go to `FetchDone`.
let last_index = TryInto::<usize>::try_into(count - 1).unwrap();
if self.reverse {
self.end_sel = KeySelector::first_greater_or_equal(
kvs[last_index].get_key_ref().clone(),
);
} else {
self.begin_sel = KeySelector::first_greater_than(
kvs[last_index].get_key_ref().clone(),
);
}
}
self.range_result_state_machine_data =
RangeResultStateMachineData::KeyValueArrayAvailable {
kvs,
index,
count,
more,
};
RangeResultStateMachineState::KeyValueArrayAvailable
}
RangeResultStateMachineEvent::FetchDone => {
self.range_result_state_machine_data = RangeResultStateMachineData::Done;
RangeResultStateMachineState::Done
}
RangeResultStateMachineEvent::FetchError { fdb_error } => {
self.range_result_state_machine_data =
RangeResultStateMachineData::Error { fdb_error };
RangeResultStateMachineState::Error
}
_ => panic!("Invalid event!"),
},
RangeResultStateMachineState::KeyValueArrayAvailable => match event {
RangeResultStateMachineEvent::FetchNextBatch {
fdb_future_key_value_array,
} => {
self.range_result_state_machine_data = RangeResultStateMachineData::Fetching {
fdb_future_key_value_array,
};
RangeResultStateMachineState::Fetching
}
RangeResultStateMachineEvent::FetchDone => {
self.range_result_state_machine_data = RangeResultStateMachineData::Done;
RangeResultStateMachineState::Done
}
_ => panic!("Invalid event!"),
},
RangeResultStateMachineState::Error | RangeResultStateMachineState::Done => {
panic!("Invalid event!");
}
}
}
}
pub(crate) fn fdb_transaction_get_range(
transaction: *mut fdb_sys::FDBTransaction,
begin_key: KeySelector,
end_key: KeySelector,
options: RangeOptions,
iteration: i32,
snapshot: bool,
) -> FdbFutureKeyValueArray {
let (key, begin_or_equal, begin_offset) = begin_key.deconstruct();
let bk = Bytes::from(key);
let begin_key_name = bk.as_ref().as_ptr();
let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
let begin_or_equal = if begin_or_equal { 1 } else { 0 };
let (key, end_or_equal, end_offset) = end_key.deconstruct();
let ek = Bytes::from(key);
let end_key_name = ek.as_ref().as_ptr();
let end_key_name_length = ek.as_ref().len().try_into().unwrap();
let end_or_equal = if end_or_equal { 1 } else { 0 };
// This is similar to Java, where calls to `tr.getRange_internal`
// sets the `target_bytes` to `0`.
let target_bytes = 0;
let limit = options.get_limit();
let mode = options.get_mode().code();
let reverse = if options.get_reverse() { 1 } else { 0 };
let s = if snapshot { 1 } else { 0 };
FdbFuture::new(unsafe {
fdb_sys::fdb_transaction_get_range(
transaction,
begin_key_name,
begin_key_name_length,
begin_or_equal,
begin_offset,
end_key_name,
end_key_name_length,
end_or_equal,
end_offset,
limit,
target_bytes,
mode,
iteration,
s,
reverse,
)
})
}
#[cfg(test)]
mod tests {
use impls::impls;
use super::RangeOptions;
#[test]
fn impls() {
#[rustfmt::skip]
assert!(impls!(
RangeOptions:
Default));
}
}