rust_rocksdb/db_iterator.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 db::{DBAccess, DB},
17 ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::mem::ManuallyDrop;
21use std::{marker::PhantomData, slice};
22
23/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
24pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
25
26/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
27/// and other `raw_iterator_*` methods.
28///
29/// This iterator replicates RocksDB's API. It should provide better
30/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
31/// Rust [`std::iter::Iterator`].
32///
33/// ```
34/// use rust_rocksdb::{DB, Options};
35///
36/// let tempdir = tempfile::Builder::new()
37/// .prefix("_path_for_rocksdb_storage4")
38/// .tempdir()
39/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
40/// let path = tempdir.path();
41/// {
42/// let db = DB::open_default(path).unwrap();
43/// let mut iter = db.raw_iterator();
44///
45/// // Forwards iteration
46/// iter.seek_to_first();
47/// while iter.valid() {
48/// println!("Saw {:?} {:?}", iter.key(), iter.value());
49/// iter.next();
50/// }
51///
52/// // Reverse iteration
53/// iter.seek_to_last();
54/// while iter.valid() {
55/// println!("Saw {:?} {:?}", iter.key(), iter.value());
56/// iter.prev();
57/// }
58///
59/// // Seeking
60/// iter.seek(b"my key");
61/// while iter.valid() {
62/// println!("Saw {:?} {:?}", iter.key(), iter.value());
63/// iter.next();
64/// }
65///
66/// // Reverse iteration from key
67/// // Note, use seek_for_prev when reversing because if this key doesn't exist,
68/// // this will make the iterator start from the previous key rather than the next.
69/// iter.seek_for_prev(b"my key");
70/// while iter.valid() {
71/// println!("Saw {:?} {:?}", iter.key(), iter.value());
72/// iter.prev();
73/// }
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
78 inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
79
80 /// When iterate_lower_bound or iterate_upper_bound are set, the inner
81 /// C iterator keeps a pointer to the upper bound inside `_readopts`.
82 /// Storing this makes sure the upper bound is always alive when the
83 /// iterator is being used.
84 ///
85 /// And yes, we need to store the entire ReadOptions structure since C++
86 /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
87 /// point to vectors we own. See issue #660.
88 readopts: ReadOptions,
89
90 db: PhantomData<&'a D>,
91}
92
93impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
94 pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
95 let inner = unsafe { db.create_iterator(&readopts) };
96 Self::from_inner(inner, readopts)
97 }
98
99 pub(crate) fn new_cf(
100 db: &'a D,
101 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
102 readopts: ReadOptions,
103 ) -> Self {
104 let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
105 Self::from_inner(inner, readopts)
106 }
107
108 pub(crate) fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
109 // This unwrap will never fail since rocksdb_create_iterator and
110 // rocksdb_create_iterator_cf functions always return non-null. They
111 // use new and deference the result so any nulls would end up with SIGSEGV
112 // there and we would have a bigger issue.
113 let inner = std::ptr::NonNull::new(inner).unwrap();
114 Self {
115 inner,
116 readopts,
117 db: PhantomData,
118 }
119 }
120
121 pub(crate) fn into_inner(self) -> (std::ptr::NonNull<ffi::rocksdb_iterator_t>, ReadOptions) {
122 let value = ManuallyDrop::new(self);
123 // SAFETY: value won't be used beyond this point
124 let inner = unsafe { std::ptr::read(&value.inner) };
125 let readopts = unsafe { std::ptr::read(&value.readopts) };
126
127 (inner, readopts)
128 }
129
130 /// Returns `true` if the iterator is valid. An iterator is invalidated when
131 /// it reaches the end of its defined range, or when it encounters an error.
132 ///
133 /// To check whether the iterator encountered an error after `valid` has
134 /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
135 /// return an error when `valid` is `true`.
136 pub fn valid(&self) -> bool {
137 unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
138 }
139
140 /// Returns an error `Result` if the iterator has encountered an error
141 /// during operation. When an error is encountered, the iterator is
142 /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
143 ///
144 /// Performing a seek will discard the current status.
145 pub fn status(&self) -> Result<(), Error> {
146 unsafe {
147 ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
148 }
149 Ok(())
150 }
151
152 /// Seeks to the first key in the database.
153 ///
154 /// # Examples
155 ///
156 /// ```rust
157 /// use rust_rocksdb::{DB, Options};
158 ///
159 /// let tempdir = tempfile::Builder::new()
160 /// .prefix("_path_for_rocksdb_storage5")
161 /// .tempdir()
162 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
163 /// let path = tempdir.path();
164 /// {
165 /// let db = DB::open_default(path).unwrap();
166 /// let mut iter = db.raw_iterator();
167 ///
168 /// // Iterate all keys from the start in lexicographic order
169 /// iter.seek_to_first();
170 ///
171 /// while iter.valid() {
172 /// println!("{:?} {:?}", iter.key(), iter.value());
173 /// iter.next();
174 /// }
175 ///
176 /// // Read just the first key
177 /// iter.seek_to_first();
178 ///
179 /// if iter.valid() {
180 /// println!("{:?} {:?}", iter.key(), iter.value());
181 /// } else {
182 /// // There are no keys in the database
183 /// }
184 /// }
185 /// let _ = DB::destroy(&Options::default(), path);
186 /// ```
187 pub fn seek_to_first(&mut self) {
188 unsafe {
189 ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
190 }
191 }
192
193 /// Seeks to the last key in the database.
194 ///
195 /// # Examples
196 ///
197 /// ```rust
198 /// use rust_rocksdb::{DB, Options};
199 ///
200 /// let tempdir = tempfile::Builder::new()
201 /// .prefix("_path_for_rocksdb_storage6")
202 /// .tempdir()
203 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
204 /// let path = tempdir.path();
205 /// {
206 /// let db = DB::open_default(path).unwrap();
207 /// let mut iter = db.raw_iterator();
208 ///
209 /// // Iterate all keys from the end in reverse lexicographic order
210 /// iter.seek_to_last();
211 ///
212 /// while iter.valid() {
213 /// println!("{:?} {:?}", iter.key(), iter.value());
214 /// iter.prev();
215 /// }
216 ///
217 /// // Read just the last key
218 /// iter.seek_to_last();
219 ///
220 /// if iter.valid() {
221 /// println!("{:?} {:?}", iter.key(), iter.value());
222 /// } else {
223 /// // There are no keys in the database
224 /// }
225 /// }
226 /// let _ = DB::destroy(&Options::default(), path);
227 /// ```
228 pub fn seek_to_last(&mut self) {
229 unsafe {
230 ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
231 }
232 }
233
234 /// Seeks to the specified key or the first key that lexicographically follows it.
235 ///
236 /// This method will attempt to seek to the specified key. If that key does not exist, it will
237 /// find and seek to the key that lexicographically follows it instead.
238 ///
239 /// # Examples
240 ///
241 /// ```rust
242 /// use rust_rocksdb::{DB, Options};
243 ///
244 /// let tempdir = tempfile::Builder::new()
245 /// .prefix("_path_for_rocksdb_storage7")
246 /// .tempdir()
247 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
248 /// let path = tempdir.path();
249 /// {
250 /// let db = DB::open_default(path).unwrap();
251 /// let mut iter = db.raw_iterator();
252 ///
253 /// // Read the first key that starts with 'a'
254 /// iter.seek(b"a");
255 ///
256 /// if iter.valid() {
257 /// println!("{:?} {:?}", iter.key(), iter.value());
258 /// } else {
259 /// // There are no keys in the database
260 /// }
261 /// }
262 /// let _ = DB::destroy(&Options::default(), path);
263 /// ```
264 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
265 let key = key.as_ref();
266
267 unsafe {
268 ffi::rocksdb_iter_seek(
269 self.inner.as_ptr(),
270 key.as_ptr() as *const c_char,
271 key.len() as size_t,
272 );
273 }
274 }
275
276 /// Seeks to the specified key, or the first key that lexicographically precedes it.
277 ///
278 /// Like ``.seek()`` this method will attempt to seek to the specified key.
279 /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
280 /// seek to key that lexicographically precedes it instead.
281 ///
282 /// # Examples
283 ///
284 /// ```rust
285 /// use rust_rocksdb::{DB, Options};
286 ///
287 /// let tempdir = tempfile::Builder::new()
288 /// .prefix("_path_for_rocksdb_storage8")
289 /// .tempdir()
290 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
291 /// let path = tempdir.path();
292 /// {
293 /// let db = DB::open_default(path).unwrap();
294 /// let mut iter = db.raw_iterator();
295 ///
296 /// // Read the last key that starts with 'a'
297 /// iter.seek_for_prev(b"b");
298 ///
299 /// if iter.valid() {
300 /// println!("{:?} {:?}", iter.key(), iter.value());
301 /// } else {
302 /// // There are no keys in the database
303 /// }
304 /// }
305 /// let _ = DB::destroy(&Options::default(), path);
306 /// ```
307 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
308 let key = key.as_ref();
309
310 unsafe {
311 ffi::rocksdb_iter_seek_for_prev(
312 self.inner.as_ptr(),
313 key.as_ptr() as *const c_char,
314 key.len() as size_t,
315 );
316 }
317 }
318
319 /// Seeks to the next key.
320 pub fn next(&mut self) {
321 if self.valid() {
322 unsafe {
323 ffi::rocksdb_iter_next(self.inner.as_ptr());
324 }
325 }
326 }
327
328 /// Seeks to the previous key.
329 pub fn prev(&mut self) {
330 if self.valid() {
331 unsafe {
332 ffi::rocksdb_iter_prev(self.inner.as_ptr());
333 }
334 }
335 }
336
337 /// Returns a slice of the current key.
338 pub fn key(&self) -> Option<&[u8]> {
339 if self.valid() {
340 Some(self.key_impl())
341 } else {
342 None
343 }
344 }
345
346 /// Returns a slice of the current value.
347 pub fn value(&self) -> Option<&[u8]> {
348 if self.valid() {
349 Some(self.value_impl())
350 } else {
351 None
352 }
353 }
354
355 /// Returns pair with slice of the current key and current value.
356 pub fn item(&self) -> Option<(&[u8], &[u8])> {
357 if self.valid() {
358 Some((self.key_impl(), self.value_impl()))
359 } else {
360 None
361 }
362 }
363
364 /// Returns a slice of the current key; assumes the iterator is valid.
365 fn key_impl(&self) -> &[u8] {
366 // Safety Note: This is safe as all methods that may invalidate the buffer returned
367 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
368 unsafe {
369 let mut key_len: size_t = 0;
370 let key_len_ptr: *mut size_t = &mut key_len;
371 let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
372 slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
373 }
374 }
375
376 /// Returns a slice of the current value; assumes the iterator is valid.
377 fn value_impl(&self) -> &[u8] {
378 // Safety Note: This is safe as all methods that may invalidate the buffer returned
379 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
380 unsafe {
381 let mut val_len: size_t = 0;
382 let val_len_ptr: *mut size_t = &mut val_len;
383 let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
384 slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
385 }
386 }
387}
388
389impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
390 fn drop(&mut self) {
391 unsafe {
392 ffi::rocksdb_iter_destroy(self.inner.as_ptr());
393 }
394 }
395}
396
397unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
398unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
399
400/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
401pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
402
403/// A standard Rust [`Iterator`] over a database or column family.
404///
405/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
406/// RocksDB's API, which can provide better performance and more features.
407///
408/// ```
409/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
410///
411/// let tempdir = tempfile::Builder::new()
412/// .prefix("_path_for_rocksdb_storage2")
413/// .tempdir()
414/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
415/// let path = tempdir.path();
416/// {
417/// let db = DB::open_default(path).unwrap();
418/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
419/// for item in iter {
420/// let (key, value) = item.unwrap();
421/// println!("Saw {:?} {:?}", key, value);
422/// }
423/// iter = db.iterator(IteratorMode::End); // Always iterates backward
424/// for item in iter {
425/// let (key, value) = item.unwrap();
426/// println!("Saw {:?} {:?}", key, value);
427/// }
428/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
429/// for item in iter {
430/// let (key, value) = item.unwrap();
431/// println!("Saw {:?} {:?}", key, value);
432/// }
433///
434/// // You can seek with an existing Iterator instance, too
435/// iter = db.iterator(IteratorMode::Start);
436/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
437/// for item in iter {
438/// let (key, value) = item.unwrap();
439/// println!("Saw {:?} {:?}", key, value);
440/// }
441/// }
442/// let _ = DB::destroy(&Options::default(), path);
443/// ```
444pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
445 raw: DBRawIteratorWithThreadMode<'a, D>,
446 direction: Direction,
447 done: bool,
448}
449
450#[derive(Copy, Clone)]
451pub enum Direction {
452 Forward,
453 Reverse,
454}
455
456pub type KVBytes = (Box<[u8]>, Box<[u8]>);
457
458#[derive(Copy, Clone)]
459pub enum IteratorMode<'a> {
460 Start,
461 End,
462 From(&'a [u8], Direction),
463}
464
465impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
466 pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
467 Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
468 }
469
470 pub(crate) fn new_cf(
471 db: &'a D,
472 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
473 readopts: ReadOptions,
474 mode: IteratorMode,
475 ) -> Self {
476 Self::from_raw(
477 DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
478 mode,
479 )
480 }
481
482 fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
483 let mut rv = DBIteratorWithThreadMode {
484 raw,
485 direction: Direction::Forward, // blown away by set_mode()
486 done: false,
487 };
488 rv.set_mode(mode);
489 rv
490 }
491
492 pub fn set_mode(&mut self, mode: IteratorMode) {
493 self.done = false;
494 self.direction = match mode {
495 IteratorMode::Start => {
496 self.raw.seek_to_first();
497 Direction::Forward
498 }
499 IteratorMode::End => {
500 self.raw.seek_to_last();
501 Direction::Reverse
502 }
503 IteratorMode::From(key, Direction::Forward) => {
504 self.raw.seek(key);
505 Direction::Forward
506 }
507 IteratorMode::From(key, Direction::Reverse) => {
508 self.raw.seek_for_prev(key);
509 Direction::Reverse
510 }
511 };
512 }
513}
514
515impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
516 type Item = Result<KVBytes, Error>;
517
518 fn next(&mut self) -> Option<Result<KVBytes, Error>> {
519 if self.done {
520 None
521 } else if let Some((key, value)) = self.raw.item() {
522 let item = (Box::from(key), Box::from(value));
523 match self.direction {
524 Direction::Forward => self.raw.next(),
525 Direction::Reverse => self.raw.prev(),
526 }
527 Some(Ok(item))
528 } else {
529 self.done = true;
530 self.raw.status().err().map(Result::Err)
531 }
532 }
533}
534
535impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
536
537impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
538 fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
539 self.raw
540 }
541}
542
543/// Iterates the batches of writes since a given sequence number.
544///
545/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
546/// batches of write operations that have occurred since a given sequence number
547/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
548/// the application.
549///
550/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
551/// value is the sequence number of the associated write batch.
552///
553pub struct DBWALIterator {
554 pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
555 pub(crate) start_seq_number: u64,
556}
557
558impl DBWALIterator {
559 /// Returns `true` if the iterator is valid. An iterator is invalidated when
560 /// it reaches the end of its defined range, or when it encounters an error.
561 ///
562 /// To check whether the iterator encountered an error after `valid` has
563 /// returned `false`, use the [`status`](DBWALIterator::status) method.
564 /// `status` will never return an error when `valid` is `true`.
565 pub fn valid(&self) -> bool {
566 unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
567 }
568
569 /// Returns an error `Result` if the iterator has encountered an error
570 /// during operation. When an error is encountered, the iterator is
571 /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
572 /// called.
573 pub fn status(&self) -> Result<(), Error> {
574 unsafe {
575 ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
576 }
577 Ok(())
578 }
579}
580
581impl Iterator for DBWALIterator {
582 type Item = Result<(u64, WriteBatch), Error>;
583
584 fn next(&mut self) -> Option<Self::Item> {
585 if !self.valid() {
586 return None;
587 }
588
589 let mut seq: u64 = 0;
590 let mut batch = WriteBatch {
591 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
592 };
593
594 // if the initial sequence number is what was requested we skip it to
595 // only provide changes *after* it
596 while seq <= self.start_seq_number {
597 unsafe {
598 ffi::rocksdb_wal_iter_next(self.inner);
599 }
600
601 if !self.valid() {
602 return None;
603 }
604
605 // this drops which in turn frees the skipped batch
606 batch = WriteBatch {
607 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
608 };
609 }
610
611 if !self.valid() {
612 return self.status().err().map(Result::Err);
613 }
614
615 // Seek to the next write batch.
616 // Note that WriteBatches live independently of the WAL iterator so this is safe to do
617 unsafe {
618 ffi::rocksdb_wal_iter_next(self.inner);
619 }
620
621 Some(Ok((seq, batch)))
622 }
623}
624
625impl Drop for DBWALIterator {
626 fn drop(&mut self) {
627 unsafe {
628 ffi::rocksdb_wal_iter_destroy(self.inner);
629 }
630 }
631}