1use std::fs::File;
24use std::io::{BufWriter, Seek, SeekFrom, Write};
25use std::path::Path;
26
27use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
28use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
29use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
30
31pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
33
34pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
36
37#[derive(Debug, Clone)]
39pub struct SSTableBuilderOptions {
40 pub block_size: usize,
42 pub restart_interval: usize,
44 pub compression: BlockType,
46 pub filter_policy: Option<Box<dyn FilterPolicy>>,
48 pub use_block_hash_index: bool,
50 pub use_two_level_index: bool,
52}
53
54impl Default for SSTableBuilderOptions {
55 fn default() -> Self {
56 Self {
57 block_size: DEFAULT_BLOCK_SIZE,
58 restart_interval: DEFAULT_RESTART_INTERVAL,
59 compression: BlockType::Uncompressed,
60 filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
61 DEFAULT_FILTER_BITS_PER_KEY,
62 ))),
63 use_block_hash_index: true,
64 use_two_level_index: false,
65 }
66 }
67}
68
69impl Clone for Box<dyn FilterPolicy> {
71 fn clone(&self) -> Self {
72 Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
75 }
76}
77
78#[derive(Debug, Clone)]
80struct IndexEntry {
81 largest_key: Vec<u8>,
83 handle: BlockHandle,
85}
86
87pub struct SSTableBuilder {
89 options: SSTableBuilderOptions,
91 file: BufWriter<File>,
93 data_block: BlockBuilder,
95 index_entries: Vec<IndexEntry>,
97 filter_builder: Option<Box<dyn FilterBuilder>>,
99 offset: u64,
101 num_entries: u64,
103 smallest_key: Option<Vec<u8>>,
105 largest_key: Option<Vec<u8>>,
107 last_key: Option<Vec<u8>>,
109 pending_index_entry: bool,
111 pending_largest_key: Vec<u8>,
113 data_section_start: u64,
115 estimated_keys: usize,
117}
118
119impl SSTableBuilder {
120 pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
122 let file = File::create(path)?;
123 let mut writer = BufWriter::new(file);
124
125 writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
127
128 let data_block = if options.use_block_hash_index {
129 BlockBuilder::with_hash_index(options.restart_interval)
130 } else {
131 BlockBuilder::new(options.restart_interval)
132 };
133
134 Ok(Self {
135 options,
136 file: writer,
137 data_block,
138 index_entries: Vec::new(),
139 filter_builder: None,
140 offset: HEADER_SIZE as u64,
141 num_entries: 0,
142 smallest_key: None,
143 largest_key: None,
144 last_key: None,
145 pending_index_entry: false,
146 pending_largest_key: Vec::new(),
147 data_section_start: HEADER_SIZE as u64,
148 estimated_keys: 0,
149 })
150 }
151
152 pub fn set_estimated_keys(&mut self, count: usize) {
154 self.estimated_keys = count;
155
156 if let Some(ref policy) = self.options.filter_policy {
158 self.filter_builder = Some(policy.create_builder(count));
159 }
160 }
161
162 pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
166 if let Some(ref last) = self.last_key {
168 debug_assert!(
169 key > last.as_slice(),
170 "Keys must be added in sorted order"
171 );
172 }
173
174 if self.pending_index_entry {
176 self.add_index_entry(&self.pending_largest_key.clone())?;
177 self.pending_index_entry = false;
178 }
179
180 if let Some(ref mut builder) = self.filter_builder {
182 builder.add_key(key);
183 }
184
185 self.data_block.add(key, value);
187
188 if self.smallest_key.is_none() {
190 self.smallest_key = Some(key.to_vec());
191 }
192 self.largest_key = Some(key.to_vec());
193 self.last_key = Some(key.to_vec());
194 self.num_entries += 1;
195
196 if self.data_block.estimated_size() >= self.options.block_size {
198 self.flush_data_block()?;
199 }
200
201 Ok(())
202 }
203
204 fn flush_data_block(&mut self) -> std::io::Result<()> {
206 if self.data_block.is_empty() {
207 return Ok(());
208 }
209
210 let block_data = self.data_block.finish();
212 let block_size = block_data.len();
213
214 let (compressed_data, block_type) = self.maybe_compress(&block_data);
216
217 let block_offset = self.offset;
219 self.file.write_all(&compressed_data)?;
220
221 self.file.write_all(&[block_type as u8])?;
223 let checksum = crc32fast::hash(&compressed_data);
224 self.file.write_all(&checksum.to_le_bytes())?;
225
226 let total_size = compressed_data.len() + 1 + 4; self.offset += total_size as u64;
229
230 if let Some(ref key) = self.largest_key {
232 self.pending_largest_key = key.clone();
233 }
234 self.pending_index_entry = true;
235
236 let handle = BlockHandle::new(block_offset, block_size as u64);
238 self.index_entries.push(IndexEntry {
239 largest_key: self.pending_largest_key.clone(),
240 handle,
241 });
242
243 self.data_block.reset();
245
246 Ok(())
247 }
248
249 fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
251 match self.options.compression {
252 BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
253 BlockType::Lz4 => {
254 match lz4_flex::compress_prepend_size(data) {
256 compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
257 _ => (data.to_vec(), BlockType::Uncompressed),
258 }
259 }
260 BlockType::Zstd => {
261 match zstd::encode_all(data, 3) {
263 Ok(compressed) if compressed.len() < data.len() => (compressed, BlockType::Zstd),
264 _ => (data.to_vec(), BlockType::Uncompressed),
265 }
266 }
267 BlockType::Snappy => {
268 (data.to_vec(), BlockType::Uncompressed)
270 }
271 }
272 }
273
274 fn add_index_entry(&mut self, largest_key: &[u8]) -> std::io::Result<()> {
276 Ok(())
279 }
280
281 pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
283 self.flush_data_block()?;
285
286 let data_section_end = self.offset;
287 let data_section_size = data_section_end - self.data_section_start;
288 let data_checksum = 0u32; let mut sections = vec![Section::new(
291 SectionType::DataBlocks,
292 self.data_section_start,
293 data_section_size,
294 data_checksum,
295 )];
296
297 if let Some(mut builder) = self.filter_builder.take() {
299 let filter_data = builder.finish();
300 let filter_offset = self.offset;
301 let filter_size = filter_data.len() as u64;
302 let filter_checksum = crc32fast::hash(&filter_data);
303
304 self.file.write_all(&filter_data)?;
305 self.offset += filter_size;
306
307 sections.push(Section::new(
308 SectionType::Filter,
309 filter_offset,
310 filter_size,
311 filter_checksum,
312 ));
313 }
314
315 let index_offset = self.offset;
317 let index_data = self.build_index()?;
318 let index_size = index_data.len() as u64;
319 let index_checksum = crc32fast::hash(&index_data);
320
321 self.file.write_all(&index_data)?;
322 self.offset += index_size;
323
324 sections.push(Section::new(
325 SectionType::Index,
326 index_offset,
327 index_size,
328 index_checksum,
329 ));
330
331 let footer_offset = self.offset;
333 let footer = Footer::new(sections.clone());
334 let footer_data = footer.encode();
335 self.file.write_all(&footer_data)?;
336
337 let header = Header::new(sections.len() as u32, footer_offset);
339 self.file.seek(SeekFrom::Start(0))?;
340 self.file.write_all(&header.encode())?;
341
342 self.file.flush()?;
344
345 Ok(SSTableBuilderResult {
346 file_size: footer_offset + footer_data.len() as u64,
347 num_entries: self.num_entries,
348 num_data_blocks: self.index_entries.len(),
349 smallest_key: self.smallest_key,
350 largest_key: self.largest_key,
351 })
352 }
353
354 fn build_index(&self) -> std::io::Result<Vec<u8>> {
356 let mut builder = BlockBuilder::new(1); for entry in &self.index_entries {
359 let handle_encoded = entry.handle.encode();
360 builder.add(&entry.largest_key, &handle_encoded);
361 }
362
363 Ok(builder.finish())
364 }
365
366 pub fn num_entries(&self) -> u64 {
368 self.num_entries
369 }
370
371 pub fn file_size(&self) -> u64 {
373 self.offset
374 }
375}
376
377#[derive(Debug)]
379pub struct SSTableBuilderResult {
380 pub file_size: u64,
382 pub num_entries: u64,
384 pub num_data_blocks: usize,
386 pub smallest_key: Option<Vec<u8>>,
388 pub largest_key: Option<Vec<u8>>,
390}
391
392#[cfg(test)]
397mod tests {
398 use super::*;
399 use tempfile::tempdir;
400
401 #[test]
402 fn test_builder_basic() {
403 let dir = tempdir().unwrap();
404 let path = dir.path().join("test.sst");
405
406 let options = SSTableBuilderOptions {
407 block_size: 256, filter_policy: None,
409 ..Default::default()
410 };
411
412 let mut builder = SSTableBuilder::new(&path, options).unwrap();
413
414 for i in 0..100 {
415 let key = format!("key{:05}", i);
416 let value = format!("value{:05}", i);
417 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
418 }
419
420 let result = builder.finish().unwrap();
421
422 assert_eq!(result.num_entries, 100);
423 assert!(result.num_data_blocks > 0);
424 assert!(result.file_size > 0);
425 }
426
427 #[test]
428 fn test_builder_with_filter() {
429 let dir = tempdir().unwrap();
430 let path = dir.path().join("test_filter.sst");
431
432 let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
433 builder.set_estimated_keys(1000);
434
435 for i in 0..1000 {
436 let key = format!("key{:06}", i);
437 let value = format!("value{:06}", i);
438 builder.add(key.as_bytes(), value.as_bytes()).unwrap();
439 }
440
441 let result = builder.finish().unwrap();
442
443 assert_eq!(result.num_entries, 1000);
444 assert!(result.file_size > 0);
445 }
446}