1use crate::block::{Block, BlockIter};
2use crate::blockhandle::BlockHandle;
3use crate::cache;
4use crate::error::Result;
5use crate::filter_block::FilterBlockReader;
6use crate::options::Options;
7use crate::table_block;
8use crate::table_builder::{self, Footer};
9use crate::types::{current_key_val, RandomAccess, SSIterator};
10
11use std::cmp::Ordering;
12use std::fs;
13use std::path;
14use std::sync::Arc;
15
16use integer_encoding::FixedIntWriter;
17
18fn read_footer(f: &dyn RandomAccess, size: usize) -> Result<Footer> {
20 let mut buf = vec![0; table_builder::FULL_FOOTER_LENGTH];
21 f.read_at(size - table_builder::FULL_FOOTER_LENGTH, &mut buf)?;
22 Ok(Footer::decode(&buf))
23}
24
25#[derive(Clone)]
27pub struct Table {
28 file: Arc<Box<dyn RandomAccess>>,
29 cache_id: cache::CacheID,
30
31 opt: Options,
32
33 footer: Footer,
34 index_block: Block,
35 filters: Option<FilterBlockReader>,
36}
37
38impl Table {
39 pub fn new_from_file(opt: Options, path: &path::Path) -> Result<Table> {
41 let f = fs::OpenOptions::new().read(true).open(path)?;
42 let size = f.metadata()?.len() as usize;
43 Table::new(opt, Box::new(f), size)
44 }
45
46 pub fn new(opt: Options, file: Box<dyn RandomAccess>, size: usize) -> Result<Table> {
48 let footer = read_footer(file.as_ref(), size)?;
49 let index_block = table_block::read_table_block(opt.clone(), file.as_ref(), &footer.index)?;
50 let metaindex_block =
51 table_block::read_table_block(opt.clone(), file.as_ref(), &footer.meta_index)?;
52
53 let filter_block_reader = Table::read_filter_block(&metaindex_block, file.as_ref(), &opt)?;
54 let cache_id = {
55 let mut block_cache = opt.block_cache.write()?;
56 block_cache.new_cache_id()
57 };
58
59 Ok(Table {
60 file: Arc::new(file),
61 cache_id: cache_id,
62 opt: opt,
63 footer: footer,
64 filters: filter_block_reader,
65 index_block: index_block,
66 })
67 }
68
69 fn read_filter_block(
70 metaix: &Block,
71 file: &dyn RandomAccess,
72 options: &Options,
73 ) -> Result<Option<FilterBlockReader>> {
74 let filter_name = format!("filter.{}", options.filter_policy.name())
76 .as_bytes()
77 .to_vec();
78
79 let mut metaindexiter = metaix.iter();
80 metaindexiter.seek(&filter_name);
81
82 if let Some((_key, val)) = current_key_val(&metaindexiter) {
83 let filter_block_location = BlockHandle::decode(&val).0;
84 if filter_block_location.size() > 0 {
85 return Ok(Some(table_block::read_filter_block(
86 file,
87 &filter_block_location,
88 options.filter_policy.clone(),
89 )?));
90 }
91 }
92 Ok(None)
93 }
94
95 fn block_cache_handle(&self, block_off: usize) -> cache::CacheKey {
98 let mut dst = [0; 2 * 8];
99 (&mut dst[..8])
100 .write_fixedint(self.cache_id)
101 .expect("error writing to vec");
102 (&mut dst[8..])
103 .write_fixedint(block_off as u64)
104 .expect("error writing to vec");
105 dst
106 }
107
108 fn read_block(&self, location: &BlockHandle) -> Result<Block> {
111 let cachekey = self.block_cache_handle(location.offset());
112 let mut block_cache = self.opt.block_cache.write()?;
113 if let Some(block) = block_cache.get(&cachekey) {
114 return Ok(block.clone());
115 }
116
117 let b =
119 table_block::read_table_block(self.opt.clone(), self.file.as_ref().as_ref(), location)?;
120
121 block_cache.insert(&cachekey, b.clone());
123
124 Ok(b)
125 }
126
127 pub fn approx_offset_of(&self, key: &[u8]) -> usize {
129 let mut iter = self.index_block.iter();
130
131 iter.seek(key);
132
133 if let Some((_, val)) = current_key_val(&iter) {
134 let location = BlockHandle::decode(&val).0;
135 return location.offset();
136 }
137
138 return self.footer.meta_index.offset();
139 }
140
141 pub fn iter(&self) -> TableIterator {
144 let iter = TableIterator {
145 current_block: None,
146 current_block_off: 0,
147 index_block: self.index_block.iter(),
148 table: self.clone(),
149 };
150 iter
151 }
152
153 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
157 let mut index_iter = self.index_block.iter();
158 index_iter.seek(key);
159
160 let handle;
161 if let Some((last_in_block, h)) = current_key_val(&index_iter) {
162 if self.opt.cmp.cmp(key, &last_in_block) == Ordering::Less {
163 handle = BlockHandle::decode(&h).0;
164 } else {
165 return Ok(None);
166 }
167 } else {
168 return Ok(None);
169 }
170
171 if let Some(ref filters) = self.filters {
175 if !filters.key_may_match(handle.offset(), key) {
176 return Ok(None);
177 }
178 }
179
180 let tb = self.read_block(&handle)?;
182 let mut iter = tb.iter();
183
184 iter.seek(key);
186 if let Some((k, v)) = current_key_val(&iter) {
187 if self.opt.cmp.cmp(&k, key) == Ordering::Equal {
188 return Ok(Some(v));
189 }
190 }
191 Ok(None)
192 }
193}
194
195pub struct TableIterator {
198 table: Table,
205 current_block: Option<BlockIter>,
206 current_block_off: usize,
207 index_block: BlockIter,
208}
209
210impl TableIterator {
211 fn skip_to_next_entry(&mut self) -> Result<bool> {
216 if let Some((_key, val)) = self.index_block.next() {
217 self.load_block(&val).map(|_| true)
218 } else {
219 Ok(false)
220 }
221 }
222
223 fn load_block(&mut self, handle: &[u8]) -> Result<()> {
225 let (new_block_handle, _) = BlockHandle::decode(handle);
226 let block = self.table.read_block(&new_block_handle)?;
227
228 self.current_block = Some(block.iter());
229 self.current_block_off = new_block_handle.offset();
230
231 Ok(())
232 }
233}
234
235impl SSIterator for TableIterator {
236 fn advance(&mut self) -> bool {
237 if self.current_block.is_none() {
239 match self.skip_to_next_entry() {
240 Ok(true) => return self.advance(),
241 Ok(false) => {
242 self.reset();
243 return false;
244 }
245 Err(_) => return self.advance(),
247 }
248 }
249
250 if let Some(ref mut cb) = self.current_block {
252 if cb.advance() {
253 return true;
254 }
255 }
256
257 self.current_block = None;
259 match self.skip_to_next_entry() {
260 Ok(true) => self.advance(),
261 Ok(false) => {
262 self.reset();
263 false
264 }
265 Err(_) => self.advance(),
267 }
268 }
269
270 fn seek(&mut self, to: &[u8]) {
273 self.index_block.seek(to);
276
277 if let Some((past_block, handle)) = current_key_val(&self.index_block) {
279 if self.table.opt.cmp.cmp(to, &past_block) <= Ordering::Equal {
280 if let Ok(()) = self.load_block(&handle) {
282 self.current_block.as_mut().unwrap().seek(to);
284 return;
285 }
286 }
287 }
288 self.reset();
290 }
291
292 fn prev(&mut self) -> bool {
293 if let Some(ref mut cb) = self.current_block {
295 if cb.prev() {
296 return true;
297 }
298 }
299
300 if self.index_block.prev() {
302 if let Some((_, handle)) = current_key_val(&self.index_block) {
303 if self.load_block(&handle).is_ok() {
304 self.current_block.as_mut().unwrap().seek_to_last();
305 self.current_block.as_ref().unwrap().valid()
306 } else {
307 self.reset();
308 false
309 }
310 } else {
311 false
312 }
313 } else {
314 false
315 }
316 }
317
318 fn reset(&mut self) {
319 self.index_block.reset();
320 self.current_block = None;
321 }
322
323 fn valid(&self) -> bool {
326 self.current_block.is_some() && (self.current_block.as_ref().unwrap().valid())
327 }
328
329 fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
330 if let Some(ref cb) = self.current_block {
331 cb.current(key, val)
332 } else {
333 false
334 }
335 }
336
337 fn current_key(&self) -> Option<&[u8]> {
338 if let Some(ref cb) = self.current_block {
339 cb.current_key()
340 } else {
341 None
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use crate::options::CompressionType;
349 use crate::table_builder::TableBuilder;
350 use crate::test_util::{test_iterator_properties, SSIteratorIter};
351 use crate::types::{current_key_val, SSIterator};
352
353 use super::*;
354
355 const LOCK_POISONED: &str = "Lock poisoned";
356
357 fn build_data() -> Vec<(&'static str, &'static str)> {
358 vec![
359 ("abc", "def"),
361 ("abd", "dee"),
362 ("bcd", "asa"),
363 ("bsr", "a00"),
365 ("xyz", "xxx"),
366 ("xzz", "yyy"),
367 ("zzz", "111"),
369 ]
370 }
371
372 fn build_table(data: Vec<(&'static str, &'static str)>) -> (Vec<u8>, usize) {
375 let mut d = Vec::with_capacity(512);
376 let mut opt = Options::default();
377 opt.block_restart_interval = 2;
378 opt.block_size = 32;
379 opt.compression_type = CompressionType::CompressionSnappy;
380
381 {
382 let mut b = TableBuilder::new(opt, &mut d);
384
385 for &(k, v) in data.iter() {
386 b.add(k.as_bytes(), v.as_bytes()).unwrap();
387 }
388
389 b.finish().unwrap();
390 }
391
392 let size = d.len();
393 (d, size)
394 }
395
396 fn wrap_buffer(src: Vec<u8>) -> Box<dyn RandomAccess> {
397 Box::new(src)
398 }
399
400 #[test]
401 fn test_table_approximate_offset() {
402 let (src, size) = build_table(build_data());
403 let mut opt = Options::default();
404 opt.block_size = 32;
405 let table = Table::new(opt, wrap_buffer(src), size).unwrap();
406 let mut iter = table.iter();
407
408 let expected_offsets = vec![0, 0, 0, 44, 44, 44, 89];
409 let mut i = 0;
410 for (k, _) in SSIteratorIter::wrap(&mut iter) {
411 assert_eq!(expected_offsets[i], table.approx_offset_of(&k));
412 i += 1;
413 }
414
415 assert_eq!(137, table.approx_offset_of("{aa".as_bytes()));
417 }
418
419 #[test]
420 fn test_table_block_cache_use() {
421 let (src, size) = build_table(build_data());
422 let mut opt = Options::default();
423 opt.block_size = 32;
424
425 let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap();
426 let mut iter = table.iter();
427
428 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0);
430
431 iter.next();
432 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
433
434 iter.next();
436 iter.next();
437 iter.next();
438 iter.next();
439 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 2);
440 }
441
442 #[test]
443 fn test_table_block_tiny_cache() {
444 let (src, size) = build_table(build_data());
445 let mut opt = Options::default().with_cache_capacity(1);
447 opt.block_size = 32;
448
449 let table = Table::new(opt.clone(), wrap_buffer(src), size).unwrap();
450 let mut iter = table.iter();
451
452 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 0);
454
455 iter.next();
457 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
458 iter.next();
459 assert_eq!(opt.block_cache.read().expect(LOCK_POISONED).count(), 1);
460 }
461
462 #[test]
463 fn test_table_iterator_fwd_bwd() {
464 let (src, size) = build_table(build_data());
465 let data = build_data();
466
467 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
468 let mut iter = table.iter();
469 let mut i = 0;
470
471 while let Some((k, v)) = iter.next() {
472 assert_eq!(
473 (data[i].0.as_bytes(), data[i].1.as_bytes()),
474 (k.as_ref(), v.as_ref())
475 );
476 i += 1;
477 }
478
479 assert_eq!(i, data.len());
480 assert!(!iter.valid());
481
482 while let Some((key, _)) = iter.next() {
484 if key.as_slice() == b"zzz" {
485 break;
486 }
487 }
488
489 assert!(iter.valid());
490 let mut j = 0;
492
493 while iter.prev() {
494 if let Some((k, v)) = current_key_val(&iter) {
495 j += 1;
496 assert_eq!(
497 (
498 data[data.len() - 1 - j].0.as_bytes(),
499 data[data.len() - 1 - j].1.as_bytes()
500 ),
501 (k.as_ref(), v.as_ref())
502 );
503 } else {
504 break;
505 }
506 }
507
508 assert_eq!(j, 6);
511 }
512
513 #[test]
514 fn test_table_iterator_filter() {
515 let (src, size) = build_table(build_data());
516
517 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
518 assert!(table.filters.is_some());
519 let filter_reader = table.filters.clone().unwrap();
520 let mut iter = table.iter();
521
522 loop {
523 if let Some((k, _)) = iter.next() {
524 assert!(filter_reader.key_may_match(iter.current_block_off, &k));
525 assert!(!filter_reader.key_may_match(iter.current_block_off, b"somerandomkey"));
526 } else {
527 break;
528 }
529 }
530 }
531
532 #[test]
533 fn test_table_iterator_state_behavior() {
534 let (src, size) = build_table(build_data());
535
536 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
537 let mut iter = table.iter();
538
539 assert!(!iter.valid());
543 assert!(current_key_val(&iter).is_none());
544 assert!(!iter.prev());
545
546 assert!(iter.advance());
547 let first = current_key_val(&iter);
548 assert!(iter.valid());
549 assert!(current_key_val(&iter).is_some());
550
551 assert!(iter.advance());
552 assert!(iter.prev());
553 assert!(iter.valid());
554
555 iter.reset();
556 assert!(!iter.valid());
557 assert!(current_key_val(&iter).is_none());
558 assert_eq!(first, iter.next());
559 }
560
561 #[test]
562 fn test_table_iterator_behavior_standard() {
563 let mut data = build_data();
564 data.truncate(4);
565 let (src, size) = build_table(data);
566 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
567 test_iterator_properties(table.iter());
568 }
569
570 #[test]
571 fn test_table_iterator_values() {
572 let (src, size) = build_table(build_data());
573 let data = build_data();
574
575 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
576 let mut iter = table.iter();
577 let mut i = 0;
578
579 iter.next();
580 iter.next();
581
582 loop {
585 iter.prev();
586
587 if let Some((k, v)) = current_key_val(&iter) {
588 assert_eq!(
589 (data[i].0.as_bytes(), data[i].1.as_bytes()),
590 (k.as_ref(), v.as_ref())
591 );
592 } else {
593 break;
594 }
595
596 i += 1;
597 if iter.next().is_none() || iter.next().is_none() {
598 break;
599 }
600 }
601
602 assert_eq!(i, 6);
604 }
605
606 #[test]
607 fn test_table_iterator_seek() {
608 let (src, size) = build_table(build_data());
609
610 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
611 let mut iter = table.iter();
612
613 iter.seek(b"bcd");
614 assert!(iter.valid());
615 assert_eq!(
616 current_key_val(&iter),
617 Some((b"bcd".to_vec(), b"asa".to_vec()))
618 );
619 iter.seek(b"abc");
620 assert!(iter.valid());
621 assert_eq!(
622 current_key_val(&iter),
623 Some((b"abc".to_vec(), b"def".to_vec()))
624 );
625
626 iter.seek("{{{".as_bytes());
628 assert!(!iter.valid());
629 iter.seek(b"bbb");
630 assert!(iter.valid());
631 }
632
633 #[test]
634 fn test_table_get() {
635 let (src, size) = build_table(build_data());
636
637 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
638 let table2 = table.clone();
639
640 let mut _iter = table.iter();
641 for (k, v) in SSIteratorIter::wrap(&mut _iter) {
643 let r = table2.get(&k);
644 assert_eq!(Ok(Some(v)), r);
645 }
646
647 assert_eq!(
648 table.opt.block_cache.read().expect(LOCK_POISONED).count(),
649 3
650 );
651
652 assert!(table.get(b"aaa").unwrap().is_none());
654 assert!(table.get(b"aaaa").unwrap().is_none());
655 assert!(table.get(b"aa").unwrap().is_none());
656 assert!(table.get(b"abcd").unwrap().is_none());
657 assert!(table.get(b"abb").unwrap().is_none());
658 assert!(table.get(b"xyy").unwrap().is_none());
659 assert!(table.get(b"zzy").unwrap().is_none());
660 assert!(table.get(b"zz1").unwrap().is_none());
661 assert!(table.get("zz{".as_bytes()).unwrap().is_none());
662 }
663
664 #[test]
665 fn test_table_reader_checksum() {
666 let (mut src, size) = build_table(build_data());
667
668 src[10] += 1;
669
670 let table = Table::new(Options::default(), wrap_buffer(src), size).unwrap();
671
672 assert!(table.filters.is_some());
673 assert_eq!(table.filters.as_ref().unwrap().num(), 1);
674
675 {
676 let mut _iter = table.iter();
677 let iter = SSIteratorIter::wrap(&mut _iter);
678 assert_eq!(iter.count(), 4);
680 }
681
682 {
683 let mut _iter = table.iter();
684 let iter = SSIteratorIter::wrap(&mut _iter);
685
686 for (k, _) in iter {
687 if k == build_data()[5].0.as_bytes() {
688 return;
689 }
690 }
691
692 panic!("Should have hit 5th record in table!");
693 }
694 }
695}