1use crate::cmp::Cmp;
2use crate::key_types::{parse_internal_key, truncate_to_userkey, LookupKey, ValueType};
3use crate::merging_iter::MergingIter;
4use crate::snapshot::Snapshot;
5use crate::types::{Direction, LdbIterator, Shared};
6use crate::version_set::VersionSet;
7
8use bytes::Bytes;
9use std::cmp::Ordering;
10use std::mem;
11use std::rc::Rc;
12
13const READ_BYTES_PERIOD: isize = 1048576;
14
15pub struct DBIterator {
17 cmp: Rc<Box<dyn Cmp>>,
19 vset: Shared<VersionSet>,
20 iter: MergingIter,
21 ss: Snapshot,
24 dir: Direction,
25 byte_count: isize,
26
27 valid: bool,
28 savedkey: Vec<u8>,
30 keybuf: Vec<u8>,
32 savedval: Vec<u8>,
33 valbuf: Vec<u8>,
34}
35
36impl DBIterator {
37 pub fn new(
38 cmp: Rc<Box<dyn Cmp>>,
39 vset: Shared<VersionSet>,
40 iter: MergingIter,
41 ss: Snapshot,
42 ) -> DBIterator {
43 DBIterator {
44 cmp,
45 vset,
46 iter,
47 ss,
48 dir: Direction::Forward,
49 byte_count: random_period(),
50
51 valid: false,
52 savedkey: vec![],
53 keybuf: vec![],
54 savedval: vec![],
55 valbuf: vec![],
56 }
57 }
58
59 fn record_read_sample(&mut self, len: usize) {
62 self.byte_count -= len as isize;
63 if self.byte_count < 0 {
64 let v = self.vset.borrow().current();
65 v.borrow_mut().record_read_sample(&self.keybuf);
66 while self.byte_count < 0 {
67 self.byte_count += random_period();
68 }
69 }
70 }
71
72 fn find_next_user_entry(&mut self, mut skipping: bool) -> bool {
74 assert!(self.iter.valid());
75 assert!(self.dir == Direction::Forward);
76
77 while self.iter.valid() {
78 if let Some((key_bytes, val_bytes)) = self.iter.current() {
79 self.keybuf = key_bytes.to_vec();
80 self.savedval = val_bytes.to_vec();
81 let len = self.keybuf.len() + self.savedval.len();
82 self.record_read_sample(len);
83 let (typ, seq, ukey) = parse_internal_key(&self.keybuf);
84
85 if seq <= self.ss.sequence() {
87 if typ == ValueType::TypeDeletion {
88 self.savedkey.clear();
90 self.savedkey.extend_from_slice(ukey);
91 skipping = true;
92 } else if typ == ValueType::TypeValue {
93 if skipping && self.cmp.cmp(ukey, &self.savedkey) <= Ordering::Equal {
94 } else {
96 self.valid = true;
97 self.savedkey.clear();
98 return true;
99 }
100 }
101 }
102 }
103 self.iter.advance();
104 }
105 self.savedkey.clear();
106 self.valid = false;
107 false
108 }
109
110 fn find_prev_user_entry(&mut self) -> bool {
114 assert!(self.dir == Direction::Reverse);
115 let mut value_type = ValueType::TypeDeletion;
116
117 while self.iter.valid() {
129 if let Some((key_bytes, val_bytes)) = self.iter.current() {
130 self.keybuf = key_bytes.to_vec();
131 self.valbuf = val_bytes.to_vec();
132 let len = self.keybuf.len() + self.valbuf.len();
133 self.record_read_sample(len);
134 let (typ, seq, ukey) = parse_internal_key(&self.keybuf);
135
136 if seq > 0 && seq <= self.ss.sequence() {
137 if value_type != ValueType::TypeDeletion
138 && self.cmp.cmp(ukey, &self.savedkey) == Ordering::Less
139 {
140 break;
142 }
143 value_type = typ;
144 if value_type == ValueType::TypeDeletion {
145 self.savedkey.clear();
146 self.savedval.clear();
147 } else {
148 self.savedkey.clear();
149 self.savedkey.extend_from_slice(ukey);
150
151 mem::swap(&mut self.savedval, &mut self.valbuf);
152 }
153 }
154 }
155 self.iter.prev();
156 }
157
158 if value_type == ValueType::TypeDeletion {
159 self.valid = false;
160 self.savedkey.clear();
161 self.savedval.clear();
162 self.dir = Direction::Forward;
163 } else {
164 self.valid = true;
165 }
166 true
167 }
168}
169
170impl LdbIterator for DBIterator {
171 fn advance(&mut self) -> bool {
172 if !self.valid() {
173 self.seek_to_first();
174 return self.valid();
175 }
176
177 if self.dir == Direction::Reverse {
178 self.dir = Direction::Forward;
179 if !self.iter.valid() {
180 self.iter.seek_to_first();
181 } else {
182 self.iter.advance();
183 }
184 if !self.iter.valid() {
185 self.valid = false;
186 self.savedkey.clear();
187 return false;
188 }
189 } else {
190 if let Some((key_bytes, val_bytes)) = self.iter.current() {
192 self.savedkey = key_bytes.to_vec();
193 self.savedval = val_bytes.to_vec();
194 truncate_to_userkey(&mut self.savedkey);
195 } else {
196 panic!("Iterator should be valid here");
197 }
198 }
199 self.find_next_user_entry(
200 true,
202 )
203 }
204 fn current(&self) -> Option<(Bytes, Bytes)> {
205 if !self.valid() {
206 return None;
207 }
208 if self.dir == Direction::Forward {
210 if let Some((key_bytes, val_bytes)) = self.iter.current() {
211 let mut key = key_bytes.to_vec();
212 truncate_to_userkey(&mut key);
213 Some((Bytes::from(key), val_bytes))
214 } else {
215 None
216 }
217 } else {
218 Some((
219 Bytes::copy_from_slice(&self.savedkey),
220 Bytes::copy_from_slice(&self.savedval),
221 ))
222 }
223 }
224 fn prev(&mut self) -> bool {
225 if !self.valid() {
226 return false;
227 }
228
229 if self.dir == Direction::Forward {
230 if let Some((key_bytes, val_bytes)) = self.iter.current() {
235 self.savedkey = key_bytes.to_vec();
236 self.savedval = val_bytes.to_vec();
237 truncate_to_userkey(&mut self.savedkey);
238 }
239 loop {
240 self.iter.prev();
241 if !self.iter.valid() {
242 self.valid = false;
243 self.savedkey.clear();
244 self.savedval.clear();
245 return false;
246 }
247 if let Some((key_bytes, _val_bytes)) = self.iter.current() {
249 self.keybuf = key_bytes.to_vec();
250 truncate_to_userkey(&mut self.keybuf);
251 }
252 if self.cmp.cmp(&self.keybuf, &self.savedkey) == Ordering::Less {
253 break;
254 }
255 }
256 self.dir = Direction::Reverse;
257 }
258 self.find_prev_user_entry()
259 }
260 fn valid(&self) -> bool {
261 self.valid
262 }
263 fn seek(&mut self, to: &[u8]) {
264 self.dir = Direction::Forward;
265 self.savedkey.clear();
266 self.savedval.clear();
267 self.savedkey
268 .extend_from_slice(LookupKey::new(to, self.ss.sequence()).internal_key());
269 self.iter.seek(&self.savedkey);
270 if self.iter.valid() {
271 self.find_next_user_entry(
272 false,
274 );
275 } else {
276 self.valid = false;
277 }
278 }
279 fn seek_to_first(&mut self) {
280 self.dir = Direction::Forward;
281 self.savedval.clear();
282 self.iter.seek_to_first();
283 if self.iter.valid() {
284 self.find_next_user_entry(
285 false,
287 );
288 } else {
289 self.valid = false;
290 }
291 }
292 fn reset(&mut self) {
293 self.iter.reset();
294 self.valid = false;
295 self.savedkey.clear();
296 self.savedval.clear();
297 self.keybuf.clear();
298 }
299}
300
301fn random_period() -> isize {
302 rand::random::<isize>() % (2 * READ_BYTES_PERIOD)
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::db_impl::testutil::*;
309 use crate::db_impl::DB;
310 use crate::options;
311 use crate::test_util::LdbIteratorIter;
312 use crate::types::{current_key_val, Direction};
313
314 use std::collections::HashMap;
315 use std::collections::HashSet;
316 use std::iter::FromIterator;
317
318 #[test]
319 fn db_iter_basic_test() {
320 let mut db = build_db().0;
321 let mut iter = db.new_iter().unwrap();
322
323 let keys: &[&[u8]] = &[
325 b"aaa", b"aab", b"aax", b"aba", b"bab", b"bba", b"cab", b"cba",
326 ];
327 let vals: &[&[u8]] = &[
328 b"val1", b"val2", b"val2", b"val3", b"val4", b"val5", b"val2", b"val3",
329 ];
330
331 let mut found = vec![];
332 for (k, v) in keys.iter().zip(vals.iter()) {
333 assert!(iter.advance());
334 assert_eq!((k.to_vec(), v.to_vec()), current_key_val(&iter).unwrap());
335 let entry = db.get(k).expect("key returned by iterator is in database");
336 assert_eq!(v.to_vec(), entry);
337 found.push((k.to_vec(), v.to_vec()));
338 }
339
340 assert_eq!(found.len(), keys.len());
341 }
342
343 #[test]
344 fn db_iter_reset() {
345 let mut db = build_db().0;
346 let mut iter = db.new_iter().unwrap();
347
348 assert!(iter.advance());
349 assert!(iter.valid());
350 iter.reset();
351 assert!(!iter.valid());
352 assert!(iter.advance());
353 assert!(iter.valid());
354 }
355
356 #[test]
357 fn db_iter_test_fwd_backwd() {
358 let mut db = build_db().0;
359 let mut iter = db.new_iter().unwrap();
360
361 let keys: &[&[u8]] = &[
363 b"aaa", b"aab", b"aax", b"aba", b"bab", b"bba", b"cab", b"cba",
364 ];
365 let vals: &[&[u8]] = &[
366 b"val1", b"val2", b"val2", b"val3", b"val4", b"val5", b"val2", b"val3",
367 ];
368
369 let dirs: &[Direction] = &[
373 Direction::Forward,
374 Direction::Forward,
375 Direction::Forward,
376 Direction::Reverse,
377 Direction::Reverse,
378 Direction::Reverse,
379 Direction::Forward,
380 Direction::Forward,
381 Direction::Reverse,
382 Direction::Forward,
383 Direction::Forward,
384 Direction::Forward,
385 Direction::Forward,
386 ];
387 let mut i = 0;
388 iter.advance();
389 for d in dirs {
390 assert_eq!(
391 (keys[i].to_vec(), vals[i].to_vec()),
392 current_key_val(&iter).unwrap()
393 );
394 match *d {
395 Direction::Forward => {
396 assert!(iter.advance());
397 i += 1;
398 }
399 Direction::Reverse => {
400 assert!(iter.prev());
401 i -= 1;
402 }
403 }
404 }
405 }
406
407 #[test]
408 fn db_iter_test_seek() {
409 let mut db = build_db().0;
410 let mut iter = db.new_iter().unwrap();
411
412 let keys: &[&[u8]] = &[b"aab", b"aaa", b"cab", b"eaa", b"aaa", b"iba", b"fba"];
414 let vals: &[&[u8]] = &[
415 b"val2", b"val1", b"val2", b"val1", b"val1", b"val2", b"val3",
416 ];
417
418 for (k, v) in keys.iter().zip(vals.iter()) {
419 eprintln!("{:?}", String::from_utf8(k.to_vec()).unwrap());
420 iter.seek(k);
421 assert_eq!((k.to_vec(), v.to_vec()), current_key_val(&iter).unwrap());
422 }
423
424 iter.seek(b"xxx");
426 assert!(!iter.valid());
427 iter.seek(b"aab");
428 assert!(iter.valid());
429
430 iter.seek(b"gca");
432 assert!(iter.valid());
433 assert_eq!(
434 (b"gda".to_vec(), b"val5".to_vec()),
435 current_key_val(&iter).unwrap()
436 );
437 }
438
439 #[test]
440 fn db_iter_deleted_entry_not_returned() {
441 let mut db = build_db().0;
442 let mut iter = db.new_iter().unwrap();
443 let must_not_appear = b"gca";
444
445 for (k, _) in LdbIteratorIter::wrap(&mut iter) {
446 assert!(k.as_slice() != must_not_appear);
447 }
448 }
449
450 #[test]
451 fn db_iter_deleted_entry_not_returned_memtable() {
452 let mut db = build_db().0;
453
454 db.put(b"xyz", b"123").unwrap();
455 db.delete(b"xyz").unwrap();
456
457 let mut iter = db.new_iter().unwrap();
458 let must_not_appear = b"xyz";
459
460 for (k, _) in LdbIteratorIter::wrap(&mut iter) {
461 assert!(k.as_slice() != must_not_appear);
462 }
463 }
464
465 #[test]
466 fn db_iter_repeated_open_close() {
467 let opt;
468 {
469 let (mut db, opt_) = build_db();
470 opt = opt_;
471
472 db.put(b"xx1", b"111").unwrap();
473 db.put(b"xx2", b"112").unwrap();
474 db.put(b"xx3", b"113").unwrap();
475 db.put(b"xx4", b"114").unwrap();
476 db.delete(b"xx2").unwrap();
477 }
478
479 {
480 let mut db = DB::open("db", opt.clone()).unwrap();
481 db.put(b"xx4", b"222").unwrap();
482 }
483
484 {
485 let mut db = DB::open("db", opt).unwrap();
486
487 let ss = db.get_snapshot();
488 db.put(b"xx5", b"223").unwrap();
490
491 let expected: HashMap<Vec<u8>, Vec<u8>> = HashMap::from_iter(vec![
492 (b"xx1".to_vec(), b"111".to_vec()),
493 (b"xx4".to_vec(), b"222".to_vec()),
494 (b"aaa".to_vec(), b"val1".to_vec()),
495 (b"cab".to_vec(), b"val2".to_vec()),
496 ]);
497 let non_existing: HashSet<Vec<u8>> =
498 HashSet::from_iter(vec![b"gca".to_vec(), b"xx2".to_vec(), b"xx5".to_vec()]);
499
500 let mut iter = db.new_iter_at(ss.clone()).unwrap();
501 for (k, v) in LdbIteratorIter::wrap(&mut iter) {
502 if let Some(ev) = expected.get(&k) {
503 assert_eq!(ev, &v);
504 }
505 assert!(!non_existing.contains(&k));
506 }
507 }
508 }
509
510 #[test]
511 fn db_iter_allow_empty_key() {
512 let opt = options::for_test();
513 let mut db = DB::open("db", opt).unwrap();
514 assert!(db.new_iter().unwrap().next().is_none());
515 db.put(&[], &[]).unwrap();
516 assert!(db.new_iter().unwrap().next().is_some());
517 }
518}