rust_rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    Error, ReadOptions, WriteBatch,
17    db::{DB, DBAccess},
18    ffi,
19};
20use libc::{c_char, c_uchar, size_t};
21use std::mem::ManuallyDrop;
22use std::{marker::PhantomData, slice};
23
24/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
25pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
26
27/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
28/// and other `raw_iterator_*` methods.
29///
30/// This iterator replicates RocksDB's API. It should provide better
31/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
32/// Rust [`std::iter::Iterator`].
33///
34/// ```
35/// use rust_rocksdb::{DB, Options};
36///
37/// let tempdir = tempfile::Builder::new()
38///     .prefix("_path_for_rocksdb_storage4")
39///     .tempdir()
40///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
41/// let path = tempdir.path();
42/// {
43///     let db = DB::open_default(path).unwrap();
44///     let mut iter = db.raw_iterator();
45///
46///     // Forwards iteration
47///     iter.seek_to_first();
48///     while iter.valid() {
49///         println!("Saw {:?} {:?}", iter.key(), iter.value());
50///         iter.next();
51///     }
52///
53///     // Reverse iteration
54///     iter.seek_to_last();
55///     while iter.valid() {
56///         println!("Saw {:?} {:?}", iter.key(), iter.value());
57///         iter.prev();
58///     }
59///
60///     // Seeking
61///     iter.seek(b"my key");
62///     while iter.valid() {
63///         println!("Saw {:?} {:?}", iter.key(), iter.value());
64///         iter.next();
65///     }
66///
67///     // Reverse iteration from key
68///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
69///     // this will make the iterator start from the previous key rather than the next.
70///     iter.seek_for_prev(b"my key");
71///     while iter.valid() {
72///         println!("Saw {:?} {:?}", iter.key(), iter.value());
73///         iter.prev();
74///     }
75/// }
76/// let _ = DB::destroy(&Options::default(), path);
77/// ```
78pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
79    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
80
81    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
82    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
83    /// Storing this makes sure the upper bound is always alive when the
84    /// iterator is being used.
85    ///
86    /// And yes, we need to store the entire ReadOptions structure since C++
87    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
88    /// point to vectors we own.  See issue #660.
89    readopts: ReadOptions,
90
91    db: PhantomData<&'a D>,
92}
93
94impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
95    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
96        let inner = unsafe { db.create_iterator(&readopts) };
97        Self::from_inner(inner, readopts)
98    }
99
100    pub(crate) fn new_cf(
101        db: &'a D,
102        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
103        readopts: ReadOptions,
104    ) -> Self {
105        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
106        Self::from_inner(inner, readopts)
107    }
108
109    pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
110        // This unwrap will never fail since rocksdb_create_iterator and
111        // rocksdb_create_iterator_cf functions always return non-null. They
112        // use new and deference the result so any nulls would end up with SIGSEGV
113        // there and we would have a bigger issue.
114        let inner = std::ptr::NonNull::new(inner).unwrap();
115        Self {
116            inner,
117            readopts,
118            db: PhantomData,
119        }
120    }
121
122    pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
123        let value = ManuallyDrop::new(self);
124        // SAFETY: value won't be used beyond this point
125        let inner = unsafe { std::ptr::read(&raw const value.inner) };
126        let readopts = unsafe { std::ptr::read(&raw const value.readopts) };
127
128        (inner, readopts)
129    }
130
131    /// Returns `true` if the iterator is valid. An iterator is invalidated when
132    /// it reaches the end of its defined range, or when it encounters an error.
133    ///
134    /// To check whether the iterator encountered an error after `valid` has
135    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
136    /// return an error when `valid` is `true`.
137    pub fn valid(&self) -> bool {
138        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
139    }
140
141    /// Returns an error `Result` if the iterator has encountered an error
142    /// during operation. When an error is encountered, the iterator is
143    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
144    ///
145    /// Performing a seek will discard the current status.
146    pub fn status(&self) -> Result<(), Error> {
147        unsafe {
148            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
149        }
150        Ok(())
151    }
152
153    /// Seeks to the first key in the database.
154    ///
155    /// # Examples
156    ///
157    /// ```rust
158    /// use rust_rocksdb::{DB, Options};
159    ///
160    /// let tempdir = tempfile::Builder::new()
161    ///     .prefix("_path_for_rocksdb_storage5")
162    ///     .tempdir()
163    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
164    /// let path = tempdir.path();
165    /// {
166    ///     let db = DB::open_default(path).unwrap();
167    ///     let mut iter = db.raw_iterator();
168    ///
169    ///     // Iterate all keys from the start in lexicographic order
170    ///     iter.seek_to_first();
171    ///
172    ///     while iter.valid() {
173    ///         println!("{:?} {:?}", iter.key(), iter.value());
174    ///         iter.next();
175    ///     }
176    ///
177    ///     // Read just the first key
178    ///     iter.seek_to_first();
179    ///
180    ///     if iter.valid() {
181    ///         println!("{:?} {:?}", iter.key(), iter.value());
182    ///     } else {
183    ///         // There are no keys in the database
184    ///     }
185    /// }
186    /// let _ = DB::destroy(&Options::default(), path);
187    /// ```
188    pub fn seek_to_first(&mut self) {
189        unsafe {
190            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
191        }
192    }
193
194    /// Seeks to the last key in the database.
195    ///
196    /// # Examples
197    ///
198    /// ```rust
199    /// use rust_rocksdb::{DB, Options};
200    ///
201    /// let tempdir = tempfile::Builder::new()
202    ///     .prefix("_path_for_rocksdb_storage6")
203    ///     .tempdir()
204    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
205    /// let path = tempdir.path();
206    /// {
207    ///     let db = DB::open_default(path).unwrap();
208    ///     let mut iter = db.raw_iterator();
209    ///
210    ///     // Iterate all keys from the end in reverse lexicographic order
211    ///     iter.seek_to_last();
212    ///
213    ///     while iter.valid() {
214    ///         println!("{:?} {:?}", iter.key(), iter.value());
215    ///         iter.prev();
216    ///     }
217    ///
218    ///     // Read just the last key
219    ///     iter.seek_to_last();
220    ///
221    ///     if iter.valid() {
222    ///         println!("{:?} {:?}", iter.key(), iter.value());
223    ///     } else {
224    ///         // There are no keys in the database
225    ///     }
226    /// }
227    /// let _ = DB::destroy(&Options::default(), path);
228    /// ```
229    pub fn seek_to_last(&mut self) {
230        unsafe {
231            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
232        }
233    }
234
235    /// Seeks to the specified key or the first key that lexicographically follows it.
236    ///
237    /// This method will attempt to seek to the specified key. If that key does not exist, it will
238    /// find and seek to the key that lexicographically follows it instead.
239    ///
240    /// # Examples
241    ///
242    /// ```rust
243    /// use rust_rocksdb::{DB, Options};
244    ///
245    /// let tempdir = tempfile::Builder::new()
246    ///     .prefix("_path_for_rocksdb_storage7")
247    ///     .tempdir()
248    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
249    /// let path = tempdir.path();
250    /// {
251    ///     let db = DB::open_default(path).unwrap();
252    ///     let mut iter = db.raw_iterator();
253    ///
254    ///     // Read the first key that starts with 'a'
255    ///     iter.seek(b"a");
256    ///
257    ///     if iter.valid() {
258    ///         println!("{:?} {:?}", iter.key(), iter.value());
259    ///     } else {
260    ///         // There are no keys in the database
261    ///     }
262    /// }
263    /// let _ = DB::destroy(&Options::default(), path);
264    /// ```
265    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
266        let key = key.as_ref();
267
268        unsafe {
269            ffi::rocksdb_iter_seek(
270                self.inner.as_ptr(),
271                key.as_ptr() as *const c_char,
272                key.len() as size_t,
273            );
274        }
275    }
276
277    /// Seeks to the specified key, or the first key that lexicographically precedes it.
278    ///
279    /// Like ``.seek()`` this method will attempt to seek to the specified key.
280    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
281    /// seek to key that lexicographically precedes it instead.
282    ///
283    /// # Examples
284    ///
285    /// ```rust
286    /// use rust_rocksdb::{DB, Options};
287    ///
288    /// let tempdir = tempfile::Builder::new()
289    ///     .prefix("_path_for_rocksdb_storage8")
290    ///     .tempdir()
291    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
292    /// let path = tempdir.path();
293    /// {
294    ///     let db = DB::open_default(path).unwrap();
295    ///     let mut iter = db.raw_iterator();
296    ///
297    ///     // Read the last key that starts with 'a'
298    ///     iter.seek_for_prev(b"b");
299    ///
300    ///     if iter.valid() {
301    ///         println!("{:?} {:?}", iter.key(), iter.value());
302    ///     } else {
303    ///         // There are no keys in the database
304    ///     }
305    /// }
306    /// let _ = DB::destroy(&Options::default(), path);
307    /// ```
308    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
309        let key = key.as_ref();
310
311        unsafe {
312            ffi::rocksdb_iter_seek_for_prev(
313                self.inner.as_ptr(),
314                key.as_ptr() as *const c_char,
315                key.len() as size_t,
316            );
317        }
318    }
319
320    /// Seeks to the next key.
321    pub fn next(&mut self) {
322        if self.valid() {
323            unsafe {
324                ffi::rocksdb_iter_next(self.inner.as_ptr());
325            }
326        }
327    }
328
329    /// Seeks to the previous key.
330    pub fn prev(&mut self) {
331        if self.valid() {
332            unsafe {
333                ffi::rocksdb_iter_prev(self.inner.as_ptr());
334            }
335        }
336    }
337
338    /// Returns a slice of the current key.
339    pub fn key(&self) -> Option<&[u8]> {
340        if self.valid() {
341            Some(self.key_impl())
342        } else {
343            None
344        }
345    }
346
347    /// Returns a slice of the current value.
348    pub fn value(&self) -> Option<&[u8]> {
349        if self.valid() {
350            Some(self.value_impl())
351        } else {
352            None
353        }
354    }
355
356    /// Returns pair with slice of the current key and current value.
357    pub fn item(&self) -> Option<(&[u8], &[u8])> {
358        if self.valid() {
359            Some((self.key_impl(), self.value_impl()))
360        } else {
361            None
362        }
363    }
364
365    /// Returns a slice of the current key; assumes the iterator is valid.
366    fn key_impl(&self) -> &[u8] {
367        // Safety Note: This is safe as all methods that may invalidate the buffer returned
368        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
369        unsafe {
370            let mut key_len: size_t = 0;
371            let key_len_ptr: *mut size_t = &raw mut key_len;
372            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
373            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
374        }
375    }
376
377    /// Returns a slice of the current value; assumes the iterator is valid.
378    fn value_impl(&self) -> &[u8] {
379        // Safety Note: This is safe as all methods that may invalidate the buffer returned
380        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
381        unsafe {
382            let mut val_len: size_t = 0;
383            let val_len_ptr: *mut size_t = &raw mut val_len;
384            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
385            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
386        }
387    }
388}
389
390impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
391    fn drop(&mut self) {
392        unsafe {
393            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
394        }
395    }
396}
397
398unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
399unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
400
401/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
402pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
403
404/// A standard Rust [`Iterator`] over a database or column family.
405///
406/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
407/// RocksDB's API, which can provide better performance and more features.
408///
409/// ```
410/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
411///
412/// let tempdir = tempfile::Builder::new()
413///     .prefix("_path_for_rocksdb_storage2")
414///     .tempdir()
415///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
416/// let path = tempdir.path();
417/// {
418///     let db = DB::open_default(path).unwrap();
419///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
420///     for item in iter {
421///         let (key, value) = item.unwrap();
422///         println!("Saw {:?} {:?}", key, value);
423///     }
424///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
425///     for item in iter {
426///         let (key, value) = item.unwrap();
427///         println!("Saw {:?} {:?}", key, value);
428///     }
429///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
430///     for item in iter {
431///         let (key, value) = item.unwrap();
432///         println!("Saw {:?} {:?}", key, value);
433///     }
434///
435///     // You can seek with an existing Iterator instance, too
436///     iter = db.iterator(IteratorMode::Start);
437///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
438///     for item in iter {
439///         let (key, value) = item.unwrap();
440///         println!("Saw {:?} {:?}", key, value);
441///     }
442/// }
443/// let _ = DB::destroy(&Options::default(), path);
444/// ```
445pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
446    raw: DBRawIteratorWithThreadMode<'a, D>,
447    direction: Direction,
448    done: bool,
449}
450
451#[derive(Copy, Clone)]
452pub enum Direction {
453    Forward,
454    Reverse,
455}
456
457pub type KVBytes = (Box<[u8]>, Box<[u8]>);
458
459#[derive(Copy, Clone)]
460pub enum IteratorMode<'a> {
461    Start,
462    End,
463    From(&'a [u8], Direction),
464}
465
466impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
467    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
468        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
469    }
470
471    pub(crate) fn new_cf(
472        db: &'a D,
473        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
474        readopts: ReadOptions,
475        mode: IteratorMode,
476    ) -> Self {
477        Self::from_raw(
478            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
479            mode,
480        )
481    }
482
483    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
484        let mut rv = DBIteratorWithThreadMode {
485            raw,
486            direction: Direction::Forward, // blown away by set_mode()
487            done: false,
488        };
489        rv.set_mode(mode);
490        rv
491    }
492
493    pub fn set_mode(&mut self, mode: IteratorMode) {
494        self.done = false;
495        self.direction = match mode {
496            IteratorMode::Start => {
497                self.raw.seek_to_first();
498                Direction::Forward
499            }
500            IteratorMode::End => {
501                self.raw.seek_to_last();
502                Direction::Reverse
503            }
504            IteratorMode::From(key, Direction::Forward) => {
505                self.raw.seek(key);
506                Direction::Forward
507            }
508            IteratorMode::From(key, Direction::Reverse) => {
509                self.raw.seek_for_prev(key);
510                Direction::Reverse
511            }
512        };
513    }
514}
515
516impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
517    type Item = Result<KVBytes, Error>;
518
519    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
520        if self.done {
521            None
522        } else if let Some((key, value)) = self.raw.item() {
523            let item = (Box::from(key), Box::from(value));
524            match self.direction {
525                Direction::Forward => self.raw.next(),
526                Direction::Reverse => self.raw.prev(),
527            }
528            Some(Ok(item))
529        } else {
530            self.done = true;
531            self.raw.status().err().map(Result::Err)
532        }
533    }
534}
535
536impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
537
538impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
539    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
540        self.raw
541    }
542}
543
544/// Iterates the batches of writes since a given sequence number.
545///
546/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
547/// batches of write operations that have occurred since a given sequence number
548/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
549/// the application.
550///
551/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
552/// value is the sequence number of the associated write batch.
553///
554pub struct DBWALIterator {
555    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
556    pub(crate) start_seq_number: u64,
557}
558
559impl DBWALIterator {
560    /// Returns `true` if the iterator is valid. An iterator is invalidated when
561    /// it reaches the end of its defined range, or when it encounters an error.
562    ///
563    /// To check whether the iterator encountered an error after `valid` has
564    /// returned `false`, use the [`status`](DBWALIterator::status) method.
565    /// `status` will never return an error when `valid` is `true`.
566    pub fn valid(&self) -> bool {
567        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
568    }
569
570    /// Returns an error `Result` if the iterator has encountered an error
571    /// during operation. When an error is encountered, the iterator is
572    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
573    /// called.
574    pub fn status(&self) -> Result<(), Error> {
575        unsafe {
576            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
577        }
578        Ok(())
579    }
580}
581
582impl Iterator for DBWALIterator {
583    type Item = Result<(u64, WriteBatch), Error>;
584
585    fn next(&mut self) -> Option<Self::Item> {
586        if !self.valid() {
587            return None;
588        }
589
590        let mut seq: u64 = 0;
591        let mut batch = WriteBatch {
592            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
593        };
594
595        // if the initial sequence number is what was requested we skip it to
596        // only provide changes *after* it
597        while seq <= self.start_seq_number {
598            unsafe {
599                ffi::rocksdb_wal_iter_next(self.inner);
600            }
601
602            if !self.valid() {
603                return None;
604            }
605
606            // this drops which in turn frees the skipped batch
607            batch = WriteBatch {
608                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
609            };
610        }
611
612        if !self.valid() {
613            return self.status().err().map(Result::Err);
614        }
615
616        // Seek to the next write batch.
617        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
618        unsafe {
619            ffi::rocksdb_wal_iter_next(self.inner);
620        }
621
622        Some(Ok((seq, batch)))
623    }
624}
625
626impl Drop for DBWALIterator {
627    fn drop(&mut self) {
628        unsafe {
629            ffi::rocksdb_wal_iter_destroy(self.inner);
630        }
631    }
632}