fdb/
range.rs

1//! Provides types for working with FDB range.
2
3use bytes::Bytes;
4
5use std::convert::TryInto;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::error::{FdbError, FdbResult};
11use crate::future::{FdbFuture, FdbFutureKeyValueArray, FdbStreamKeyValue};
12use crate::transaction::{FdbTransaction, ReadTransaction};
13use crate::tuple::key_util;
14use crate::{Key, KeySelector, KeyValue};
15
16#[cfg(feature = "fdb-7_1")]
17use crate::Mapper;
18
19#[cfg(feature = "fdb-7_1")]
20use crate::future::FdbStreamMappedKeyValue;
21
22pub use crate::option::StreamingMode;
23
24/// [`Range`] describes an exact range of keyspace, specified by a
25/// begin and end key.
26///
27/// As with all FDB APIs, begin is inclusive, and end exclusive.
28#[derive(Clone, Debug, PartialEq)]
29pub struct Range {
30    begin: Key,
31    end: Key,
32}
33
34impl Range {
35    /// Construct a new [`Range`] with an inclusive begin key an
36    /// exclusive end key.
37    pub fn new(begin: impl Into<Key>, end: impl Into<Key>) -> Range {
38        Range {
39            begin: begin.into(),
40            end: end.into(),
41        }
42    }
43
44    /// Return a [`Range`] that describes all possible keys that are
45    /// prefixed with the specified key.
46    ///
47    /// # Panic
48    ///
49    /// Panics if the supplied [`Key`] is empty or contains only
50    /// `0xFF` bytes.
51    pub fn starts_with(prefix_key: impl Into<Key>) -> Range {
52        let prefix_key = prefix_key.into();
53        Range::new(
54            prefix_key.clone(),
55            key_util::strinc(prefix_key).unwrap_or_else(|err| {
56                panic!("Error occurred during `bytes_util::strinc`: {:?}", err)
57            }),
58        )
59    }
60
61    /// Gets a reference to the begin [`Key`] of the [`Range`].
62    pub fn begin_key_ref(&self) -> &Key {
63        &self.begin
64    }
65
66    /// Gets a reference to the end [`Key`] of the [`Range`].
67    pub fn end_key_ref(&self) -> &Key {
68        &self.end
69    }
70
71    /// Extract begin [`Key`] from the [`Range`].
72    pub fn into_begin_key(self) -> Key {
73        self.begin
74    }
75
76    /// Extract end [`Key`] from the [`Range`].
77    pub fn into_end_key(self) -> Key {
78        self.end
79    }
80
81    /// Extract begin and end [`Key`] from the [`Range`].
82    pub fn into_parts(self) -> (Key, Key) {
83        let Range { begin, end } = self;
84        (begin, end)
85    }
86
87    /// Gets an ordered range of keys and values from the database.
88    ///
89    /// The returned [`FdbStreamKeyValue`] implements [`Stream`] trait
90    /// that yields a [`KeyValue`] item.
91    ///
92    /// [`Stream`]: futures::Stream
93    pub fn into_stream<T>(self, rt: &T, options: RangeOptions) -> FdbStreamKeyValue
94    where
95        T: ReadTransaction,
96    {
97        let (begin_key, end_key) = self.into_parts();
98
99        let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
100        let end_key_selector = KeySelector::first_greater_or_equal(end_key);
101
102        rt.get_range(begin_key_selector, end_key_selector, options)
103    }
104
105    #[cfg(feature = "fdb-7_1")]
106    /// Gets an ordered range of mapped keys and values from the
107    /// database.
108    ///
109    /// The returned [`FdbStreamMappedKeyValue`] implements [`Stream`] trait
110    /// that yields a [`MappedKeyValue`] item.
111    ///
112    /// [`Stream`]: futures::Stream
113    /// [`MappedKeyValue`]: crate::MappedKeyValue
114    pub fn into_mapped_stream<T>(
115        self,
116        rt: &T,
117        mapper: Mapper,
118        options: RangeOptions,
119    ) -> FdbStreamMappedKeyValue
120    where
121        T: ReadTransaction,
122    {
123        let (begin_key, end_key) = self.into_parts();
124
125        let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
126        let end_key_selector = KeySelector::first_greater_or_equal(end_key);
127
128        rt.get_mapped_range(begin_key_selector, end_key_selector, mapper, options)
129    }
130}
131
132/// [`RangeOptions`] specify how a database range operation is carried out.
133///
134/// There are three parameters for which accessors methods are provided.
135///
136/// 1. Limit restricts the number of key-value pairs returned as part
137///    of a range read. A value of zero indicates no limit.
138///
139/// 2. Mode sets the [streaming mode] of the range read, allowing
140///    database to balance latency and bandwidth for this read.
141///
142/// 3. Reverse indicates that the read should be performed
143///    lexicographic order (when false) or reverse lexicographic (when
144///    true).
145///
146///    When reverse is true and limit is non-zero, last limit
147///    key-value pairs in the range are returned. Ranges in reverse is
148///    supported natively by the database should have minimal extra
149///    cost.
150///
151/// To create a value of [`RangeOptions`] type, use
152/// [`Default::default`] method. The default value represents - no
153/// limit, [iterator streaming mode] and lexicographic order.
154///
155/// [streaming mode]: StreamingMode
156/// [iterator streaming mode]: StreamingMode::Iterator
157#[derive(Clone, Debug)]
158pub struct RangeOptions {
159    pub(crate) limit: i32,
160    pub(crate) mode: StreamingMode,
161    pub(crate) reverse: bool,
162}
163
164impl RangeOptions {
165    /// Set limit
166    pub fn set_limit(&mut self, limit: i32) {
167        self.limit = limit;
168    }
169
170    /// Get limit
171    pub fn get_limit(&self) -> i32 {
172        self.limit
173    }
174
175    /// Set streaming mode
176    pub fn set_mode(&mut self, mode: StreamingMode) {
177        self.mode = mode;
178    }
179
180    /// Get streaming mode
181    pub fn get_mode(&self) -> StreamingMode {
182        self.mode
183    }
184
185    /// Set the read order (lexicographic or non-lexicographic)
186    pub fn set_reverse(&mut self, reverse: bool) {
187        self.reverse = reverse;
188    }
189
190    /// Get the read order (lexicographic or non-lexicographic)
191    pub fn get_reverse(&self) -> bool {
192        self.reverse
193    }
194
195    pub(crate) fn new(limit: i32, mode: StreamingMode, reverse: bool) -> RangeOptions {
196        RangeOptions {
197            limit,
198            mode,
199            reverse,
200        }
201    }
202}
203
204impl Default for RangeOptions {
205    fn default() -> RangeOptions {
206        RangeOptions {
207            limit: 0,
208            mode: StreamingMode::Iterator,
209            reverse: false,
210        }
211    }
212}
213
214// Java API refers to this type `RangeResult` and Go API has something
215// simliar with `futureKeyValueArray` and `[]KeyValue`. Go API
216// `RangeResult` is similar to Java API `RangeQuery`. Be careful and
217// don't confuse Java API `RangeResult` with Go API `RangeResult`.
218#[derive(Debug)]
219pub(crate) struct KeyValueArray {
220    kvs: Vec<KeyValue>,
221    index: i32,
222    count: i32,
223    more: bool,
224}
225
226impl KeyValueArray {
227    pub(crate) fn new(kvs: Vec<KeyValue>, count: i32, more: bool) -> KeyValueArray {
228        let index = 0;
229        KeyValueArray {
230            kvs,
231            index,
232            count,
233            more,
234        }
235    }
236}
237
238#[derive(Debug)]
239enum RangeResultStateMachineState {
240    Fetching,
241    KeyValueArrayAvailable,
242    Error,
243    Done,
244}
245
246#[derive(Debug)]
247enum RangeResultStateMachineData {
248    Fetching {
249        fdb_future_key_value_array: FdbFutureKeyValueArray,
250    },
251    KeyValueArrayAvailable {
252        kvs: Vec<KeyValue>,
253        index: i32,
254        count: i32,
255        more: bool,
256    },
257    Error {
258        fdb_error: FdbError,
259    },
260    Done,
261}
262
263// The variant names match with the sismic events.
264#[allow(clippy::enum_variant_names)]
265#[derive(Debug)]
266enum RangeResultStateMachineEvent {
267    FetchOk {
268        kvs: Vec<KeyValue>,
269        index: i32,
270        count: i32,
271        more: bool,
272    },
273    FetchNextBatch {
274        fdb_future_key_value_array: FdbFutureKeyValueArray,
275    },
276    FetchError {
277        fdb_error: FdbError,
278    },
279    FetchDone,
280}
281
282// A state machine that returns the key-value pairs from the database
283// satisfying the range specified in a range read.
284//
285// See `sismic/range_result_state_machine.yaml` for the design of the
286// state machine.
287#[derive(Debug)]
288pub(crate) struct RangeResultStateMachine {
289    transaction: FdbTransaction,
290    snapshot: bool,
291    mode: StreamingMode,
292    reverse: bool,
293
294    // This is *only* used in case of `StreamingMode::Iterator`. In
295    // other cases, we set it to `None`.
296    iteration: Option<i32>,
297
298    // When `limit` is `None`, it means that the C API is allowed to
299    // choose how many key values it can return. If `limit` is
300    // `Some(x)` then that is the *maximum* allowed KVs, but it can
301    // return less. Therefore in subsequent calls to `get_range`, we
302    // reduce the limit.
303    //
304    // *Note* When `StreamingMode::Exact` is used, `limit` *must* be
305    // specified. However, we don't check for this as binding tester
306    // checks for `2210` errors.
307    limit: Option<i32>,
308    begin_sel: KeySelector,
309    end_sel: KeySelector,
310
311    range_result_state_machine_state: RangeResultStateMachineState,
312    range_result_state_machine_data: RangeResultStateMachineData,
313}
314
315impl RangeResultStateMachine {
316    // We need to have these parameters in order to construct a value
317    // of `RangeResultStateMachine` type. This is an internal API and
318    // the meaning of the parameters documented above.
319    #[allow(clippy::too_many_arguments)]
320    pub(crate) fn new(
321        transaction: FdbTransaction,
322        begin_sel: KeySelector,
323        end_sel: KeySelector,
324        mode: StreamingMode,
325        iteration: Option<i32>,
326        reverse: bool,
327        limit: Option<i32>,
328        snapshot: bool,
329        fdb_future_key_value_array: FdbFutureKeyValueArray,
330    ) -> RangeResultStateMachine {
331        RangeResultStateMachine {
332            transaction,
333            snapshot,
334            mode,
335            reverse,
336            iteration,
337            limit,
338            begin_sel,
339            end_sel,
340            range_result_state_machine_state: RangeResultStateMachineState::Fetching,
341            range_result_state_machine_data: RangeResultStateMachineData::Fetching {
342                fdb_future_key_value_array,
343            },
344        }
345    }
346
347    pub(crate) fn poll_next(
348        mut self: Pin<&mut RangeResultStateMachine>,
349        cx: &mut Context<'_>,
350    ) -> Poll<Option<FdbResult<KeyValue>>> {
351        loop {
352            match self.range_result_state_machine_state {
353                RangeResultStateMachineState::Fetching => {
354                    if let RangeResultStateMachineData::Fetching {
355                        ref mut fdb_future_key_value_array,
356                    } = self.range_result_state_machine_data
357                    {
358                        match Pin::new(fdb_future_key_value_array).poll(cx) {
359                            Poll::Ready(res) => match res {
360                                Ok(key_value_array) => {
361                                    let KeyValueArray {
362                                        kvs,
363                                        index,
364                                        count,
365                                        more,
366                                    } = key_value_array;
367                                    if count == 0 {
368                                        // In case count is zero, we are done.
369                                        self.step_once_with_event(
370                                            RangeResultStateMachineEvent::FetchDone,
371                                        );
372                                    } else {
373                                        self.step_once_with_event(
374                                            RangeResultStateMachineEvent::FetchOk {
375                                                kvs,
376                                                index,
377                                                count,
378                                                more,
379                                            },
380                                        );
381                                    }
382                                }
383                                Err(fdb_error) => {
384                                    self.step_once_with_event(
385                                        RangeResultStateMachineEvent::FetchError { fdb_error },
386                                    );
387                                }
388                            },
389                            Poll::Pending => return Poll::Pending,
390                        }
391                    } else {
392                        panic!("invalid range_result_state_machine_data");
393                    }
394                }
395                RangeResultStateMachineState::KeyValueArrayAvailable => {
396                    if let RangeResultStateMachineData::KeyValueArrayAvailable {
397                        ref kvs,
398                        ref mut index,
399                        count,
400                        more,
401                    } = self.range_result_state_machine_data
402                    {
403                        // Unlike in Python, where the `index ==
404                        // count` check is done when returning the
405                        // last element, in our case the last element
406                        // gets returned and in the next call to
407                        // `poll_next`, we do our check.
408                        if *index == count {
409                            // Should we get more?
410                            if more {
411                                if let Some(0) = self.limit {
412                                    self.step_once_with_event(
413                                        RangeResultStateMachineEvent::FetchDone,
414                                    );
415                                } else {
416                                    // `limit` is either `None` or
417                                    // non-zero.
418
419                                    // iteration, limit, begin_sel and
420                                    // end_sel have already been updated
421                                    // in the transition action.
422                                    let options = match self.limit {
423                                        Some(limit) => RangeOptions {
424                                            limit,
425                                            mode: self.mode,
426                                            reverse: self.reverse,
427                                        },
428                                        None => RangeOptions {
429                                            limit: 0,
430                                            mode: self.mode,
431                                            reverse: self.reverse,
432                                        },
433                                    };
434
435                                    let fdb_future_key_value_array = fdb_transaction_get_range(
436                                        self.transaction.get_c_api_ptr(),
437                                        self.begin_sel.clone(),
438                                        self.end_sel.clone(),
439                                        options,
440                                        self.iteration.unwrap_or(0),
441                                        self.snapshot,
442                                    );
443
444                                    self.step_once_with_event(
445                                        RangeResultStateMachineEvent::FetchNextBatch {
446                                            fdb_future_key_value_array,
447                                        },
448                                    );
449                                }
450                            } else {
451                                self.step_once_with_event(RangeResultStateMachineEvent::FetchDone);
452                            }
453                        } else {
454                            // We need to remove elements from the
455                            // beginning. If we used `Vec::remove`
456                            // that would keep shifting elements to
457                            // the left. Instead of modifying `kvs`,
458                            // we just clone the element that we need.
459                            //
460                            // Safety: `index` starts with `0` (set in
461                            //          `KeyValueArray::new`) and is
462                            //          incremented till it reaches
463                            //          `count`.
464                            let result = kvs[TryInto::<usize>::try_into(*index).unwrap()].clone();
465                            *index += 1;
466
467                            return Poll::Ready(Some(Ok(result)));
468                        }
469                    } else {
470                        panic!("invalid range_result_state_machine_data");
471                    }
472                }
473                RangeResultStateMachineState::Error => {
474                    if let RangeResultStateMachineData::Error { fdb_error } =
475                        self.range_result_state_machine_data
476                    {
477                        return Poll::Ready(Some(Err(fdb_error)));
478                    } else {
479                        panic!("invalid range_result_state_machine_data");
480                    }
481                }
482                RangeResultStateMachineState::Done => return Poll::Ready(None),
483            }
484        }
485    }
486
487    fn step_once_with_event(&mut self, event: RangeResultStateMachineEvent) {
488        self.range_result_state_machine_state = match self.range_result_state_machine_state {
489            RangeResultStateMachineState::Fetching => match event {
490                RangeResultStateMachineEvent::FetchOk {
491                    kvs,
492                    index,
493                    count,
494                    more,
495                } => {
496                    // tansition action
497
498                    // Once we are done with `kvs` we'll we need to
499                    // fetch the next batch if `more` is `true`. Do
500                    // the required setup for creating the next
501                    // `FdbFutureKeyValueArray` in case it is
502                    // needed. This would be used by `FetchNextBatch`
503                    // event.
504
505                    if more {
506                        // This assumes that we have mode to be
507                        // `StreamingMode::Iterator`.
508                        if let Some(iteration) = self.iteration.as_mut() {
509                            *iteration += 1;
510                        }
511
512                        if let Some(limit) = self.limit.as_mut() {
513                            *limit -= count;
514                        }
515
516                        // Safety: We only generate the `FetchOk` event
517                        // when count > 0, otherwise we go to `FetchDone`.
518                        let last_index = TryInto::<usize>::try_into(count - 1).unwrap();
519
520                        if self.reverse {
521                            self.end_sel = KeySelector::first_greater_or_equal(
522                                kvs[last_index].get_key_ref().clone(),
523                            );
524                        } else {
525                            self.begin_sel = KeySelector::first_greater_than(
526                                kvs[last_index].get_key_ref().clone(),
527                            );
528                        }
529                    }
530
531                    self.range_result_state_machine_data =
532                        RangeResultStateMachineData::KeyValueArrayAvailable {
533                            kvs,
534                            index,
535                            count,
536                            more,
537                        };
538                    RangeResultStateMachineState::KeyValueArrayAvailable
539                }
540                RangeResultStateMachineEvent::FetchDone => {
541                    self.range_result_state_machine_data = RangeResultStateMachineData::Done;
542                    RangeResultStateMachineState::Done
543                }
544                RangeResultStateMachineEvent::FetchError { fdb_error } => {
545                    self.range_result_state_machine_data =
546                        RangeResultStateMachineData::Error { fdb_error };
547                    RangeResultStateMachineState::Error
548                }
549                _ => panic!("Invalid event!"),
550            },
551            RangeResultStateMachineState::KeyValueArrayAvailable => match event {
552                RangeResultStateMachineEvent::FetchNextBatch {
553                    fdb_future_key_value_array,
554                } => {
555                    self.range_result_state_machine_data = RangeResultStateMachineData::Fetching {
556                        fdb_future_key_value_array,
557                    };
558                    RangeResultStateMachineState::Fetching
559                }
560                RangeResultStateMachineEvent::FetchDone => {
561                    self.range_result_state_machine_data = RangeResultStateMachineData::Done;
562                    RangeResultStateMachineState::Done
563                }
564                _ => panic!("Invalid event!"),
565            },
566            RangeResultStateMachineState::Error | RangeResultStateMachineState::Done => {
567                panic!("Invalid event!");
568            }
569        }
570    }
571}
572
573pub(crate) fn fdb_transaction_get_range(
574    transaction: *mut fdb_sys::FDBTransaction,
575    begin_key: KeySelector,
576    end_key: KeySelector,
577    options: RangeOptions,
578    iteration: i32,
579    snapshot: bool,
580) -> FdbFutureKeyValueArray {
581    let (key, begin_or_equal, begin_offset) = begin_key.deconstruct();
582    let bk = Bytes::from(key);
583    let begin_key_name = bk.as_ref().as_ptr();
584    let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
585    let begin_or_equal = if begin_or_equal { 1 } else { 0 };
586
587    let (key, end_or_equal, end_offset) = end_key.deconstruct();
588    let ek = Bytes::from(key);
589    let end_key_name = ek.as_ref().as_ptr();
590    let end_key_name_length = ek.as_ref().len().try_into().unwrap();
591    let end_or_equal = if end_or_equal { 1 } else { 0 };
592
593    // This is similar to Java, where calls to `tr.getRange_internal`
594    // sets the `target_bytes` to `0`.
595    let target_bytes = 0;
596
597    let limit = options.get_limit();
598    let mode = options.get_mode().code();
599    let reverse = if options.get_reverse() { 1 } else { 0 };
600
601    let s = if snapshot { 1 } else { 0 };
602
603    FdbFuture::new(unsafe {
604        fdb_sys::fdb_transaction_get_range(
605            transaction,
606            begin_key_name,
607            begin_key_name_length,
608            begin_or_equal,
609            begin_offset,
610            end_key_name,
611            end_key_name_length,
612            end_or_equal,
613            end_offset,
614            limit,
615            target_bytes,
616            mode,
617            iteration,
618            s,
619            reverse,
620        )
621    })
622}
623
624#[cfg(test)]
625mod tests {
626    use impls::impls;
627
628    use super::RangeOptions;
629
630    #[test]
631    fn impls() {
632        #[rustfmt::skip]
633        assert!(impls!(
634	    RangeOptions:
635	    Default));
636    }
637}