haizhi_rocksdb/db_iterator.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 db::{DBAccess, DB},
17 ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// An iterator over a database or column family, with specifiable
26/// ranges and direction.
27///
28/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
29/// replicate the underlying iterator API within RocksDB itself. This should
30/// give access to more performance and flexibility but departs from the
31/// widely recognised Rust idioms.
32///
33/// ```
34/// use rocksdb::{DB, Options};
35///
36/// let path = "_path_for_rocksdb_storage4";
37/// {
38/// let db = DB::open_default(path).unwrap();
39/// let mut iter = db.raw_iterator();
40///
41/// // Forwards iteration
42/// iter.seek_to_first();
43/// while iter.valid() {
44/// println!("Saw {:?} {:?}", iter.key(), iter.value());
45/// iter.next();
46/// }
47///
48/// // Reverse iteration
49/// iter.seek_to_last();
50/// while iter.valid() {
51/// println!("Saw {:?} {:?}", iter.key(), iter.value());
52/// iter.prev();
53/// }
54///
55/// // Seeking
56/// iter.seek(b"my key");
57/// while iter.valid() {
58/// println!("Saw {:?} {:?}", iter.key(), iter.value());
59/// iter.next();
60/// }
61///
62/// // Reverse iteration from key
63/// // Note, use seek_for_prev when reversing because if this key doesn't exist,
64/// // this will make the iterator start from the previous key rather than the next.
65/// iter.seek_for_prev(b"my key");
66/// while iter.valid() {
67/// println!("Saw {:?} {:?}", iter.key(), iter.value());
68/// iter.prev();
69/// }
70/// }
71/// let _ = DB::destroy(&Options::default(), path);
72/// ```
73pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
74 inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
75
76 /// When iterate_lower_bound or iterate_upper_bound are set, the inner
77 /// C iterator keeps a pointer to the upper bound inside `_readopts`.
78 /// Storing this makes sure the upper bound is always alive when the
79 /// iterator is being used.
80 ///
81 /// And yes, we need to store the entire ReadOptions structure since C++
82 /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
83 /// point to vectors we own. See issue #660.
84 _readopts: ReadOptions,
85
86 db: PhantomData<&'a D>,
87}
88
89impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
90 pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
91 let inner = unsafe { db.create_iterator(&readopts) };
92 Self::from_inner(inner, readopts)
93 }
94
95 pub(crate) fn new_cf(
96 db: &'a D,
97 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
98 readopts: ReadOptions,
99 ) -> Self {
100 let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
101 Self::from_inner(inner, readopts)
102 }
103
104 fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
105 // This unwrap will never fail since rocksdb_create_iterator and
106 // rocksdb_create_iterator_cf functions always return non-null. They
107 // use new and deference the result so any nulls would end up with SIGSEGV
108 // there and we would have a bigger issue.
109 let inner = std::ptr::NonNull::new(inner).unwrap();
110 Self {
111 inner,
112 _readopts: readopts,
113 db: PhantomData,
114 }
115 }
116
117 /// Returns `true` if the iterator is valid. An iterator is invalidated when
118 /// it reaches the end of its defined range, or when it encounters an error.
119 ///
120 /// To check whether the iterator encountered an error after `valid` has
121 /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
122 /// return an error when `valid` is `true`.
123 pub fn valid(&self) -> bool {
124 unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
125 }
126
127 /// Returns an error `Result` if the iterator has encountered an error
128 /// during operation. When an error is encountered, the iterator is
129 /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
130 ///
131 /// Performing a seek will discard the current status.
132 pub fn status(&self) -> Result<(), Error> {
133 unsafe {
134 ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
135 }
136 Ok(())
137 }
138
139 /// Seeks to the first key in the database.
140 ///
141 /// # Examples
142 ///
143 /// ```rust
144 /// use rocksdb::{DB, Options};
145 ///
146 /// let path = "_path_for_rocksdb_storage5";
147 /// {
148 /// let db = DB::open_default(path).unwrap();
149 /// let mut iter = db.raw_iterator();
150 ///
151 /// // Iterate all keys from the start in lexicographic order
152 /// iter.seek_to_first();
153 ///
154 /// while iter.valid() {
155 /// println!("{:?} {:?}", iter.key(), iter.value());
156 /// iter.next();
157 /// }
158 ///
159 /// // Read just the first key
160 /// iter.seek_to_first();
161 ///
162 /// if iter.valid() {
163 /// println!("{:?} {:?}", iter.key(), iter.value());
164 /// } else {
165 /// // There are no keys in the database
166 /// }
167 /// }
168 /// let _ = DB::destroy(&Options::default(), path);
169 /// ```
170 pub fn seek_to_first(&mut self) {
171 unsafe {
172 ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
173 }
174 }
175
176 pub fn refresh(&mut self) {
177 unsafe {
178 ffi::rocksdb_iter_refresh(self.inner.as_ptr());
179 }
180 }
181
182 /// Seeks to the last key in the database.
183 ///
184 /// # Examples
185 ///
186 /// ```rust
187 /// use rocksdb::{DB, Options};
188 ///
189 /// let path = "_path_for_rocksdb_storage6";
190 /// {
191 /// let db = DB::open_default(path).unwrap();
192 /// let mut iter = db.raw_iterator();
193 ///
194 /// // Iterate all keys from the end in reverse lexicographic order
195 /// iter.seek_to_last();
196 ///
197 /// while iter.valid() {
198 /// println!("{:?} {:?}", iter.key(), iter.value());
199 /// iter.prev();
200 /// }
201 ///
202 /// // Read just the last key
203 /// iter.seek_to_last();
204 ///
205 /// if iter.valid() {
206 /// println!("{:?} {:?}", iter.key(), iter.value());
207 /// } else {
208 /// // There are no keys in the database
209 /// }
210 /// }
211 /// let _ = DB::destroy(&Options::default(), path);
212 /// ```
213 pub fn seek_to_last(&mut self) {
214 unsafe {
215 ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
216 }
217 }
218
219 /// Seeks to the specified key or the first key that lexicographically follows it.
220 ///
221 /// This method will attempt to seek to the specified key. If that key does not exist, it will
222 /// find and seek to the key that lexicographically follows it instead.
223 ///
224 /// # Examples
225 ///
226 /// ```rust
227 /// use rocksdb::{DB, Options};
228 ///
229 /// let path = "_path_for_rocksdb_storage7";
230 /// {
231 /// let db = DB::open_default(path).unwrap();
232 /// let mut iter = db.raw_iterator();
233 ///
234 /// // Read the first key that starts with 'a'
235 /// iter.seek(b"a");
236 ///
237 /// if iter.valid() {
238 /// println!("{:?} {:?}", iter.key(), iter.value());
239 /// } else {
240 /// // There are no keys in the database
241 /// }
242 /// }
243 /// let _ = DB::destroy(&Options::default(), path);
244 /// ```
245 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
246 let key = key.as_ref();
247
248 unsafe {
249 ffi::rocksdb_iter_seek(
250 self.inner.as_ptr(),
251 key.as_ptr() as *const c_char,
252 key.len() as size_t,
253 );
254 }
255 }
256
257 /// Seeks to the specified key, or the first key that lexicographically precedes it.
258 ///
259 /// Like ``.seek()`` this method will attempt to seek to the specified key.
260 /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
261 /// seek to key that lexicographically precedes it instead.
262 ///
263 /// # Examples
264 ///
265 /// ```rust
266 /// use rocksdb::{DB, Options};
267 ///
268 /// let path = "_path_for_rocksdb_storage8";
269 /// {
270 /// let db = DB::open_default(path).unwrap();
271 /// let mut iter = db.raw_iterator();
272 ///
273 /// // Read the last key that starts with 'a'
274 /// iter.seek_for_prev(b"b");
275 ///
276 /// if iter.valid() {
277 /// println!("{:?} {:?}", iter.key(), iter.value());
278 /// } else {
279 /// // There are no keys in the database
280 /// }
281 /// }
282 /// let _ = DB::destroy(&Options::default(), path);
283 /// ```
284 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
285 let key = key.as_ref();
286
287 unsafe {
288 ffi::rocksdb_iter_seek_for_prev(
289 self.inner.as_ptr(),
290 key.as_ptr() as *const c_char,
291 key.len() as size_t,
292 );
293 }
294 }
295
296 /// Seeks to the next key.
297 pub fn next(&mut self) {
298 unsafe {
299 ffi::rocksdb_iter_next(self.inner.as_ptr());
300 }
301 }
302
303 /// Seeks to the previous key.
304 pub fn prev(&mut self) {
305 unsafe {
306 ffi::rocksdb_iter_prev(self.inner.as_ptr());
307 }
308 }
309
310 /// Returns a slice of the current key.
311 pub fn key(&self) -> Option<&[u8]> {
312 if self.valid() {
313 Some(self.key_impl())
314 } else {
315 None
316 }
317 }
318
319 /// Returns a slice of the current value.
320 pub fn value(&self) -> Option<&[u8]> {
321 if self.valid() {
322 Some(self.value_impl())
323 } else {
324 None
325 }
326 }
327
328 /// Returns pair with slice of the current key and current value.
329 pub fn item(&self) -> Option<(&[u8], &[u8])> {
330 if self.valid() {
331 Some((self.key_impl(), self.value_impl()))
332 } else {
333 None
334 }
335 }
336
337 /// Returns a slice of the current key; assumes the iterator is valid.
338 fn key_impl(&self) -> &[u8] {
339 // Safety Note: This is safe as all methods that may invalidate the buffer returned
340 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
341 unsafe {
342 let mut key_len: size_t = 0;
343 let key_len_ptr: *mut size_t = &mut key_len;
344 let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
345 slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
346 }
347 }
348
349 /// Returns a slice of the current value; assumes the iterator is valid.
350 fn value_impl(&self) -> &[u8] {
351 // Safety Note: This is safe as all methods that may invalidate the buffer returned
352 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
353 unsafe {
354 let mut val_len: size_t = 0;
355 let val_len_ptr: *mut size_t = &mut val_len;
356 let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
357 slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
358 }
359 }
360}
361
362impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
363 fn drop(&mut self) {
364 unsafe {
365 ffi::rocksdb_iter_destroy(self.inner.as_ptr());
366 }
367 }
368}
369
370unsafe impl<'a, D: DBAccess> Send for DBRawIteratorWithThreadMode<'a, D> {}
371unsafe impl<'a, D: DBAccess> Sync for DBRawIteratorWithThreadMode<'a, D> {}
372
373/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
374pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
375
376/// An iterator over a database or column family, with specifiable
377/// ranges and direction.
378///
379/// ```
380/// use rocksdb::{DB, Direction, IteratorMode, Options};
381///
382/// let path = "_path_for_rocksdb_storage2";
383/// {
384/// let db = DB::open_default(path).unwrap();
385/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
386/// for item in iter {
387/// let (key, value) = item.unwrap();
388/// println!("Saw {:?} {:?}", key, value);
389/// }
390/// iter = db.iterator(IteratorMode::End); // Always iterates backward
391/// for item in iter {
392/// let (key, value) = item.unwrap();
393/// println!("Saw {:?} {:?}", key, value);
394/// }
395/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
396/// for item in iter {
397/// let (key, value) = item.unwrap();
398/// println!("Saw {:?} {:?}", key, value);
399/// }
400///
401/// // You can seek with an existing Iterator instance, too
402/// iter = db.iterator(IteratorMode::Start);
403/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
404/// for item in iter {
405/// let (key, value) = item.unwrap();
406/// println!("Saw {:?} {:?}", key, value);
407/// }
408/// }
409/// let _ = DB::destroy(&Options::default(), path);
410/// ```
411pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
412 raw: DBRawIteratorWithThreadMode<'a, D>,
413 direction: Direction,
414 done: bool,
415}
416
417#[derive(Copy, Clone)]
418pub enum Direction {
419 Forward,
420 Reverse,
421}
422
423pub type KVBytes = (Box<[u8]>, Box<[u8]>);
424
425#[derive(Copy, Clone)]
426pub enum IteratorMode<'a> {
427 Start,
428 End,
429 From(&'a [u8], Direction),
430}
431
432impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
433 pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
434 Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
435 }
436
437 pub(crate) fn new_cf(
438 db: &'a D,
439 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
440 readopts: ReadOptions,
441 mode: IteratorMode,
442 ) -> Self {
443 Self::from_raw(
444 DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
445 mode,
446 )
447 }
448
449 fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
450 let mut rv = DBIteratorWithThreadMode {
451 raw,
452 direction: Direction::Forward, // blown away by set_mode()
453 done: false,
454 };
455 rv.set_mode(mode);
456 rv
457 }
458
459 pub fn set_mode(&mut self, mode: IteratorMode) {
460 self.done = false;
461 self.direction = match mode {
462 IteratorMode::Start => {
463 self.raw.seek_to_first();
464 Direction::Forward
465 }
466 IteratorMode::End => {
467 self.raw.seek_to_last();
468 Direction::Reverse
469 }
470 IteratorMode::From(key, Direction::Forward) => {
471 self.raw.seek(key);
472 Direction::Forward
473 }
474 IteratorMode::From(key, Direction::Reverse) => {
475 self.raw.seek_for_prev(key);
476 Direction::Reverse
477 }
478 };
479 }
480}
481
482impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> {
483 type Item = Result<KVBytes, Error>;
484
485 fn next(&mut self) -> Option<Result<KVBytes, Error>> {
486 if self.done {
487 None
488 } else if let Some((key, value)) = self.raw.item() {
489 let item = (Box::from(key), Box::from(value));
490 match self.direction {
491 Direction::Forward => self.raw.next(),
492 Direction::Reverse => self.raw.prev(),
493 }
494 Some(Ok(item))
495 } else {
496 self.done = true;
497 self.raw.status().err().map(Result::Err)
498 }
499 }
500}
501
502impl<'a, D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'a, D> {}
503
504impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
505 fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
506 self.raw
507 }
508}
509
510/// Iterates the batches of writes since a given sequence number.
511///
512/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
513/// batches of write operations that have occurred since a given sequence number
514/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
515/// the application.
516///
517/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
518/// value is the sequence number of the associated write batch.
519///
520pub struct DBWALIterator {
521 pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
522 pub(crate) start_seq_number: u64,
523}
524
525impl DBWALIterator {
526 /// Returns `true` if the iterator is valid. An iterator is invalidated when
527 /// it reaches the end of its defined range, or when it encounters an error.
528 ///
529 /// To check whether the iterator encountered an error after `valid` has
530 /// returned `false`, use the [`status`](DBWALIterator::status) method.
531 /// `status` will never return an error when `valid` is `true`.
532 pub fn valid(&self) -> bool {
533 unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
534 }
535
536 /// Returns an error `Result` if the iterator has encountered an error
537 /// during operation. When an error is encountered, the iterator is
538 /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
539 /// called.
540 pub fn status(&self) -> Result<(), Error> {
541 unsafe {
542 ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
543 }
544 Ok(())
545 }
546}
547
548impl Iterator for DBWALIterator {
549 type Item = Result<(u64, WriteBatch), Error>;
550
551 fn next(&mut self) -> Option<Self::Item> {
552 if !self.valid() {
553 return None;
554 }
555
556 let mut seq: u64 = 0;
557 let mut batch = WriteBatch {
558 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
559 };
560
561 // if the initial sequence number is what was requested we skip it to
562 // only provide changes *after* it
563 if seq == self.start_seq_number {
564 unsafe {
565 ffi::rocksdb_wal_iter_next(self.inner);
566 }
567
568 if !self.valid() {
569 return None;
570 }
571
572 // this drops which in turn frees the skipped batch
573 batch = WriteBatch {
574 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
575 };
576 }
577
578 if !self.valid() {
579 return self.status().err().map(Result::Err);
580 }
581
582 // Seek to the next write batch.
583 // Note that WriteBatches live independently of the WAL iterator so this is safe to do
584 unsafe {
585 ffi::rocksdb_wal_iter_next(self.inner);
586 }
587
588 Some(Ok((seq, batch)))
589 }
590}
591
592impl Drop for DBWALIterator {
593 fn drop(&mut self) {
594 unsafe {
595 ffi::rocksdb_wal_iter_destroy(self.inner);
596 }
597 }
598}