1use crate::block::{Block, BlockIter};
2use crate::blockhandle::BlockHandle;
3use crate::cache::{self, Cache};
4use crate::cmp::InternalKeyCmp;
5use crate::env::RandomAccess;
6use crate::error::{self, err, Result};
7use crate::filter;
8use crate::filter_block::FilterBlockReader;
9use crate::key_types::InternalKey;
10use crate::options::Options;
11use crate::table_block;
12use crate::table_builder::{self, Footer};
13use crate::types::{current_key_val, LdbIterator, Shared};
14
15use std::cmp::Ordering;
16use std::rc::Rc;
17
18use bytes::Bytes;
19use integer_encoding::FixedIntWriter;
20
21fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
23 let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
24 f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
25 match Footer::decode(&buf) {
26 Some(ok) => Ok(ok),
27 None => err(
28 error::StatusCode::Corruption,
29 &format!("Couldn't decode damaged footer {:?}", &buf),
30 ),
31 }
32}
33
34#[derive(Clone)]
35pub struct Table {
36 file: Rc<Box<dyn RandomAccess>>,
37 file_size: usize,
38 cache_id: cache::CacheID,
39
40 opt: Options,
41 block_cache: Shared<Cache<Block>>,
42
43 footer: Footer,
44 indexblock: Block,
45 filters: Option<FilterBlockReader>,
46}
47
48impl Table {
49 fn new_raw(
51 opt: Options,
52 block_cache: Shared<Cache<Block>>,
53 file: Rc<Box<dyn RandomAccess>>,
54 size: usize,
55 ) -> Result<Table> {
56 let footer = read_footer(file.as_ref().as_ref(), size)?;
57 let indexblock =
58 table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.index)?;
59 let metaindexblock =
60 table_block::read_table_block(opt.clone(), file.as_ref().as_ref(), &footer.meta_index)?;
61
62 let filter_block_reader =
63 Table::read_filter_block(&metaindexblock, file.as_ref().as_ref(), &opt)?;
64 let cache_id = block_cache.borrow_mut().new_cache_id();
65
66 Ok(Table {
67 file,
68 file_size: size,
69 cache_id,
70 opt,
71 block_cache,
72 footer,
73 filters: filter_block_reader,
74 indexblock,
75 })
76 }
77
78 fn read_filter_block(
79 metaix: &Block,
80 file: &dyn RandomAccess,
81 options: &Options,
82 ) -> Result<Option<FilterBlockReader>> {
83 let filter_name = format!("filter.{}", options.filter_policy.name())
85 .as_bytes()
86 .to_vec();
87
88 let mut metaindexiter = metaix.iter();
89 metaindexiter.seek(&filter_name);
90
91 if let Some((_key, val)) = current_key_val(&metaindexiter) {
92 let fbl = BlockHandle::decode(&val);
93 let filter_block_location = match fbl {
94 None => {
95 return err(
96 error::StatusCode::Corruption,
97 &format!("Couldn't decode corrupt blockhandle {:?}", &val),
98 )
99 }
100 Some(ok) => ok.0,
101 };
102 if filter_block_location.size() > 0 {
103 return Ok(Some(table_block::read_filter_block(
104 file,
105 &filter_block_location,
106 options.filter_policy.clone(),
107 )?));
108 }
109 }
110 Ok(None)
111 }
112
113 pub fn new(
117 mut opt: Options,
118 block_cache: Shared<Cache<Block>>,
119 file: Rc<Box<dyn RandomAccess>>,
120 size: usize,
121 ) -> Result<Table> {
122 opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
123 opt.filter_policy = Rc::new(Box::new(filter::InternalFilterPolicy::new(
124 opt.filter_policy,
125 )));
126 Table::new_raw(opt, block_cache, file, size)
127 }
128
129 fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
132 let mut dst = [0; 2 * 8];
133 (&mut dst[..8])
134 .write_fixedint(self.cache_id)
135 .expect("error writing to vec");
136 (&mut dst[8..])
137 .write_fixedint(block_off as u64)
138 .expect("error writing to vec");
139 dst
140 }
141
142 fn read_block(&self, location: &BlockHandle) -> Result<Block> {
145 let cachekey = self.block_cache_handle(location.offset());
146 if let Some(block) = self.block_cache.borrow_mut().get(&cachekey) {
147 return Ok(block.clone());
148 }
149
150 let b =
152 table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;
153
154 self.block_cache.borrow_mut().insert(&cachekey, b.clone());
156
157 Ok(b)
158 }
159
160 pub fn approx_offset_of(&self, key: &[u8]) -> usize {
162 let mut iter = self.indexblock.iter();
163
164 iter.seek(key);
165
166 if let Some((_, val)) = current_key_val(&iter) {
167 let location = BlockHandle::decode(&val).unwrap().0;
168 return location.offset();
169 }
170
171 self.footer.meta_index.offset()
172 }
173
174 pub fn iter(&self) -> TableIterator {
176 TableIterator {
177 current_block: None,
178 current_block_off: 0,
179 index_block: self.indexblock.iter(),
180 table: self.clone(),
181 }
182 }
183
184 pub fn get(&self, key: InternalKey<'_>) -> Result<Option<(Bytes, Bytes)>> {
196 let mut index_iter = self.indexblock.iter();
197 index_iter.seek(key);
198
199 let handle;
200 if let Some((last_in_block, h)) = current_key_val(&index_iter) {
201 if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
202 handle = BlockHandle::decode(&h).unwrap().0;
203 } else {
204 return Ok(None);
205 }
206 } else {
207 return Ok(None);
208 }
209
210 if let Some(ref filters) = self.filters {
214 if !filters.key_may_match(handle.offset(), key) {
215 return Ok(None);
216 }
217 }
218
219 let tb = self.read_block(&handle)?;
221 let mut iter = tb.iter();
222
223 iter.seek(key);
225 if let Some((k, v)) = current_key_val(&iter) {
226 if self.opt.cmp.cmp(&k, key) >= Ordering::Equal {
227 return Ok(Some((k.into(), v.into())));
228 }
229 }
230 Ok(None)
231 }
232}
233
234pub struct TableIterator {
237 table: Table,
244 current_block: Option<BlockIter>,
245 current_block_off: usize,
246 index_block: BlockIter,
247}
248
249impl TableIterator {
250 fn skip_to_next_entry(&mut self) -> Result<bool> {
255 if let Some((_key, val)) = self.index_block.next() {
256 self.load_block(&val).map(|_| true)
257 } else {
258 Ok(false)
259 }
260 }
261
262 fn load_block(&mut self, handle: &[u8]) -> Result<()> {
264 let (new_block_handle, _) = match BlockHandle::decode(handle) {
265 None => {
266 return err(
267 error::StatusCode::Corruption,
268 "Couldn't decode corrupt block handle",
269 )
270 }
271 Some(ok) => ok,
272 };
273 let block = self.table.read_block(&new_block_handle)?;
274
275 self.current_block = Some(block.iter());
276 self.current_block_off = new_block_handle.offset();
277
278 Ok(())
279 }
280}
281
282impl LdbIterator for TableIterator {
283 fn advance(&mut self) -> bool {
284 if self.current_block.is_none() {
286 match self.skip_to_next_entry() {
287 Ok(true) => return self.advance(),
288 Ok(false) => {
289 self.reset();
290 return false;
291 }
292 Err(_) => return self.advance(),
294 }
295 }
296
297 if let Some(ref mut cb) = self.current_block {
299 if cb.advance() {
300 return true;
301 }
302 }
303
304 self.current_block = None;
306 match self.skip_to_next_entry() {
307 Ok(true) => self.advance(),
308 Ok(false) => {
309 self.reset();
310 false
311 }
312 Err(_) => self.advance(),
314 }
315 }
316
317 fn seek(&mut self, to: &[u8]) {
320 self.index_block.seek(to);
323
324 if let Some((past_block, handle)) = current_key_val(&self.index_block) {
326 if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
327 if let Ok(()) = self.load_block(&handle) {
329 self.current_block.as_mut().unwrap().seek(to);
331 return;
332 }
333 }
334 }
335 self.reset();
337 }
338
339 fn prev(&mut self) -> bool {
340 if let Some(ref mut cb) = self.current_block {
342 if cb.prev() {
343 return true;
344 }
345 }
346
347 if self.index_block.prev() {
349 if let Some((_, handle)) = current_key_val(&self.index_block) {
350 if self.load_block(&handle).is_ok() {
351 self.current_block.as_mut().unwrap().seek_to_last();
352 self.current_block.as_ref().unwrap().valid()
353 } else {
354 self.reset();
355 false
356 }
357 } else {
358 false
359 }
360 } else {
361 false
362 }
363 }
364
365 fn reset(&mut self) {
366 self.index_block.reset();
367 self.current_block = None;
368 }
369
370 fn valid(&self) -> bool {
373 self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
374 }
375
376 fn current(&self) -> Option<(Bytes, Bytes)> {
377 if let Some(ref cb) = self.current_block {
378 cb.current()
379 } else {
380 None
381 }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use std::cell::RefCell;
388
389 use crate::compressor::CompressorId;
390 use crate::filter::BloomPolicy;
391 use crate::key_types::LookupKey;
392 use crate::table_builder::TableBuilder;
393 use crate::test_util::{test_iterator_properties, LdbIteratorIter};
394 use crate::types::{current_key_val, share, LdbIterator};
395 use crate::{block, compressor, options};
396
397 use super::*;
398
399 fn build_data() -> Vec<(&'static str, &'static str)> {
400 vec![
401 ("abc", "def"),
403 ("abd", "dee"),
404 ("bcd", "asa"),
405 ("bsr", "a00"),
407 ("xyz", "xxx"),
408 ("xzz", "yyy"),
409 ("zzz", "111"),
411 ]
412 }
413
414 fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
417 let mut d = Vec::with_capacity(512);
418 let mut opt = options::for_test();
419 opt.block_restart_interval = 2;
420 opt.block_size = 32;
421 opt.compressor = compressor::SnappyCompressor::ID;
422
423 {
424 let mut b = TableBuilder::new_raw(opt, &mut d);
426
427 for &(k, v) in data.iter() {
428 b.add(k.as_bytes(), v.as_bytes()).unwrap();
429 }
430
431 b.finish().unwrap();
432 }
433
434 let size = d.len();
435 (d, size)
436 }
437
438 fn build_internal_table() -> (Vec<u8>, usize) {
440 let mut d = Vec::with_capacity(512);
441 let mut opt = options::for_test();
442 opt.block_restart_interval = 1;
443 opt.block_size = 32;
444 opt.filter_policy = Rc::new(Box::new(BloomPolicy::new(4)));
445
446 let mut i = 1_u64;
447 let data: Vec<(Vec<u8>, &'static str)> = build_data()
448 .into_iter()
449 .map(|(k, v)| {
450 i += 1;
451 (LookupKey::new(k.as_bytes(), i).internal_key().to_vec(), v)
452 })
453 .collect();
454
455 {
456 let mut b = TableBuilder::new(opt, &mut d);
458
459 for (k, v) in data.iter() {
460 b.add(k.as_slice(), v.as_bytes()).unwrap();
461 }
462
463 b.finish().unwrap();
464 }
465
466 let size = d.len();
467
468 (d, size)
469 }
470
471 fn wrap_buffer(src: Vec<u8>) -> Rc<Box<dyn RandomAccess>> {
472 Rc::new(Box::new(src))
473 }
474
475 #[test]
476 fn test_table_approximate_offset() {
477 let (src, size) = build_table(build_data());
478 let mut opt = options::for_test();
479 let bc = share(Cache::new(128));
480 opt.block_size = 32;
481 let table = Table::new_raw(opt, bc, wrap_buffer(src), size).unwrap();
482 let mut iter = table.iter();
483
484 let expected_offsets = [0, 0, 0, 44, 44, 44, 89];
485 for (i, (k, _)) in LdbIteratorIter::wrap(&mut iter).enumerate() {
486 assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
487 }
488
489 assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
491 }
492
493 #[test]
494 fn test_table_block_cache_use() {
495 let (src, size) = build_table(build_data());
496 let bc = share(Cache::new(128));
497 let mut opt = options::for_test();
498 opt.block_size = 32;
499
500 {
501 let table = Table::new_raw(opt.clone(), bc.clone(), wrap_buffer(src), size).unwrap();
502 let mut iter = table.iter();
503
504 assert_eq!(bc.borrow().count(), 0);
506 iter.next();
507 assert_eq!(bc.borrow().count(), 1);
508 iter.next();
510 iter.next();
511 iter.next();
512 iter.next();
513 assert_eq!(bc.borrow().count(), 2);
514 }
515
516 println!(
517 "weak = {}, strong = {}",
518 std::rc::Rc::<RefCell<cache::Cache<block::Block>>>::weak_count(&bc),
519 std::rc::Rc::<RefCell<cache::Cache<block::Block>>>::strong_count(&bc)
520 );
521 }
522
523 #[test]
524 fn test_table_iterator_fwd_bwd() {
525 let (src, size) = build_table(build_data());
526 let data = build_data();
527 let bc = share(Cache::new(128));
528
529 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
530 let mut iter = table.iter();
531 let mut i = 0;
532
533 while let Some((k, v)) = iter.next() {
534 assert_eq!(
535 (data[i].0.as_bytes(), data[i].1.as_bytes()),
536 (k.as_ref(), v.as_ref())
537 );
538 i += 1;
539 }
540
541 assert_eq!(i, data.len());
542 assert!(!iter.valid());
543
544 while let Some((key, _)) = iter.next() {
546 if key.as_slice() == b"zzz" {
547 break;
548 }
549 }
550
551 assert!(iter.valid());
552 let mut j = 0;
554
555 while iter.prev() {
556 if let Some((k, v)) = current_key_val(&iter) {
557 j += 1;
558 assert_eq!(
559 (
560 data[data.len() - 1 - j].0.as_bytes(),
561 data[data.len() - 1 - j].1.as_bytes()
562 ),
563 (k.as_ref(), v.as_ref())
564 );
565 } else {
566 break;
567 }
568 }
569
570 assert_eq!(j, 6);
573 }
574
575 #[test]
576 fn test_table_iterator_filter() {
577 let (src, size) = build_table(build_data());
578 let bc = share(Cache::new(128));
579
580 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
581 assert!(table.filters.is_some());
582 let filter_reader = table.filters.clone().unwrap();
583 let mut iter = table.iter();
584 while let Some((k, _)) = iter.next() {
585 assert!(filter_reader.key_may_match(iter.current_block_off, &k));
586 assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
587 }
588 }
589
590 #[test]
591 fn test_table_iterator_state_behavior() {
592 let (src, size) = build_table(build_data());
593 let bc = share(Cache::new(128));
594
595 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
596 let mut iter = table.iter();
597
598 assert!(!iter.valid());
602 assert!(current_key_val(&iter).is_none());
603 assert!(!iter.prev());
604
605 assert!(iter.advance());
606 let first = current_key_val(&iter);
607 assert!(iter.valid());
608 assert!(current_key_val(&iter).is_some());
609
610 assert!(iter.advance());
611 assert!(iter.prev());
612 assert!(iter.valid());
613
614 iter.reset();
615 assert!(!iter.valid());
616 assert!(current_key_val(&iter).is_none());
617 assert_eq!(first, iter.next());
618 }
619
620 #[test]
621 fn test_table_iterator_behavior_standard() {
622 let mut data = build_data();
623 data.truncate(4);
624 let (src, size) = build_table(data);
625 let bc = share(Cache::new(128));
626 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
627 test_iterator_properties(table.iter());
628 }
629
630 #[test]
631 fn test_table_iterator_values() {
632 let (src, size) = build_table(build_data());
633 let data = build_data();
634 let bc = share(Cache::new(128));
635
636 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
637 let mut iter = table.iter();
638 let mut i = 0;
639
640 iter.next();
641 iter.next();
642
643 loop {
646 iter.prev();
647
648 if let Some((k, v)) = current_key_val(&iter) {
649 assert_eq!(
650 (data[i].0.as_bytes(), data[i].1.as_bytes()),
651 (k.as_ref(), v.as_ref())
652 );
653 } else {
654 break;
655 }
656
657 i += 1;
658 if iter.next().is_none() || iter.next().is_none() {
659 break;
660 }
661 }
662
663 assert_eq!(i, 6);
665 }
666
667 #[test]
668 fn test_table_iterator_seek() {
669 let (src, size) = build_table(build_data());
670 let bc = share(Cache::new(128));
671
672 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
673 let mut iter = table.iter();
674
675 iter.seek(b"bcd");
676 assert!(iter.valid());
677 assert_eq!(
678 current_key_val(&iter),
679 Some((b"bcd".to_vec(), b"asa".to_vec()))
680 );
681 iter.seek(b"abc");
682 assert!(iter.valid());
683 assert_eq!(
684 current_key_val(&iter),
685 Some((b"abc".to_vec(), b"def".to_vec()))
686 );
687
688 iter.seek("{{{".as_bytes());
690 assert!(!iter.valid());
691 iter.seek(b"bbb");
692 assert!(iter.valid());
693 }
694
695 #[test]
696 fn test_table_get() {
697 let (src, size) = build_table(build_data());
698 let bc = share(Cache::new(128));
699
700 let table =
701 Table::new_raw(options::for_test(), bc.clone(), wrap_buffer(src), size).unwrap();
702 let table2 = table.clone();
703
704 let mut _iter = table.iter();
705 for (k, v) in LdbIteratorIter::wrap(&mut _iter) {
707 let r = table2.get(&k);
708 assert_eq!(Ok(Some((k.into(), v.into()))), r);
709 }
710
711 assert_eq!(bc.borrow().count(), 3);
712
713 assert!(table.get(b"aaa").unwrap().is_none());
715 assert!(table.get(b"aaaa").unwrap().is_none());
716 assert!(table.get(b"aa").unwrap().is_none());
717 assert!(table.get(b"abcd").unwrap().is_none());
718 assert!(table.get(b"abb").unwrap().is_none());
719 assert!(table.get(b"zzy").unwrap().is_none());
720 assert!(table.get(b"zz1").unwrap().is_none());
721 assert!(table.get("zz{".as_bytes()).unwrap().is_none());
722 }
723
724 #[test]
730 fn test_table_internal_keys() {
731 use crate::key_types::LookupKey;
732
733 let (src, size) = build_internal_table();
734
735 let bc = share(Cache::new(128));
736 let table = Table::new(options::for_test(), bc, wrap_buffer(src), size).unwrap();
737 let filter_reader = table.filters.clone().unwrap();
738
739 let mut _iter = table.iter();
741 for (ref k, ref v) in LdbIteratorIter::wrap(&mut _iter) {
742 assert_eq!(k.len(), 3 + 8);
743 let (result_k, result_v) = table.get(k).unwrap().unwrap();
744 assert_eq!(
745 (k.to_vec(), v.to_vec()),
746 (result_k.to_vec(), result_v.to_vec())
747 );
748 }
749
750 assert!(table
751 .get(LookupKey::new(b"abc", 1000).internal_key())
752 .unwrap()
753 .is_some());
754
755 let mut iter = table.iter();
756
757 while let Some((k, _)) = iter.next() {
758 let lk = LookupKey::new(&k, 123);
759 let userkey = lk.user_key();
760
761 assert!(filter_reader.key_may_match(iter.current_block_off, userkey));
762 assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
763 }
764 }
765
766 #[test]
767 fn test_table_reader_checksum() {
768 let bc = share(Cache::new(128));
769 let (mut src, size) = build_table(build_data());
770
771 src[10] += 1;
772
773 let table = Table::new_raw(options::for_test(), bc, wrap_buffer(src), size).unwrap();
774
775 assert!(table.filters.is_some());
776 assert_eq!(table.filters.as_ref().unwrap().num(), 1);
777
778 {
779 let mut _iter = table.iter();
780 let iter = LdbIteratorIter::wrap(&mut _iter);
781 assert_eq!(iter.count(), 4);
783 }
784
785 {
786 let mut _iter = table.iter();
787 let iter = LdbIteratorIter::wrap(&mut _iter);
788
789 for (k, _) in iter {
790 if k == build_data()[5].0.as_bytes() {
791 return;
792 }
793 }
794
795 panic!("Should have hit 5th record in table!");
796 }
797 }
798}