1use crate::block::BlockContents;
2use crate::block_builder::BlockBuilder;
3use crate::blockhandle::BlockHandle;
4use crate::cmp::InternalKeyCmp;
5use crate::compressor::{self, Compressor, CompressorId};
6use crate::crc;
7use crate::error::Result;
8use crate::filter::{InternalFilterPolicy, NoFilterPolicy};
9use crate::filter_block::FilterBlockBuilder;
10use crate::key_types::InternalKey;
11use crate::log::mask_crc;
12use crate::options::Options;
13
14use std::cmp::Ordering;
15use std::io::Write;
16use std::rc::Rc;
17
18use integer_encoding::FixedIntWriter;
19
20pub const FOOTER_LENGTH: usize = 40;
21pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
22pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
23pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
24
25pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
26pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
27
28#[derive(Debug, Clone)]
30pub struct Footer {
31 pub meta_index: BlockHandle,
32 pub index: BlockHandle,
33}
34
35impl Footer {
40 pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
41 Footer {
42 meta_index: metaix,
43 index,
44 }
45 }
46
47 pub fn decode(from: &[u8]) -> Option<Footer> {
48 assert!(from.len() >= FULL_FOOTER_LENGTH);
49 assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
50 let (meta, metalen) = BlockHandle::decode(&from[0..])?;
51 let (ix, _) = BlockHandle::decode(&from[metalen..])?;
52
53 Some(Footer {
54 meta_index: meta,
55 index: ix,
56 })
57 }
58
59 pub fn encode(&self, to: &mut [u8]) {
60 assert!(to.len() >= FOOTER_LENGTH + 8);
61
62 let s1 = self.meta_index.encode_to(to);
63 let s2 = self.index.encode_to(&mut to[s1..]);
64
65 (s1 + s2..FOOTER_LENGTH).for_each(|i| {
66 to[i] = 0;
67 });
68 (FOOTER_LENGTH..FULL_FOOTER_LENGTH).for_each(|i| {
69 to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
70 });
71 }
72}
73
74pub struct TableBuilder<Dst: Write> {
83 opt: Options,
84 dst: Dst,
85
86 offset: usize,
87 num_entries: usize,
88 prev_block_last_key: Vec<u8>,
89
90 data_block: Option<BlockBuilder>,
91 index_block: Option<BlockBuilder>,
92 filter_block: Option<FilterBlockBuilder>,
93}
94
95impl<Dst: Write> TableBuilder<Dst> {
96 pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
97 opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
98 TableBuilder::new(opt, dst)
99 }
100}
101
102impl<Dst: Write> TableBuilder<Dst> {
105 pub fn new(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
109 opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
110 opt.filter_policy = Rc::new(Box::new(InternalFilterPolicy::new(opt.filter_policy)));
111 TableBuilder::new_raw(opt, dst)
112 }
113
114 pub fn new_raw(opt: Options, dst: Dst) -> TableBuilder<Dst> {
116 TableBuilder {
117 opt: opt.clone(),
118 dst,
119 offset: 0,
120 prev_block_last_key: vec![],
121 num_entries: 0,
122 data_block: Some(BlockBuilder::new(opt.clone())),
123 filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
124 index_block: Some(BlockBuilder::new(opt)),
125 }
126 }
127
128 pub fn entries(&self) -> usize {
129 self.num_entries
130 }
131
132 pub fn size_estimate(&self) -> usize {
133 let mut size = 0;
134 if let Some(ref b) = self.data_block {
135 size += b.size_estimate();
136 }
137 if let Some(ref b) = self.index_block {
138 size += b.size_estimate();
139 }
140 if let Some(ref b) = self.filter_block {
141 size += b.size_estimate();
142 }
143 size + self.offset + FULL_FOOTER_LENGTH
144 }
145
146 pub fn add(&mut self, key: InternalKey<'_>, val: &[u8]) -> Result<()> {
148 assert!(self.data_block.is_some());
149
150 if !self.prev_block_last_key.is_empty() {
151 assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
152 }
153
154 if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
155 self.write_data_block(key)?;
156 }
157
158 let dblock = &mut self.data_block.as_mut().unwrap();
159
160 if let Some(ref mut fblock) = self.filter_block {
161 fblock.add_key(key);
162 }
163
164 self.num_entries += 1;
165 dblock.add(key, val);
166 Ok(())
167 }
168
169 fn write_data_block(&mut self, next_key: InternalKey<'_>) -> Result<()> {
173 assert!(self.data_block.is_some());
174
175 let block = self.data_block.take().unwrap();
176 let sep = self.opt.cmp.find_shortest_sep(block.last_key(), next_key);
177 self.prev_block_last_key = Vec::from(block.last_key());
178 let contents = block.finish();
179
180 let compressor_list = self.opt.compressor_list.clone();
181 let compressor = compressor_list.get(self.opt.compressor)?;
182
183 let handle = self.write_block(contents, (self.opt.compressor, compressor))?;
184
185 let mut handle_enc = [0_u8; 16];
186 let enc_len = handle.encode_to(&mut handle_enc);
187
188 self.index_block
189 .as_mut()
190 .unwrap()
191 .add(&sep, &handle_enc[0..enc_len]);
192 self.data_block = Some(BlockBuilder::new(self.opt.clone()));
193
194 if let Some(ref mut fblock) = self.filter_block {
195 fblock.start_block(self.offset);
196 }
197
198 Ok(())
199 }
200
201 fn write_block(
203 &mut self,
204 block: BlockContents,
205 compressor_id_pair: (u8, &dyn Compressor),
206 ) -> Result<BlockHandle> {
207 let (ctype, compressor) = compressor_id_pair;
208 let data = compressor.encode(block.to_vec())?;
209
210 let mut digest = crc::digest();
211
212 digest.update(&data);
213 digest.update(&[ctype; TABLE_BLOCK_COMPRESS_LEN]);
214
215 self.dst.write_all(&data)?;
216 self.dst.write_all(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?;
217 self.dst.write_fixedint(mask_crc(digest.finalize()))?;
218
219 let handle = BlockHandle::new(self.offset, data.len());
220 self.offset += data.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
221
222 Ok(handle)
223 }
224
225 pub fn finish(mut self) -> Result<usize> {
226 assert!(self.data_block.is_some());
227
228 let compressor_list = self.opt.compressor_list.clone();
229 let compressor_id_pair = (
230 self.opt.compressor,
231 compressor_list.get(self.opt.compressor)?,
232 );
233
234 if self.data_block.as_ref().unwrap().entries() > 0 {
236 let key_past_last = self
238 .opt
239 .cmp
240 .find_short_succ(self.data_block.as_ref().unwrap().last_key());
241 self.write_data_block(&key_past_last)?;
242 }
243
244 let mut meta_ix_block = BlockBuilder::new(self.opt.clone());
246
247 if self.filter_block.is_some() {
248 let fblock = self.filter_block.take().unwrap();
250 let filter_key = format!("filter.{}", fblock.filter_name());
251 let fblock_data = fblock.finish();
252 let fblock_handle = self.write_block(
253 fblock_data.into(),
254 (compressor::NoneCompressor::ID, &compressor::NoneCompressor),
255 )?;
256
257 let mut handle_enc = [0_u8; 16];
258 let enc_len = fblock_handle.encode_to(&mut handle_enc);
259
260 meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]);
261 }
262
263 let meta_ix = meta_ix_block.finish();
265 let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?;
266
267 let index_cont = self.index_block.take().unwrap().finish();
269 let ix_handle = self.write_block(index_cont, compressor_id_pair)?;
270
271 let footer = Footer::new(meta_ix_handle, ix_handle);
273 let mut buf = [0; FULL_FOOTER_LENGTH];
274 footer.encode(&mut buf);
275
276 self.offset += self.dst.write(&buf[..])?;
277 self.dst.flush()?;
278 Ok(self.offset)
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::blockhandle::BlockHandle;
286 use crate::options;
287
288 #[test]
289 fn test_footer() {
290 let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5));
291 let mut buf = [0; 48];
292 f.encode(&mut buf[..]);
293
294 let f2 = Footer::decode(&buf).unwrap();
295 assert_eq!(f2.meta_index.offset(), 44);
296 assert_eq!(f2.meta_index.size(), 4);
297 assert_eq!(f2.index.offset(), 55);
298 assert_eq!(f2.index.size(), 5);
299 }
300
301 #[test]
302 fn test_table_builder() {
303 let mut d = Vec::with_capacity(512);
304 let mut opt = options::for_test();
305 opt.block_restart_interval = 3;
306 opt.compressor = compressor::SnappyCompressor::ID;
307 let mut b = TableBuilder::new_raw(opt, &mut d);
308
309 let data = [
310 ("abc", "def"),
311 ("abe", "dee"),
312 ("bcd", "asa"),
313 ("dcc", "a00"),
314 ];
315 let data2 = [
316 ("abd", "def"),
317 ("abf", "dee"),
318 ("ccd", "asa"),
319 ("dcd", "a00"),
320 ];
321
322 for i in 0..data.len() {
323 b.add(data[i].0.as_bytes(), data[i].1.as_bytes()).unwrap();
324 b.add(data2[i].0.as_bytes(), data2[i].1.as_bytes()).unwrap();
325 }
326
327 let estimate = b.size_estimate();
328
329 assert_eq!(143, estimate);
330 assert!(b.filter_block.is_some());
331
332 let actual = b.finish().unwrap();
333 assert_eq!(223, actual);
334 }
335
336 #[test]
337 #[should_panic]
338 fn test_bad_input() {
339 let mut d = Vec::with_capacity(512);
340 let mut opt = options::for_test();
341 opt.block_restart_interval = 3;
342 let mut b = TableBuilder::new_raw(opt, &mut d);
343
344 let data = [
346 ("abc", "def"),
347 ("abc", "dee"),
348 ("bcd", "asa"),
349 ("bsr", "a00"),
350 ];
351
352 for &(k, v) in data.iter() {
353 b.add(k.as_bytes(), v.as_bytes()).unwrap();
354 }
355 b.finish().unwrap();
356 }
357}