haizhi_rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    db::{DBAccess, DB},
17    ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// An iterator over a database or column family, with specifiable
26/// ranges and direction.
27///
28/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
29/// replicate the underlying iterator API within RocksDB itself. This should
30/// give access to more performance and flexibility but departs from the
31/// widely recognised Rust idioms.
32///
33/// ```
34/// use rocksdb::{DB, Options};
35///
36/// let path = "_path_for_rocksdb_storage4";
37/// {
38///     let db = DB::open_default(path).unwrap();
39///     let mut iter = db.raw_iterator();
40///
41///     // Forwards iteration
42///     iter.seek_to_first();
43///     while iter.valid() {
44///         println!("Saw {:?} {:?}", iter.key(), iter.value());
45///         iter.next();
46///     }
47///
48///     // Reverse iteration
49///     iter.seek_to_last();
50///     while iter.valid() {
51///         println!("Saw {:?} {:?}", iter.key(), iter.value());
52///         iter.prev();
53///     }
54///
55///     // Seeking
56///     iter.seek(b"my key");
57///     while iter.valid() {
58///         println!("Saw {:?} {:?}", iter.key(), iter.value());
59///         iter.next();
60///     }
61///
62///     // Reverse iteration from key
63///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
64///     // this will make the iterator start from the previous key rather than the next.
65///     iter.seek_for_prev(b"my key");
66///     while iter.valid() {
67///         println!("Saw {:?} {:?}", iter.key(), iter.value());
68///         iter.prev();
69///     }
70/// }
71/// let _ = DB::destroy(&Options::default(), path);
72/// ```
73pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
74    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
75
76    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
77    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
78    /// Storing this makes sure the upper bound is always alive when the
79    /// iterator is being used.
80    ///
81    /// And yes, we need to store the entire ReadOptions structure since C++
82    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
83    /// point to vectors we own.  See issue #660.
84    _readopts: ReadOptions,
85
86    db: PhantomData<&'a D>,
87}
88
89impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
90    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
91        let inner = unsafe { db.create_iterator(&readopts) };
92        Self::from_inner(inner, readopts)
93    }
94
95    pub(crate) fn new_cf(
96        db: &'a D,
97        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
98        readopts: ReadOptions,
99    ) -> Self {
100        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
101        Self::from_inner(inner, readopts)
102    }
103
104    fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
105        // This unwrap will never fail since rocksdb_create_iterator and
106        // rocksdb_create_iterator_cf functions always return non-null. They
107        // use new and deference the result so any nulls would end up with SIGSEGV
108        // there and we would have a bigger issue.
109        let inner = std::ptr::NonNull::new(inner).unwrap();
110        Self {
111            inner,
112            _readopts: readopts,
113            db: PhantomData,
114        }
115    }
116
117    /// Returns `true` if the iterator is valid. An iterator is invalidated when
118    /// it reaches the end of its defined range, or when it encounters an error.
119    ///
120    /// To check whether the iterator encountered an error after `valid` has
121    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
122    /// return an error when `valid` is `true`.
123    pub fn valid(&self) -> bool {
124        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
125    }
126
127    /// Returns an error `Result` if the iterator has encountered an error
128    /// during operation. When an error is encountered, the iterator is
129    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
130    ///
131    /// Performing a seek will discard the current status.
132    pub fn status(&self) -> Result<(), Error> {
133        unsafe {
134            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
135        }
136        Ok(())
137    }
138
139    /// Seeks to the first key in the database.
140    ///
141    /// # Examples
142    ///
143    /// ```rust
144    /// use rocksdb::{DB, Options};
145    ///
146    /// let path = "_path_for_rocksdb_storage5";
147    /// {
148    ///     let db = DB::open_default(path).unwrap();
149    ///     let mut iter = db.raw_iterator();
150    ///
151    ///     // Iterate all keys from the start in lexicographic order
152    ///     iter.seek_to_first();
153    ///
154    ///     while iter.valid() {
155    ///         println!("{:?} {:?}", iter.key(), iter.value());
156    ///         iter.next();
157    ///     }
158    ///
159    ///     // Read just the first key
160    ///     iter.seek_to_first();
161    ///
162    ///     if iter.valid() {
163    ///         println!("{:?} {:?}", iter.key(), iter.value());
164    ///     } else {
165    ///         // There are no keys in the database
166    ///     }
167    /// }
168    /// let _ = DB::destroy(&Options::default(), path);
169    /// ```
170    pub fn seek_to_first(&mut self) {
171        unsafe {
172            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
173        }
174    }
175
176    pub fn refresh(&mut self) {
177        unsafe {
178            ffi::rocksdb_iter_refresh(self.inner.as_ptr());
179        }
180    }
181
182    /// Seeks to the last key in the database.
183    ///
184    /// # Examples
185    ///
186    /// ```rust
187    /// use rocksdb::{DB, Options};
188    ///
189    /// let path = "_path_for_rocksdb_storage6";
190    /// {
191    ///     let db = DB::open_default(path).unwrap();
192    ///     let mut iter = db.raw_iterator();
193    ///
194    ///     // Iterate all keys from the end in reverse lexicographic order
195    ///     iter.seek_to_last();
196    ///
197    ///     while iter.valid() {
198    ///         println!("{:?} {:?}", iter.key(), iter.value());
199    ///         iter.prev();
200    ///     }
201    ///
202    ///     // Read just the last key
203    ///     iter.seek_to_last();
204    ///
205    ///     if iter.valid() {
206    ///         println!("{:?} {:?}", iter.key(), iter.value());
207    ///     } else {
208    ///         // There are no keys in the database
209    ///     }
210    /// }
211    /// let _ = DB::destroy(&Options::default(), path);
212    /// ```
213    pub fn seek_to_last(&mut self) {
214        unsafe {
215            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
216        }
217    }
218
219    /// Seeks to the specified key or the first key that lexicographically follows it.
220    ///
221    /// This method will attempt to seek to the specified key. If that key does not exist, it will
222    /// find and seek to the key that lexicographically follows it instead.
223    ///
224    /// # Examples
225    ///
226    /// ```rust
227    /// use rocksdb::{DB, Options};
228    ///
229    /// let path = "_path_for_rocksdb_storage7";
230    /// {
231    ///     let db = DB::open_default(path).unwrap();
232    ///     let mut iter = db.raw_iterator();
233    ///
234    ///     // Read the first key that starts with 'a'
235    ///     iter.seek(b"a");
236    ///
237    ///     if iter.valid() {
238    ///         println!("{:?} {:?}", iter.key(), iter.value());
239    ///     } else {
240    ///         // There are no keys in the database
241    ///     }
242    /// }
243    /// let _ = DB::destroy(&Options::default(), path);
244    /// ```
245    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
246        let key = key.as_ref();
247
248        unsafe {
249            ffi::rocksdb_iter_seek(
250                self.inner.as_ptr(),
251                key.as_ptr() as *const c_char,
252                key.len() as size_t,
253            );
254        }
255    }
256
257    /// Seeks to the specified key, or the first key that lexicographically precedes it.
258    ///
259    /// Like ``.seek()`` this method will attempt to seek to the specified key.
260    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
261    /// seek to key that lexicographically precedes it instead.
262    ///
263    /// # Examples
264    ///
265    /// ```rust
266    /// use rocksdb::{DB, Options};
267    ///
268    /// let path = "_path_for_rocksdb_storage8";
269    /// {
270    ///     let db = DB::open_default(path).unwrap();
271    ///     let mut iter = db.raw_iterator();
272    ///
273    ///     // Read the last key that starts with 'a'
274    ///     iter.seek_for_prev(b"b");
275    ///
276    ///     if iter.valid() {
277    ///         println!("{:?} {:?}", iter.key(), iter.value());
278    ///     } else {
279    ///         // There are no keys in the database
280    ///     }
281    /// }
282    /// let _ = DB::destroy(&Options::default(), path);
283    /// ```
284    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
285        let key = key.as_ref();
286
287        unsafe {
288            ffi::rocksdb_iter_seek_for_prev(
289                self.inner.as_ptr(),
290                key.as_ptr() as *const c_char,
291                key.len() as size_t,
292            );
293        }
294    }
295
296    /// Seeks to the next key.
297    pub fn next(&mut self) {
298        unsafe {
299            ffi::rocksdb_iter_next(self.inner.as_ptr());
300        }
301    }
302
303    /// Seeks to the previous key.
304    pub fn prev(&mut self) {
305        unsafe {
306            ffi::rocksdb_iter_prev(self.inner.as_ptr());
307        }
308    }
309
310    /// Returns a slice of the current key.
311    pub fn key(&self) -> Option<&[u8]> {
312        if self.valid() {
313            Some(self.key_impl())
314        } else {
315            None
316        }
317    }
318
319    /// Returns a slice of the current value.
320    pub fn value(&self) -> Option<&[u8]> {
321        if self.valid() {
322            Some(self.value_impl())
323        } else {
324            None
325        }
326    }
327
328    /// Returns pair with slice of the current key and current value.
329    pub fn item(&self) -> Option<(&[u8], &[u8])> {
330        if self.valid() {
331            Some((self.key_impl(), self.value_impl()))
332        } else {
333            None
334        }
335    }
336
337    /// Returns a slice of the current key; assumes the iterator is valid.
338    fn key_impl(&self) -> &[u8] {
339        // Safety Note: This is safe as all methods that may invalidate the buffer returned
340        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
341        unsafe {
342            let mut key_len: size_t = 0;
343            let key_len_ptr: *mut size_t = &mut key_len;
344            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
345            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
346        }
347    }
348
349    /// Returns a slice of the current value; assumes the iterator is valid.
350    fn value_impl(&self) -> &[u8] {
351        // Safety Note: This is safe as all methods that may invalidate the buffer returned
352        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
353        unsafe {
354            let mut val_len: size_t = 0;
355            let val_len_ptr: *mut size_t = &mut val_len;
356            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
357            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
358        }
359    }
360}
361
362impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
363    fn drop(&mut self) {
364        unsafe {
365            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
366        }
367    }
368}
369
370unsafe impl<'a, D: DBAccess> Send for DBRawIteratorWithThreadMode<'a, D> {}
371unsafe impl<'a, D: DBAccess> Sync for DBRawIteratorWithThreadMode<'a, D> {}
372
373/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
374pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
375
376/// An iterator over a database or column family, with specifiable
377/// ranges and direction.
378///
379/// ```
380/// use rocksdb::{DB, Direction, IteratorMode, Options};
381///
382/// let path = "_path_for_rocksdb_storage2";
383/// {
384///     let db = DB::open_default(path).unwrap();
385///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
386///     for item in iter {
387///         let (key, value) = item.unwrap();
388///         println!("Saw {:?} {:?}", key, value);
389///     }
390///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
391///     for item in iter {
392///         let (key, value) = item.unwrap();
393///         println!("Saw {:?} {:?}", key, value);
394///     }
395///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
396///     for item in iter {
397///         let (key, value) = item.unwrap();
398///         println!("Saw {:?} {:?}", key, value);
399///     }
400///
401///     // You can seek with an existing Iterator instance, too
402///     iter = db.iterator(IteratorMode::Start);
403///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
404///     for item in iter {
405///         let (key, value) = item.unwrap();
406///         println!("Saw {:?} {:?}", key, value);
407///     }
408/// }
409/// let _ = DB::destroy(&Options::default(), path);
410/// ```
411pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
412    raw: DBRawIteratorWithThreadMode<'a, D>,
413    direction: Direction,
414    done: bool,
415}
416
417#[derive(Copy, Clone)]
418pub enum Direction {
419    Forward,
420    Reverse,
421}
422
423pub type KVBytes = (Box<[u8]>, Box<[u8]>);
424
425#[derive(Copy, Clone)]
426pub enum IteratorMode<'a> {
427    Start,
428    End,
429    From(&'a [u8], Direction),
430}
431
432impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
433    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
434        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
435    }
436
437    pub(crate) fn new_cf(
438        db: &'a D,
439        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
440        readopts: ReadOptions,
441        mode: IteratorMode,
442    ) -> Self {
443        Self::from_raw(
444            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
445            mode,
446        )
447    }
448
449    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
450        let mut rv = DBIteratorWithThreadMode {
451            raw,
452            direction: Direction::Forward, // blown away by set_mode()
453            done: false,
454        };
455        rv.set_mode(mode);
456        rv
457    }
458
459    pub fn set_mode(&mut self, mode: IteratorMode) {
460        self.done = false;
461        self.direction = match mode {
462            IteratorMode::Start => {
463                self.raw.seek_to_first();
464                Direction::Forward
465            }
466            IteratorMode::End => {
467                self.raw.seek_to_last();
468                Direction::Reverse
469            }
470            IteratorMode::From(key, Direction::Forward) => {
471                self.raw.seek(key);
472                Direction::Forward
473            }
474            IteratorMode::From(key, Direction::Reverse) => {
475                self.raw.seek_for_prev(key);
476                Direction::Reverse
477            }
478        };
479    }
480}
481
482impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> {
483    type Item = Result<KVBytes, Error>;
484
485    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
486        if self.done {
487            None
488        } else if let Some((key, value)) = self.raw.item() {
489            let item = (Box::from(key), Box::from(value));
490            match self.direction {
491                Direction::Forward => self.raw.next(),
492                Direction::Reverse => self.raw.prev(),
493            }
494            Some(Ok(item))
495        } else {
496            self.done = true;
497            self.raw.status().err().map(Result::Err)
498        }
499    }
500}
501
502impl<'a, D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'a, D> {}
503
504impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
505    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
506        self.raw
507    }
508}
509
510/// Iterates the batches of writes since a given sequence number.
511///
512/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
513/// batches of write operations that have occurred since a given sequence number
514/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
515/// the application.
516///
517/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
518/// value is the sequence number of the associated write batch.
519///
520pub struct DBWALIterator {
521    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
522    pub(crate) start_seq_number: u64,
523}
524
525impl DBWALIterator {
526    /// Returns `true` if the iterator is valid. An iterator is invalidated when
527    /// it reaches the end of its defined range, or when it encounters an error.
528    ///
529    /// To check whether the iterator encountered an error after `valid` has
530    /// returned `false`, use the [`status`](DBWALIterator::status) method.
531    /// `status` will never return an error when `valid` is `true`.
532    pub fn valid(&self) -> bool {
533        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
534    }
535
536    /// Returns an error `Result` if the iterator has encountered an error
537    /// during operation. When an error is encountered, the iterator is
538    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
539    /// called.
540    pub fn status(&self) -> Result<(), Error> {
541        unsafe {
542            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
543        }
544        Ok(())
545    }
546}
547
548impl Iterator for DBWALIterator {
549    type Item = Result<(u64, WriteBatch), Error>;
550
551    fn next(&mut self) -> Option<Self::Item> {
552        if !self.valid() {
553            return None;
554        }
555
556        let mut seq: u64 = 0;
557        let mut batch = WriteBatch {
558            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
559        };
560
561        // if the initial sequence number is what was requested we skip it to
562        // only provide changes *after* it
563        if seq == self.start_seq_number {
564            unsafe {
565                ffi::rocksdb_wal_iter_next(self.inner);
566            }
567
568            if !self.valid() {
569                return None;
570            }
571
572            // this drops which in turn frees the skipped batch
573            batch = WriteBatch {
574                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
575            };
576        }
577
578        if !self.valid() {
579            return self.status().err().map(Result::Err);
580        }
581
582        // Seek to the next write batch.
583        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
584        unsafe {
585            ffi::rocksdb_wal_iter_next(self.inner);
586        }
587
588        Some(Ok((seq, batch)))
589    }
590}
591
592impl Drop for DBWALIterator {
593    fn drop(&mut self) {
594        unsafe {
595            ffi::rocksdb_wal_iter_destroy(self.inner);
596        }
597    }
598}