fdb/range.rs
1//! Provides types for working with FDB range.
2
3use bytes::Bytes;
4
5use std::convert::TryInto;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::error::{FdbError, FdbResult};
11use crate::future::{FdbFuture, FdbFutureKeyValueArray, FdbStreamKeyValue};
12use crate::transaction::{FdbTransaction, ReadTransaction};
13use crate::tuple::key_util;
14use crate::{Key, KeySelector, KeyValue};
15
16#[cfg(feature = "fdb-7_1")]
17use crate::Mapper;
18
19#[cfg(feature = "fdb-7_1")]
20use crate::future::FdbStreamMappedKeyValue;
21
22pub use crate::option::StreamingMode;
23
24/// [`Range`] describes an exact range of keyspace, specified by a
25/// begin and end key.
26///
27/// As with all FDB APIs, begin is inclusive, and end exclusive.
28#[derive(Clone, Debug, PartialEq)]
29pub struct Range {
30 begin: Key,
31 end: Key,
32}
33
34impl Range {
35 /// Construct a new [`Range`] with an inclusive begin key an
36 /// exclusive end key.
37 pub fn new(begin: impl Into<Key>, end: impl Into<Key>) -> Range {
38 Range {
39 begin: begin.into(),
40 end: end.into(),
41 }
42 }
43
44 /// Return a [`Range`] that describes all possible keys that are
45 /// prefixed with the specified key.
46 ///
47 /// # Panic
48 ///
49 /// Panics if the supplied [`Key`] is empty or contains only
50 /// `0xFF` bytes.
51 pub fn starts_with(prefix_key: impl Into<Key>) -> Range {
52 let prefix_key = prefix_key.into();
53 Range::new(
54 prefix_key.clone(),
55 key_util::strinc(prefix_key).unwrap_or_else(|err| {
56 panic!("Error occurred during `bytes_util::strinc`: {:?}", err)
57 }),
58 )
59 }
60
61 /// Gets a reference to the begin [`Key`] of the [`Range`].
62 pub fn begin_key_ref(&self) -> &Key {
63 &self.begin
64 }
65
66 /// Gets a reference to the end [`Key`] of the [`Range`].
67 pub fn end_key_ref(&self) -> &Key {
68 &self.end
69 }
70
71 /// Extract begin [`Key`] from the [`Range`].
72 pub fn into_begin_key(self) -> Key {
73 self.begin
74 }
75
76 /// Extract end [`Key`] from the [`Range`].
77 pub fn into_end_key(self) -> Key {
78 self.end
79 }
80
81 /// Extract begin and end [`Key`] from the [`Range`].
82 pub fn into_parts(self) -> (Key, Key) {
83 let Range { begin, end } = self;
84 (begin, end)
85 }
86
87 /// Gets an ordered range of keys and values from the database.
88 ///
89 /// The returned [`FdbStreamKeyValue`] implements [`Stream`] trait
90 /// that yields a [`KeyValue`] item.
91 ///
92 /// [`Stream`]: futures::Stream
93 pub fn into_stream<T>(self, rt: &T, options: RangeOptions) -> FdbStreamKeyValue
94 where
95 T: ReadTransaction,
96 {
97 let (begin_key, end_key) = self.into_parts();
98
99 let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
100 let end_key_selector = KeySelector::first_greater_or_equal(end_key);
101
102 rt.get_range(begin_key_selector, end_key_selector, options)
103 }
104
105 #[cfg(feature = "fdb-7_1")]
106 /// Gets an ordered range of mapped keys and values from the
107 /// database.
108 ///
109 /// The returned [`FdbStreamMappedKeyValue`] implements [`Stream`] trait
110 /// that yields a [`MappedKeyValue`] item.
111 ///
112 /// [`Stream`]: futures::Stream
113 /// [`MappedKeyValue`]: crate::MappedKeyValue
114 pub fn into_mapped_stream<T>(
115 self,
116 rt: &T,
117 mapper: Mapper,
118 options: RangeOptions,
119 ) -> FdbStreamMappedKeyValue
120 where
121 T: ReadTransaction,
122 {
123 let (begin_key, end_key) = self.into_parts();
124
125 let begin_key_selector = KeySelector::first_greater_or_equal(begin_key);
126 let end_key_selector = KeySelector::first_greater_or_equal(end_key);
127
128 rt.get_mapped_range(begin_key_selector, end_key_selector, mapper, options)
129 }
130}
131
132/// [`RangeOptions`] specify how a database range operation is carried out.
133///
134/// There are three parameters for which accessors methods are provided.
135///
136/// 1. Limit restricts the number of key-value pairs returned as part
137/// of a range read. A value of zero indicates no limit.
138///
139/// 2. Mode sets the [streaming mode] of the range read, allowing
140/// database to balance latency and bandwidth for this read.
141///
142/// 3. Reverse indicates that the read should be performed
143/// lexicographic order (when false) or reverse lexicographic (when
144/// true).
145///
146/// When reverse is true and limit is non-zero, last limit
147/// key-value pairs in the range are returned. Ranges in reverse is
148/// supported natively by the database should have minimal extra
149/// cost.
150///
151/// To create a value of [`RangeOptions`] type, use
152/// [`Default::default`] method. The default value represents - no
153/// limit, [iterator streaming mode] and lexicographic order.
154///
155/// [streaming mode]: StreamingMode
156/// [iterator streaming mode]: StreamingMode::Iterator
157#[derive(Clone, Debug)]
158pub struct RangeOptions {
159 pub(crate) limit: i32,
160 pub(crate) mode: StreamingMode,
161 pub(crate) reverse: bool,
162}
163
164impl RangeOptions {
165 /// Set limit
166 pub fn set_limit(&mut self, limit: i32) {
167 self.limit = limit;
168 }
169
170 /// Get limit
171 pub fn get_limit(&self) -> i32 {
172 self.limit
173 }
174
175 /// Set streaming mode
176 pub fn set_mode(&mut self, mode: StreamingMode) {
177 self.mode = mode;
178 }
179
180 /// Get streaming mode
181 pub fn get_mode(&self) -> StreamingMode {
182 self.mode
183 }
184
185 /// Set the read order (lexicographic or non-lexicographic)
186 pub fn set_reverse(&mut self, reverse: bool) {
187 self.reverse = reverse;
188 }
189
190 /// Get the read order (lexicographic or non-lexicographic)
191 pub fn get_reverse(&self) -> bool {
192 self.reverse
193 }
194
195 pub(crate) fn new(limit: i32, mode: StreamingMode, reverse: bool) -> RangeOptions {
196 RangeOptions {
197 limit,
198 mode,
199 reverse,
200 }
201 }
202}
203
204impl Default for RangeOptions {
205 fn default() -> RangeOptions {
206 RangeOptions {
207 limit: 0,
208 mode: StreamingMode::Iterator,
209 reverse: false,
210 }
211 }
212}
213
214// Java API refers to this type `RangeResult` and Go API has something
215// simliar with `futureKeyValueArray` and `[]KeyValue`. Go API
216// `RangeResult` is similar to Java API `RangeQuery`. Be careful and
217// don't confuse Java API `RangeResult` with Go API `RangeResult`.
218#[derive(Debug)]
219pub(crate) struct KeyValueArray {
220 kvs: Vec<KeyValue>,
221 index: i32,
222 count: i32,
223 more: bool,
224}
225
226impl KeyValueArray {
227 pub(crate) fn new(kvs: Vec<KeyValue>, count: i32, more: bool) -> KeyValueArray {
228 let index = 0;
229 KeyValueArray {
230 kvs,
231 index,
232 count,
233 more,
234 }
235 }
236}
237
238#[derive(Debug)]
239enum RangeResultStateMachineState {
240 Fetching,
241 KeyValueArrayAvailable,
242 Error,
243 Done,
244}
245
246#[derive(Debug)]
247enum RangeResultStateMachineData {
248 Fetching {
249 fdb_future_key_value_array: FdbFutureKeyValueArray,
250 },
251 KeyValueArrayAvailable {
252 kvs: Vec<KeyValue>,
253 index: i32,
254 count: i32,
255 more: bool,
256 },
257 Error {
258 fdb_error: FdbError,
259 },
260 Done,
261}
262
263// The variant names match with the sismic events.
264#[allow(clippy::enum_variant_names)]
265#[derive(Debug)]
266enum RangeResultStateMachineEvent {
267 FetchOk {
268 kvs: Vec<KeyValue>,
269 index: i32,
270 count: i32,
271 more: bool,
272 },
273 FetchNextBatch {
274 fdb_future_key_value_array: FdbFutureKeyValueArray,
275 },
276 FetchError {
277 fdb_error: FdbError,
278 },
279 FetchDone,
280}
281
282// A state machine that returns the key-value pairs from the database
283// satisfying the range specified in a range read.
284//
285// See `sismic/range_result_state_machine.yaml` for the design of the
286// state machine.
287#[derive(Debug)]
288pub(crate) struct RangeResultStateMachine {
289 transaction: FdbTransaction,
290 snapshot: bool,
291 mode: StreamingMode,
292 reverse: bool,
293
294 // This is *only* used in case of `StreamingMode::Iterator`. In
295 // other cases, we set it to `None`.
296 iteration: Option<i32>,
297
298 // When `limit` is `None`, it means that the C API is allowed to
299 // choose how many key values it can return. If `limit` is
300 // `Some(x)` then that is the *maximum* allowed KVs, but it can
301 // return less. Therefore in subsequent calls to `get_range`, we
302 // reduce the limit.
303 //
304 // *Note* When `StreamingMode::Exact` is used, `limit` *must* be
305 // specified. However, we don't check for this as binding tester
306 // checks for `2210` errors.
307 limit: Option<i32>,
308 begin_sel: KeySelector,
309 end_sel: KeySelector,
310
311 range_result_state_machine_state: RangeResultStateMachineState,
312 range_result_state_machine_data: RangeResultStateMachineData,
313}
314
315impl RangeResultStateMachine {
316 // We need to have these parameters in order to construct a value
317 // of `RangeResultStateMachine` type. This is an internal API and
318 // the meaning of the parameters documented above.
319 #[allow(clippy::too_many_arguments)]
320 pub(crate) fn new(
321 transaction: FdbTransaction,
322 begin_sel: KeySelector,
323 end_sel: KeySelector,
324 mode: StreamingMode,
325 iteration: Option<i32>,
326 reverse: bool,
327 limit: Option<i32>,
328 snapshot: bool,
329 fdb_future_key_value_array: FdbFutureKeyValueArray,
330 ) -> RangeResultStateMachine {
331 RangeResultStateMachine {
332 transaction,
333 snapshot,
334 mode,
335 reverse,
336 iteration,
337 limit,
338 begin_sel,
339 end_sel,
340 range_result_state_machine_state: RangeResultStateMachineState::Fetching,
341 range_result_state_machine_data: RangeResultStateMachineData::Fetching {
342 fdb_future_key_value_array,
343 },
344 }
345 }
346
347 pub(crate) fn poll_next(
348 mut self: Pin<&mut RangeResultStateMachine>,
349 cx: &mut Context<'_>,
350 ) -> Poll<Option<FdbResult<KeyValue>>> {
351 loop {
352 match self.range_result_state_machine_state {
353 RangeResultStateMachineState::Fetching => {
354 if let RangeResultStateMachineData::Fetching {
355 ref mut fdb_future_key_value_array,
356 } = self.range_result_state_machine_data
357 {
358 match Pin::new(fdb_future_key_value_array).poll(cx) {
359 Poll::Ready(res) => match res {
360 Ok(key_value_array) => {
361 let KeyValueArray {
362 kvs,
363 index,
364 count,
365 more,
366 } = key_value_array;
367 if count == 0 {
368 // In case count is zero, we are done.
369 self.step_once_with_event(
370 RangeResultStateMachineEvent::FetchDone,
371 );
372 } else {
373 self.step_once_with_event(
374 RangeResultStateMachineEvent::FetchOk {
375 kvs,
376 index,
377 count,
378 more,
379 },
380 );
381 }
382 }
383 Err(fdb_error) => {
384 self.step_once_with_event(
385 RangeResultStateMachineEvent::FetchError { fdb_error },
386 );
387 }
388 },
389 Poll::Pending => return Poll::Pending,
390 }
391 } else {
392 panic!("invalid range_result_state_machine_data");
393 }
394 }
395 RangeResultStateMachineState::KeyValueArrayAvailable => {
396 if let RangeResultStateMachineData::KeyValueArrayAvailable {
397 ref kvs,
398 ref mut index,
399 count,
400 more,
401 } = self.range_result_state_machine_data
402 {
403 // Unlike in Python, where the `index ==
404 // count` check is done when returning the
405 // last element, in our case the last element
406 // gets returned and in the next call to
407 // `poll_next`, we do our check.
408 if *index == count {
409 // Should we get more?
410 if more {
411 if let Some(0) = self.limit {
412 self.step_once_with_event(
413 RangeResultStateMachineEvent::FetchDone,
414 );
415 } else {
416 // `limit` is either `None` or
417 // non-zero.
418
419 // iteration, limit, begin_sel and
420 // end_sel have already been updated
421 // in the transition action.
422 let options = match self.limit {
423 Some(limit) => RangeOptions {
424 limit,
425 mode: self.mode,
426 reverse: self.reverse,
427 },
428 None => RangeOptions {
429 limit: 0,
430 mode: self.mode,
431 reverse: self.reverse,
432 },
433 };
434
435 let fdb_future_key_value_array = fdb_transaction_get_range(
436 self.transaction.get_c_api_ptr(),
437 self.begin_sel.clone(),
438 self.end_sel.clone(),
439 options,
440 self.iteration.unwrap_or(0),
441 self.snapshot,
442 );
443
444 self.step_once_with_event(
445 RangeResultStateMachineEvent::FetchNextBatch {
446 fdb_future_key_value_array,
447 },
448 );
449 }
450 } else {
451 self.step_once_with_event(RangeResultStateMachineEvent::FetchDone);
452 }
453 } else {
454 // We need to remove elements from the
455 // beginning. If we used `Vec::remove`
456 // that would keep shifting elements to
457 // the left. Instead of modifying `kvs`,
458 // we just clone the element that we need.
459 //
460 // Safety: `index` starts with `0` (set in
461 // `KeyValueArray::new`) and is
462 // incremented till it reaches
463 // `count`.
464 let result = kvs[TryInto::<usize>::try_into(*index).unwrap()].clone();
465 *index += 1;
466
467 return Poll::Ready(Some(Ok(result)));
468 }
469 } else {
470 panic!("invalid range_result_state_machine_data");
471 }
472 }
473 RangeResultStateMachineState::Error => {
474 if let RangeResultStateMachineData::Error { fdb_error } =
475 self.range_result_state_machine_data
476 {
477 return Poll::Ready(Some(Err(fdb_error)));
478 } else {
479 panic!("invalid range_result_state_machine_data");
480 }
481 }
482 RangeResultStateMachineState::Done => return Poll::Ready(None),
483 }
484 }
485 }
486
487 fn step_once_with_event(&mut self, event: RangeResultStateMachineEvent) {
488 self.range_result_state_machine_state = match self.range_result_state_machine_state {
489 RangeResultStateMachineState::Fetching => match event {
490 RangeResultStateMachineEvent::FetchOk {
491 kvs,
492 index,
493 count,
494 more,
495 } => {
496 // tansition action
497
498 // Once we are done with `kvs` we'll we need to
499 // fetch the next batch if `more` is `true`. Do
500 // the required setup for creating the next
501 // `FdbFutureKeyValueArray` in case it is
502 // needed. This would be used by `FetchNextBatch`
503 // event.
504
505 if more {
506 // This assumes that we have mode to be
507 // `StreamingMode::Iterator`.
508 if let Some(iteration) = self.iteration.as_mut() {
509 *iteration += 1;
510 }
511
512 if let Some(limit) = self.limit.as_mut() {
513 *limit -= count;
514 }
515
516 // Safety: We only generate the `FetchOk` event
517 // when count > 0, otherwise we go to `FetchDone`.
518 let last_index = TryInto::<usize>::try_into(count - 1).unwrap();
519
520 if self.reverse {
521 self.end_sel = KeySelector::first_greater_or_equal(
522 kvs[last_index].get_key_ref().clone(),
523 );
524 } else {
525 self.begin_sel = KeySelector::first_greater_than(
526 kvs[last_index].get_key_ref().clone(),
527 );
528 }
529 }
530
531 self.range_result_state_machine_data =
532 RangeResultStateMachineData::KeyValueArrayAvailable {
533 kvs,
534 index,
535 count,
536 more,
537 };
538 RangeResultStateMachineState::KeyValueArrayAvailable
539 }
540 RangeResultStateMachineEvent::FetchDone => {
541 self.range_result_state_machine_data = RangeResultStateMachineData::Done;
542 RangeResultStateMachineState::Done
543 }
544 RangeResultStateMachineEvent::FetchError { fdb_error } => {
545 self.range_result_state_machine_data =
546 RangeResultStateMachineData::Error { fdb_error };
547 RangeResultStateMachineState::Error
548 }
549 _ => panic!("Invalid event!"),
550 },
551 RangeResultStateMachineState::KeyValueArrayAvailable => match event {
552 RangeResultStateMachineEvent::FetchNextBatch {
553 fdb_future_key_value_array,
554 } => {
555 self.range_result_state_machine_data = RangeResultStateMachineData::Fetching {
556 fdb_future_key_value_array,
557 };
558 RangeResultStateMachineState::Fetching
559 }
560 RangeResultStateMachineEvent::FetchDone => {
561 self.range_result_state_machine_data = RangeResultStateMachineData::Done;
562 RangeResultStateMachineState::Done
563 }
564 _ => panic!("Invalid event!"),
565 },
566 RangeResultStateMachineState::Error | RangeResultStateMachineState::Done => {
567 panic!("Invalid event!");
568 }
569 }
570 }
571}
572
573pub(crate) fn fdb_transaction_get_range(
574 transaction: *mut fdb_sys::FDBTransaction,
575 begin_key: KeySelector,
576 end_key: KeySelector,
577 options: RangeOptions,
578 iteration: i32,
579 snapshot: bool,
580) -> FdbFutureKeyValueArray {
581 let (key, begin_or_equal, begin_offset) = begin_key.deconstruct();
582 let bk = Bytes::from(key);
583 let begin_key_name = bk.as_ref().as_ptr();
584 let begin_key_name_length = bk.as_ref().len().try_into().unwrap();
585 let begin_or_equal = if begin_or_equal { 1 } else { 0 };
586
587 let (key, end_or_equal, end_offset) = end_key.deconstruct();
588 let ek = Bytes::from(key);
589 let end_key_name = ek.as_ref().as_ptr();
590 let end_key_name_length = ek.as_ref().len().try_into().unwrap();
591 let end_or_equal = if end_or_equal { 1 } else { 0 };
592
593 // This is similar to Java, where calls to `tr.getRange_internal`
594 // sets the `target_bytes` to `0`.
595 let target_bytes = 0;
596
597 let limit = options.get_limit();
598 let mode = options.get_mode().code();
599 let reverse = if options.get_reverse() { 1 } else { 0 };
600
601 let s = if snapshot { 1 } else { 0 };
602
603 FdbFuture::new(unsafe {
604 fdb_sys::fdb_transaction_get_range(
605 transaction,
606 begin_key_name,
607 begin_key_name_length,
608 begin_or_equal,
609 begin_offset,
610 end_key_name,
611 end_key_name_length,
612 end_or_equal,
613 end_offset,
614 limit,
615 target_bytes,
616 mode,
617 iteration,
618 s,
619 reverse,
620 )
621 })
622}
623
624#[cfg(test)]
625mod tests {
626 use impls::impls;
627
628 use super::RangeOptions;
629
630 #[test]
631 fn impls() {
632 #[rustfmt::skip]
633 assert!(impls!(
634 RangeOptions:
635 Default));
636 }
637}