sstable/
table_reader.rs

1use crate::block::{Block, BlockIter};
2use crate::blockhandle::BlockHandle;
3use crate::cache;
4use crate::error::Result;
5use crate::filter_block::FilterBlockReader;
6use crate::options::Options;
7use crate::table_block;
8use crate::table_builder::{self, Footer};
9use crate::types::{current_key_val, RandomAccess, SSIterator};
10
11use std::cmp::Ordering;
12use std::fs;
13use std::path;
14use std::sync::Arc;
15
16use integer_encoding::FixedIntWriter;
17
18/// Reads the table footer.
19fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
20    let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
21    f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
22    Ok(Footer::decode(&buf))
23}
24
25/// `Table` is used for accessing SSTables.
26#[derive(Clone)]
27pub struct Table {
28    file: Arc<Box<dyn RandomAccess>>,
29    cache_id: cache::CacheID,
30
31    opt: Options,
32
33    footer: Footer,
34    index_block: Block,
35    filters: Option<FilterBlockReader>,
36}
37
38impl Table {
39    /// Creates a new table reader from a file at `path`.
40    pub fn new_from_file(opt: Options, path: &path::Path) -> Result<Table> {
41        let f = fs::OpenOptions::new().read(true).open(path)?;
42        let size = f.metadata()?.len() as usize;
43        Table::new(opt, Box::new(f), size)
44    }
45
46    /// Creates a new table reader.
47    pub fn new(opt: Options, file: Box<dyn RandomAccess>, size: usize) -> Result<Table> {
48        let footer = read_footer(file.as_ref(), size)?;
49        let index_block = table_block::read_table_block(opt.clone(), file.as_ref(), &footer.index)?;
50        let metaindex_block =
51            table_block::read_table_block(opt.clone(), file.as_ref(), &footer.meta_index)?;
52
53        let filter_block_reader = Table::read_filter_block(&metaindex_block, file.as_ref(), &opt)?;
54        let cache_id = {
55            let mut block_cache = opt.block_cache.write()?;
56            block_cache.new_cache_id()
57        };
58
59        Ok(Table {
60            file: Arc::new(file),
61            cache_id: cache_id,
62            opt: opt,
63            footer: footer,
64            filters: filter_block_reader,
65            index_block: index_block,
66        })
67    }
68
69    fn read_filter_block(
70        metaix: &Block,
71        file: &dyn RandomAccess,
72        options: &Options,
73    ) -> Result<Option<FilterBlockReader>> {
74        // Open filter block for reading
75        let filter_name = format!("filter.{}", options.filter_policy.name())
76            .as_bytes()
77            .to_vec();
78
79        let mut metaindexiter = metaix.iter();
80        metaindexiter.seek(&filter_name);
81
82        if let Some((_key, val)) = current_key_val(&metaindexiter) {
83            let filter_block_location = BlockHandle::decode(&val).0;
84            if filter_block_location.size() > 0 {
85                return Ok(Some(table_block::read_filter_block(
86                    file,
87                    &filter_block_location,
88                    options.filter_policy.clone(),
89                )?));
90            }
91        }
92        Ok(None)
93    }
94
95    /// block_cache_handle creates a CacheKey for a block with a given offset to be used in the
96    /// block cache.
97    fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
98        let mut dst = [0; 2 * 8];
99        (&mut dst[..8])
100            .write_fixedint(self.cache_id)
101            .expect("error writing to vec");
102        (&mut dst[8..])
103            .write_fixedint(block_off as u64)
104            .expect("error writing to vec");
105        dst
106    }
107
108    /// Read a block from the current table at `location`, and cache it in the options' block
109    /// cache.
110    fn read_block(&self, location: &BlockHandle) -> Result<Block> {
111        let cachekey = self.block_cache_handle(location.offset());
112        let mut block_cache = self.opt.block_cache.write()?;
113        if let Some(block) = block_cache.get(&cachekey) {
114            return Ok(block.clone());
115        }
116
117        // Two times as_ref(): First time to get a ref from Rc<>, then one from Box<>.
118        let b =
119            table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;
120
121        // insert a cheap copy (Arc).
122        block_cache.insert(&cachekey, b.clone());
123
124        Ok(b)
125    }
126
127    /// Returns the offset of the block that contains `key`.
128    pub fn approx_offset_of(&self, key: &[u8]) -> usize {
129        let mut iter = self.index_block.iter();
130
131        iter.seek(key);
132
133        if let Some((_, val)) = current_key_val(&iter) {
134            let location = BlockHandle::decode(&val).0;
135            return location.offset();
136        }
137
138        return self.footer.meta_index.offset();
139    }
140
141    /// Returns an iterator over an SSTable. Iterators hold internal references to the table, so
142    /// make sure to let them expire when not needed anymore.
143    pub fn iter(&self) -> TableIterator {
144        let iter = TableIterator {
145            current_block: None,
146            current_block_off: 0,
147            index_block: self.index_block.iter(),
148            table: self.clone(),
149        };
150        iter
151    }
152
153    /// Retrieve an entry for a key from the table. This function uses the attached filters, so
154    /// is better suited if you frequently look for non-existing values (as it will detect the
155    /// non-existence of an entry in a block without having to load the block).
156    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
157        let mut index_iter = self.index_block.iter();
158        index_iter.seek(key);
159
160        let handle;
161        if let Some((last_in_block, h)) = current_key_val(&index_iter) {
162            if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
163                handle = BlockHandle::decode(&h).0;
164            } else {
165                return Ok(None);
166            }
167        } else {
168            return Ok(None);
169        }
170
171        // found correct block.
172
173        // Check bloom (or whatever) filter
174        if let Some(ref filters) = self.filters {
175            if !filters.key_may_match(handle.offset(), key) {
176                return Ok(None);
177            }
178        }
179
180        // Read block (potentially from cache)
181        let tb = self.read_block(&handle)?;
182        let mut iter = tb.iter();
183
184        // Go to entry and check if it's the wanted entry.
185        iter.seek(key);
186        if let Some((k, v)) = current_key_val(&iter) {
187            if self.opt.cmp.cmp(&k, key) == Ordering::Equal {
188                return Ok(Some(v));
189            }
190        }
191        Ok(None)
192    }
193}
194
195/// This iterator is a "TwoLevelIterator"; it uses an index block in order to get an offset hint
196/// into the data blocks.
197pub struct TableIterator {
198    // A TableIterator is independent of its table (on the syntax level -- it doesn't know its
199    // Table's lifetime). This is mainly required by the dynamic iterators used everywhere, where a
200    // lifetime makes things like returning an iterator from a function neigh-impossible.
201    //
202    // Instead, reference-counted pointers and locks inside the Table ensure that all
203    // TableIterators still share a table.
204    table: Table,
205    current_block: Option<BlockIter>,
206    current_block_off: usize,
207    index_block: BlockIter,
208}
209
210impl TableIterator {
211    // Skips to the entry referenced by the next entry in the index block.
212    // This is called once a block has run out of entries.
213    // Err means corruption or I/O error; Ok(true) means a new block was loaded; Ok(false) means
214    // tht there's no more entries.
215    fn skip_to_next_entry(&mut self) -> Result<bool> {
216        if let Some((_key, val)) = self.index_block.next() {
217            self.load_block(&val).map(|_| true)
218        } else {
219            Ok(false)
220        }
221    }
222
223    // Load the block at `handle` into `self.current_block`
224    fn load_block(&mut self, handle: &[u8]) -> Result<()> {
225        let (new_block_handle, _) = BlockHandle::decode(handle);
226        let block = self.table.read_block(&new_block_handle)?;
227
228        self.current_block = Some(block.iter());
229        self.current_block_off = new_block_handle.offset();
230
231        Ok(())
232    }
233}
234
235impl SSIterator for TableIterator {
236    fn advance(&mut self) -> bool {
237        // Uninitialized case.
238        if self.current_block.is_none() {
239            match self.skip_to_next_entry() {
240                Ok(true) => return self.advance(),
241                Ok(false) => {
242                    self.reset();
243                    return false;
244                }
245                // try next block from index, this might be corruption
246                Err(_) => return self.advance(),
247            }
248        }
249
250        // Initialized case -- does the current block have more entries?
251        if let Some(ref mut cb) = self.current_block {
252            if cb.advance() {
253                return true;
254            }
255        }
256
257        // If the current block is exhausted, try loading the next block.
258        self.current_block = None;
259        match self.skip_to_next_entry() {
260            Ok(true) => self.advance(),
261            Ok(false) => {
262                self.reset();
263                false
264            }
265            // try next block, this might be corruption
266            Err(_) => self.advance(),
267        }
268    }
269
270    // A call to valid() after seeking is necessary to ensure that the seek worked (e.g., no error
271    // while reading from disk)
272    fn seek(&mut self, to: &[u8]) {
273        // first seek in index block, rewind by one entry (so we get the next smaller index entry),
274        // then set current_block and seek there
275        self.index_block.seek(to);
276
277        // It's possible that this is a seek past-last; reset in that case.
278        if let Some((past_block, handle)) = current_key_val(&self.index_block) {
279            if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
280                // ok, found right block: continue
281                if let Ok(()) = self.load_block(&handle) {
282                    // current_block is always set if load_block() returned Ok.
283                    self.current_block.as_mut().unwrap().seek(to);
284                    return;
285                }
286            }
287        }
288        // Reached in case of failure.
289        self.reset();
290    }
291
292    fn prev(&mut self) -> bool {
293        // happy path: current block contains previous entry
294        if let Some(ref mut cb) = self.current_block {
295            if cb.prev() {
296                return true;
297            }
298        }
299
300        // Go back one block and look for the last entry in the previous block
301        if self.index_block.prev() {
302            if let Some((_, handle)) = current_key_val(&self.index_block) {
303                if self.load_block(&handle).is_ok() {
304                    self.current_block.as_mut().unwrap().seek_to_last();
305                    self.current_block.as_ref().unwrap().valid()
306                } else {
307                    self.reset();
308                    false
309                }
310            } else {
311                false
312            }
313        } else {
314            false
315        }
316    }
317
318    fn reset(&mut self) {
319        self.index_block.reset();
320        self.current_block = None;
321    }
322
323    // This iterator is special in that it's valid even before the first call to advance(). It
324    // behaves correctly, though.
325    fn valid(&self) -> bool {
326        self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
327    }
328
329    fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
330        if let Some(ref cb) = self.current_block {
331            cb.current(key, val)
332        } else {
333            false
334        }
335    }
336
337    fn current_key(&self) -> Option<&[u8]> {
338        if let Some(ref cb) = self.current_block {
339            cb.current_key()
340        } else {
341            None
342        }
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use crate::options::CompressionType;
349    use crate::table_builder::TableBuilder;
350    use crate::test_util::{test_iterator_properties, SSIteratorIter};
351    use crate::types::{current_key_val, SSIterator};
352
353    use super::*;
354
355    const LOCK_POISONED: &str = "Lock poisoned";
356
357    fn build_data() -> Vec<(&'static str, &'static str)> {
358        vec![
359            // block 1
360            ("abc", "def"),
361            ("abd", "dee"),
362            ("bcd", "asa"),
363            // block 2
364            ("bsr", "a00"),
365            ("xyz", "xxx"),
366            ("xzz", "yyy"),
367            // block 3
368            ("zzz", "111"),
369        ]
370    }
371
372    // Build a table containing raw keys (no format). It returns (vector, length) for convenience
373    // reason, a call f(v, v.len()) doesn't work for borrowing reasons.
374    fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
375        let mut d = Vec::with_capacity(512);
376        let mut opt = Options::default();
377        opt.block_restart_interval = 2;
378        opt.block_size = 32;
379        opt.compression_type = CompressionType::CompressionSnappy;
380
381        {
382            // Uses the standard comparator in opt.
383            let mut b = TableBuilder::new(opt, &mut d);
384
385            for &(k, v) in data.iter() {
386                b.add(k.as_bytes(), v.as_bytes()).unwrap();
387            }
388
389            b.finish().unwrap();
390        }
391
392        let size = d.len();
393        (d, size)
394    }
395
396    fn wrap_buffer(src: Vec<u8>) -> Box<dyn RandomAccess> {
397        Box::new(src)
398    }
399
400    #[test]
401    fn test_table_approximate_offset() {
402        let (src, size) = build_table(build_data());
403        let mut opt = Options::default();
404        opt.block_size = 32;
405        let table = Table::new(opt, wrap_buffer(src), size).unwrap();
406        let mut iter = table.iter();
407
408        let expected_offsets = vec![0, 0, 0, 44, 44, 44, 89];
409        let mut i = 0;
410        for (k, _) in SSIteratorIter::wrap(&mut iter) {
411            assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
412            i += 1;
413        }
414
415        // Key-past-last returns offset of metaindex block.
416        assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
417    }
418
419    #[test]
420    fn test_table_block_cache_use() {
421        let (src, size) = build_table(build_data());
422        let mut opt = Options::default();
423        opt.block_size = 32;
424
425        let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap();
426        let mut iter = table.iter();
427
428        // index/metaindex blocks are not cached. That'd be a waste of memory.
429        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0);
430
431        iter.next();
432        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
433
434        // This may fail if block parameters or data change. In that case, adapt it.
435        iter.next();
436        iter.next();
437        iter.next();
438        iter.next();
439        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 2);
440    }
441
442    #[test]
443    fn test_table_block_tiny_cache() {
444        let (src, size) = build_table(build_data());
445        // Create a table with no block cache
446        let mut opt = Options::default().with_cache_capacity(1);
447        opt.block_size = 32;
448
449        let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap();
450        let mut iter = table.iter();
451
452        // index/metaindex blocks are not cached. That'd be a waste of memory.
453        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0);
454
455        // We should have at most one item in the cache
456        iter.next();
457        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
458        iter.next();
459        assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
460    }
461
462    #[test]
463    fn test_table_iterator_fwd_bwd() {
464        let (src, size) = build_table(build_data());
465        let data = build_data();
466
467        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
468        let mut iter = table.iter();
469        let mut i = 0;
470
471        while let Some((k, v)) = iter.next() {
472            assert_eq!(
473                (data[i].0.as_bytes(), data[i].1.as_bytes()),
474                (k.as_ref(), v.as_ref())
475            );
476            i += 1;
477        }
478
479        assert_eq!(i, data.len());
480        assert!(!iter.valid());
481
482        // Go forward again, to last entry.
483        while let Some((key, _)) = iter.next() {
484            if key.as_slice() == b"zzz" {
485                break;
486            }
487        }
488
489        assert!(iter.valid());
490        // backwards count
491        let mut j = 0;
492
493        while iter.prev() {
494            if let Some((k, v)) = current_key_val(&iter) {
495                j += 1;
496                assert_eq!(
497                    (
498                        data[data.len() - 1 - j].0.as_bytes(),
499                        data[data.len() - 1 - j].1.as_bytes()
500                    ),
501                    (k.as_ref(), v.as_ref())
502                );
503            } else {
504                break;
505            }
506        }
507
508        // expecting 7 - 1, because the last entry that the iterator stopped on is the last entry
509        // in the table; that is, it needs to go back over 6 entries.
510        assert_eq!(j, 6);
511    }
512
513    #[test]
514    fn test_table_iterator_filter() {
515        let (src, size) = build_table(build_data());
516
517        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
518        assert!(table.filters.is_some());
519        let filter_reader = table.filters.clone().unwrap();
520        let mut iter = table.iter();
521
522        loop {
523            if let Some((k, _)) = iter.next() {
524                assert!(filter_reader.key_may_match(iter.current_block_off, &k));
525                assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
526            } else {
527                break;
528            }
529        }
530    }
531
532    #[test]
533    fn test_table_iterator_state_behavior() {
534        let (src, size) = build_table(build_data());
535
536        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
537        let mut iter = table.iter();
538
539        // behavior test
540
541        // See comment on valid()
542        assert!(!iter.valid());
543        assert!(current_key_val(&iter).is_none());
544        assert!(!iter.prev());
545
546        assert!(iter.advance());
547        let first = current_key_val(&iter);
548        assert!(iter.valid());
549        assert!(current_key_val(&iter).is_some());
550
551        assert!(iter.advance());
552        assert!(iter.prev());
553        assert!(iter.valid());
554
555        iter.reset();
556        assert!(!iter.valid());
557        assert!(current_key_val(&iter).is_none());
558        assert_eq!(first, iter.next());
559    }
560
561    #[test]
562    fn test_table_iterator_behavior_standard() {
563        let mut data = build_data();
564        data.truncate(4);
565        let (src, size) = build_table(data);
566        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
567        test_iterator_properties(table.iter());
568    }
569
570    #[test]
571    fn test_table_iterator_values() {
572        let (src, size) = build_table(build_data());
573        let data = build_data();
574
575        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
576        let mut iter = table.iter();
577        let mut i = 0;
578
579        iter.next();
580        iter.next();
581
582        // Go back to previous entry, check, go forward two entries, repeat
583        // Verifies that prev/next works well.
584        loop {
585            iter.prev();
586
587            if let Some((k, v)) = current_key_val(&iter) {
588                assert_eq!(
589                    (data[i].0.as_bytes(), data[i].1.as_bytes()),
590                    (k.as_ref(), v.as_ref())
591                );
592            } else {
593                break;
594            }
595
596            i += 1;
597            if iter.next().is_none() || iter.next().is_none() {
598                break;
599            }
600        }
601
602        // Skipping the last value because the second next() above will break the loop
603        assert_eq!(i, 6);
604    }
605
606    #[test]
607    fn test_table_iterator_seek() {
608        let (src, size) = build_table(build_data());
609
610        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
611        let mut iter = table.iter();
612
613        iter.seek(b"bcd");
614        assert!(iter.valid());
615        assert_eq!(
616            current_key_val(&iter),
617            Some((b"bcd".to_vec(), b"asa".to_vec()))
618        );
619        iter.seek(b"abc");
620        assert!(iter.valid());
621        assert_eq!(
622            current_key_val(&iter),
623            Some((b"abc".to_vec(), b"def".to_vec()))
624        );
625
626        // Seek-past-last invalidates.
627        iter.seek("{{{".as_bytes());
628        assert!(!iter.valid());
629        iter.seek(b"bbb");
630        assert!(iter.valid());
631    }
632
633    #[test]
634    fn test_table_get() {
635        let (src, size) = build_table(build_data());
636
637        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
638        let table2 = table.clone();
639
640        let mut _iter = table.iter();
641        // Test that all of the table's entries are reachable via get()
642        for (k, v) in SSIteratorIter::wrap(&mut _iter) {
643            let r = table2.get(&k);
644            assert_eq!(Ok(Some(v)), r);
645        }
646
647        assert_eq!(
648            table.opt.block_cache.read().expect(LOCK_POISONED).count(),
649            3
650        );
651
652        // test that filters work and don't return anything at all.
653        assert!(table.get(b"aaa").unwrap().is_none());
654        assert!(table.get(b"aaaa").unwrap().is_none());
655        assert!(table.get(b"aa").unwrap().is_none());
656        assert!(table.get(b"abcd").unwrap().is_none());
657        assert!(table.get(b"abb").unwrap().is_none());
658        assert!(table.get(b"xyy").unwrap().is_none());
659        assert!(table.get(b"zzy").unwrap().is_none());
660        assert!(table.get(b"zz1").unwrap().is_none());
661        assert!(table.get("zz{".as_bytes()).unwrap().is_none());
662    }
663
664    #[test]
665    fn test_table_reader_checksum() {
666        let (mut src, size) = build_table(build_data());
667
668        src[10] += 1;
669
670        let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
671
672        assert!(table.filters.is_some());
673        assert_eq!(table.filters.as_ref().unwrap().num(), 1);
674
675        {
676            let mut _iter = table.iter();
677            let iter = SSIteratorIter::wrap(&mut _iter);
678            // first block is skipped
679            assert_eq!(iter.count(), 4);
680        }
681
682        {
683            let mut _iter = table.iter();
684            let iter = SSIteratorIter::wrap(&mut _iter);
685
686            for (k, _) in iter {
687                if k == build_data()[5].0.as_bytes() {
688                    return;
689                }
690            }
691
692            panic!("Should have hit 5th record in table!");
693        }
694    }
695}