rusty_leveldb/
db_iter.rs

1use crate::cmp::Cmp;
2use crate::key_types::{parse_internal_key, truncate_to_userkey, LookupKey, ValueType};
3use crate::merging_iter::MergingIter;
4use crate::snapshot::Snapshot;
5use crate::types::{Direction, LdbIterator, Shared};
6use crate::version_set::VersionSet;
7
8use bytes::Bytes;
9use std::cmp::Ordering;
10use std::mem;
11use std::rc::Rc;
12
13const READ_BYTES_PERIOD: isize = 1048576;
14
15/// DBIterator is an iterator over the contents of a database.
16pub struct DBIterator {
17    // A user comparator.
18    cmp: Rc<Box<dyn Cmp>>,
19    vset: Shared<VersionSet>,
20    iter: MergingIter,
21    // By holding onto a snapshot, we make sure that the iterator iterates over the state at the
22    // point of its creation.
23    ss: Snapshot,
24    dir: Direction,
25    byte_count: isize,
26
27    valid: bool,
28    // temporarily stored user key.
29    savedkey: Vec<u8>,
30    // buffer for reading internal keys
31    keybuf: Vec<u8>,
32    savedval: Vec<u8>,
33    valbuf: Vec<u8>,
34}
35
36impl DBIterator {
37    pub fn new(
38        cmp: Rc<Box<dyn Cmp>>,
39        vset: Shared<VersionSet>,
40        iter: MergingIter,
41        ss: Snapshot,
42    ) -> DBIterator {
43        DBIterator {
44            cmp,
45            vset,
46            iter,
47            ss,
48            dir: Direction::Forward,
49            byte_count: random_period(),
50
51            valid: false,
52            savedkey: vec![],
53            keybuf: vec![],
54            savedval: vec![],
55            valbuf: vec![],
56        }
57    }
58
59    /// record_read_sample records a read sample using the current contents of self.keybuf, which
60    /// should be an InternalKey.
61    fn record_read_sample(&mut self, len: usize) {
62        self.byte_count -= len as isize;
63        if self.byte_count < 0 {
64            let v = self.vset.borrow().current();
65            v.borrow_mut().record_read_sample(&self.keybuf);
66            while self.byte_count < 0 {
67                self.byte_count += random_period();
68            }
69        }
70    }
71
72    /// find_next_user_entry skips to the next user entry after the one saved in self.savedkey.
73    fn find_next_user_entry(&mut self, mut skipping: bool) -> bool {
74        assert!(self.iter.valid());
75        assert!(self.dir == Direction::Forward);
76
77        while self.iter.valid() {
78            if let Some((key_bytes, val_bytes)) = self.iter.current() {
79                self.keybuf = key_bytes.to_vec();
80                self.savedval = val_bytes.to_vec();
81                let len = self.keybuf.len() + self.savedval.len();
82                self.record_read_sample(len);
83                let (typ, seq, ukey) = parse_internal_key(&self.keybuf);
84
85                // Skip keys with a sequence number after our snapshot.
86                if seq <= self.ss.sequence() {
87                    if typ == ValueType::TypeDeletion {
88                        // Mark current (deleted) key to be skipped.
89                        self.savedkey.clear();
90                        self.savedkey.extend_from_slice(ukey);
91                        skipping = true;
92                    } else if typ == ValueType::TypeValue {
93                        if skipping && self.cmp.cmp(ukey, &self.savedkey) <= Ordering::Equal {
94                            // Entry hidden, because it's smaller than the key to be skipped.
95                        } else {
96                            self.valid = true;
97                            self.savedkey.clear();
98                            return true;
99                        }
100                    }
101                }
102            }
103            self.iter.advance();
104        }
105        self.savedkey.clear();
106        self.valid = false;
107        false
108    }
109
110    /// find_prev_user_entry, on a backwards-moving iterator, stores the newest non-deleted version
111    /// of the entry with the key == self.savedkey that is in the current snapshot, into
112    /// savedkey/savedval.
113    fn find_prev_user_entry(&mut self) -> bool {
114        assert!(self.dir == Direction::Reverse);
115        let mut value_type = ValueType::TypeDeletion;
116
117        // The iterator should be already set to the previous entry if this is a direction change
118        // (i.e. first prev() call after advance()). savedkey is set to the key of that entry.
119        //
120        // We read the current entry, ignore it for comparison (because the initial value_type is
121        // Deletion), assign it to savedkey and savedval and go back another step (at the end of
122        // the loop).
123        //
124        // We repeat this until we hit the first entry with a different user key (possibly going
125        // through newer versions of the same key, because the newest entry is first in order),
126        // then break. The key and value of the latest entry for the desired key have been stored
127        // in the previous iteration to savedkey and savedval.
128        while self.iter.valid() {
129            if let Some((key_bytes, val_bytes)) = self.iter.current() {
130                self.keybuf = key_bytes.to_vec();
131                self.valbuf = val_bytes.to_vec();
132                let len = self.keybuf.len() + self.valbuf.len();
133                self.record_read_sample(len);
134                let (typ, seq, ukey) = parse_internal_key(&self.keybuf);
135
136                if seq > 0 && seq <= self.ss.sequence() {
137                    if value_type != ValueType::TypeDeletion
138                        && self.cmp.cmp(ukey, &self.savedkey) == Ordering::Less
139                    {
140                        // We found a non-deleted entry for a previous key (in the previous iteration)
141                        break;
142                    }
143                    value_type = typ;
144                    if value_type == ValueType::TypeDeletion {
145                        self.savedkey.clear();
146                        self.savedval.clear();
147                    } else {
148                        self.savedkey.clear();
149                        self.savedkey.extend_from_slice(ukey);
150
151                        mem::swap(&mut self.savedval, &mut self.valbuf);
152                    }
153                }
154            }
155            self.iter.prev();
156        }
157
158        if value_type == ValueType::TypeDeletion {
159            self.valid = false;
160            self.savedkey.clear();
161            self.savedval.clear();
162            self.dir = Direction::Forward;
163        } else {
164            self.valid = true;
165        }
166        true
167    }
168}
169
170impl LdbIterator for DBIterator {
171    fn advance(&mut self) -> bool {
172        if !self.valid() {
173            self.seek_to_first();
174            return self.valid();
175        }
176
177        if self.dir == Direction::Reverse {
178            self.dir = Direction::Forward;
179            if !self.iter.valid() {
180                self.iter.seek_to_first();
181            } else {
182                self.iter.advance();
183            }
184            if !self.iter.valid() {
185                self.valid = false;
186                self.savedkey.clear();
187                return false;
188            }
189        } else {
190            // Save current user key.
191            if let Some((key_bytes, val_bytes)) = self.iter.current() {
192                self.savedkey = key_bytes.to_vec();
193                self.savedval = val_bytes.to_vec();
194                truncate_to_userkey(&mut self.savedkey);
195            } else {
196                panic!("Iterator should be valid here");
197            }
198        }
199        self.find_next_user_entry(
200            // skipping=
201            true,
202        )
203    }
204    fn current(&self) -> Option<(Bytes, Bytes)> {
205        if !self.valid() {
206            return None;
207        }
208        // If direction is forward, savedkey and savedval are not used.
209        if self.dir == Direction::Forward {
210            if let Some((key_bytes, val_bytes)) = self.iter.current() {
211                let mut key = key_bytes.to_vec();
212                truncate_to_userkey(&mut key);
213                Some((Bytes::from(key), val_bytes))
214            } else {
215                None
216            }
217        } else {
218            Some((
219                Bytes::copy_from_slice(&self.savedkey),
220                Bytes::copy_from_slice(&self.savedval),
221            ))
222        }
223    }
224    fn prev(&mut self) -> bool {
225        if !self.valid() {
226            return false;
227        }
228
229        if self.dir == Direction::Forward {
230            // scan backwards until we hit a different key; then use the normal scanning procedure:
231            // find_prev_user_entry() wants savedkey to be the key of the entry that is supposed to
232            // be left in savedkey/savedval, which is why we have to go to the previous entry before
233            // calling it.
234            if let Some((key_bytes, val_bytes)) = self.iter.current() {
235                self.savedkey = key_bytes.to_vec();
236                self.savedval = val_bytes.to_vec();
237                truncate_to_userkey(&mut self.savedkey);
238            }
239            loop {
240                self.iter.prev();
241                if !self.iter.valid() {
242                    self.valid = false;
243                    self.savedkey.clear();
244                    self.savedval.clear();
245                    return false;
246                }
247                // Scan until we hit the next-smaller key.
248                if let Some((key_bytes, _val_bytes)) = self.iter.current() {
249                    self.keybuf = key_bytes.to_vec();
250                    truncate_to_userkey(&mut self.keybuf);
251                }
252                if self.cmp.cmp(&self.keybuf, &self.savedkey) == Ordering::Less {
253                    break;
254                }
255            }
256            self.dir = Direction::Reverse;
257        }
258        self.find_prev_user_entry()
259    }
260    fn valid(&self) -> bool {
261        self.valid
262    }
263    fn seek(&mut self, to: &[u8]) {
264        self.dir = Direction::Forward;
265        self.savedkey.clear();
266        self.savedval.clear();
267        self.savedkey
268            .extend_from_slice(LookupKey::new(to, self.ss.sequence()).internal_key());
269        self.iter.seek(&self.savedkey);
270        if self.iter.valid() {
271            self.find_next_user_entry(
272                // skipping=
273                false,
274            );
275        } else {
276            self.valid = false;
277        }
278    }
279    fn seek_to_first(&mut self) {
280        self.dir = Direction::Forward;
281        self.savedval.clear();
282        self.iter.seek_to_first();
283        if self.iter.valid() {
284            self.find_next_user_entry(
285                // skipping=
286                false,
287            );
288        } else {
289            self.valid = false;
290        }
291    }
292    fn reset(&mut self) {
293        self.iter.reset();
294        self.valid = false;
295        self.savedkey.clear();
296        self.savedval.clear();
297        self.keybuf.clear();
298    }
299}
300
301fn random_period() -> isize {
302    rand::random::<isize>() % (2 * READ_BYTES_PERIOD)
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::db_impl::testutil::*;
309    use crate::db_impl::DB;
310    use crate::options;
311    use crate::test_util::LdbIteratorIter;
312    use crate::types::{current_key_val, Direction};
313
314    use std::collections::HashMap;
315    use std::collections::HashSet;
316    use std::iter::FromIterator;
317
318    #[test]
319    fn db_iter_basic_test() {
320        let mut db = build_db().0;
321        let mut iter = db.new_iter().unwrap();
322
323        // keys and values come from make_version(); they are each the latest entry.
324        let keys: &[&[u8]] = &[
325            b"aaa", b"aab", b"aax", b"aba", b"bab", b"bba", b"cab", b"cba",
326        ];
327        let vals: &[&[u8]] = &[
328            b"val1", b"val2", b"val2", b"val3", b"val4", b"val5", b"val2", b"val3",
329        ];
330
331        let mut found = vec![];
332        for (k, v) in keys.iter().zip(vals.iter()) {
333            assert!(iter.advance());
334            assert_eq!((k.to_vec(), v.to_vec()), current_key_val(&iter).unwrap());
335            let entry = db.get(k).expect("key returned by iterator is in database");
336            assert_eq!(v.to_vec(), entry);
337            found.push((k.to_vec(), v.to_vec()));
338        }
339
340        assert_eq!(found.len(), keys.len());
341    }
342
343    #[test]
344    fn db_iter_reset() {
345        let mut db = build_db().0;
346        let mut iter = db.new_iter().unwrap();
347
348        assert!(iter.advance());
349        assert!(iter.valid());
350        iter.reset();
351        assert!(!iter.valid());
352        assert!(iter.advance());
353        assert!(iter.valid());
354    }
355
356    #[test]
357    fn db_iter_test_fwd_backwd() {
358        let mut db = build_db().0;
359        let mut iter = db.new_iter().unwrap();
360
361        // keys and values come from make_version(); they are each the latest entry.
362        let keys: &[&[u8]] = &[
363            b"aaa", b"aab", b"aax", b"aba", b"bab", b"bba", b"cab", b"cba",
364        ];
365        let vals: &[&[u8]] = &[
366            b"val1", b"val2", b"val2", b"val3", b"val4", b"val5", b"val2", b"val3",
367        ];
368
369        // This specifies the direction that the iterator should move to. Based on this, an index
370        // into keys/vals is incremented/decremented so that we get a nice test checking iterator
371        // move correctness.
372        let dirs: &[Direction] = &[
373            Direction::Forward,
374            Direction::Forward,
375            Direction::Forward,
376            Direction::Reverse,
377            Direction::Reverse,
378            Direction::Reverse,
379            Direction::Forward,
380            Direction::Forward,
381            Direction::Reverse,
382            Direction::Forward,
383            Direction::Forward,
384            Direction::Forward,
385            Direction::Forward,
386        ];
387        let mut i = 0;
388        iter.advance();
389        for d in dirs {
390            assert_eq!(
391                (keys[i].to_vec(), vals[i].to_vec()),
392                current_key_val(&iter).unwrap()
393            );
394            match *d {
395                Direction::Forward => {
396                    assert!(iter.advance());
397                    i += 1;
398                }
399                Direction::Reverse => {
400                    assert!(iter.prev());
401                    i -= 1;
402                }
403            }
404        }
405    }
406
407    #[test]
408    fn db_iter_test_seek() {
409        let mut db = build_db().0;
410        let mut iter = db.new_iter().unwrap();
411
412        // gca is the deleted entry.
413        let keys: &[&[u8]] = &[b"aab", b"aaa", b"cab", b"eaa", b"aaa", b"iba", b"fba"];
414        let vals: &[&[u8]] = &[
415            b"val2", b"val1", b"val2", b"val1", b"val1", b"val2", b"val3",
416        ];
417
418        for (k, v) in keys.iter().zip(vals.iter()) {
419            eprintln!("{:?}", String::from_utf8(k.to_vec()).unwrap());
420            iter.seek(k);
421            assert_eq!((k.to_vec(), v.to_vec()), current_key_val(&iter).unwrap());
422        }
423
424        // seek past last.
425        iter.seek(b"xxx");
426        assert!(!iter.valid());
427        iter.seek(b"aab");
428        assert!(iter.valid());
429
430        // Seek skips over deleted entry.
431        iter.seek(b"gca");
432        assert!(iter.valid());
433        assert_eq!(
434            (b"gda".to_vec(), b"val5".to_vec()),
435            current_key_val(&iter).unwrap()
436        );
437    }
438
439    #[test]
440    fn db_iter_deleted_entry_not_returned() {
441        let mut db = build_db().0;
442        let mut iter = db.new_iter().unwrap();
443        let must_not_appear = b"gca";
444
445        for (k, _) in LdbIteratorIter::wrap(&mut iter) {
446            assert!(k.as_slice() != must_not_appear);
447        }
448    }
449
450    #[test]
451    fn db_iter_deleted_entry_not_returned_memtable() {
452        let mut db = build_db().0;
453
454        db.put(b"xyz", b"123").unwrap();
455        db.delete(b"xyz").unwrap();
456
457        let mut iter = db.new_iter().unwrap();
458        let must_not_appear = b"xyz";
459
460        for (k, _) in LdbIteratorIter::wrap(&mut iter) {
461            assert!(k.as_slice() != must_not_appear);
462        }
463    }
464
465    #[test]
466    fn db_iter_repeated_open_close() {
467        let opt;
468        {
469            let (mut db, opt_) = build_db();
470            opt = opt_;
471
472            db.put(b"xx1", b"111").unwrap();
473            db.put(b"xx2", b"112").unwrap();
474            db.put(b"xx3", b"113").unwrap();
475            db.put(b"xx4", b"114").unwrap();
476            db.delete(b"xx2").unwrap();
477        }
478
479        {
480            let mut db = DB::open("db", opt.clone()).unwrap();
481            db.put(b"xx4", b"222").unwrap();
482        }
483
484        {
485            let mut db = DB::open("db", opt).unwrap();
486
487            let ss = db.get_snapshot();
488            // xx5 should not be visible.
489            db.put(b"xx5", b"223").unwrap();
490
491            let expected: HashMap<Vec<u8>, Vec<u8>> = HashMap::from_iter(vec![
492                (b"xx1".to_vec(), b"111".to_vec()),
493                (b"xx4".to_vec(), b"222".to_vec()),
494                (b"aaa".to_vec(), b"val1".to_vec()),
495                (b"cab".to_vec(), b"val2".to_vec()),
496            ]);
497            let non_existing: HashSet<Vec<u8>> =
498                HashSet::from_iter(vec![b"gca".to_vec(), b"xx2".to_vec(), b"xx5".to_vec()]);
499
500            let mut iter = db.new_iter_at(ss.clone()).unwrap();
501            for (k, v) in LdbIteratorIter::wrap(&mut iter) {
502                if let Some(ev) = expected.get(&k) {
503                    assert_eq!(ev, &v);
504                }
505                assert!(!non_existing.contains(&k));
506            }
507        }
508    }
509
510    #[test]
511    fn db_iter_allow_empty_key() {
512        let opt = options::for_test();
513        let mut db = DB::open("db", opt).unwrap();
514        assert!(db.new_iter().unwrap().next().is_none());
515        db.put(&[], &[]).unwrap();
516        assert!(db.new_iter().unwrap().next().is_some());
517    }
518}