rusty_leveldb/
table_reader.rs

1use crate::block::{Block, BlockIter};
2use crate::blockhandle::BlockHandle;
3use crate::cache::{self, Cache};
4use crate::cmp::InternalKeyCmp;
5use crate::env::RandomAccess;
6use crate::error::{self, err, Result};
7use crate::filter;
8use crate::filter_block::FilterBlockReader;
9use crate::key_types::InternalKey;
10use crate::options::Options;
11use crate::table_block;
12use crate::table_builder::{self, Footer};
13use crate::types::{current_key_val, LdbIterator, Shared};
14
15use std::cmp::Ordering;
16use std::rc::Rc;
17
18use bytes::Bytes;
19use integer_encoding::FixedIntWriter;
20
21/// Reads the table footer.
22fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
23    let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
24    f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
25    match Footer::decode(&buf) {
26        Some(ok) => Ok(ok),
27        None => err(
28            error::StatusCode::Corruption,
29            &format!("Couldn't decode damaged footer {:?}", &buf),
30        ),
31    }
32}
33
34#[derive(Clone)]
35pub struct Table {
36    file: Rc<Box<dyn RandomAccess>>,
37    file_size: usize,
38    cache_id: cache::CacheID,
39
40    opt: Options,
41    block_cache: Shared<Cache<Block>>,
42
43    footer: Footer,
44    indexblock: Block,
45    filters: Option<FilterBlockReader>,
46}
47
48impl Table {
49    /// Creates a new table reader operating on unformatted keys (i.e., UserKey).
50    fn new_raw(
51        opt: Options,
52        block_cache: Shared<Cache<Block>>,
53        file: Rc<Box<dyn RandomAccess>>,
54        size: usize,
55    ) -> Result<Table> {
56        let footer = read_footer(file.as_ref().as_ref(), size)?;
57        let indexblock =
58            table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index)?;
59        let metaindexblock =
60            table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.meta_index)?;
61
62        let filter_block_reader =
63            Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?;
64        let cache_id = block_cache.borrow_mut().new_cache_id();
65
66        Ok(Table {
67            file,
68            file_size: size,
69            cache_id,
70            opt,
71            block_cache,
72            footer,
73            filters: filter_block_reader,
74            indexblock,
75        })
76    }
77
78    fn read_filter_block(
79        metaix: &Block,
80        file: &dyn RandomAccess,
81        options: &Options,
82    ) -> Result<Option<FilterBlockReader>> {
83        // Open filter block for reading
84        let filter_name = format!("filter.{}", options.filter_policy.name())
85            .as_bytes()
86            .to_vec();
87
88        let mut metaindexiter = metaix.iter();
89        metaindexiter.seek(&filter_name);
90
91        if let Some((_key, val)) = current_key_val(&metaindexiter) {
92            let fbl = BlockHandle::decode(&val);
93            let filter_block_location = match fbl {
94                None => {
95                    return err(
96                        error::StatusCode::Corruption,
97                        &format!("Couldn't decode corrupt blockhandle {:?}", &val),
98                    )
99                }
100                Some(ok) => ok.0,
101            };
102            if filter_block_location.size() > 0 {
103                return Ok(Some(table_block::read_filter_block(
104                    file,
105                    &filter_block_location,
106                    options.filter_policy.clone(),
107                )?));
108            }
109        }
110        Ok(None)
111    }
112
113    /// Creates a new table reader operating on internal keys (i.e., InternalKey). This means that
114    /// a different comparator (internal_key_cmp) and a different filter policy
115    /// (InternalFilterPolicy) are used.
116    pub fn new(
117        mut opt: Options,
118        block_cache: Shared<Cache<Block>>,
119        file: Rc<Box<dyn RandomAccess>>,
120        size: usize,
121    ) -> Result<Table> {
122        opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
123        opt.filter_policy = Rc::new(Box::new(filter::InternalFilterPolicy::new(
124            opt.filter_policy,
125        )));
126        Table::new_raw(opt, block_cache, file, size)
127    }
128
129    /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the
130    /// block cache.
131    fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
132        let mut dst = [0; 2 * 8];
133        (&mut dst[..8])
134            .write_fixedint(self.cache_id)
135            .expect("error writing to vec");
136        (&mut dst[8..])
137            .write_fixedint(block_off as u64)
138            .expect("error writing to vec");
139        dst
140    }
141
142    /// Read a block from the current table at `location`, and cache it in the options' block
143    /// cache.
144    fn read_block(&self, location: &BlockHandle) -> Result<Block> {
145        let cachekey = self.block_cache_handle(location.offset());
146        if let Some(block) = self.block_cache.borrow_mut().get(&cachekey) {
147            return Ok(block.clone());
148        }
149
150        // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>.
151        let b =
152            table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;
153
154        // insert a cheap copy (Rc).
155        self.block_cache.borrow_mut().insert(&cachekey, b.clone());
156
157        Ok(b)
158    }
159
160    /// Returns the offset of the block that contains `key`.
161    pub fn approx_offset_of(&self, key: &[u8]) -> usize {
162        let mut iter = self.indexblock.iter();
163
164        iter.seek(key);
165
166        if let Some((_, val)) = current_key_val(&iter) {
167            let location = BlockHandle::decode(&val).unwrap().0;
168            return location.offset();
169        }
170
171        self.footer.meta_index.offset()
172    }
173
174    /// Iterators read from the file; thus only one iterator can be borrowed (mutably) per scope
175    pub fn iter(&self) -> TableIterator {
176        TableIterator {
177            current_block: None,
178            current_block_off: 0,
179            index_block: self.indexblock.iter(),
180            table: self.clone(),
181        }
182    }
183
184    /// Retrieve next-biggest entry for key from table. This function uses the attached filters, so
185    /// is better suited if you frequently look for non-existing values (as it will detect the
186    /// non-existence of an entry in a block without having to load the block).
187    ///
188    /// The caller must check if the returned key, which is the raw key (not e.g. the user portion
189    /// of an InternalKey) is acceptable (e.g. correct value type or sequence number), as it may
190    /// not be an exact match for key.
191    ///
192    /// This is done this way because some key types, like internal keys, will not result in an
193    /// exact match; it depends on other comparators than the one that the table reader knows
194    /// whether a match is acceptable.
195    pub fn get(&self, key: InternalKey<'_>) -> Result<Option<(Bytes, Bytes)>> {
196        let mut index_iter = self.indexblock.iter();
197        index_iter.seek(key);
198
199        let handle;
200        if let Some((last_in_block, h)) = current_key_val(&index_iter) {
201            if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
202                handle = BlockHandle::decode(&h).unwrap().0;
203            } else {
204                return Ok(None);
205            }
206        } else {
207            return Ok(None);
208        }
209
210        // found correct block.
211
212        // Check bloom (or whatever) filter
213        if let Some(ref filters) = self.filters {
214            if !filters.key_may_match(handle.offset(), key) {
215                return Ok(None);
216            }
217        }
218
219        // Read block (potentially from cache)
220        let tb = self.read_block(&handle)?;
221        let mut iter = tb.iter();
222
223        // Go to entry and check if it's the wanted entry.
224        iter.seek(key);
225        if let Some((k, v)) = current_key_val(&iter) {
226            if self.opt.cmp.cmp(&k, key) >= Ordering::Equal {
227                return Ok(Some((k.into(), v.into())));
228            }
229        }
230        Ok(None)
231    }
232}
233
234/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
235/// into the data blocks.
236pub struct TableIterator {
237    // A TableIterator is independent of its table (on the syntax level -- it doesn't know its
238    // Table's lifetime). This is mainly required by the dynamic iterators used everywhere, where a
239    // lifetime makes things like returning an iterator from a function neigh-impossible.
240    //
241    // Instead, reference-counted pointers and locks inside the Table ensure that all
242    // TableIterators still share a table.
243    table: Table,
244    current_block: Option<BlockIter>,
245    current_block_off: usize,
246    index_block: BlockIter,
247}
248
249impl TableIterator {
250    // Skips to the entry referenced by the next entry in the index block.
251    // This is called once a block has run out of entries.
252    // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
253    // tht there's no more entries.
254    fn skip_to_next_entry(&mut self) -> Result<bool> {
255        if let Some((_key, val)) = self.index_block.next() {
256            self.load_block(&val).map(|_| true)
257        } else {
258            Ok(false)
259        }
260    }
261
262    // Load the block at `handle` into `self.current_block`
263    fn load_block(&mut self, handle: &[u8]) -> Result<()> {
264        let (new_block_handle, _) = match BlockHandle::decode(handle) {
265            None => {
266                return err(
267                    error::StatusCode::Corruption,
268                    "Couldn't decode corrupt block handle",
269                )
270            }
271            Some(ok) => ok,
272        };
273        let block = self.table.read_block(&new_block_handle)?;
274
275        self.current_block = Some(block.iter());
276        self.current_block_off = new_block_handle.offset();
277
278        Ok(())
279    }
280}
281
282impl LdbIterator for TableIterator {
283    fn advance(&mut self) -> bool {
284        // Uninitialized case.
285        if self.current_block.is_none() {
286            match self.skip_to_next_entry() {
287                Ok(true) => return self.advance(),
288                Ok(false) => {
289                    self.reset();
290                    return false;
291                }
292                // try next block from index, this might be corruption
293                Err(_) => return self.advance(),
294            }
295        }
296
297        // Initialized case -- does the current block have more entries?
298        if let Some(ref mut cb) = self.current_block {
299            if cb.advance() {
300                return true;
301            }
302        }
303
304        // If the current block is exhausted, try loading the next block.
305        self.current_block = None;
306        match self.skip_to_next_entry() {
307            Ok(true) => self.advance(),
308            Ok(false) => {
309                self.reset();
310                false
311            }
312            // try next block, this might be corruption
313            Err(_) => self.advance(),
314        }
315    }
316
317    // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
318    // while reading from disk)
319    fn seek(&mut self, to: &[u8]) {
320        // first seek in index block, rewind by one entry (so we get the next smaller index entry),
321        // then set current_block and seek there
322        self.index_block.seek(to);
323
324        // It's possible that this is a seek past-last; reset in that case.
325        if let Some((past_block, handle)) = current_key_val(&self.index_block) {
326            if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
327                // ok, found right block: continue
328                if let Ok(()) = self.load_block(&handle) {
329                    // current_block is always set if load_block() returned Ok.
330                    self.current_block.as_mut().unwrap().seek(to);
331                    return;
332                }
333            }
334        }
335        // Reached in case of failure.
336        self.reset();
337    }
338
339    fn prev(&mut self) -> bool {
340        // happy path: current block contains previous entry
341        if let Some(ref mut cb) = self.current_block {
342            if cb.prev() {
343                return true;
344            }
345        }
346
347        // Go back one block and look for the last entry in the previous block
348        if self.index_block.prev() {
349            if let Some((_, handle)) = current_key_val(&self.index_block) {
350                if self.load_block(&handle).is_ok() {
351                    self.current_block.as_mut().unwrap().seek_to_last();
352                    self.current_block.as_ref().unwrap().valid()
353                } else {
354                    self.reset();
355                    false
356                }
357            } else {
358                false
359            }
360        } else {
361            false
362        }
363    }
364
365    fn reset(&mut self) {
366        self.index_block.reset();
367        self.current_block = None;
368    }
369
370    // This iterator is special in that it's valid even before the first call to advance(). It
371    // behaves correctly, though.
372    fn valid(&self) -> bool {
373        self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
374    }
375
376    fn current(&self) -> Option<(Bytes, Bytes)> {
377        if let Some(ref cb) = self.current_block {
378            cb.current()
379        } else {
380            None
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use std::cell::RefCell;
388
389    use crate::compressor::CompressorId;
390    use crate::filter::BloomPolicy;
391    use crate::key_types::LookupKey;
392    use crate::table_builder::TableBuilder;
393    use crate::test_util::{test_iterator_properties, LdbIteratorIter};
394    use crate::types::{current_key_val, share, LdbIterator};
395    use crate::{block, compressor, options};
396
397    use super::*;
398
399    fn build_data() -> Vec<(&'static str, &'static str)> {
400        vec![
401            // block 1
402            ("abc", "def"),
403            ("abd", "dee"),
404            ("bcd", "asa"),
405            // block 2
406            ("bsr", "a00"),
407            ("xyz", "xxx"),
408            ("xzz", "yyy"),
409            // block 3
410            ("zzz", "111"),
411        ]
412    }
413
414    // Build a table containing raw keys (no format). It returns (vector, length) for convenience
415    // reason, a call f(v, v.len()) doesn't work for borrowing reasons.
416    fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
417        let mut d = Vec::with_capacity(512);
418        let mut opt = options::for_test();
419        opt.block_restart_interval = 2;
420        opt.block_size = 32;
421        opt.compressor = compressor::SnappyCompressor::ID;
422
423        {
424            // Uses the standard comparator in opt.
425            let mut b = TableBuilder::new_raw(opt, &mut d);
426
427            for &(k, v) in data.iter() {
428                b.add(k.as_bytes(), v.as_bytes()).unwrap();
429            }
430
431            b.finish().unwrap();
432        }
433
434        let size = d.len();
435        (d, size)
436    }
437
438    // Build a table containing keys in InternalKey format.
439    fn build_internal_table() -> (Vec<u8>, usize) {
440        let mut d = Vec::with_capacity(512);
441        let mut opt = options::for_test();
442        opt.block_restart_interval = 1;
443        opt.block_size = 32;
444        opt.filter_policy = Rc::new(Box::new(BloomPolicy::new(4)));
445
446        let mut i = 1_u64;
447        let data: Vec<(Vec<u8>, &'static str)> = build_data()
448            .into_iter()
449            .map(|(k, v)| {
450                i += 1;
451                (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v)
452            })
453            .collect();
454
455        {
456            // Uses InternalKeyCmp
457            let mut b = TableBuilder::new(opt, &mut d);
458
459            for (k, v) in data.iter() {
460                b.add(k.as_slice(), v.as_bytes()).unwrap();
461            }
462
463            b.finish().unwrap();
464        }
465
466        let size = d.len();
467
468        (d, size)
469    }
470
471    fn wrap_buffer(src: Vec<u8>) -> Rc<Box<dyn RandomAccess>> {
472        Rc::new(Box::new(src))
473    }
474
475    #[test]
476    fn test_table_approximate_offset() {
477        let (src, size) = build_table(build_data());
478        let mut opt = options::for_test();
479        let bc = share(Cache::new(128));
480        opt.block_size = 32;
481        let table = Table::new_raw(opt, bc, wrap_buffer(src), size).unwrap();
482        let mut iter = table.iter();
483
484        let expected_offsets = [0, 0, 0, 44, 44, 44, 89];
485        for (i, (k, _)) in LdbIteratorIter::wrap(&mut iter).enumerate() {
486            assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
487        }
488
489        // Key-past-last returns offset of metaindex block.
490        assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
491    }
492
493    #[test]
494    fn test_table_block_cache_use() {
495        let (src, size) = build_table(build_data());
496        let bc = share(Cache::new(128));
497        let mut opt = options::for_test();
498        opt.block_size = 32;
499
500        {
501            let table = Table::new_raw(opt.clone(), bc.clone(), wrap_buffer(src), size).unwrap();
502            let mut iter = table.iter();
503
504            // index/metaindex blocks are not cached. That'd be a waste of memory.
505            assert_eq!(bc.borrow().count(), 0);
506            iter.next();
507            assert_eq!(bc.borrow().count(), 1);
508            // This may fail if block parameters or data change. In that case, adapt it.
509            iter.next();
510            iter.next();
511            iter.next();
512            iter.next();
513            assert_eq!(bc.borrow().count(), 2);
514        }
515
516        println!(
517            "weak = {}, strong = {}",
518            std::rc::Rc::<RefCell<cache::Cache<block::Block>>>::weak_count(&bc),
519            std::rc::Rc::<RefCell<cache::Cache<block::Block>>>::strong_count(&bc)
520        );
521    }
522
523    #[test]
524    fn test_table_iterator_fwd_bwd() {
525        let (src, size) = build_table(build_data());
526        let data = build_data();
527        let bc = share(Cache::new(128));
528
529        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
530        let mut iter = table.iter();
531        let mut i = 0;
532
533        while let Some((k, v)) = iter.next() {
534            assert_eq!(
535                (data[i].0.as_bytes(), data[i].1.as_bytes()),
536                (k.as_ref(), v.as_ref())
537            );
538            i += 1;
539        }
540
541        assert_eq!(i, data.len());
542        assert!(!iter.valid());
543
544        // Go forward again, to last entry.
545        while let Some((key, _)) = iter.next() {
546            if key.as_slice() == b"zzz" {
547                break;
548            }
549        }
550
551        assert!(iter.valid());
552        // backwards count
553        let mut j = 0;
554
555        while iter.prev() {
556            if let Some((k, v)) = current_key_val(&iter) {
557                j += 1;
558                assert_eq!(
559                    (
560                        data[data.len() - 1 - j].0.as_bytes(),
561                        data[data.len() - 1 - j].1.as_bytes()
562                    ),
563                    (k.as_ref(), v.as_ref())
564                );
565            } else {
566                break;
567            }
568        }
569
570        // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry
571        // in the table; that is, it needs to go back over 6 entries.
572        assert_eq!(j, 6);
573    }
574
575    #[test]
576    fn test_table_iterator_filter() {
577        let (src, size) = build_table(build_data());
578        let bc = share(Cache::new(128));
579
580        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
581        assert!(table.filters.is_some());
582        let filter_reader = table.filters.clone().unwrap();
583        let mut iter = table.iter();
584        while let Some((k, _)) = iter.next() {
585            assert!(filter_reader.key_may_match(iter.current_block_off, &k));
586            assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
587        }
588    }
589
590    #[test]
591    fn test_table_iterator_state_behavior() {
592        let (src, size) = build_table(build_data());
593        let bc = share(Cache::new(128));
594
595        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
596        let mut iter = table.iter();
597
598        // behavior test
599
600        // See comment on valid()
601        assert!(!iter.valid());
602        assert!(current_key_val(&iter).is_none());
603        assert!(!iter.prev());
604
605        assert!(iter.advance());
606        let first = current_key_val(&iter);
607        assert!(iter.valid());
608        assert!(current_key_val(&iter).is_some());
609
610        assert!(iter.advance());
611        assert!(iter.prev());
612        assert!(iter.valid());
613
614        iter.reset();
615        assert!(!iter.valid());
616        assert!(current_key_val(&iter).is_none());
617        assert_eq!(first, iter.next());
618    }
619
620    #[test]
621    fn test_table_iterator_behavior_standard() {
622        let mut data = build_data();
623        data.truncate(4);
624        let (src, size) = build_table(data);
625        let bc = share(Cache::new(128));
626        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
627        test_iterator_properties(table.iter());
628    }
629
630    #[test]
631    fn test_table_iterator_values() {
632        let (src, size) = build_table(build_data());
633        let data = build_data();
634        let bc = share(Cache::new(128));
635
636        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
637        let mut iter = table.iter();
638        let mut i = 0;
639
640        iter.next();
641        iter.next();
642
643        // Go back to previous entry, check, go forward two entries, repeat
644        // Verifies that prev/next works well.
645        loop {
646            iter.prev();
647
648            if let Some((k, v)) = current_key_val(&iter) {
649                assert_eq!(
650                    (data[i].0.as_bytes(), data[i].1.as_bytes()),
651                    (k.as_ref(), v.as_ref())
652                );
653            } else {
654                break;
655            }
656
657            i += 1;
658            if iter.next().is_none() || iter.next().is_none() {
659                break;
660            }
661        }
662
663        // Skipping the last value because the second next() above will break the loop
664        assert_eq!(i, 6);
665    }
666
667    #[test]
668    fn test_table_iterator_seek() {
669        let (src, size) = build_table(build_data());
670        let bc = share(Cache::new(128));
671
672        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
673        let mut iter = table.iter();
674
675        iter.seek(b"bcd");
676        assert!(iter.valid());
677        assert_eq!(
678            current_key_val(&iter),
679            Some((b"bcd".to_vec(), b"asa".to_vec()))
680        );
681        iter.seek(b"abc");
682        assert!(iter.valid());
683        assert_eq!(
684            current_key_val(&iter),
685            Some((b"abc".to_vec(), b"def".to_vec()))
686        );
687
688        // Seek-past-last invalidates.
689        iter.seek("{{{".as_bytes());
690        assert!(!iter.valid());
691        iter.seek(b"bbb");
692        assert!(iter.valid());
693    }
694
695    #[test]
696    fn test_table_get() {
697        let (src, size) = build_table(build_data());
698        let bc = share(Cache::new(128));
699
700        let table =
701            Table::new_raw(options::for_test(), bc.clone(), wrap_buffer(src), size).unwrap();
702        let table2 = table.clone();
703
704        let mut _iter = table.iter();
705        // Test that all of the table's entries are reachable via get()
706        for (k, v) in LdbIteratorIter::wrap(&mut _iter) {
707            let r = table2.get(&k);
708            assert_eq!(Ok(Some((k.into(), v.into()))), r);
709        }
710
711        assert_eq!(bc.borrow().count(), 3);
712
713        // test that filters work and don't return anything at all.
714        assert!(table.get(b"aaa").unwrap().is_none());
715        assert!(table.get(b"aaaa").unwrap().is_none());
716        assert!(table.get(b"aa").unwrap().is_none());
717        assert!(table.get(b"abcd").unwrap().is_none());
718        assert!(table.get(b"abb").unwrap().is_none());
719        assert!(table.get(b"zzy").unwrap().is_none());
720        assert!(table.get(b"zz1").unwrap().is_none());
721        assert!(table.get("zz{".as_bytes()).unwrap().is_none());
722    }
723
724    // This test verifies that the table and filters work with internal keys. This means:
725    // The table contains keys in InternalKey format and it uses a filter wrapped by
726    // InternalFilterPolicy.
727    // All the other tests use raw keys that don't have any internal structure; this is fine in
728    // general, but here we want to see that the other infrastructure works too.
729    #[test]
730    fn test_table_internal_keys() {
731        use crate::key_types::LookupKey;
732
733        let (src, size) = build_internal_table();
734
735        let bc = share(Cache::new(128));
736        let table = Table::new(options::for_test(), bc, wrap_buffer(src), size).unwrap();
737        let filter_reader = table.filters.clone().unwrap();
738
739        // Check that we're actually using internal keys
740        let mut _iter = table.iter();
741        for (ref k, ref v) in LdbIteratorIter::wrap(&mut _iter) {
742            assert_eq!(k.len(), 3 + 8);
743            let (result_k, result_v) = table.get(k).unwrap().unwrap();
744            assert_eq!(
745                (k.to_vec(), v.to_vec()),
746                (result_k.to_vec(), result_v.to_vec())
747            );
748        }
749
750        assert!(table
751            .get(LookupKey::new(b"abc", 1000).internal_key())
752            .unwrap()
753            .is_some());
754
755        let mut iter = table.iter();
756
757        while let Some((k, _)) = iter.next() {
758            let lk = LookupKey::new(&k, 123);
759            let userkey = lk.user_key();
760
761            assert!(filter_reader.key_may_match(iter.current_block_off, userkey));
762            assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
763        }
764    }
765
766    #[test]
767    fn test_table_reader_checksum() {
768        let bc = share(Cache::new(128));
769        let (mut src, size) = build_table(build_data());
770
771        src[10] += 1;
772
773        let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
774
775        assert!(table.filters.is_some());
776        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
777
778        {
779            let mut _iter = table.iter();
780            let iter = LdbIteratorIter::wrap(&mut _iter);
781            // first block is skipped
782            assert_eq!(iter.count(), 4);
783        }
784
785        {
786            let mut _iter = table.iter();
787            let iter = LdbIteratorIter::wrap(&mut _iter);
788
789            for (k, _) in iter {
790                if k == build_data()[5].0.as_bytes() {
791                    return;
792                }
793            }
794
795            panic!("Should have hit 5th record in table!");
796        }
797    }
798}