rust_rocksdb/db_iterator.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 Error, ReadOptions, WriteBatch,
17 db::{DB, DBAccess},
18 ffi,
19};
20use libc::{c_char, c_uchar, size_t};
21use std::mem::ManuallyDrop;
22use std::{marker::PhantomData, slice};
23
24/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
25pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
26
27/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
28/// and other `raw_iterator_*` methods.
29///
30/// This iterator replicates RocksDB's API. It should provide better
31/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
32/// Rust [`std::iter::Iterator`].
33///
34/// ```
35/// use rust_rocksdb::{DB, Options};
36///
37/// let tempdir = tempfile::Builder::new()
38/// .prefix("_path_for_rocksdb_storage4")
39/// .tempdir()
40/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
41/// let path = tempdir.path();
42/// {
43/// let db = DB::open_default(path).unwrap();
44/// let mut iter = db.raw_iterator();
45///
46/// // Forwards iteration
47/// iter.seek_to_first();
48/// while iter.valid() {
49/// println!("Saw {:?} {:?}", iter.key(), iter.value());
50/// iter.next();
51/// }
52///
53/// // Reverse iteration
54/// iter.seek_to_last();
55/// while iter.valid() {
56/// println!("Saw {:?} {:?}", iter.key(), iter.value());
57/// iter.prev();
58/// }
59///
60/// // Seeking
61/// iter.seek(b"my key");
62/// while iter.valid() {
63/// println!("Saw {:?} {:?}", iter.key(), iter.value());
64/// iter.next();
65/// }
66///
67/// // Reverse iteration from key
68/// // Note, use seek_for_prev when reversing because if this key doesn't exist,
69/// // this will make the iterator start from the previous key rather than the next.
70/// iter.seek_for_prev(b"my key");
71/// while iter.valid() {
72/// println!("Saw {:?} {:?}", iter.key(), iter.value());
73/// iter.prev();
74/// }
75/// }
76/// let _ = DB::destroy(&Options::default(), path);
77/// ```
78pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
79 inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
80
81 /// When iterate_lower_bound or iterate_upper_bound are set, the inner
82 /// C iterator keeps a pointer to the upper bound inside `_readopts`.
83 /// Storing this makes sure the upper bound is always alive when the
84 /// iterator is being used.
85 ///
86 /// And yes, we need to store the entire ReadOptions structure since C++
87 /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
88 /// point to vectors we own. See issue #660.
89 readopts: ReadOptions,
90
91 db: PhantomData<&'a D>,
92}
93
94impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
95 pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
96 let inner = unsafe { db.create_iterator(&readopts) };
97 Self::from_inner(inner, readopts)
98 }
99
100 pub(crate) fn new_cf(
101 db: &'a D,
102 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
103 readopts: ReadOptions,
104 ) -> Self {
105 let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
106 Self::from_inner(inner, readopts)
107 }
108
109 pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
110 // This unwrap will never fail since rocksdb_create_iterator and
111 // rocksdb_create_iterator_cf functions always return non-null. They
112 // use new and deference the result so any nulls would end up with SIGSEGV
113 // there and we would have a bigger issue.
114 let inner = std::ptr::NonNull::new(inner).unwrap();
115 Self {
116 inner,
117 readopts,
118 db: PhantomData,
119 }
120 }
121
122 pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
123 let value = ManuallyDrop::new(self);
124 // SAFETY: value won't be used beyond this point
125 let inner = unsafe { std::ptr::read(&raw const value.inner) };
126 let readopts = unsafe { std::ptr::read(&raw const value.readopts) };
127
128 (inner, readopts)
129 }
130
131 /// Returns `true` if the iterator is valid. An iterator is invalidated when
132 /// it reaches the end of its defined range, or when it encounters an error.
133 ///
134 /// To check whether the iterator encountered an error after `valid` has
135 /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
136 /// return an error when `valid` is `true`.
137 pub fn valid(&self) -> bool {
138 unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
139 }
140
141 /// Returns an error `Result` if the iterator has encountered an error
142 /// during operation. When an error is encountered, the iterator is
143 /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
144 ///
145 /// Performing a seek will discard the current status.
146 pub fn status(&self) -> Result<(), Error> {
147 unsafe {
148 ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
149 }
150 Ok(())
151 }
152
153 /// Seeks to the first key in the database.
154 ///
155 /// # Examples
156 ///
157 /// ```rust
158 /// use rust_rocksdb::{DB, Options};
159 ///
160 /// let tempdir = tempfile::Builder::new()
161 /// .prefix("_path_for_rocksdb_storage5")
162 /// .tempdir()
163 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
164 /// let path = tempdir.path();
165 /// {
166 /// let db = DB::open_default(path).unwrap();
167 /// let mut iter = db.raw_iterator();
168 ///
169 /// // Iterate all keys from the start in lexicographic order
170 /// iter.seek_to_first();
171 ///
172 /// while iter.valid() {
173 /// println!("{:?} {:?}", iter.key(), iter.value());
174 /// iter.next();
175 /// }
176 ///
177 /// // Read just the first key
178 /// iter.seek_to_first();
179 ///
180 /// if iter.valid() {
181 /// println!("{:?} {:?}", iter.key(), iter.value());
182 /// } else {
183 /// // There are no keys in the database
184 /// }
185 /// }
186 /// let _ = DB::destroy(&Options::default(), path);
187 /// ```
188 pub fn seek_to_first(&mut self) {
189 unsafe {
190 ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
191 }
192 }
193
194 /// Seeks to the last key in the database.
195 ///
196 /// # Examples
197 ///
198 /// ```rust
199 /// use rust_rocksdb::{DB, Options};
200 ///
201 /// let tempdir = tempfile::Builder::new()
202 /// .prefix("_path_for_rocksdb_storage6")
203 /// .tempdir()
204 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
205 /// let path = tempdir.path();
206 /// {
207 /// let db = DB::open_default(path).unwrap();
208 /// let mut iter = db.raw_iterator();
209 ///
210 /// // Iterate all keys from the end in reverse lexicographic order
211 /// iter.seek_to_last();
212 ///
213 /// while iter.valid() {
214 /// println!("{:?} {:?}", iter.key(), iter.value());
215 /// iter.prev();
216 /// }
217 ///
218 /// // Read just the last key
219 /// iter.seek_to_last();
220 ///
221 /// if iter.valid() {
222 /// println!("{:?} {:?}", iter.key(), iter.value());
223 /// } else {
224 /// // There are no keys in the database
225 /// }
226 /// }
227 /// let _ = DB::destroy(&Options::default(), path);
228 /// ```
229 pub fn seek_to_last(&mut self) {
230 unsafe {
231 ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
232 }
233 }
234
235 /// Seeks to the specified key or the first key that lexicographically follows it.
236 ///
237 /// This method will attempt to seek to the specified key. If that key does not exist, it will
238 /// find and seek to the key that lexicographically follows it instead.
239 ///
240 /// # Examples
241 ///
242 /// ```rust
243 /// use rust_rocksdb::{DB, Options};
244 ///
245 /// let tempdir = tempfile::Builder::new()
246 /// .prefix("_path_for_rocksdb_storage7")
247 /// .tempdir()
248 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
249 /// let path = tempdir.path();
250 /// {
251 /// let db = DB::open_default(path).unwrap();
252 /// let mut iter = db.raw_iterator();
253 ///
254 /// // Read the first key that starts with 'a'
255 /// iter.seek(b"a");
256 ///
257 /// if iter.valid() {
258 /// println!("{:?} {:?}", iter.key(), iter.value());
259 /// } else {
260 /// // There are no keys in the database
261 /// }
262 /// }
263 /// let _ = DB::destroy(&Options::default(), path);
264 /// ```
265 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
266 let key = key.as_ref();
267
268 unsafe {
269 ffi::rocksdb_iter_seek(
270 self.inner.as_ptr(),
271 key.as_ptr() as *const c_char,
272 key.len() as size_t,
273 );
274 }
275 }
276
277 /// Seeks to the specified key, or the first key that lexicographically precedes it.
278 ///
279 /// Like ``.seek()`` this method will attempt to seek to the specified key.
280 /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
281 /// seek to key that lexicographically precedes it instead.
282 ///
283 /// # Examples
284 ///
285 /// ```rust
286 /// use rust_rocksdb::{DB, Options};
287 ///
288 /// let tempdir = tempfile::Builder::new()
289 /// .prefix("_path_for_rocksdb_storage8")
290 /// .tempdir()
291 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
292 /// let path = tempdir.path();
293 /// {
294 /// let db = DB::open_default(path).unwrap();
295 /// let mut iter = db.raw_iterator();
296 ///
297 /// // Read the last key that starts with 'a'
298 /// iter.seek_for_prev(b"b");
299 ///
300 /// if iter.valid() {
301 /// println!("{:?} {:?}", iter.key(), iter.value());
302 /// } else {
303 /// // There are no keys in the database
304 /// }
305 /// }
306 /// let _ = DB::destroy(&Options::default(), path);
307 /// ```
308 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
309 let key = key.as_ref();
310
311 unsafe {
312 ffi::rocksdb_iter_seek_for_prev(
313 self.inner.as_ptr(),
314 key.as_ptr() as *const c_char,
315 key.len() as size_t,
316 );
317 }
318 }
319
320 /// Seeks to the next key.
321 pub fn next(&mut self) {
322 if self.valid() {
323 unsafe {
324 ffi::rocksdb_iter_next(self.inner.as_ptr());
325 }
326 }
327 }
328
329 /// Seeks to the previous key.
330 pub fn prev(&mut self) {
331 if self.valid() {
332 unsafe {
333 ffi::rocksdb_iter_prev(self.inner.as_ptr());
334 }
335 }
336 }
337
338 /// Returns a slice of the current key.
339 pub fn key(&self) -> Option<&[u8]> {
340 if self.valid() {
341 // SAFETY: We just checked that the iterator is valid.
342 Some(unsafe { self.key_impl() })
343 } else {
344 None
345 }
346 }
347
348 /// Returns a slice of the current value.
349 pub fn value(&self) -> Option<&[u8]> {
350 if self.valid() {
351 // SAFETY: We just checked that the iterator is valid.
352 Some(unsafe { self.value_impl() })
353 } else {
354 None
355 }
356 }
357
358 /// Returns pair with slice of the current key and current value.
359 pub fn item(&self) -> Option<(&[u8], &[u8])> {
360 if self.valid() {
361 // SAFETY: We just checked that the iterator is valid.
362 Some(unsafe { (self.key_impl(), self.value_impl()) })
363 } else {
364 None
365 }
366 }
367
368 /// Returns a slice of the current key.
369 ///
370 /// # Safety
371 ///
372 /// The iterator must be valid (i.e., `valid()` returns true). Calling this
373 /// method when the iterator is invalid is undefined behavior, as RocksDB
374 /// may return an invalid pointer.
375 ///
376 /// Uses `rocksdb_iter_key_slice` which returns a `rocksdb_slice_t` by value,
377 /// avoiding the overhead of output parameters compared to `rocksdb_iter_key`.
378 unsafe fn key_impl(&self) -> &[u8] {
379 unsafe {
380 let slice = ffi::rocksdb_iter_key_slice(self.inner.as_ptr());
381 slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
382 }
383 }
384
385 /// Returns a slice of the current value.
386 ///
387 /// # Safety
388 ///
389 /// The iterator must be valid (i.e., `valid()` returns true). Calling this
390 /// method when the iterator is invalid is undefined behavior, as RocksDB
391 /// may return an invalid pointer.
392 ///
393 /// Uses `rocksdb_iter_value_slice` which returns a `rocksdb_slice_t` by value,
394 /// avoiding the overhead of output parameters compared to `rocksdb_iter_value`.
395 unsafe fn value_impl(&self) -> &[u8] {
396 unsafe {
397 let slice = ffi::rocksdb_iter_value_slice(self.inner.as_ptr());
398 slice::from_raw_parts(slice.data as *const c_uchar, slice.size)
399 }
400 }
401}
402
403impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
404 fn drop(&mut self) {
405 unsafe {
406 ffi::rocksdb_iter_destroy(self.inner.as_ptr());
407 }
408 }
409}
410
411unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
412unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
413
414/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
415pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
416
417/// A standard Rust [`Iterator`] over a database or column family.
418///
419/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
420/// RocksDB's API, which can provide better performance and more features.
421///
422/// ```
423/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
424///
425/// let tempdir = tempfile::Builder::new()
426/// .prefix("_path_for_rocksdb_storage2")
427/// .tempdir()
428/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
429/// let path = tempdir.path();
430/// {
431/// let db = DB::open_default(path).unwrap();
432/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
433/// for item in iter {
434/// let (key, value) = item.unwrap();
435/// println!("Saw {:?} {:?}", key, value);
436/// }
437/// iter = db.iterator(IteratorMode::End); // Always iterates backward
438/// for item in iter {
439/// let (key, value) = item.unwrap();
440/// println!("Saw {:?} {:?}", key, value);
441/// }
442/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
443/// for item in iter {
444/// let (key, value) = item.unwrap();
445/// println!("Saw {:?} {:?}", key, value);
446/// }
447///
448/// // You can seek with an existing Iterator instance, too
449/// iter = db.iterator(IteratorMode::Start);
450/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
451/// for item in iter {
452/// let (key, value) = item.unwrap();
453/// println!("Saw {:?} {:?}", key, value);
454/// }
455/// }
456/// let _ = DB::destroy(&Options::default(), path);
457/// ```
458pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
459 raw: DBRawIteratorWithThreadMode<'a, D>,
460 direction: Direction,
461 done: bool,
462}
463
464#[derive(Copy, Clone)]
465pub enum Direction {
466 Forward,
467 Reverse,
468}
469
470pub type KVBytes = (Box<[u8]>, Box<[u8]>);
471
472#[derive(Copy, Clone)]
473pub enum IteratorMode<'a> {
474 Start,
475 End,
476 From(&'a [u8], Direction),
477}
478
479impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
480 pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
481 Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
482 }
483
484 pub(crate) fn new_cf(
485 db: &'a D,
486 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
487 readopts: ReadOptions,
488 mode: IteratorMode,
489 ) -> Self {
490 Self::from_raw(
491 DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
492 mode,
493 )
494 }
495
496 fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
497 let mut rv = DBIteratorWithThreadMode {
498 raw,
499 direction: Direction::Forward, // blown away by set_mode()
500 done: false,
501 };
502 rv.set_mode(mode);
503 rv
504 }
505
506 pub fn set_mode(&mut self, mode: IteratorMode) {
507 self.done = false;
508 self.direction = match mode {
509 IteratorMode::Start => {
510 self.raw.seek_to_first();
511 Direction::Forward
512 }
513 IteratorMode::End => {
514 self.raw.seek_to_last();
515 Direction::Reverse
516 }
517 IteratorMode::From(key, Direction::Forward) => {
518 self.raw.seek(key);
519 Direction::Forward
520 }
521 IteratorMode::From(key, Direction::Reverse) => {
522 self.raw.seek_for_prev(key);
523 Direction::Reverse
524 }
525 };
526 }
527}
528
529impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
530 type Item = Result<KVBytes, Error>;
531
532 fn next(&mut self) -> Option<Result<KVBytes, Error>> {
533 if self.done {
534 None
535 } else if let Some((key, value)) = self.raw.item() {
536 let item = (Box::from(key), Box::from(value));
537 match self.direction {
538 Direction::Forward => self.raw.next(),
539 Direction::Reverse => self.raw.prev(),
540 }
541 Some(Ok(item))
542 } else {
543 self.done = true;
544 self.raw.status().err().map(Result::Err)
545 }
546 }
547}
548
549impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
550
551impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
552 fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
553 self.raw
554 }
555}
556
557/// Iterates the batches of writes since a given sequence number.
558///
559/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
560/// batches of write operations that have occurred since a given sequence number
561/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
562/// the application.
563///
564/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
565/// value is the sequence number of the associated write batch.
566///
567pub struct DBWALIterator {
568 pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
569 pub(crate) start_seq_number: u64,
570}
571
572impl DBWALIterator {
573 /// Returns `true` if the iterator is valid. An iterator is invalidated when
574 /// it reaches the end of its defined range, or when it encounters an error.
575 ///
576 /// To check whether the iterator encountered an error after `valid` has
577 /// returned `false`, use the [`status`](DBWALIterator::status) method.
578 /// `status` will never return an error when `valid` is `true`.
579 pub fn valid(&self) -> bool {
580 unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
581 }
582
583 /// Returns an error `Result` if the iterator has encountered an error
584 /// during operation. When an error is encountered, the iterator is
585 /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
586 /// called.
587 pub fn status(&self) -> Result<(), Error> {
588 unsafe {
589 ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
590 }
591 Ok(())
592 }
593}
594
595impl Iterator for DBWALIterator {
596 type Item = Result<(u64, WriteBatch), Error>;
597
598 fn next(&mut self) -> Option<Self::Item> {
599 if !self.valid() {
600 return None;
601 }
602
603 let mut seq: u64 = 0;
604 let mut batch = WriteBatch {
605 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
606 };
607
608 // if the initial sequence number is what was requested we skip it to
609 // only provide changes *after* it
610 while seq <= self.start_seq_number {
611 unsafe {
612 ffi::rocksdb_wal_iter_next(self.inner);
613 }
614
615 if !self.valid() {
616 return None;
617 }
618
619 // this drops which in turn frees the skipped batch
620 batch = WriteBatch {
621 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &raw mut seq) },
622 };
623 }
624
625 if !self.valid() {
626 return self.status().err().map(Result::Err);
627 }
628
629 // Seek to the next write batch.
630 // Note that WriteBatches live independently of the WAL iterator so this is safe to do
631 unsafe {
632 ffi::rocksdb_wal_iter_next(self.inner);
633 }
634
635 Some(Ok((seq, batch)))
636 }
637}
638
639impl Drop for DBWALIterator {
640 fn drop(&mut self) {
641 unsafe {
642 ffi::rocksdb_wal_iter_destroy(self.inner);
643 }
644 }
645}