1use std::fs::File;
27use std::io::{BufWriter, Seek, SeekFrom, Write};
28use std::path::Path;
29
30use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
31use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
32use super::format::{Footer, HEADER_SIZE, Header, Section, SectionType};
33
34pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
36
37pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
39
40#[derive(Debug, Clone)]
42pub struct SSTableBuilderOptions {
43 pub block_size: usize,
45 pub restart_interval: usize,
47 pub compression: BlockType,
49 pub filter_policy: Option<Box<dyn FilterPolicy>>,
51 pub use_block_hash_index: bool,
53 pub use_two_level_index: bool,
55}
56
57impl Default for SSTableBuilderOptions {
58 fn default() -> Self {
59 Self {
60 block_size: DEFAULT_BLOCK_SIZE,
61 restart_interval: DEFAULT_RESTART_INTERVAL,
62 compression: BlockType::Uncompressed,
63 filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
64 DEFAULT_FILTER_BITS_PER_KEY,
65 ))),
66 use_block_hash_index: true,
67 use_two_level_index: false,
68 }
69 }
70}
71
72impl Clone for Box<dyn FilterPolicy> {
74 fn clone(&self) -> Self {
75 Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
78 }
79}
80
81#[derive(Debug, Clone)]
83struct IndexEntry {
84 largest_key: Vec<u8>,
86 handle: BlockHandle,
88}
89
90pub struct SSTableBuilder {
92 options: SSTableBuilderOptions,
94 file: BufWriter<File>,
96 data_block: BlockBuilder,
98 index_entries: Vec<IndexEntry>,
100 filter_builder: Option<Box<dyn FilterBuilder>>,
102 offset: u64,
104 num_entries: u64,
106 smallest_key: Option<Vec<u8>>,
108 largest_key: Option<Vec<u8>>,
110 last_key: Option<Vec<u8>>,
112 pending_index_entry: bool,
114 pending_largest_key: Vec<u8>,
116 data_section_start: u64,
118 estimated_keys: usize,
120}
121
122impl SSTableBuilder {
123 pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
125 let file = File::create(path)?;
126 let mut writer = BufWriter::new(file);
127
128 writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
130
131 let data_block = if options.use_block_hash_index {
132 BlockBuilder::with_hash_index(options.restart_interval)
133 } else {
134 BlockBuilder::new(options.restart_interval)
135 };
136
137 Ok(Self {
138 options,
139 file: writer,
140 data_block,
141 index_entries: Vec::new(),
142 filter_builder: None,
143 offset: HEADER_SIZE as u64,
144 num_entries: 0,
145 smallest_key: None,
146 largest_key: None,
147 last_key: None,
148 pending_index_entry: false,
149 pending_largest_key: Vec::new(),
150 data_section_start: HEADER_SIZE as u64,
151 estimated_keys: 0,
152 })
153 }
154
155 pub fn set_estimated_keys(&mut self, count: usize) {
157 self.estimated_keys = count;
158
159 if let Some(ref policy) = self.options.filter_policy {
161 self.filter_builder = Some(policy.create_builder(count));
162 }
163 }
164
165 pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
169 if let Some(ref last) = self.last_key {
171 debug_assert!(key > last.as_slice(), "Keys must be added in sorted order");
172 }
173
174 if self.pending_index_entry {
176 self.add_index_entry(&self.pending_largest_key.clone())?;
177 self.pending_index_entry = false;
178 }
179
180 if let Some(ref mut builder) = self.filter_builder {
182 builder.add_key(key);
183 }
184
185 self.data_block.add(key, value);
187
188 if self.smallest_key.is_none() {
190 self.smallest_key = Some(key.to_vec());
191 }
192 self.largest_key = Some(key.to_vec());
193 self.last_key = Some(key.to_vec());
194 self.num_entries += 1;
195
196 if self.data_block.estimated_size() >= self.options.block_size {
198 self.flush_data_block()?;
199 }
200
201 Ok(())
202 }
203
204 fn flush_data_block(&mut self) -> std::io::Result<()> {
206 if self.data_block.is_empty() {
207 return Ok(());
208 }
209
210 let block_data = self.data_block.finish();
212 let block_size = block_data.len();
213
214 let (compressed_data, block_type) = self.maybe_compress(&block_data);
216
217 let block_offset = self.offset;
219 self.file.write_all(&compressed_data)?;
220
221 self.file.write_all(&[block_type as u8])?;
223 let checksum = crc32fast::hash(&compressed_data);
224 self.file.write_all(&checksum.to_le_bytes())?;
225
226 let total_size = compressed_data.len() + 1 + 4; self.offset += total_size as u64;
229
230 if let Some(ref key) = self.largest_key {
232 self.pending_largest_key = key.clone();
233 }
234 self.pending_index_entry = true;
235
236 let handle = BlockHandle::new(block_offset, block_size as u64);
238 self.index_entries.push(IndexEntry {
239 largest_key: self.pending_largest_key.clone(),
240 handle,
241 });
242
243 self.data_block.reset();
245
246 Ok(())
247 }
248
249 fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
251 match self.options.compression {
252 BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
253 BlockType::Lz4 => {
254 match lz4_flex::compress_prepend_size(data) {
256 compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
257 _ => (data.to_vec(), BlockType::Uncompressed),
258 }
259 }
260 BlockType::Zstd => {
261 match zstd::encode_all(data, 3) {
263 Ok(compressed) if compressed.len() < data.len() => {
264 (compressed, BlockType::Zstd)
265 }
266 _ => (data.to_vec(), BlockType::Uncompressed),
267 }
268 }
269 BlockType::Snappy => {
270 (data.to_vec(), BlockType::Uncompressed)
272 }
273 }
274 }
275
276 fn add_index_entry(&mut self, _largest_key: &[u8]) -> std::io::Result<()> {
278 Ok(())
281 }
282
283 pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
285 self.flush_data_block()?;
287
288 let data_section_end = self.offset;
289 let data_section_size = data_section_end - self.data_section_start;
290 let data_checksum = 0u32; let mut sections = vec![Section::new(
293 SectionType::DataBlocks,
294 self.data_section_start,
295 data_section_size,
296 data_checksum,
297 )];
298
299 if let Some(mut builder) = self.filter_builder.take() {
301 let filter_data = builder.finish();
302 let filter_offset = self.offset;
303 let filter_size = filter_data.len() as u64;
304 let filter_checksum = crc32fast::hash(&filter_data);
305
306 self.file.write_all(&filter_data)?;
307 self.offset += filter_size;
308
309 sections.push(Section::new(
310 SectionType::Filter,
311 filter_offset,
312 filter_size,
313 filter_checksum,
314 ));
315 }
316
317 let index_offset = self.offset;
319 let index_data = self.build_index()?;
320 let index_size = index_data.len() as u64;
321 let index_checksum = crc32fast::hash(&index_data);
322
323 self.file.write_all(&index_data)?;
324 self.offset += index_size;
325
326 sections.push(Section::new(
327 SectionType::Index,
328 index_offset,
329 index_size,
330 index_checksum,
331 ));
332
333 let footer_offset = self.offset;
335 let footer = Footer::new(sections.clone());
336 let footer_data = footer.encode();
337 self.file.write_all(&footer_data)?;
338
339 let header = Header::new(sections.len() as u32, footer_offset);
341 self.file.seek(SeekFrom::Start(0))?;
342 self.file.write_all(&header.encode())?;
343
344 self.file.flush()?;
346
347 Ok(SSTableBuilderResult {
348 file_size: footer_offset + footer_data.len() as u64,
349 num_entries: self.num_entries,
350 num_data_blocks: self.index_entries.len(),
351 smallest_key: self.smallest_key,
352 largest_key: self.largest_key,
353 })
354 }
355
356 fn build_index(&self) -> std::io::Result<Vec<u8>> {
358 let mut builder = BlockBuilder::new(1); for entry in &self.index_entries {
361 let handle_encoded = entry.handle.encode();
362 builder.add(&entry.largest_key, &handle_encoded);
363 }
364
365 Ok(builder.finish())
366 }
367
368 pub fn num_entries(&self) -> u64 {
370 self.num_entries
371 }
372
373 pub fn file_size(&self) -> u64 {
375 self.offset
376 }
377}
378
379#[derive(Debug)]
381pub struct SSTableBuilderResult {
382 pub file_size: u64,
384 pub num_entries: u64,
386 pub num_data_blocks: usize,
388 pub smallest_key: Option<Vec<u8>>,
390 pub largest_key: Option<Vec<u8>>,
392}
393
394#[cfg(test)]
399mod tests {
400 use super::*;
401 use tempfile::tempdir;
402
403 #[test]
404 fn test_builder_basic() {
405 let dir = tempdir().unwrap();
406 let path = dir.path().join("test.sst");
407
408 let options = SSTableBuilderOptions {
409 block_size: 256, filter_policy: None,
411 ..Default::default()
412 };
413
414 let mut builder = SSTableBuilder::new(&path, options).unwrap();
415
416 for i in 0..100 {
417 let key = format!("key{:05}", i);
418 let value = format!("value{:05}", i);
419 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
420 }
421
422 let result = builder.finish().unwrap();
423
424 assert_eq!(result.num_entries, 100);
425 assert!(result.num_data_blocks > 0);
426 assert!(result.file_size > 0);
427 }
428
429 #[test]
430 fn test_builder_with_filter() {
431 let dir = tempdir().unwrap();
432 let path = dir.path().join("test_filter.sst");
433
434 let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
435 builder.set_estimated_keys(1000);
436
437 for i in 0..1000 {
438 let key = format!("key{:06}", i);
439 let value = format!("value{:06}", i);
440 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
441 }
442
443 let result = builder.finish().unwrap();
444
445 assert_eq!(result.num_entries, 1000);
446 assert!(result.file_size > 0);
447 }
448}