rust_rocksdb/db_iterator.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 db::{DBAccess, DB},
17 ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// An iterator over a database or column family, with specifiable
26/// ranges and direction.
27///
28/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
29/// replicate the underlying iterator API within RocksDB itself. This should
30/// give access to more performance and flexibility but departs from the
31/// widely recognized Rust idioms.
32///
33/// ```
34/// use rust_rocksdb::{DB, Options};
35///
36/// let tempdir = tempfile::Builder::new()
37/// .prefix("_path_for_rocksdb_storage4")
38/// .tempdir()
39/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
40/// let path = tempdir.path();
41/// {
42/// let db = DB::open_default(path).unwrap();
43/// let mut iter = db.raw_iterator();
44///
45/// // Forwards iteration
46/// iter.seek_to_first();
47/// while iter.valid() {
48/// println!("Saw {:?} {:?}", iter.key(), iter.value());
49/// iter.next();
50/// }
51///
52/// // Reverse iteration
53/// iter.seek_to_last();
54/// while iter.valid() {
55/// println!("Saw {:?} {:?}", iter.key(), iter.value());
56/// iter.prev();
57/// }
58///
59/// // Seeking
60/// iter.seek(b"my key");
61/// while iter.valid() {
62/// println!("Saw {:?} {:?}", iter.key(), iter.value());
63/// iter.next();
64/// }
65///
66/// // Reverse iteration from key
67/// // Note, use seek_for_prev when reversing because if this key doesn't exist,
68/// // this will make the iterator start from the previous key rather than the next.
69/// iter.seek_for_prev(b"my key");
70/// while iter.valid() {
71/// println!("Saw {:?} {:?}", iter.key(), iter.value());
72/// iter.prev();
73/// }
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
78 inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
79
80 /// When iterate_lower_bound or iterate_upper_bound are set, the inner
81 /// C iterator keeps a pointer to the upper bound inside `_readopts`.
82 /// Storing this makes sure the upper bound is always alive when the
83 /// iterator is being used.
84 ///
85 /// And yes, we need to store the entire ReadOptions structure since C++
86 /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
87 /// point to vectors we own. See issue #660.
88 _readopts: ReadOptions,
89
90 db: PhantomData<&'a D>,
91}
92
93impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
94 pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
95 let inner = unsafe { db.create_iterator(&readopts) };
96 Self::from_inner(inner, readopts)
97 }
98
99 pub(crate) fn new_cf(
100 db: &'a D,
101 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
102 readopts: ReadOptions,
103 ) -> Self {
104 let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
105 Self::from_inner(inner, readopts)
106 }
107
108 fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
109 // This unwrap will never fail since rocksdb_create_iterator and
110 // rocksdb_create_iterator_cf functions always return non-null. They
111 // use new and deference the result so any nulls would end up with SIGSEGV
112 // there and we would have a bigger issue.
113 let inner = std::ptr::NonNull::new(inner).unwrap();
114 Self {
115 inner,
116 _readopts: readopts,
117 db: PhantomData,
118 }
119 }
120
121 /// Returns `true` if the iterator is valid. An iterator is invalidated when
122 /// it reaches the end of its defined range, or when it encounters an error.
123 ///
124 /// To check whether the iterator encountered an error after `valid` has
125 /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
126 /// return an error when `valid` is `true`.
127 pub fn valid(&self) -> bool {
128 unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
129 }
130
131 /// Returns an error `Result` if the iterator has encountered an error
132 /// during operation. When an error is encountered, the iterator is
133 /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
134 ///
135 /// Performing a seek will discard the current status.
136 pub fn status(&self) -> Result<(), Error> {
137 unsafe {
138 ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
139 }
140 Ok(())
141 }
142
143 /// Seeks to the first key in the database.
144 ///
145 /// # Examples
146 ///
147 /// ```rust
148 /// use rust_rocksdb::{DB, Options};
149 ///
150 /// let tempdir = tempfile::Builder::new()
151 /// .prefix("_path_for_rocksdb_storage5")
152 /// .tempdir()
153 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
154 /// let path = tempdir.path();
155 /// {
156 /// let db = DB::open_default(path).unwrap();
157 /// let mut iter = db.raw_iterator();
158 ///
159 /// // Iterate all keys from the start in lexicographic order
160 /// iter.seek_to_first();
161 ///
162 /// while iter.valid() {
163 /// println!("{:?} {:?}", iter.key(), iter.value());
164 /// iter.next();
165 /// }
166 ///
167 /// // Read just the first key
168 /// iter.seek_to_first();
169 ///
170 /// if iter.valid() {
171 /// println!("{:?} {:?}", iter.key(), iter.value());
172 /// } else {
173 /// // There are no keys in the database
174 /// }
175 /// }
176 /// let _ = DB::destroy(&Options::default(), path);
177 /// ```
178 pub fn seek_to_first(&mut self) {
179 unsafe {
180 ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
181 }
182 }
183
184 /// Seeks to the last key in the database.
185 ///
186 /// # Examples
187 ///
188 /// ```rust
189 /// use rust_rocksdb::{DB, Options};
190 ///
191 /// let tempdir = tempfile::Builder::new()
192 /// .prefix("_path_for_rocksdb_storage6")
193 /// .tempdir()
194 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
195 /// let path = tempdir.path();
196 /// {
197 /// let db = DB::open_default(path).unwrap();
198 /// let mut iter = db.raw_iterator();
199 ///
200 /// // Iterate all keys from the end in reverse lexicographic order
201 /// iter.seek_to_last();
202 ///
203 /// while iter.valid() {
204 /// println!("{:?} {:?}", iter.key(), iter.value());
205 /// iter.prev();
206 /// }
207 ///
208 /// // Read just the last key
209 /// iter.seek_to_last();
210 ///
211 /// if iter.valid() {
212 /// println!("{:?} {:?}", iter.key(), iter.value());
213 /// } else {
214 /// // There are no keys in the database
215 /// }
216 /// }
217 /// let _ = DB::destroy(&Options::default(), path);
218 /// ```
219 pub fn seek_to_last(&mut self) {
220 unsafe {
221 ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
222 }
223 }
224
225 /// Seeks to the specified key or the first key that lexicographically follows it.
226 ///
227 /// This method will attempt to seek to the specified key. If that key does not exist, it will
228 /// find and seek to the key that lexicographically follows it instead.
229 ///
230 /// # Examples
231 ///
232 /// ```rust
233 /// use rust_rocksdb::{DB, Options};
234 ///
235 /// let tempdir = tempfile::Builder::new()
236 /// .prefix("_path_for_rocksdb_storage7")
237 /// .tempdir()
238 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
239 /// let path = tempdir.path();
240 /// {
241 /// let db = DB::open_default(path).unwrap();
242 /// let mut iter = db.raw_iterator();
243 ///
244 /// // Read the first key that starts with 'a'
245 /// iter.seek(b"a");
246 ///
247 /// if iter.valid() {
248 /// println!("{:?} {:?}", iter.key(), iter.value());
249 /// } else {
250 /// // There are no keys in the database
251 /// }
252 /// }
253 /// let _ = DB::destroy(&Options::default(), path);
254 /// ```
255 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
256 let key = key.as_ref();
257
258 unsafe {
259 ffi::rocksdb_iter_seek(
260 self.inner.as_ptr(),
261 key.as_ptr() as *const c_char,
262 key.len() as size_t,
263 );
264 }
265 }
266
267 /// Seeks to the specified key, or the first key that lexicographically precedes it.
268 ///
269 /// Like ``.seek()`` this method will attempt to seek to the specified key.
270 /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
271 /// seek to key that lexicographically precedes it instead.
272 ///
273 /// # Examples
274 ///
275 /// ```rust
276 /// use rust_rocksdb::{DB, Options};
277 ///
278 /// let tempdir = tempfile::Builder::new()
279 /// .prefix("_path_for_rocksdb_storage8")
280 /// .tempdir()
281 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
282 /// let path = tempdir.path();
283 /// {
284 /// let db = DB::open_default(path).unwrap();
285 /// let mut iter = db.raw_iterator();
286 ///
287 /// // Read the last key that starts with 'a'
288 /// iter.seek_for_prev(b"b");
289 ///
290 /// if iter.valid() {
291 /// println!("{:?} {:?}", iter.key(), iter.value());
292 /// } else {
293 /// // There are no keys in the database
294 /// }
295 /// }
296 /// let _ = DB::destroy(&Options::default(), path);
297 /// ```
298 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
299 let key = key.as_ref();
300
301 unsafe {
302 ffi::rocksdb_iter_seek_for_prev(
303 self.inner.as_ptr(),
304 key.as_ptr() as *const c_char,
305 key.len() as size_t,
306 );
307 }
308 }
309
310 /// Seeks to the next key.
311 pub fn next(&mut self) {
312 if self.valid() {
313 unsafe {
314 ffi::rocksdb_iter_next(self.inner.as_ptr());
315 }
316 }
317 }
318
319 /// Seeks to the previous key.
320 pub fn prev(&mut self) {
321 if self.valid() {
322 unsafe {
323 ffi::rocksdb_iter_prev(self.inner.as_ptr());
324 }
325 }
326 }
327
328 /// Returns a slice of the current key.
329 pub fn key(&self) -> Option<&[u8]> {
330 if self.valid() {
331 Some(self.key_impl())
332 } else {
333 None
334 }
335 }
336
337 /// Returns a slice of the current value.
338 pub fn value(&self) -> Option<&[u8]> {
339 if self.valid() {
340 Some(self.value_impl())
341 } else {
342 None
343 }
344 }
345
346 /// Returns pair with slice of the current key and current value.
347 pub fn item(&self) -> Option<(&[u8], &[u8])> {
348 if self.valid() {
349 Some((self.key_impl(), self.value_impl()))
350 } else {
351 None
352 }
353 }
354
355 /// Returns a slice of the current key; assumes the iterator is valid.
356 fn key_impl(&self) -> &[u8] {
357 // Safety Note: This is safe as all methods that may invalidate the buffer returned
358 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
359 unsafe {
360 let mut key_len: size_t = 0;
361 let key_len_ptr: *mut size_t = &mut key_len;
362 let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
363 slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
364 }
365 }
366
367 /// Returns a slice of the current value; assumes the iterator is valid.
368 fn value_impl(&self) -> &[u8] {
369 // Safety Note: This is safe as all methods that may invalidate the buffer returned
370 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
371 unsafe {
372 let mut val_len: size_t = 0;
373 let val_len_ptr: *mut size_t = &mut val_len;
374 let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
375 slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
376 }
377 }
378}
379
380impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
381 fn drop(&mut self) {
382 unsafe {
383 ffi::rocksdb_iter_destroy(self.inner.as_ptr());
384 }
385 }
386}
387
388unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
389unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
390
391/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
392pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
393
394/// An iterator over a database or column family, with specifiable
395/// ranges and direction.
396///
397/// ```
398/// use rust_rocksdb::{DB, Direction, IteratorMode, Options};
399///
400/// let tempdir = tempfile::Builder::new()
401/// .prefix("_path_for_rocksdb_storage2")
402/// .tempdir()
403/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
404/// let path = tempdir.path();
405/// {
406/// let db = DB::open_default(path).unwrap();
407/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
408/// for item in iter {
409/// let (key, value) = item.unwrap();
410/// println!("Saw {:?} {:?}", key, value);
411/// }
412/// iter = db.iterator(IteratorMode::End); // Always iterates backward
413/// for item in iter {
414/// let (key, value) = item.unwrap();
415/// println!("Saw {:?} {:?}", key, value);
416/// }
417/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
418/// for item in iter {
419/// let (key, value) = item.unwrap();
420/// println!("Saw {:?} {:?}", key, value);
421/// }
422///
423/// // You can seek with an existing Iterator instance, too
424/// iter = db.iterator(IteratorMode::Start);
425/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
426/// for item in iter {
427/// let (key, value) = item.unwrap();
428/// println!("Saw {:?} {:?}", key, value);
429/// }
430/// }
431/// let _ = DB::destroy(&Options::default(), path);
432/// ```
433pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
434 raw: DBRawIteratorWithThreadMode<'a, D>,
435 direction: Direction,
436 done: bool,
437}
438
439#[derive(Copy, Clone)]
440pub enum Direction {
441 Forward,
442 Reverse,
443}
444
445pub type KVBytes = (Box<[u8]>, Box<[u8]>);
446
447#[derive(Copy, Clone)]
448pub enum IteratorMode<'a> {
449 Start,
450 End,
451 From(&'a [u8], Direction),
452}
453
454impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
455 pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
456 Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
457 }
458
459 pub(crate) fn new_cf(
460 db: &'a D,
461 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
462 readopts: ReadOptions,
463 mode: IteratorMode,
464 ) -> Self {
465 Self::from_raw(
466 DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
467 mode,
468 )
469 }
470
471 fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
472 let mut rv = DBIteratorWithThreadMode {
473 raw,
474 direction: Direction::Forward, // blown away by set_mode()
475 done: false,
476 };
477 rv.set_mode(mode);
478 rv
479 }
480
481 pub fn set_mode(&mut self, mode: IteratorMode) {
482 self.done = false;
483 self.direction = match mode {
484 IteratorMode::Start => {
485 self.raw.seek_to_first();
486 Direction::Forward
487 }
488 IteratorMode::End => {
489 self.raw.seek_to_last();
490 Direction::Reverse
491 }
492 IteratorMode::From(key, Direction::Forward) => {
493 self.raw.seek(key);
494 Direction::Forward
495 }
496 IteratorMode::From(key, Direction::Reverse) => {
497 self.raw.seek_for_prev(key);
498 Direction::Reverse
499 }
500 };
501 }
502}
503
504impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
505 type Item = Result<KVBytes, Error>;
506
507 fn next(&mut self) -> Option<Result<KVBytes, Error>> {
508 if self.done {
509 None
510 } else if let Some((key, value)) = self.raw.item() {
511 let item = (Box::from(key), Box::from(value));
512 match self.direction {
513 Direction::Forward => self.raw.next(),
514 Direction::Reverse => self.raw.prev(),
515 }
516 Some(Ok(item))
517 } else {
518 self.done = true;
519 self.raw.status().err().map(Result::Err)
520 }
521 }
522}
523
524impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
525
526impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
527 fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
528 self.raw
529 }
530}
531
532/// Iterates the batches of writes since a given sequence number.
533///
534/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
535/// batches of write operations that have occurred since a given sequence number
536/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
537/// the application.
538///
539/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
540/// value is the sequence number of the associated write batch.
541///
542pub struct DBWALIterator {
543 pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
544 pub(crate) start_seq_number: u64,
545}
546
547impl DBWALIterator {
548 /// Returns `true` if the iterator is valid. An iterator is invalidated when
549 /// it reaches the end of its defined range, or when it encounters an error.
550 ///
551 /// To check whether the iterator encountered an error after `valid` has
552 /// returned `false`, use the [`status`](DBWALIterator::status) method.
553 /// `status` will never return an error when `valid` is `true`.
554 pub fn valid(&self) -> bool {
555 unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
556 }
557
558 /// Returns an error `Result` if the iterator has encountered an error
559 /// during operation. When an error is encountered, the iterator is
560 /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
561 /// called.
562 pub fn status(&self) -> Result<(), Error> {
563 unsafe {
564 ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
565 }
566 Ok(())
567 }
568}
569
570impl Iterator for DBWALIterator {
571 type Item = Result<(u64, WriteBatch), Error>;
572
573 fn next(&mut self) -> Option<Self::Item> {
574 if !self.valid() {
575 return None;
576 }
577
578 let mut seq: u64 = 0;
579 let mut batch = WriteBatch {
580 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
581 };
582
583 // if the initial sequence number is what was requested we skip it to
584 // only provide changes *after* it
585 while seq <= self.start_seq_number {
586 unsafe {
587 ffi::rocksdb_wal_iter_next(self.inner);
588 }
589
590 if !self.valid() {
591 return None;
592 }
593
594 // this drops which in turn frees the skipped batch
595 batch = WriteBatch {
596 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
597 };
598 }
599
600 if !self.valid() {
601 return self.status().err().map(Result::Err);
602 }
603
604 // Seek to the next write batch.
605 // Note that WriteBatches live independently of the WAL iterator so this is safe to do
606 unsafe {
607 ffi::rocksdb_wal_iter_next(self.inner);
608 }
609
610 Some(Ok((seq, batch)))
611 }
612}
613
614impl Drop for DBWALIterator {
615 fn drop(&mut self) {
616 unsafe {
617 ffi::rocksdb_wal_iter_destroy(self.inner);
618 }
619 }
620}