rust_rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    db::{DBAccess, DB},
17    ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::mem::ManuallyDrop;
21use std::{marker::PhantomData, slice};
22
23/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
24pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
25
26/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
27/// and other `raw_iterator_*` methods.
28///
29/// This iterator replicates RocksDB's API. It should provide better
30/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
31/// Rust [`std::iter::Iterator`].
32///
33/// ```
34/// use rust_rocksdb::{DB, Options};
35///
36/// let tempdir = tempfile::Builder::new()
37///     .prefix("_path_for_rocksdb_storage4")
38///     .tempdir()
39///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
40/// let path = tempdir.path();
41/// {
42///     let db = DB::open_default(path).unwrap();
43///     let mut iter = db.raw_iterator();
44///
45///     // Forwards iteration
46///     iter.seek_to_first();
47///     while iter.valid() {
48///         println!("Saw {:?} {:?}", iter.key(), iter.value());
49///         iter.next();
50///     }
51///
52///     // Reverse iteration
53///     iter.seek_to_last();
54///     while iter.valid() {
55///         println!("Saw {:?} {:?}", iter.key(), iter.value());
56///         iter.prev();
57///     }
58///
59///     // Seeking
60///     iter.seek(b"my key");
61///     while iter.valid() {
62///         println!("Saw {:?} {:?}", iter.key(), iter.value());
63///         iter.next();
64///     }
65///
66///     // Reverse iteration from key
67///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
68///     // this will make the iterator start from the previous key rather than the next.
69///     iter.seek_for_prev(b"my key");
70///     while iter.valid() {
71///         println!("Saw {:?} {:?}", iter.key(), iter.value());
72///         iter.prev();
73///     }
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
78    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
79
80    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
81    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
82    /// Storing this makes sure the upper bound is always alive when the
83    /// iterator is being used.
84    ///
85    /// And yes, we need to store the entire ReadOptions structure since C++
86    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
87    /// point to vectors we own.  See issue #660.
88    readopts: ReadOptions,
89
90    db: PhantomData<&'a D>,
91}
92
93impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
94    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
95        let inner = unsafe { db.create_iterator(&readopts) };
96        Self::from_inner(inner, readopts)
97    }
98
99    pub(crate) fn new_cf(
100        db: &'a D,
101        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
102        readopts: ReadOptions,
103    ) -> Self {
104        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
105        Self::from_inner(inner, readopts)
106    }
107
108    pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
109        // This unwrap will never fail since rocksdb_create_iterator and
110        // rocksdb_create_iterator_cf functions always return non-null. They
111        // use new and deference the result so any nulls would end up with SIGSEGV
112        // there and we would have a bigger issue.
113        let inner = std::ptr::NonNull::new(inner).unwrap();
114        Self {
115            inner,
116            readopts,
117            db: PhantomData,
118        }
119    }
120
121    pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
122        let value = ManuallyDrop::new(self);
123        // SAFETY: value won't be used beyond this point
124        let inner = unsafe { std::ptr::read(&value.inner) };
125        let readopts = unsafe { std::ptr::read(&value.readopts) };
126
127        (inner, readopts)
128    }
129
130    /// Returns `true` if the iterator is valid. An iterator is invalidated when
131    /// it reaches the end of its defined range, or when it encounters an error.
132    ///
133    /// To check whether the iterator encountered an error after `valid` has
134    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
135    /// return an error when `valid` is `true`.
136    pub fn valid(&self) -> bool {
137        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
138    }
139
140    /// Returns an error `Result` if the iterator has encountered an error
141    /// during operation. When an error is encountered, the iterator is
142    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
143    ///
144    /// Performing a seek will discard the current status.
145    pub fn status(&self) -> Result<(), Error> {
146        unsafe {
147            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
148        }
149        Ok(())
150    }
151
152    /// Seeks to the first key in the database.
153    ///
154    /// # Examples
155    ///
156    /// ```rust
157    /// use rust_rocksdb::{DB, Options};
158    ///
159    /// let tempdir = tempfile::Builder::new()
160    ///     .prefix("_path_for_rocksdb_storage5")
161    ///     .tempdir()
162    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
163    /// let path = tempdir.path();
164    /// {
165    ///     let db = DB::open_default(path).unwrap();
166    ///     let mut iter = db.raw_iterator();
167    ///
168    ///     // Iterate all keys from the start in lexicographic order
169    ///     iter.seek_to_first();
170    ///
171    ///     while iter.valid() {
172    ///         println!("{:?} {:?}", iter.key(), iter.value());
173    ///         iter.next();
174    ///     }
175    ///
176    ///     // Read just the first key
177    ///     iter.seek_to_first();
178    ///
179    ///     if iter.valid() {
180    ///         println!("{:?} {:?}", iter.key(), iter.value());
181    ///     } else {
182    ///         // There are no keys in the database
183    ///     }
184    /// }
185    /// let _ = DB::destroy(&Options::default(), path);
186    /// ```
187    pub fn seek_to_first(&mut self) {
188        unsafe {
189            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
190        }
191    }
192
193    /// Seeks to the last key in the database.
194    ///
195    /// # Examples
196    ///
197    /// ```rust
198    /// use rust_rocksdb::{DB, Options};
199    ///
200    /// let tempdir = tempfile::Builder::new()
201    ///     .prefix("_path_for_rocksdb_storage6")
202    ///     .tempdir()
203    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
204    /// let path = tempdir.path();
205    /// {
206    ///     let db = DB::open_default(path).unwrap();
207    ///     let mut iter = db.raw_iterator();
208    ///
209    ///     // Iterate all keys from the end in reverse lexicographic order
210    ///     iter.seek_to_last();
211    ///
212    ///     while iter.valid() {
213    ///         println!("{:?} {:?}", iter.key(), iter.value());
214    ///         iter.prev();
215    ///     }
216    ///
217    ///     // Read just the last key
218    ///     iter.seek_to_last();
219    ///
220    ///     if iter.valid() {
221    ///         println!("{:?} {:?}", iter.key(), iter.value());
222    ///     } else {
223    ///         // There are no keys in the database
224    ///     }
225    /// }
226    /// let _ = DB::destroy(&Options::default(), path);
227    /// ```
228    pub fn seek_to_last(&mut self) {
229        unsafe {
230            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
231        }
232    }
233
234    /// Seeks to the specified key or the first key that lexicographically follows it.
235    ///
236    /// This method will attempt to seek to the specified key. If that key does not exist, it will
237    /// find and seek to the key that lexicographically follows it instead.
238    ///
239    /// # Examples
240    ///
241    /// ```rust
242    /// use rust_rocksdb::{DB, Options};
243    ///
244    /// let tempdir = tempfile::Builder::new()
245    ///     .prefix("_path_for_rocksdb_storage7")
246    ///     .tempdir()
247    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
248    /// let path = tempdir.path();
249    /// {
250    ///     let db = DB::open_default(path).unwrap();
251    ///     let mut iter = db.raw_iterator();
252    ///
253    ///     // Read the first key that starts with 'a'
254    ///     iter.seek(b"a");
255    ///
256    ///     if iter.valid() {
257    ///         println!("{:?} {:?}", iter.key(), iter.value());
258    ///     } else {
259    ///         // There are no keys in the database
260    ///     }
261    /// }
262    /// let _ = DB::destroy(&Options::default(), path);
263    /// ```
264    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
265        let key = key.as_ref();
266
267        unsafe {
268            ffi::rocksdb_iter_seek(
269                self.inner.as_ptr(),
270                key.as_ptr() as *const c_char,
271                key.len() as size_t,
272            );
273        }
274    }
275
276    /// Seeks to the specified key, or the first key that lexicographically precedes it.
277    ///
278    /// Like ``.seek()`` this method will attempt to seek to the specified key.
279    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
280    /// seek to key that lexicographically precedes it instead.
281    ///
282    /// # Examples
283    ///
284    /// ```rust
285    /// use rust_rocksdb::{DB, Options};
286    ///
287    /// let tempdir = tempfile::Builder::new()
288    ///     .prefix("_path_for_rocksdb_storage8")
289    ///     .tempdir()
290    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
291    /// let path = tempdir.path();
292    /// {
293    ///     let db = DB::open_default(path).unwrap();
294    ///     let mut iter = db.raw_iterator();
295    ///
296    ///     // Read the last key that starts with 'a'
297    ///     iter.seek_for_prev(b"b");
298    ///
299    ///     if iter.valid() {
300    ///         println!("{:?} {:?}", iter.key(), iter.value());
301    ///     } else {
302    ///         // There are no keys in the database
303    ///     }
304    /// }
305    /// let _ = DB::destroy(&Options::default(), path);
306    /// ```
307    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
308        let key = key.as_ref();
309
310        unsafe {
311            ffi::rocksdb_iter_seek_for_prev(
312                self.inner.as_ptr(),
313                key.as_ptr() as *const c_char,
314                key.len() as size_t,
315            );
316        }
317    }
318
319    /// Seeks to the next key.
320    pub fn next(&mut self) {
321        if self.valid() {
322            unsafe {
323                ffi::rocksdb_iter_next(self.inner.as_ptr());
324            }
325        }
326    }
327
328    /// Seeks to the previous key.
329    pub fn prev(&mut self) {
330        if self.valid() {
331            unsafe {
332                ffi::rocksdb_iter_prev(self.inner.as_ptr());
333            }
334        }
335    }
336
337    /// Returns a slice of the current key.
338    pub fn key(&self) -> Option<&[u8]> {
339        if self.valid() {
340            Some(self.key_impl())
341        } else {
342            None
343        }
344    }
345
346    /// Returns a slice of the current value.
347    pub fn value(&self) -> Option<&[u8]> {
348        if self.valid() {
349            Some(self.value_impl())
350        } else {
351            None
352        }
353    }
354
355    /// Returns pair with slice of the current key and current value.
356    pub fn item(&self) -> Option<(&[u8], &[u8])> {
357        if self.valid() {
358            Some((self.key_impl(), self.value_impl()))
359        } else {
360            None
361        }
362    }
363
364    /// Returns a slice of the current key; assumes the iterator is valid.
365    fn key_impl(&self) -> &[u8] {
366        // Safety Note: This is safe as all methods that may invalidate the buffer returned
367        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
368        unsafe {
369            let mut key_len: size_t = 0;
370            let key_len_ptr: *mut size_t = &mut key_len;
371            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
372            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
373        }
374    }
375
376    /// Returns a slice of the current value; assumes the iterator is valid.
377    fn value_impl(&self) -> &[u8] {
378        // Safety Note: This is safe as all methods that may invalidate the buffer returned
379        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
380        unsafe {
381            let mut val_len: size_t = 0;
382            let val_len_ptr: *mut size_t = &mut val_len;
383            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
384            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
385        }
386    }
387}
388
389impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
390    fn drop(&mut self) {
391        unsafe {
392            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
393        }
394    }
395}
396
397unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
398unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
399
400/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
401pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
402
403/// A standard Rust [`Iterator`] over a database or column family.
404///
405/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
406/// RocksDB's API, which can provide better performance and more features.
407///
408/// ```
409/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
410///
411/// let tempdir = tempfile::Builder::new()
412///     .prefix("_path_for_rocksdb_storage2")
413///     .tempdir()
414///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
415/// let path = tempdir.path();
416/// {
417///     let db = DB::open_default(path).unwrap();
418///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
419///     for item in iter {
420///         let (key, value) = item.unwrap();
421///         println!("Saw {:?} {:?}", key, value);
422///     }
423///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
424///     for item in iter {
425///         let (key, value) = item.unwrap();
426///         println!("Saw {:?} {:?}", key, value);
427///     }
428///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
429///     for item in iter {
430///         let (key, value) = item.unwrap();
431///         println!("Saw {:?} {:?}", key, value);
432///     }
433///
434///     // You can seek with an existing Iterator instance, too
435///     iter = db.iterator(IteratorMode::Start);
436///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
437///     for item in iter {
438///         let (key, value) = item.unwrap();
439///         println!("Saw {:?} {:?}", key, value);
440///     }
441/// }
442/// let _ = DB::destroy(&Options::default(), path);
443/// ```
444pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
445    raw: DBRawIteratorWithThreadMode<'a, D>,
446    direction: Direction,
447    done: bool,
448}
449
450#[derive(Copy, Clone)]
451pub enum Direction {
452    Forward,
453    Reverse,
454}
455
456pub type KVBytes = (Box<[u8]>, Box<[u8]>);
457
458#[derive(Copy, Clone)]
459pub enum IteratorMode<'a> {
460    Start,
461    End,
462    From(&'a [u8], Direction),
463}
464
465impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
466    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
467        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
468    }
469
470    pub(crate) fn new_cf(
471        db: &'a D,
472        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
473        readopts: ReadOptions,
474        mode: IteratorMode,
475    ) -> Self {
476        Self::from_raw(
477            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
478            mode,
479        )
480    }
481
482    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
483        let mut rv = DBIteratorWithThreadMode {
484            raw,
485            direction: Direction::Forward, // blown away by set_mode()
486            done: false,
487        };
488        rv.set_mode(mode);
489        rv
490    }
491
492    pub fn set_mode(&mut self, mode: IteratorMode) {
493        self.done = false;
494        self.direction = match mode {
495            IteratorMode::Start => {
496                self.raw.seek_to_first();
497                Direction::Forward
498            }
499            IteratorMode::End => {
500                self.raw.seek_to_last();
501                Direction::Reverse
502            }
503            IteratorMode::From(key, Direction::Forward) => {
504                self.raw.seek(key);
505                Direction::Forward
506            }
507            IteratorMode::From(key, Direction::Reverse) => {
508                self.raw.seek_for_prev(key);
509                Direction::Reverse
510            }
511        };
512    }
513}
514
515impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
516    type Item = Result<KVBytes, Error>;
517
518    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
519        if self.done {
520            None
521        } else if let Some((key, value)) = self.raw.item() {
522            let item = (Box::from(key), Box::from(value));
523            match self.direction {
524                Direction::Forward => self.raw.next(),
525                Direction::Reverse => self.raw.prev(),
526            }
527            Some(Ok(item))
528        } else {
529            self.done = true;
530            self.raw.status().err().map(Result::Err)
531        }
532    }
533}
534
535impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
536
537impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
538    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
539        self.raw
540    }
541}
542
543/// Iterates the batches of writes since a given sequence number.
544///
545/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
546/// batches of write operations that have occurred since a given sequence number
547/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
548/// the application.
549///
550/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
551/// value is the sequence number of the associated write batch.
552///
553pub struct DBWALIterator {
554    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
555    pub(crate) start_seq_number: u64,
556}
557
558impl DBWALIterator {
559    /// Returns `true` if the iterator is valid. An iterator is invalidated when
560    /// it reaches the end of its defined range, or when it encounters an error.
561    ///
562    /// To check whether the iterator encountered an error after `valid` has
563    /// returned `false`, use the [`status`](DBWALIterator::status) method.
564    /// `status` will never return an error when `valid` is `true`.
565    pub fn valid(&self) -> bool {
566        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
567    }
568
569    /// Returns an error `Result` if the iterator has encountered an error
570    /// during operation. When an error is encountered, the iterator is
571    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
572    /// called.
573    pub fn status(&self) -> Result<(), Error> {
574        unsafe {
575            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
576        }
577        Ok(())
578    }
579}
580
581impl Iterator for DBWALIterator {
582    type Item = Result<(u64, WriteBatch), Error>;
583
584    fn next(&mut self) -> Option<Self::Item> {
585        if !self.valid() {
586            return None;
587        }
588
589        let mut seq: u64 = 0;
590        let mut batch = WriteBatch {
591            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
592        };
593
594        // if the initial sequence number is what was requested we skip it to
595        // only provide changes *after* it
596        while seq <= self.start_seq_number {
597            unsafe {
598                ffi::rocksdb_wal_iter_next(self.inner);
599            }
600
601            if !self.valid() {
602                return None;
603            }
604
605            // this drops which in turn frees the skipped batch
606            batch = WriteBatch {
607                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
608            };
609        }
610
611        if !self.valid() {
612            return self.status().err().map(Result::Err);
613        }
614
615        // Seek to the next write batch.
616        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
617        unsafe {
618            ffi::rocksdb_wal_iter_next(self.inner);
619        }
620
621        Some(Ok((seq, batch)))
622    }
623}
624
625impl Drop for DBWALIterator {
626    fn drop(&mut self) {
627        unsafe {
628            ffi::rocksdb_wal_iter_destroy(self.inner);
629        }
630    }
631}