sochdb_storage/sstable/
builder.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SSTable Builder
16//!
17//! This module provides a builder for creating SSTable files with:
18//! - Configurable block size
19//! - Optional bloom/ribbon/xor filters
20//! - Two-level index for large tables
21//! - Compression support
22
23use std::fs::File;
24use std::io::{BufWriter, Seek, SeekFrom, Write};
25use std::path::Path;
26
27use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
28use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
29use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
30
31/// Default block size (4KB - matches typical filesystem block)
32pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
33
34/// Default filter bits per key
35pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
36
37/// Builder options
38#[derive(Debug, Clone)]
39pub struct SSTableBuilderOptions {
40    /// Target block size in bytes
41    pub block_size: usize,
42    /// Restart interval for prefix compression
43    pub restart_interval: usize,
44    /// Block compression type
45    pub compression: BlockType,
46    /// Filter policy (None = no filter)
47    pub filter_policy: Option<Box<dyn FilterPolicy>>,
48    /// Use hash index for blocks
49    pub use_block_hash_index: bool,
50    /// Enable two-level index for large tables
51    pub use_two_level_index: bool,
52}
53
54impl Default for SSTableBuilderOptions {
55    fn default() -> Self {
56        Self {
57            block_size: DEFAULT_BLOCK_SIZE,
58            restart_interval: DEFAULT_RESTART_INTERVAL,
59            compression: BlockType::Uncompressed,
60            filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
61                DEFAULT_FILTER_BITS_PER_KEY,
62            ))),
63            use_block_hash_index: true,
64            use_two_level_index: false,
65        }
66    }
67}
68
69// Implement Clone for FilterPolicy
70impl Clone for Box<dyn FilterPolicy> {
71    fn clone(&self) -> Self {
72        // Create a new policy with same configuration
73        // This is a simplified approach - could use a proper clone trait
74        Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
75    }
76}
77
78/// Index entry pointing to a data block
79#[derive(Debug, Clone)]
80struct IndexEntry {
81    /// Largest key in the block (separator key)
82    largest_key: Vec<u8>,
83    /// Block handle (offset, size)
84    handle: BlockHandle,
85}
86
87/// SSTable builder
88pub struct SSTableBuilder {
89    /// Options
90    options: SSTableBuilderOptions,
91    /// Output file
92    file: BufWriter<File>,
93    /// Current data block builder
94    data_block: BlockBuilder,
95    /// Index entries
96    index_entries: Vec<IndexEntry>,
97    /// Filter builder (if enabled)
98    filter_builder: Option<Box<dyn FilterBuilder>>,
99    /// Current file offset
100    offset: u64,
101    /// Number of entries
102    num_entries: u64,
103    /// Smallest key
104    smallest_key: Option<Vec<u8>>,
105    /// Largest key
106    largest_key: Option<Vec<u8>>,
107    /// Last key added (for sorting check)
108    last_key: Option<Vec<u8>>,
109    /// Pending index entry for current block
110    pending_index_entry: bool,
111    /// Last block's largest key
112    pending_largest_key: Vec<u8>,
113    /// Data blocks section start
114    data_section_start: u64,
115    /// Estimated keys for filter sizing
116    estimated_keys: usize,
117}
118
119impl SSTableBuilder {
120    /// Create a new SSTable builder
121    pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
122        let file = File::create(path)?;
123        let mut writer = BufWriter::new(file);
124        
125        // Reserve space for header
126        writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
127        
128        let data_block = if options.use_block_hash_index {
129            BlockBuilder::with_hash_index(options.restart_interval)
130        } else {
131            BlockBuilder::new(options.restart_interval)
132        };
133        
134        Ok(Self {
135            options,
136            file: writer,
137            data_block,
138            index_entries: Vec::new(),
139            filter_builder: None,
140            offset: HEADER_SIZE as u64,
141            num_entries: 0,
142            smallest_key: None,
143            largest_key: None,
144            last_key: None,
145            pending_index_entry: false,
146            pending_largest_key: Vec::new(),
147            data_section_start: HEADER_SIZE as u64,
148            estimated_keys: 0,
149        })
150    }
151
152    /// Set estimated number of keys (for filter sizing)
153    pub fn set_estimated_keys(&mut self, count: usize) {
154        self.estimated_keys = count;
155        
156        // Initialize filter builder with proper size
157        if let Some(ref policy) = self.options.filter_policy {
158            self.filter_builder = Some(policy.create_builder(count));
159        }
160    }
161
162    /// Add a key-value pair
163    ///
164    /// Keys must be added in sorted order.
165    pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
166        // Verify sorted order
167        if let Some(ref last) = self.last_key {
168            debug_assert!(
169                key > last.as_slice(),
170                "Keys must be added in sorted order"
171            );
172        }
173
174        // Handle pending index entry from previous block
175        if self.pending_index_entry {
176            self.add_index_entry(&self.pending_largest_key.clone())?;
177            self.pending_index_entry = false;
178        }
179
180        // Add to filter if enabled
181        if let Some(ref mut builder) = self.filter_builder {
182            builder.add_key(key);
183        }
184
185        // Add to data block
186        self.data_block.add(key, value);
187
188        // Track keys
189        if self.smallest_key.is_none() {
190            self.smallest_key = Some(key.to_vec());
191        }
192        self.largest_key = Some(key.to_vec());
193        self.last_key = Some(key.to_vec());
194        self.num_entries += 1;
195
196        // Flush block if it's large enough
197        if self.data_block.estimated_size() >= self.options.block_size {
198            self.flush_data_block()?;
199        }
200
201        Ok(())
202    }
203
204    /// Flush current data block to file
205    fn flush_data_block(&mut self) -> std::io::Result<()> {
206        if self.data_block.is_empty() {
207            return Ok(());
208        }
209
210        // Get block contents
211        let block_data = self.data_block.finish();
212        let block_size = block_data.len();
213
214        // Compress if enabled
215        let (compressed_data, block_type) = self.maybe_compress(&block_data);
216
217        // Write block
218        let block_offset = self.offset;
219        self.file.write_all(&compressed_data)?;
220        
221        // Write block trailer (type + checksum)
222        self.file.write_all(&[block_type as u8])?;
223        let checksum = crc32fast::hash(&compressed_data);
224        self.file.write_all(&checksum.to_le_bytes())?;
225
226        // Update offset
227        let total_size = compressed_data.len() + 1 + 4; // data + type + checksum
228        self.offset += total_size as u64;
229
230        // Record pending index entry
231        if let Some(ref key) = self.largest_key {
232            self.pending_largest_key = key.clone();
233        }
234        self.pending_index_entry = true;
235
236        // Create index entry
237        let handle = BlockHandle::new(block_offset, block_size as u64);
238        self.index_entries.push(IndexEntry {
239            largest_key: self.pending_largest_key.clone(),
240            handle,
241        });
242
243        // Reset block builder
244        self.data_block.reset();
245
246        Ok(())
247    }
248
249    /// Maybe compress block data
250    fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
251        match self.options.compression {
252            BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
253            BlockType::Lz4 => {
254                // Use LZ4 compression
255                match lz4_flex::compress_prepend_size(data) {
256                    compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
257                    _ => (data.to_vec(), BlockType::Uncompressed),
258                }
259            }
260            BlockType::Zstd => {
261                // Use Zstd compression
262                match zstd::encode_all(data, 3) {
263                    Ok(compressed) if compressed.len() < data.len() => (compressed, BlockType::Zstd),
264                    _ => (data.to_vec(), BlockType::Uncompressed),
265                }
266            }
267            BlockType::Snappy => {
268                // Snappy not implemented - fall back to uncompressed
269                (data.to_vec(), BlockType::Uncompressed)
270            }
271        }
272    }
273
274    /// Add index entry
275    fn add_index_entry(&mut self, largest_key: &[u8]) -> std::io::Result<()> {
276        // Index entries are already added in flush_data_block
277        // This is for any additional processing
278        Ok(())
279    }
280
281    /// Finish building the SSTable
282    pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
283        // Flush any remaining data
284        self.flush_data_block()?;
285
286        let data_section_end = self.offset;
287        let data_section_size = data_section_end - self.data_section_start;
288        let data_checksum = 0u32; // Would compute from all blocks
289
290        let mut sections = vec![Section::new(
291            SectionType::DataBlocks,
292            self.data_section_start,
293            data_section_size,
294            data_checksum,
295        )];
296
297        // Write filter section
298        if let Some(mut builder) = self.filter_builder.take() {
299            let filter_data = builder.finish();
300            let filter_offset = self.offset;
301            let filter_size = filter_data.len() as u64;
302            let filter_checksum = crc32fast::hash(&filter_data);
303
304            self.file.write_all(&filter_data)?;
305            self.offset += filter_size;
306
307            sections.push(Section::new(
308                SectionType::Filter,
309                filter_offset,
310                filter_size,
311                filter_checksum,
312            ));
313        }
314
315        // Write index section
316        let index_offset = self.offset;
317        let index_data = self.build_index()?;
318        let index_size = index_data.len() as u64;
319        let index_checksum = crc32fast::hash(&index_data);
320
321        self.file.write_all(&index_data)?;
322        self.offset += index_size;
323
324        sections.push(Section::new(
325            SectionType::Index,
326            index_offset,
327            index_size,
328            index_checksum,
329        ));
330
331        // Write footer
332        let footer_offset = self.offset;
333        let footer = Footer::new(sections.clone());
334        let footer_data = footer.encode();
335        self.file.write_all(&footer_data)?;
336
337        // Write header at start
338        let header = Header::new(sections.len() as u32, footer_offset);
339        self.file.seek(SeekFrom::Start(0))?;
340        self.file.write_all(&header.encode())?;
341
342        // Flush and sync
343        self.file.flush()?;
344
345        Ok(SSTableBuilderResult {
346            file_size: footer_offset + footer_data.len() as u64,
347            num_entries: self.num_entries,
348            num_data_blocks: self.index_entries.len(),
349            smallest_key: self.smallest_key,
350            largest_key: self.largest_key,
351        })
352    }
353
354    /// Build index block
355    fn build_index(&self) -> std::io::Result<Vec<u8>> {
356        let mut builder = BlockBuilder::new(1); // No prefix compression for index
357
358        for entry in &self.index_entries {
359            let handle_encoded = entry.handle.encode();
360            builder.add(&entry.largest_key, &handle_encoded);
361        }
362
363        Ok(builder.finish())
364    }
365
366    /// Get number of entries added
367    pub fn num_entries(&self) -> u64 {
368        self.num_entries
369    }
370
371    /// Get current file size
372    pub fn file_size(&self) -> u64 {
373        self.offset
374    }
375}
376
377/// Result of building an SSTable
378#[derive(Debug)]
379pub struct SSTableBuilderResult {
380    /// Final file size in bytes
381    pub file_size: u64,
382    /// Number of entries
383    pub num_entries: u64,
384    /// Number of data blocks
385    pub num_data_blocks: usize,
386    /// Smallest key
387    pub smallest_key: Option<Vec<u8>>,
388    /// Largest key
389    pub largest_key: Option<Vec<u8>>,
390}
391
392// =============================================================================
393// Tests
394// =============================================================================
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use tempfile::tempdir;
400
401    #[test]
402    fn test_builder_basic() {
403        let dir = tempdir().unwrap();
404        let path = dir.path().join("test.sst");
405
406        let options = SSTableBuilderOptions {
407            block_size: 256, // Small blocks for testing
408            filter_policy: None,
409            ..Default::default()
410        };
411
412        let mut builder = SSTableBuilder::new(&path, options).unwrap();
413
414        for i in 0..100 {
415            let key = format!("key{:05}", i);
416            let value = format!("value{:05}", i);
417            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
418        }
419
420        let result = builder.finish().unwrap();
421
422        assert_eq!(result.num_entries, 100);
423        assert!(result.num_data_blocks > 0);
424        assert!(result.file_size > 0);
425    }
426
427    #[test]
428    fn test_builder_with_filter() {
429        let dir = tempdir().unwrap();
430        let path = dir.path().join("test_filter.sst");
431
432        let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
433        builder.set_estimated_keys(1000);
434
435        for i in 0..1000 {
436            let key = format!("key{:06}", i);
437            let value = format!("value{:06}", i);
438            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
439        }
440
441        let result = builder.finish().unwrap();
442
443        assert_eq!(result.num_entries, 1000);
444        assert!(result.file_size > 0);
445    }
446}