Skip to main content

rust_rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    Error, ReadOptions, WriteBatch,
17    db::{DB, DBAccess},
18    ffi,
19};
20use libc::{c_char, c_uchar, size_t};
21use std::mem::ManuallyDrop;
22use std::{marker::PhantomData, slice};
23
24/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
25pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
26
27/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
28/// and other `raw_iterator_*` methods.
29///
30/// This iterator replicates RocksDB's API. It should provide better
31/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
32/// Rust [`std::iter::Iterator`].
33///
34/// ```
35/// use rust_rocksdb::{DB, Options};
36///
37/// let tempdir = tempfile::Builder::new()
38///     .prefix("_path_for_rocksdb_storage4")
39///     .tempdir()
40///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
41/// let path = tempdir.path();
42/// {
43///     let db = DB::open_default(path).unwrap();
44///     let mut iter = db.raw_iterator();
45///
46///     // Forwards iteration
47///     iter.seek_to_first();
48///     while iter.valid() {
49///         println!("Saw {:?} {:?}", iter.key(), iter.value());
50///         iter.next();
51///     }
52///
53///     // Reverse iteration
54///     iter.seek_to_last();
55///     while iter.valid() {
56///         println!("Saw {:?} {:?}", iter.key(), iter.value());
57///         iter.prev();
58///     }
59///
60///     // Seeking
61///     iter.seek(b"my key");
62///     while iter.valid() {
63///         println!("Saw {:?} {:?}", iter.key(), iter.value());
64///         iter.next();
65///     }
66///
67///     // Reverse iteration from key
68///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
69///     // this will make the iterator start from the previous key rather than the next.
70///     iter.seek_for_prev(b"my key");
71///     while iter.valid() {
72///         println!("Saw {:?} {:?}", iter.key(), iter.value());
73///         iter.prev();
74///     }
75/// }
76/// let _ = DB::destroy(&Options::default(), path);
77/// ```
78pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
79    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
80
81    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
82    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
83    /// Storing this makes sure the upper bound is always alive when the
84    /// iterator is being used.
85    ///
86    /// And yes, we need to store the entire ReadOptions structure since C++
87    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
88    /// point to vectors we own.  See issue #660.
89    readopts: ReadOptions,
90
91    db: PhantomData<&'a D>,
92}
93
94impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
95    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
96        let inner = unsafe { db.create_iterator(&readopts) };
97        Self::from_inner(inner, readopts)
98    }
99
100    pub(crate) fn new_cf(
101        db: &'a D,
102        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
103        readopts: ReadOptions,
104    ) -> Self {
105        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
106        Self::from_inner(inner, readopts)
107    }
108
109    pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
110        // This unwrap will never fail since rocksdb_create_iterator and
111        // rocksdb_create_iterator_cf functions always return non-null. They
112        // use new and deference the result so any nulls would end up with SIGSEGV
113        // there and we would have a bigger issue.
114        let inner = std::ptr::NonNull::new(inner).unwrap();
115        Self {
116            inner,
117            readopts,
118            db: PhantomData,
119        }
120    }
121
122    pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
123        let value = ManuallyDrop::new(self);
124        // SAFETY: value won't be used beyond this point
125        let inner = unsafe { std::ptr::read(&raw const value.inner) };
126        let readopts = unsafe { std::ptr::read(&raw const value.readopts) };
127
128        (inner, readopts)
129    }
130
131    /// Returns `true` if the iterator is valid. An iterator is invalidated when
132    /// it reaches the end of its defined range, or when it encounters an error.
133    ///
134    /// To check whether the iterator encountered an error after `valid` has
135    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
136    /// return an error when `valid` is `true`.
137    pub fn valid(&self) -> bool {
138        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
139    }
140
141    /// Returns an error `Result` if the iterator has encountered an error
142    /// during operation. When an error is encountered, the iterator is
143    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
144    ///
145    /// Performing a seek will discard the current status.
146    pub fn status(&self) -> Result<(), Error> {
147        unsafe {
148            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
149        }
150        Ok(())
151    }
152
153    /// Seeks to the first key in the database.
154    ///
155    /// # Examples
156    ///
157    /// ```rust
158    /// use rust_rocksdb::{DB, Options};
159    ///
160    /// let tempdir = tempfile::Builder::new()
161    ///     .prefix("_path_for_rocksdb_storage5")
162    ///     .tempdir()
163    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
164    /// let path = tempdir.path();
165    /// {
166    ///     let db = DB::open_default(path).unwrap();
167    ///     let mut iter = db.raw_iterator();
168    ///
169    ///     // Iterate all keys from the start in lexicographic order
170    ///     iter.seek_to_first();
171    ///
172    ///     while iter.valid() {
173    ///         println!("{:?} {:?}", iter.key(), iter.value());
174    ///         iter.next();
175    ///     }
176    ///
177    ///     // Read just the first key
178    ///     iter.seek_to_first();
179    ///
180    ///     if iter.valid() {
181    ///         println!("{:?} {:?}", iter.key(), iter.value());
182    ///     } else {
183    ///         // There are no keys in the database
184    ///     }
185    /// }
186    /// let _ = DB::destroy(&Options::default(), path);
187    /// ```
188    pub fn seek_to_first(&mut self) {
189        unsafe {
190            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
191        }
192    }
193
194    /// Seeks to the last key in the database.
195    ///
196    /// # Examples
197    ///
198    /// ```rust
199    /// use rust_rocksdb::{DB, Options};
200    ///
201    /// let tempdir = tempfile::Builder::new()
202    ///     .prefix("_path_for_rocksdb_storage6")
203    ///     .tempdir()
204    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
205    /// let path = tempdir.path();
206    /// {
207    ///     let db = DB::open_default(path).unwrap();
208    ///     let mut iter = db.raw_iterator();
209    ///
210    ///     // Iterate all keys from the end in reverse lexicographic order
211    ///     iter.seek_to_last();
212    ///
213    ///     while iter.valid() {
214    ///         println!("{:?} {:?}", iter.key(), iter.value());
215    ///         iter.prev();
216    ///     }
217    ///
218    ///     // Read just the last key
219    ///     iter.seek_to_last();
220    ///
221    ///     if iter.valid() {
222    ///         println!("{:?} {:?}", iter.key(), iter.value());
223    ///     } else {
224    ///         // There are no keys in the database
225    ///     }
226    /// }
227    /// let _ = DB::destroy(&Options::default(), path);
228    /// ```
229    pub fn seek_to_last(&mut self) {
230        unsafe {
231            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
232        }
233    }
234
235    /// Seeks to the specified key or the first key that lexicographically follows it.
236    ///
237    /// This method will attempt to seek to the specified key. If that key does not exist, it will
238    /// find and seek to the key that lexicographically follows it instead.
239    ///
240    /// # Examples
241    ///
242    /// ```rust
243    /// use rust_rocksdb::{DB, Options};
244    ///
245    /// let tempdir = tempfile::Builder::new()
246    ///     .prefix("_path_for_rocksdb_storage7")
247    ///     .tempdir()
248    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
249    /// let path = tempdir.path();
250    /// {
251    ///     let db = DB::open_default(path).unwrap();
252    ///     let mut iter = db.raw_iterator();
253    ///
254    ///     // Read the first key that starts with 'a'
255    ///     iter.seek(b"a");
256    ///
257    ///     if iter.valid() {
258    ///         println!("{:?} {:?}", iter.key(), iter.value());
259    ///     } else {
260    ///         // There are no keys in the database
261    ///     }
262    /// }
263    /// let _ = DB::destroy(&Options::default(), path);
264    /// ```
265    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
266        let key = key.as_ref();
267
268        unsafe {
269            ffi::rocksdb_iter_seek(
270                self.inner.as_ptr(),
271                key.as_ptr() as *const c_char,
272                key.len() as size_t,
273            );
274        }
275    }
276
277    /// Seeks to the specified key, or the first key that lexicographically precedes it.
278    ///
279    /// Like ``.seek()`` this method will attempt to seek to the specified key.
280    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
281    /// seek to key that lexicographically precedes it instead.
282    ///
283    /// # Examples
284    ///
285    /// ```rust
286    /// use rust_rocksdb::{DB, Options};
287    ///
288    /// let tempdir = tempfile::Builder::new()
289    ///     .prefix("_path_for_rocksdb_storage8")
290    ///     .tempdir()
291    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
292    /// let path = tempdir.path();
293    /// {
294    ///     let db = DB::open_default(path).unwrap();
295    ///     let mut iter = db.raw_iterator();
296    ///
297    ///     // Read the last key that starts with 'a'
298    ///     iter.seek_for_prev(b"b");
299    ///
300    ///     if iter.valid() {
301    ///         println!("{:?} {:?}", iter.key(), iter.value());
302    ///     } else {
303    ///         // There are no keys in the database
304    ///     }
305    /// }
306    /// let _ = DB::destroy(&Options::default(), path);
307    /// ```
308    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
309        let key = key.as_ref();
310
311        unsafe {
312            ffi::rocksdb_iter_seek_for_prev(
313                self.inner.as_ptr(),
314                key.as_ptr() as *const c_char,
315                key.len() as size_t,
316            );
317        }
318    }
319
320    /// Seeks to the next key.
321    pub fn next(&mut self) {
322        if self.valid() {
323            unsafe {
324                ffi::rocksdb_iter_next(self.inner.as_ptr());
325            }
326        }
327    }
328
329    /// Seeks to the previous key.
330    pub fn prev(&mut self) {
331        if self.valid() {
332            unsafe {
333                ffi::rocksdb_iter_prev(self.inner.as_ptr());
334            }
335        }
336    }
337
338    /// Returns a slice of the current key.
339    pub fn key(&self) -> Option<&[u8]> {
340        if self.valid() {
341            // SAFETY: We just checked that the iterator is valid.
342            Some(unsafe { self.key_impl() })
343        } else {
344            None
345        }
346    }
347
348    /// Returns a slice of the current value.
349    pub fn value(&self) -> Option<&[u8]> {
350        if self.valid() {
351            // SAFETY: We just checked that the iterator is valid.
352            Some(unsafe { self.value_impl() })
353        } else {
354            None
355        }
356    }
357
358    /// Returns pair with slice of the current key and current value.
359    pub fn item(&self) -> Option<(&[u8], &[u8])> {
360        if self.valid() {
361            // SAFETY: We just checked that the iterator is valid.
362            Some(unsafe { (self.key_impl(), self.value_impl()) })
363        } else {
364            None
365        }
366    }
367
368    /// Returns a slice of the current key.
369    ///
370    /// # Safety
371    ///
372    /// The iterator must be valid (i.e., `valid()` returns true). Calling this
373    /// method when the iterator is invalid is undefined behavior, as RocksDB
374    /// may return an invalid pointer.
375    ///
376    /// Uses `rocksdb_iter_key_slice` which returns a `rocksdb_slice_t` by value,
377    /// avoiding the overhead of output parameters compared to `rocksdb_iter_key`.
378    unsafe fn key_impl(&self) -> &[u8] {
379        unsafe {
380            let slice = ffi::rocksdb_iter_key_slice(self.inner.as_ptr());
381            slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
382        }
383    }
384
385    /// Returns a slice of the current value.
386    ///
387    /// # Safety
388    ///
389    /// The iterator must be valid (i.e., `valid()` returns true). Calling this
390    /// method when the iterator is invalid is undefined behavior, as RocksDB
391    /// may return an invalid pointer.
392    ///
393    /// Uses `rocksdb_iter_value_slice` which returns a `rocksdb_slice_t` by value,
394    /// avoiding the overhead of output parameters compared to `rocksdb_iter_value`.
395    unsafe fn value_impl(&self) -> &[u8] {
396        unsafe {
397            let slice = ffi::rocksdb_iter_value_slice(self.inner.as_ptr());
398            slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
399        }
400    }
401}
402
403impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
404    fn drop(&mut self) {
405        unsafe {
406            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
407        }
408    }
409}
410
411unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
412unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
413
414/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
415pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
416
417/// A standard Rust [`Iterator`] over a database or column family.
418///
419/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
420/// RocksDB's API, which can provide better performance and more features.
421///
422/// ```
423/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
424///
425/// let tempdir = tempfile::Builder::new()
426///     .prefix("_path_for_rocksdb_storage2")
427///     .tempdir()
428///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
429/// let path = tempdir.path();
430/// {
431///     let db = DB::open_default(path).unwrap();
432///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
433///     for item in iter {
434///         let (key, value) = item.unwrap();
435///         println!("Saw {:?} {:?}", key, value);
436///     }
437///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
438///     for item in iter {
439///         let (key, value) = item.unwrap();
440///         println!("Saw {:?} {:?}", key, value);
441///     }
442///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
443///     for item in iter {
444///         let (key, value) = item.unwrap();
445///         println!("Saw {:?} {:?}", key, value);
446///     }
447///
448///     // You can seek with an existing Iterator instance, too
449///     iter = db.iterator(IteratorMode::Start);
450///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
451///     for item in iter {
452///         let (key, value) = item.unwrap();
453///         println!("Saw {:?} {:?}", key, value);
454///     }
455/// }
456/// let _ = DB::destroy(&Options::default(), path);
457/// ```
458pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
459    raw: DBRawIteratorWithThreadMode<'a, D>,
460    direction: Direction,
461    done: bool,
462}
463
464#[derive(Copy, Clone)]
465pub enum Direction {
466    Forward,
467    Reverse,
468}
469
470pub type KVBytes = (Box<[u8]>, Box<[u8]>);
471
472#[derive(Copy, Clone)]
473pub enum IteratorMode<'a> {
474    Start,
475    End,
476    From(&'a [u8], Direction),
477}
478
479impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
480    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
481        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
482    }
483
484    pub(crate) fn new_cf(
485        db: &'a D,
486        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
487        readopts: ReadOptions,
488        mode: IteratorMode,
489    ) -> Self {
490        Self::from_raw(
491            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
492            mode,
493        )
494    }
495
496    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
497        let mut rv = DBIteratorWithThreadMode {
498            raw,
499            direction: Direction::Forward, // blown away by set_mode()
500            done: false,
501        };
502        rv.set_mode(mode);
503        rv
504    }
505
506    pub fn set_mode(&mut self, mode: IteratorMode) {
507        self.done = false;
508        self.direction = match mode {
509            IteratorMode::Start => {
510                self.raw.seek_to_first();
511                Direction::Forward
512            }
513            IteratorMode::End => {
514                self.raw.seek_to_last();
515                Direction::Reverse
516            }
517            IteratorMode::From(key, Direction::Forward) => {
518                self.raw.seek(key);
519                Direction::Forward
520            }
521            IteratorMode::From(key, Direction::Reverse) => {
522                self.raw.seek_for_prev(key);
523                Direction::Reverse
524            }
525        };
526    }
527}
528
529impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
530    type Item = Result<KVBytes, Error>;
531
532    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
533        if self.done {
534            None
535        } else if let Some((key, value)) = self.raw.item() {
536            let item = (Box::from(key), Box::from(value));
537            match self.direction {
538                Direction::Forward => self.raw.next(),
539                Direction::Reverse => self.raw.prev(),
540            }
541            Some(Ok(item))
542        } else {
543            self.done = true;
544            self.raw.status().err().map(Result::Err)
545        }
546    }
547}
548
549impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
550
551impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
552    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
553        self.raw
554    }
555}
556
557/// Iterates the batches of writes since a given sequence number.
558///
559/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
560/// batches of write operations that have occurred since a given sequence number
561/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
562/// the application.
563///
564/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
565/// value is the sequence number of the associated write batch.
566///
567pub struct DBWALIterator {
568    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
569    pub(crate) start_seq_number: u64,
570}
571
572impl DBWALIterator {
573    /// Returns `true` if the iterator is valid. An iterator is invalidated when
574    /// it reaches the end of its defined range, or when it encounters an error.
575    ///
576    /// To check whether the iterator encountered an error after `valid` has
577    /// returned `false`, use the [`status`](DBWALIterator::status) method.
578    /// `status` will never return an error when `valid` is `true`.
579    pub fn valid(&self) -> bool {
580        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
581    }
582
583    /// Returns an error `Result` if the iterator has encountered an error
584    /// during operation. When an error is encountered, the iterator is
585    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
586    /// called.
587    pub fn status(&self) -> Result<(), Error> {
588        unsafe {
589            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
590        }
591        Ok(())
592    }
593}
594
595impl Iterator for DBWALIterator {
596    type Item = Result<(u64, WriteBatch), Error>;
597
598    fn next(&mut self) -> Option<Self::Item> {
599        if !self.valid() {
600            return None;
601        }
602
603        let mut seq: u64 = 0;
604        let mut batch = WriteBatch {
605            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
606        };
607
608        // if the initial sequence number is what was requested we skip it to
609        // only provide changes *after* it
610        while seq <= self.start_seq_number {
611            unsafe {
612                ffi::rocksdb_wal_iter_next(self.inner);
613            }
614
615            if !self.valid() {
616                return None;
617            }
618
619            // this drops which in turn frees the skipped batch
620            batch = WriteBatch {
621                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
622            };
623        }
624
625        if !self.valid() {
626            return self.status().err().map(Result::Err);
627        }
628
629        // Seek to the next write batch.
630        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
631        unsafe {
632            ffi::rocksdb_wal_iter_next(self.inner);
633        }
634
635        Some(Ok((seq, batch)))
636    }
637}
638
639impl Drop for DBWALIterator {
640    fn drop(&mut self) {
641        unsafe {
642            ffi::rocksdb_wal_iter_destroy(self.inner);
643        }
644    }
645}