rusty_leveldb/
table_builder.rs

1use crate::block::BlockContents;
2use crate::block_builder::BlockBuilder;
3use crate::blockhandle::BlockHandle;
4use crate::cmp::InternalKeyCmp;
5use crate::compressor::{self, Compressor, CompressorId};
6use crate::crc;
7use crate::error::Result;
8use crate::filter::{InternalFilterPolicy, NoFilterPolicy};
9use crate::filter_block::FilterBlockBuilder;
10use crate::key_types::InternalKey;
11use crate::log::mask_crc;
12use crate::options::Options;
13
14use std::cmp::Ordering;
15use std::io::Write;
16use std::rc::Rc;
17
18use integer_encoding::FixedIntWriter;
19
20pub const FOOTER_LENGTH: usize = 40;
21pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
22pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
23pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
24
25pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
26pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
27
28/// Footer is a helper for encoding/decoding a table footer.
29#[derive(Debug, Clone)]
30pub struct Footer {
31    pub meta_index: BlockHandle,
32    pub index: BlockHandle,
33}
34
35/// A Table footer contains a pointer to the metaindex block, another pointer to the index block,
36/// and a magic number:
37/// [ { table data ... , METAINDEX blockhandle, INDEX blockhandle, PADDING bytes } = 40 bytes,
38/// MAGIC_FOOTER_ENCODED ]
39impl Footer {
40    pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
41        Footer {
42            meta_index: metaix,
43            index,
44        }
45    }
46
47    pub fn decode(from: &[u8]) -> Option<Footer> {
48        assert!(from.len() >= FULL_FOOTER_LENGTH);
49        assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
50        let (meta, metalen) = BlockHandle::decode(&from[0..])?;
51        let (ix, _) = BlockHandle::decode(&from[metalen..])?;
52
53        Some(Footer {
54            meta_index: meta,
55            index: ix,
56        })
57    }
58
59    pub fn encode(&self, to: &mut [u8]) {
60        assert!(to.len() >= FOOTER_LENGTH + 8);
61
62        let s1 = self.meta_index.encode_to(to);
63        let s2 = self.index.encode_to(&mut to[s1..]);
64
65        (s1 + s2..FOOTER_LENGTH).for_each(|i| {
66            to[i] = 0;
67        });
68        (FOOTER_LENGTH..FULL_FOOTER_LENGTH).for_each(|i| {
69            to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
70        });
71    }
72}
73
74/// A table consists of DATA BLOCKs, META BLOCKs, a METAINDEX BLOCK, an INDEX BLOCK and a FOOTER.
75///
76/// DATA BLOCKs, META BLOCKs, INDEX BLOCK and METAINDEX BLOCK are built using the code in
77/// the `block` module.
78///
79/// The FOOTER consists of a BlockHandle that points to the metaindex block, another pointing to
80/// the index block, padding to fill up to 40 B and at the end the 8B magic number
81/// 0xdb4775248b80fb57.
82pub struct TableBuilder<Dst: Write> {
83    opt: Options,
84    dst: Dst,
85
86    offset: usize,
87    num_entries: usize,
88    prev_block_last_key: Vec<u8>,
89
90    data_block: Option<BlockBuilder>,
91    index_block: Option<BlockBuilder>,
92    filter_block: Option<FilterBlockBuilder>,
93}
94
95impl<Dst: Write> TableBuilder<Dst> {
96    pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
97        opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
98        TableBuilder::new(opt, dst)
99    }
100}
101
102/// TableBuilder is used for building a new SSTable. It groups entries into blocks,
103/// calculating checksums and bloom filters.
104impl<Dst: Write> TableBuilder<Dst> {
105    /// Create a new table builder.
106    /// The comparator in opt will be wrapped in a InternalKeyCmp, and the filter policy
107    /// in an InternalFilterPolicy.
108    pub fn new(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
109        opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
110        opt.filter_policy = Rc::new(Box::new(InternalFilterPolicy::new(opt.filter_policy)));
111        TableBuilder::new_raw(opt, dst)
112    }
113
114    /// Like new(), but doesn't wrap the comparator in an InternalKeyCmp (for testing)
115    pub fn new_raw(opt: Options, dst: Dst) -> TableBuilder<Dst> {
116        TableBuilder {
117            opt: opt.clone(),
118            dst,
119            offset: 0,
120            prev_block_last_key: vec![],
121            num_entries: 0,
122            data_block: Some(BlockBuilder::new(opt.clone())),
123            filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
124            index_block: Some(BlockBuilder::new(opt)),
125        }
126    }
127
128    pub fn entries(&self) -> usize {
129        self.num_entries
130    }
131
132    pub fn size_estimate(&self) -> usize {
133        let mut size = 0;
134        if let Some(ref b) = self.data_block {
135            size += b.size_estimate();
136        }
137        if let Some(ref b) = self.index_block {
138            size += b.size_estimate();
139        }
140        if let Some(ref b) = self.filter_block {
141            size += b.size_estimate();
142        }
143        size + self.offset + FULL_FOOTER_LENGTH
144    }
145
146    /// Add a key to the table. The key as to be lexically greater or equal to the last one added.
147    pub fn add(&mut self, key: InternalKey<'_>, val: &[u8]) -> Result<()> {
148        assert!(self.data_block.is_some());
149
150        if !self.prev_block_last_key.is_empty() {
151            assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
152        }
153
154        if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
155            self.write_data_block(key)?;
156        }
157
158        let dblock = &mut self.data_block.as_mut().unwrap();
159
160        if let Some(ref mut fblock) = self.filter_block {
161            fblock.add_key(key);
162        }
163
164        self.num_entries += 1;
165        dblock.add(key, val);
166        Ok(())
167    }
168
169    /// Writes an index entry for the current data_block where `next_key` is the first key of the
170    /// next block.
171    /// Calls write_block() for writing the block to disk.
172    fn write_data_block(&mut self, next_key: InternalKey<'_>) -> Result<()> {
173        assert!(self.data_block.is_some());
174
175        let block = self.data_block.take().unwrap();
176        let sep = self.opt.cmp.find_shortest_sep(block.last_key(), next_key);
177        self.prev_block_last_key = Vec::from(block.last_key());
178        let contents = block.finish();
179
180        let compressor_list = self.opt.compressor_list.clone();
181        let compressor = compressor_list.get(self.opt.compressor)?;
182
183        let handle = self.write_block(contents, (self.opt.compressor, compressor))?;
184
185        let mut handle_enc = [0_u8; 16];
186        let enc_len = handle.encode_to(&mut handle_enc);
187
188        self.index_block
189            .as_mut()
190            .unwrap()
191            .add(&sep, &handle_enc[0..enc_len]);
192        self.data_block = Some(BlockBuilder::new(self.opt.clone()));
193
194        if let Some(ref mut fblock) = self.filter_block {
195            fblock.start_block(self.offset);
196        }
197
198        Ok(())
199    }
200
201    /// Calculates the checksum, writes the block to disk and updates the offset.
202    fn write_block(
203        &mut self,
204        block: BlockContents,
205        compressor_id_pair: (u8, &dyn Compressor),
206    ) -> Result<BlockHandle> {
207        let (ctype, compressor) = compressor_id_pair;
208        let data = compressor.encode(block.to_vec())?;
209
210        let mut digest = crc::digest();
211
212        digest.update(&data);
213        digest.update(&[ctype; TABLE_BLOCK_COMPRESS_LEN]);
214
215        self.dst.write_all(&data)?;
216        self.dst.write_all(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?;
217        self.dst.write_fixedint(mask_crc(digest.finalize()))?;
218
219        let handle = BlockHandle::new(self.offset, data.len());
220        self.offset += data.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
221
222        Ok(handle)
223    }
224
225    pub fn finish(mut self) -> Result<usize> {
226        assert!(self.data_block.is_some());
227
228        let compressor_list = self.opt.compressor_list.clone();
229        let compressor_id_pair = (
230            self.opt.compressor,
231            compressor_list.get(self.opt.compressor)?,
232        );
233
234        // If there's a pending data block, write it
235        if self.data_block.as_ref().unwrap().entries() > 0 {
236            // Find a key reliably past the last key
237            let key_past_last = self
238                .opt
239                .cmp
240                .find_short_succ(self.data_block.as_ref().unwrap().last_key());
241            self.write_data_block(&key_past_last)?;
242        }
243
244        // Create metaindex block
245        let mut meta_ix_block = BlockBuilder::new(self.opt.clone());
246
247        if self.filter_block.is_some() {
248            // if there's a filter block, write the filter block and add it to the metaindex block.
249            let fblock = self.filter_block.take().unwrap();
250            let filter_key = format!("filter.{}", fblock.filter_name());
251            let fblock_data = fblock.finish();
252            let fblock_handle = self.write_block(
253                fblock_data.into(),
254                (compressor::NoneCompressor::ID, &compressor::NoneCompressor),
255            )?;
256
257            let mut handle_enc = [0_u8; 16];
258            let enc_len = fblock_handle.encode_to(&mut handle_enc);
259
260            meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]);
261        }
262
263        // write metaindex block
264        let meta_ix = meta_ix_block.finish();
265        let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?;
266
267        // write index block
268        let index_cont = self.index_block.take().unwrap().finish();
269        let ix_handle = self.write_block(index_cont, compressor_id_pair)?;
270
271        // write footer.
272        let footer = Footer::new(meta_ix_handle, ix_handle);
273        let mut buf = [0; FULL_FOOTER_LENGTH];
274        footer.encode(&mut buf);
275
276        self.offset += self.dst.write(&buf[..])?;
277        self.dst.flush()?;
278        Ok(self.offset)
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::blockhandle::BlockHandle;
286    use crate::options;
287
288    #[test]
289    fn test_footer() {
290        let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5));
291        let mut buf = [0; 48];
292        f.encode(&mut buf[..]);
293
294        let f2 = Footer::decode(&buf).unwrap();
295        assert_eq!(f2.meta_index.offset(), 44);
296        assert_eq!(f2.meta_index.size(), 4);
297        assert_eq!(f2.index.offset(), 55);
298        assert_eq!(f2.index.size(), 5);
299    }
300
301    #[test]
302    fn test_table_builder() {
303        let mut d = Vec::with_capacity(512);
304        let mut opt = options::for_test();
305        opt.block_restart_interval = 3;
306        opt.compressor = compressor::SnappyCompressor::ID;
307        let mut b = TableBuilder::new_raw(opt, &mut d);
308
309        let data = [
310            ("abc", "def"),
311            ("abe", "dee"),
312            ("bcd", "asa"),
313            ("dcc", "a00"),
314        ];
315        let data2 = [
316            ("abd", "def"),
317            ("abf", "dee"),
318            ("ccd", "asa"),
319            ("dcd", "a00"),
320        ];
321
322        for i in 0..data.len() {
323            b.add(data[i].0.as_bytes(), data[i].1.as_bytes()).unwrap();
324            b.add(data2[i].0.as_bytes(), data2[i].1.as_bytes()).unwrap();
325        }
326
327        let estimate = b.size_estimate();
328
329        assert_eq!(143, estimate);
330        assert!(b.filter_block.is_some());
331
332        let actual = b.finish().unwrap();
333        assert_eq!(223, actual);
334    }
335
336    #[test]
337    #[should_panic]
338    fn test_bad_input() {
339        let mut d = Vec::with_capacity(512);
340        let mut opt = options::for_test();
341        opt.block_restart_interval = 3;
342        let mut b = TableBuilder::new_raw(opt, &mut d);
343
344        // Test two equal consecutive keys
345        let data = [
346            ("abc", "def"),
347            ("abc", "dee"),
348            ("bcd", "asa"),
349            ("bsr", "a00"),
350        ];
351
352        for &(k, v) in data.iter() {
353            b.add(k.as_bytes(), v.as_bytes()).unwrap();
354        }
355        b.finish().unwrap();
356    }
357}