1use std::fs::File;
27use std::io::{BufWriter, Seek, SeekFrom, Write};
28use std::path::Path;
29
30use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
31use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
32use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
33
34pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
36
37pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
39
40#[derive(Debug, Clone)]
42pub struct SSTableBuilderOptions {
43 pub block_size: usize,
45 pub restart_interval: usize,
47 pub compression: BlockType,
49 pub filter_policy: Option<Box<dyn FilterPolicy>>,
51 pub use_block_hash_index: bool,
53 pub use_two_level_index: bool,
55}
56
57impl Default for SSTableBuilderOptions {
58 fn default() -> Self {
59 Self {
60 block_size: DEFAULT_BLOCK_SIZE,
61 restart_interval: DEFAULT_RESTART_INTERVAL,
62 compression: BlockType::Uncompressed,
63 filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
64 DEFAULT_FILTER_BITS_PER_KEY,
65 ))),
66 use_block_hash_index: true,
67 use_two_level_index: false,
68 }
69 }
70}
71
72impl Clone for Box<dyn FilterPolicy> {
74 fn clone(&self) -> Self {
75 Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
78 }
79}
80
81#[derive(Debug, Clone)]
83struct IndexEntry {
84 largest_key: Vec<u8>,
86 handle: BlockHandle,
88}
89
90pub struct SSTableBuilder {
92 options: SSTableBuilderOptions,
94 file: BufWriter<File>,
96 data_block: BlockBuilder,
98 index_entries: Vec<IndexEntry>,
100 filter_builder: Option<Box<dyn FilterBuilder>>,
102 offset: u64,
104 num_entries: u64,
106 smallest_key: Option<Vec<u8>>,
108 largest_key: Option<Vec<u8>>,
110 last_key: Option<Vec<u8>>,
112 pending_index_entry: bool,
114 pending_largest_key: Vec<u8>,
116 data_section_start: u64,
118 estimated_keys: usize,
120}
121
122impl SSTableBuilder {
123 pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
125 let file = File::create(path)?;
126 let mut writer = BufWriter::new(file);
127
128 writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
130
131 let data_block = if options.use_block_hash_index {
132 BlockBuilder::with_hash_index(options.restart_interval)
133 } else {
134 BlockBuilder::new(options.restart_interval)
135 };
136
137 Ok(Self {
138 options,
139 file: writer,
140 data_block,
141 index_entries: Vec::new(),
142 filter_builder: None,
143 offset: HEADER_SIZE as u64,
144 num_entries: 0,
145 smallest_key: None,
146 largest_key: None,
147 last_key: None,
148 pending_index_entry: false,
149 pending_largest_key: Vec::new(),
150 data_section_start: HEADER_SIZE as u64,
151 estimated_keys: 0,
152 })
153 }
154
155 pub fn set_estimated_keys(&mut self, count: usize) {
157 self.estimated_keys = count;
158
159 if let Some(ref policy) = self.options.filter_policy {
161 self.filter_builder = Some(policy.create_builder(count));
162 }
163 }
164
165 pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
169 if let Some(ref last) = self.last_key {
171 debug_assert!(
172 key > last.as_slice(),
173 "Keys must be added in sorted order"
174 );
175 }
176
177 if self.pending_index_entry {
179 self.add_index_entry(&self.pending_largest_key.clone())?;
180 self.pending_index_entry = false;
181 }
182
183 if let Some(ref mut builder) = self.filter_builder {
185 builder.add_key(key);
186 }
187
188 self.data_block.add(key, value);
190
191 if self.smallest_key.is_none() {
193 self.smallest_key = Some(key.to_vec());
194 }
195 self.largest_key = Some(key.to_vec());
196 self.last_key = Some(key.to_vec());
197 self.num_entries += 1;
198
199 if self.data_block.estimated_size() >= self.options.block_size {
201 self.flush_data_block()?;
202 }
203
204 Ok(())
205 }
206
207 fn flush_data_block(&mut self) -> std::io::Result<()> {
209 if self.data_block.is_empty() {
210 return Ok(());
211 }
212
213 let block_data = self.data_block.finish();
215 let block_size = block_data.len();
216
217 let (compressed_data, block_type) = self.maybe_compress(&block_data);
219
220 let block_offset = self.offset;
222 self.file.write_all(&compressed_data)?;
223
224 self.file.write_all(&[block_type as u8])?;
226 let checksum = crc32fast::hash(&compressed_data);
227 self.file.write_all(&checksum.to_le_bytes())?;
228
229 let total_size = compressed_data.len() + 1 + 4; self.offset += total_size as u64;
232
233 if let Some(ref key) = self.largest_key {
235 self.pending_largest_key = key.clone();
236 }
237 self.pending_index_entry = true;
238
239 let handle = BlockHandle::new(block_offset, block_size as u64);
241 self.index_entries.push(IndexEntry {
242 largest_key: self.pending_largest_key.clone(),
243 handle,
244 });
245
246 self.data_block.reset();
248
249 Ok(())
250 }
251
252 fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
254 match self.options.compression {
255 BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
256 BlockType::Lz4 => {
257 match lz4_flex::compress_prepend_size(data) {
259 compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
260 _ => (data.to_vec(), BlockType::Uncompressed),
261 }
262 }
263 BlockType::Zstd => {
264 match zstd::encode_all(data, 3) {
266 Ok(compressed) if compressed.len() < data.len() => (compressed, BlockType::Zstd),
267 _ => (data.to_vec(), BlockType::Uncompressed),
268 }
269 }
270 BlockType::Snappy => {
271 (data.to_vec(), BlockType::Uncompressed)
273 }
274 }
275 }
276
277 fn add_index_entry(&mut self, largest_key: &[u8]) -> std::io::Result<()> {
279 Ok(())
282 }
283
284 pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
286 self.flush_data_block()?;
288
289 let data_section_end = self.offset;
290 let data_section_size = data_section_end - self.data_section_start;
291 let data_checksum = 0u32; let mut sections = vec![Section::new(
294 SectionType::DataBlocks,
295 self.data_section_start,
296 data_section_size,
297 data_checksum,
298 )];
299
300 if let Some(mut builder) = self.filter_builder.take() {
302 let filter_data = builder.finish();
303 let filter_offset = self.offset;
304 let filter_size = filter_data.len() as u64;
305 let filter_checksum = crc32fast::hash(&filter_data);
306
307 self.file.write_all(&filter_data)?;
308 self.offset += filter_size;
309
310 sections.push(Section::new(
311 SectionType::Filter,
312 filter_offset,
313 filter_size,
314 filter_checksum,
315 ));
316 }
317
318 let index_offset = self.offset;
320 let index_data = self.build_index()?;
321 let index_size = index_data.len() as u64;
322 let index_checksum = crc32fast::hash(&index_data);
323
324 self.file.write_all(&index_data)?;
325 self.offset += index_size;
326
327 sections.push(Section::new(
328 SectionType::Index,
329 index_offset,
330 index_size,
331 index_checksum,
332 ));
333
334 let footer_offset = self.offset;
336 let footer = Footer::new(sections.clone());
337 let footer_data = footer.encode();
338 self.file.write_all(&footer_data)?;
339
340 let header = Header::new(sections.len() as u32, footer_offset);
342 self.file.seek(SeekFrom::Start(0))?;
343 self.file.write_all(&header.encode())?;
344
345 self.file.flush()?;
347
348 Ok(SSTableBuilderResult {
349 file_size: footer_offset + footer_data.len() as u64,
350 num_entries: self.num_entries,
351 num_data_blocks: self.index_entries.len(),
352 smallest_key: self.smallest_key,
353 largest_key: self.largest_key,
354 })
355 }
356
357 fn build_index(&self) -> std::io::Result<Vec<u8>> {
359 let mut builder = BlockBuilder::new(1); for entry in &self.index_entries {
362 let handle_encoded = entry.handle.encode();
363 builder.add(&entry.largest_key, &handle_encoded);
364 }
365
366 Ok(builder.finish())
367 }
368
369 pub fn num_entries(&self) -> u64 {
371 self.num_entries
372 }
373
374 pub fn file_size(&self) -> u64 {
376 self.offset
377 }
378}
379
380#[derive(Debug)]
382pub struct SSTableBuilderResult {
383 pub file_size: u64,
385 pub num_entries: u64,
387 pub num_data_blocks: usize,
389 pub smallest_key: Option<Vec<u8>>,
391 pub largest_key: Option<Vec<u8>>,
393}
394
395#[cfg(test)]
400mod tests {
401 use super::*;
402 use tempfile::tempdir;
403
404 #[test]
405 fn test_builder_basic() {
406 let dir = tempdir().unwrap();
407 let path = dir.path().join("test.sst");
408
409 let options = SSTableBuilderOptions {
410 block_size: 256, filter_policy: None,
412 ..Default::default()
413 };
414
415 let mut builder = SSTableBuilder::new(&path, options).unwrap();
416
417 for i in 0..100 {
418 let key = format!("key{:05}", i);
419 let value = format!("value{:05}", i);
420 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
421 }
422
423 let result = builder.finish().unwrap();
424
425 assert_eq!(result.num_entries, 100);
426 assert!(result.num_data_blocks > 0);
427 assert!(result.file_size > 0);
428 }
429
430 #[test]
431 fn test_builder_with_filter() {
432 let dir = tempdir().unwrap();
433 let path = dir.path().join("test_filter.sst");
434
435 let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
436 builder.set_estimated_keys(1000);
437
438 for i in 0..1000 {
439 let key = format!("key{:06}", i);
440 let value = format!("value{:06}", i);
441 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
442 }
443
444 let result = builder.finish().unwrap();
445
446 assert_eq!(result.num_entries, 1000);
447 assert!(result.file_size > 0);
448 }
449}