rust_rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    db::{DBAccess, DB},
17    ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// An iterator over a database or column family, with specifiable
26/// ranges and direction.
27///
28/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
29/// replicate the underlying iterator API within RocksDB itself. This should
30/// give access to more performance and flexibility but departs from the
31/// widely recognized Rust idioms.
32///
33/// ```
34/// use rust_rocksdb::{DB, Options};
35///
36/// let tempdir = tempfile::Builder::new()
37///     .prefix("_path_for_rocksdb_storage4")
38///     .tempdir()
39///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
40/// let path = tempdir.path();
41/// {
42///     let db = DB::open_default(path).unwrap();
43///     let mut iter = db.raw_iterator();
44///
45///     // Forwards iteration
46///     iter.seek_to_first();
47///     while iter.valid() {
48///         println!("Saw {:?} {:?}", iter.key(), iter.value());
49///         iter.next();
50///     }
51///
52///     // Reverse iteration
53///     iter.seek_to_last();
54///     while iter.valid() {
55///         println!("Saw {:?} {:?}", iter.key(), iter.value());
56///         iter.prev();
57///     }
58///
59///     // Seeking
60///     iter.seek(b"my key");
61///     while iter.valid() {
62///         println!("Saw {:?} {:?}", iter.key(), iter.value());
63///         iter.next();
64///     }
65///
66///     // Reverse iteration from key
67///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
68///     // this will make the iterator start from the previous key rather than the next.
69///     iter.seek_for_prev(b"my key");
70///     while iter.valid() {
71///         println!("Saw {:?} {:?}", iter.key(), iter.value());
72///         iter.prev();
73///     }
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
78    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
79
80    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
81    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
82    /// Storing this makes sure the upper bound is always alive when the
83    /// iterator is being used.
84    ///
85    /// And yes, we need to store the entire ReadOptions structure since C++
86    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
87    /// point to vectors we own.  See issue #660.
88    _readopts: ReadOptions,
89
90    db: PhantomData<&'a D>,
91}
92
93impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
94    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
95        let inner = unsafe { db.create_iterator(&readopts) };
96        Self::from_inner(inner, readopts)
97    }
98
99    pub(crate) fn new_cf(
100        db: &'a D,
101        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
102        readopts: ReadOptions,
103    ) -> Self {
104        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
105        Self::from_inner(inner, readopts)
106    }
107
108    fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
109        // This unwrap will never fail since rocksdb_create_iterator and
110        // rocksdb_create_iterator_cf functions always return non-null. They
111        // use new and deference the result so any nulls would end up with SIGSEGV
112        // there and we would have a bigger issue.
113        let inner = std::ptr::NonNull::new(inner).unwrap();
114        Self {
115            inner,
116            _readopts: readopts,
117            db: PhantomData,
118        }
119    }
120
121    /// Returns `true` if the iterator is valid. An iterator is invalidated when
122    /// it reaches the end of its defined range, or when it encounters an error.
123    ///
124    /// To check whether the iterator encountered an error after `valid` has
125    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
126    /// return an error when `valid` is `true`.
127    pub fn valid(&self) -> bool {
128        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
129    }
130
131    /// Returns an error `Result` if the iterator has encountered an error
132    /// during operation. When an error is encountered, the iterator is
133    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
134    ///
135    /// Performing a seek will discard the current status.
136    pub fn status(&self) -> Result<(), Error> {
137        unsafe {
138            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
139        }
140        Ok(())
141    }
142
143    /// Seeks to the first key in the database.
144    ///
145    /// # Examples
146    ///
147    /// ```rust
148    /// use rust_rocksdb::{DB, Options};
149    ///
150    /// let tempdir = tempfile::Builder::new()
151    ///     .prefix("_path_for_rocksdb_storage5")
152    ///     .tempdir()
153    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
154    /// let path = tempdir.path();
155    /// {
156    ///     let db = DB::open_default(path).unwrap();
157    ///     let mut iter = db.raw_iterator();
158    ///
159    ///     // Iterate all keys from the start in lexicographic order
160    ///     iter.seek_to_first();
161    ///
162    ///     while iter.valid() {
163    ///         println!("{:?} {:?}", iter.key(), iter.value());
164    ///         iter.next();
165    ///     }
166    ///
167    ///     // Read just the first key
168    ///     iter.seek_to_first();
169    ///
170    ///     if iter.valid() {
171    ///         println!("{:?} {:?}", iter.key(), iter.value());
172    ///     } else {
173    ///         // There are no keys in the database
174    ///     }
175    /// }
176    /// let _ = DB::destroy(&Options::default(), path);
177    /// ```
178    pub fn seek_to_first(&mut self) {
179        unsafe {
180            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
181        }
182    }
183
184    /// Seeks to the last key in the database.
185    ///
186    /// # Examples
187    ///
188    /// ```rust
189    /// use rust_rocksdb::{DB, Options};
190    ///
191    /// let tempdir = tempfile::Builder::new()
192    ///     .prefix("_path_for_rocksdb_storage6")
193    ///     .tempdir()
194    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
195    /// let path = tempdir.path();
196    /// {
197    ///     let db = DB::open_default(path).unwrap();
198    ///     let mut iter = db.raw_iterator();
199    ///
200    ///     // Iterate all keys from the end in reverse lexicographic order
201    ///     iter.seek_to_last();
202    ///
203    ///     while iter.valid() {
204    ///         println!("{:?} {:?}", iter.key(), iter.value());
205    ///         iter.prev();
206    ///     }
207    ///
208    ///     // Read just the last key
209    ///     iter.seek_to_last();
210    ///
211    ///     if iter.valid() {
212    ///         println!("{:?} {:?}", iter.key(), iter.value());
213    ///     } else {
214    ///         // There are no keys in the database
215    ///     }
216    /// }
217    /// let _ = DB::destroy(&Options::default(), path);
218    /// ```
219    pub fn seek_to_last(&mut self) {
220        unsafe {
221            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
222        }
223    }
224
225    /// Seeks to the specified key or the first key that lexicographically follows it.
226    ///
227    /// This method will attempt to seek to the specified key. If that key does not exist, it will
228    /// find and seek to the key that lexicographically follows it instead.
229    ///
230    /// # Examples
231    ///
232    /// ```rust
233    /// use rust_rocksdb::{DB, Options};
234    ///
235    /// let tempdir = tempfile::Builder::new()
236    ///     .prefix("_path_for_rocksdb_storage7")
237    ///     .tempdir()
238    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
239    /// let path = tempdir.path();
240    /// {
241    ///     let db = DB::open_default(path).unwrap();
242    ///     let mut iter = db.raw_iterator();
243    ///
244    ///     // Read the first key that starts with 'a'
245    ///     iter.seek(b"a");
246    ///
247    ///     if iter.valid() {
248    ///         println!("{:?} {:?}", iter.key(), iter.value());
249    ///     } else {
250    ///         // There are no keys in the database
251    ///     }
252    /// }
253    /// let _ = DB::destroy(&Options::default(), path);
254    /// ```
255    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
256        let key = key.as_ref();
257
258        unsafe {
259            ffi::rocksdb_iter_seek(
260                self.inner.as_ptr(),
261                key.as_ptr() as *const c_char,
262                key.len() as size_t,
263            );
264        }
265    }
266
267    /// Seeks to the specified key, or the first key that lexicographically precedes it.
268    ///
269    /// Like ``.seek()`` this method will attempt to seek to the specified key.
270    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
271    /// seek to key that lexicographically precedes it instead.
272    ///
273    /// # Examples
274    ///
275    /// ```rust
276    /// use rust_rocksdb::{DB, Options};
277    ///
278    /// let tempdir = tempfile::Builder::new()
279    ///     .prefix("_path_for_rocksdb_storage8")
280    ///     .tempdir()
281    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
282    /// let path = tempdir.path();
283    /// {
284    ///     let db = DB::open_default(path).unwrap();
285    ///     let mut iter = db.raw_iterator();
286    ///
287    ///     // Read the last key that starts with 'a'
288    ///     iter.seek_for_prev(b"b");
289    ///
290    ///     if iter.valid() {
291    ///         println!("{:?} {:?}", iter.key(), iter.value());
292    ///     } else {
293    ///         // There are no keys in the database
294    ///     }
295    /// }
296    /// let _ = DB::destroy(&Options::default(), path);
297    /// ```
298    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
299        let key = key.as_ref();
300
301        unsafe {
302            ffi::rocksdb_iter_seek_for_prev(
303                self.inner.as_ptr(),
304                key.as_ptr() as *const c_char,
305                key.len() as size_t,
306            );
307        }
308    }
309
310    /// Seeks to the next key.
311    pub fn next(&mut self) {
312        if self.valid() {
313            unsafe {
314                ffi::rocksdb_iter_next(self.inner.as_ptr());
315            }
316        }
317    }
318
319    /// Seeks to the previous key.
320    pub fn prev(&mut self) {
321        if self.valid() {
322            unsafe {
323                ffi::rocksdb_iter_prev(self.inner.as_ptr());
324            }
325        }
326    }
327
328    /// Returns a slice of the current key.
329    pub fn key(&self) -> Option<&[u8]> {
330        if self.valid() {
331            Some(self.key_impl())
332        } else {
333            None
334        }
335    }
336
337    /// Returns a slice of the current value.
338    pub fn value(&self) -> Option<&[u8]> {
339        if self.valid() {
340            Some(self.value_impl())
341        } else {
342            None
343        }
344    }
345
346    /// Returns pair with slice of the current key and current value.
347    pub fn item(&self) -> Option<(&[u8], &[u8])> {
348        if self.valid() {
349            Some((self.key_impl(), self.value_impl()))
350        } else {
351            None
352        }
353    }
354
355    /// Returns a slice of the current key; assumes the iterator is valid.
356    fn key_impl(&self) -> &[u8] {
357        // Safety Note: This is safe as all methods that may invalidate the buffer returned
358        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
359        unsafe {
360            let mut key_len: size_t = 0;
361            let key_len_ptr: *mut size_t = &mut key_len;
362            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
363            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
364        }
365    }
366
367    /// Returns a slice of the current value; assumes the iterator is valid.
368    fn value_impl(&self) -> &[u8] {
369        // Safety Note: This is safe as all methods that may invalidate the buffer returned
370        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
371        unsafe {
372            let mut val_len: size_t = 0;
373            let val_len_ptr: *mut size_t = &mut val_len;
374            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
375            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
376        }
377    }
378}
379
380impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
381    fn drop(&mut self) {
382        unsafe {
383            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
384        }
385    }
386}
387
388unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
389unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
390
391/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
392pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
393
394/// An iterator over a database or column family, with specifiable
395/// ranges and direction.
396///
397/// ```
398/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
399///
400/// let tempdir = tempfile::Builder::new()
401///     .prefix("_path_for_rocksdb_storage2")
402///     .tempdir()
403///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
404/// let path = tempdir.path();
405/// {
406///     let db = DB::open_default(path).unwrap();
407///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
408///     for item in iter {
409///         let (key, value) = item.unwrap();
410///         println!("Saw {:?} {:?}", key, value);
411///     }
412///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
413///     for item in iter {
414///         let (key, value) = item.unwrap();
415///         println!("Saw {:?} {:?}", key, value);
416///     }
417///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
418///     for item in iter {
419///         let (key, value) = item.unwrap();
420///         println!("Saw {:?} {:?}", key, value);
421///     }
422///
423///     // You can seek with an existing Iterator instance, too
424///     iter = db.iterator(IteratorMode::Start);
425///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
426///     for item in iter {
427///         let (key, value) = item.unwrap();
428///         println!("Saw {:?} {:?}", key, value);
429///     }
430/// }
431/// let _ = DB::destroy(&Options::default(), path);
432/// ```
433pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
434    raw: DBRawIteratorWithThreadMode<'a, D>,
435    direction: Direction,
436    done: bool,
437}
438
439#[derive(Copy, Clone)]
440pub enum Direction {
441    Forward,
442    Reverse,
443}
444
445pub type KVBytes = (Box<[u8]>, Box<[u8]>);
446
447#[derive(Copy, Clone)]
448pub enum IteratorMode<'a> {
449    Start,
450    End,
451    From(&'a [u8], Direction),
452}
453
454impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
455    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
456        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
457    }
458
459    pub(crate) fn new_cf(
460        db: &'a D,
461        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
462        readopts: ReadOptions,
463        mode: IteratorMode,
464    ) -> Self {
465        Self::from_raw(
466            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
467            mode,
468        )
469    }
470
471    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
472        let mut rv = DBIteratorWithThreadMode {
473            raw,
474            direction: Direction::Forward, // blown away by set_mode()
475            done: false,
476        };
477        rv.set_mode(mode);
478        rv
479    }
480
481    pub fn set_mode(&mut self, mode: IteratorMode) {
482        self.done = false;
483        self.direction = match mode {
484            IteratorMode::Start => {
485                self.raw.seek_to_first();
486                Direction::Forward
487            }
488            IteratorMode::End => {
489                self.raw.seek_to_last();
490                Direction::Reverse
491            }
492            IteratorMode::From(key, Direction::Forward) => {
493                self.raw.seek(key);
494                Direction::Forward
495            }
496            IteratorMode::From(key, Direction::Reverse) => {
497                self.raw.seek_for_prev(key);
498                Direction::Reverse
499            }
500        };
501    }
502}
503
504impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
505    type Item = Result<KVBytes, Error>;
506
507    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
508        if self.done {
509            None
510        } else if let Some((key, value)) = self.raw.item() {
511            let item = (Box::from(key), Box::from(value));
512            match self.direction {
513                Direction::Forward => self.raw.next(),
514                Direction::Reverse => self.raw.prev(),
515            }
516            Some(Ok(item))
517        } else {
518            self.done = true;
519            self.raw.status().err().map(Result::Err)
520        }
521    }
522}
523
524impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
525
526impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
527    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
528        self.raw
529    }
530}
531
532/// Iterates the batches of writes since a given sequence number.
533///
534/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
535/// batches of write operations that have occurred since a given sequence number
536/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
537/// the application.
538///
539/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
540/// value is the sequence number of the associated write batch.
541///
542pub struct DBWALIterator {
543    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
544    pub(crate) start_seq_number: u64,
545}
546
547impl DBWALIterator {
548    /// Returns `true` if the iterator is valid. An iterator is invalidated when
549    /// it reaches the end of its defined range, or when it encounters an error.
550    ///
551    /// To check whether the iterator encountered an error after `valid` has
552    /// returned `false`, use the [`status`](DBWALIterator::status) method.
553    /// `status` will never return an error when `valid` is `true`.
554    pub fn valid(&self) -> bool {
555        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
556    }
557
558    /// Returns an error `Result` if the iterator has encountered an error
559    /// during operation. When an error is encountered, the iterator is
560    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
561    /// called.
562    pub fn status(&self) -> Result<(), Error> {
563        unsafe {
564            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
565        }
566        Ok(())
567    }
568}
569
570impl Iterator for DBWALIterator {
571    type Item = Result<(u64, WriteBatch), Error>;
572
573    fn next(&mut self) -> Option<Self::Item> {
574        if !self.valid() {
575            return None;
576        }
577
578        let mut seq: u64 = 0;
579        let mut batch = WriteBatch {
580            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
581        };
582
583        // if the initial sequence number is what was requested we skip it to
584        // only provide changes *after* it
585        while seq <= self.start_seq_number {
586            unsafe {
587                ffi::rocksdb_wal_iter_next(self.inner);
588            }
589
590            if !self.valid() {
591                return None;
592            }
593
594            // this drops which in turn frees the skipped batch
595            batch = WriteBatch {
596                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
597            };
598        }
599
600        if !self.valid() {
601            return self.status().err().map(Result::Err);
602        }
603
604        // Seek to the next write batch.
605        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
606        unsafe {
607            ffi::rocksdb_wal_iter_next(self.inner);
608        }
609
610        Some(Ok((seq, batch)))
611    }
612}
613
614impl Drop for DBWALIterator {
615    fn drop(&mut self) {
616        unsafe {
617            ffi::rocksdb_wal_iter_destroy(self.inner);
618        }
619    }
620}