Skip to main content

sochdb_storage/sstable/
builder.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SSTable Builder
19//!
20//! This module provides a builder for creating SSTable files with:
21//! - Configurable block size
22//! - Optional bloom/ribbon/xor filters
23//! - Two-level index for large tables
24//! - Compression support
25
26use std::fs::File;
27use std::io::{BufWriter, Seek, SeekFrom, Write};
28use std::path::Path;
29
30use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
31use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
32use super::format::{Footer, Header, Section, SectionType, SSTableFormat, HEADER_SIZE};
33
34/// Default block size (4KB - matches typical filesystem block)
35pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
36
37/// Default filter bits per key
38pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
39
40/// Builder options
41#[derive(Debug, Clone)]
42pub struct SSTableBuilderOptions {
43    /// Target block size in bytes
44    pub block_size: usize,
45    /// Restart interval for prefix compression
46    pub restart_interval: usize,
47    /// Block compression type
48    pub compression: BlockType,
49    /// Filter policy (None = no filter)
50    pub filter_policy: Option<Box<dyn FilterPolicy>>,
51    /// Use hash index for blocks
52    pub use_block_hash_index: bool,
53    /// Enable two-level index for large tables
54    pub use_two_level_index: bool,
55}
56
57impl Default for SSTableBuilderOptions {
58    fn default() -> Self {
59        Self {
60            block_size: DEFAULT_BLOCK_SIZE,
61            restart_interval: DEFAULT_RESTART_INTERVAL,
62            compression: BlockType::Uncompressed,
63            filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
64                DEFAULT_FILTER_BITS_PER_KEY,
65            ))),
66            use_block_hash_index: true,
67            use_two_level_index: false,
68        }
69    }
70}
71
72// Implement Clone for FilterPolicy
73impl Clone for Box<dyn FilterPolicy> {
74    fn clone(&self) -> Self {
75        // Create a new policy with same configuration
76        // This is a simplified approach - could use a proper clone trait
77        Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
78    }
79}
80
81/// Index entry pointing to a data block
82#[derive(Debug, Clone)]
83struct IndexEntry {
84    /// Largest key in the block (separator key)
85    largest_key: Vec<u8>,
86    /// Block handle (offset, size)
87    handle: BlockHandle,
88}
89
90/// SSTable builder
91pub struct SSTableBuilder {
92    /// Options
93    options: SSTableBuilderOptions,
94    /// Output file
95    file: BufWriter<File>,
96    /// Current data block builder
97    data_block: BlockBuilder,
98    /// Index entries
99    index_entries: Vec<IndexEntry>,
100    /// Filter builder (if enabled)
101    filter_builder: Option<Box<dyn FilterBuilder>>,
102    /// Current file offset
103    offset: u64,
104    /// Number of entries
105    num_entries: u64,
106    /// Smallest key
107    smallest_key: Option<Vec<u8>>,
108    /// Largest key
109    largest_key: Option<Vec<u8>>,
110    /// Last key added (for sorting check)
111    last_key: Option<Vec<u8>>,
112    /// Pending index entry for current block
113    pending_index_entry: bool,
114    /// Last block's largest key
115    pending_largest_key: Vec<u8>,
116    /// Data blocks section start
117    data_section_start: u64,
118    /// Estimated keys for filter sizing
119    estimated_keys: usize,
120}
121
122impl SSTableBuilder {
123    /// Create a new SSTable builder
124    pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
125        let file = File::create(path)?;
126        let mut writer = BufWriter::new(file);
127        
128        // Reserve space for header
129        writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
130        
131        let data_block = if options.use_block_hash_index {
132            BlockBuilder::with_hash_index(options.restart_interval)
133        } else {
134            BlockBuilder::new(options.restart_interval)
135        };
136        
137        Ok(Self {
138            options,
139            file: writer,
140            data_block,
141            index_entries: Vec::new(),
142            filter_builder: None,
143            offset: HEADER_SIZE as u64,
144            num_entries: 0,
145            smallest_key: None,
146            largest_key: None,
147            last_key: None,
148            pending_index_entry: false,
149            pending_largest_key: Vec::new(),
150            data_section_start: HEADER_SIZE as u64,
151            estimated_keys: 0,
152        })
153    }
154
155    /// Set estimated number of keys (for filter sizing)
156    pub fn set_estimated_keys(&mut self, count: usize) {
157        self.estimated_keys = count;
158        
159        // Initialize filter builder with proper size
160        if let Some(ref policy) = self.options.filter_policy {
161            self.filter_builder = Some(policy.create_builder(count));
162        }
163    }
164
165    /// Add a key-value pair
166    ///
167    /// Keys must be added in sorted order.
168    pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
169        // Verify sorted order
170        if let Some(ref last) = self.last_key {
171            debug_assert!(
172                key > last.as_slice(),
173                "Keys must be added in sorted order"
174            );
175        }
176
177        // Handle pending index entry from previous block
178        if self.pending_index_entry {
179            self.add_index_entry(&self.pending_largest_key.clone())?;
180            self.pending_index_entry = false;
181        }
182
183        // Add to filter if enabled
184        if let Some(ref mut builder) = self.filter_builder {
185            builder.add_key(key);
186        }
187
188        // Add to data block
189        self.data_block.add(key, value);
190
191        // Track keys
192        if self.smallest_key.is_none() {
193            self.smallest_key = Some(key.to_vec());
194        }
195        self.largest_key = Some(key.to_vec());
196        self.last_key = Some(key.to_vec());
197        self.num_entries += 1;
198
199        // Flush block if it's large enough
200        if self.data_block.estimated_size() >= self.options.block_size {
201            self.flush_data_block()?;
202        }
203
204        Ok(())
205    }
206
207    /// Flush current data block to file
208    fn flush_data_block(&mut self) -> std::io::Result<()> {
209        if self.data_block.is_empty() {
210            return Ok(());
211        }
212
213        // Get block contents
214        let block_data = self.data_block.finish();
215        let block_size = block_data.len();
216
217        // Compress if enabled
218        let (compressed_data, block_type) = self.maybe_compress(&block_data);
219
220        // Write block
221        let block_offset = self.offset;
222        self.file.write_all(&compressed_data)?;
223        
224        // Write block trailer (type + checksum)
225        self.file.write_all(&[block_type as u8])?;
226        let checksum = crc32fast::hash(&compressed_data);
227        self.file.write_all(&checksum.to_le_bytes())?;
228
229        // Update offset
230        let total_size = compressed_data.len() + 1 + 4; // data + type + checksum
231        self.offset += total_size as u64;
232
233        // Record pending index entry
234        if let Some(ref key) = self.largest_key {
235            self.pending_largest_key = key.clone();
236        }
237        self.pending_index_entry = true;
238
239        // Create index entry
240        let handle = BlockHandle::new(block_offset, block_size as u64);
241        self.index_entries.push(IndexEntry {
242            largest_key: self.pending_largest_key.clone(),
243            handle,
244        });
245
246        // Reset block builder
247        self.data_block.reset();
248
249        Ok(())
250    }
251
252    /// Maybe compress block data
253    fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
254        match self.options.compression {
255            BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
256            BlockType::Lz4 => {
257                // Use LZ4 compression
258                match lz4_flex::compress_prepend_size(data) {
259                    compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
260                    _ => (data.to_vec(), BlockType::Uncompressed),
261                }
262            }
263            BlockType::Zstd => {
264                // Use Zstd compression
265                match zstd::encode_all(data, 3) {
266                    Ok(compressed) if compressed.len() < data.len() => (compressed, BlockType::Zstd),
267                    _ => (data.to_vec(), BlockType::Uncompressed),
268                }
269            }
270            BlockType::Snappy => {
271                // Snappy not implemented - fall back to uncompressed
272                (data.to_vec(), BlockType::Uncompressed)
273            }
274        }
275    }
276
277    /// Add index entry
278    fn add_index_entry(&mut self, largest_key: &[u8]) -> std::io::Result<()> {
279        // Index entries are already added in flush_data_block
280        // This is for any additional processing
281        Ok(())
282    }
283
284    /// Finish building the SSTable
285    pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
286        // Flush any remaining data
287        self.flush_data_block()?;
288
289        let data_section_end = self.offset;
290        let data_section_size = data_section_end - self.data_section_start;
291        let data_checksum = 0u32; // Would compute from all blocks
292
293        let mut sections = vec![Section::new(
294            SectionType::DataBlocks,
295            self.data_section_start,
296            data_section_size,
297            data_checksum,
298        )];
299
300        // Write filter section
301        if let Some(mut builder) = self.filter_builder.take() {
302            let filter_data = builder.finish();
303            let filter_offset = self.offset;
304            let filter_size = filter_data.len() as u64;
305            let filter_checksum = crc32fast::hash(&filter_data);
306
307            self.file.write_all(&filter_data)?;
308            self.offset += filter_size;
309
310            sections.push(Section::new(
311                SectionType::Filter,
312                filter_offset,
313                filter_size,
314                filter_checksum,
315            ));
316        }
317
318        // Write index section
319        let index_offset = self.offset;
320        let index_data = self.build_index()?;
321        let index_size = index_data.len() as u64;
322        let index_checksum = crc32fast::hash(&index_data);
323
324        self.file.write_all(&index_data)?;
325        self.offset += index_size;
326
327        sections.push(Section::new(
328            SectionType::Index,
329            index_offset,
330            index_size,
331            index_checksum,
332        ));
333
334        // Write footer
335        let footer_offset = self.offset;
336        let footer = Footer::new(sections.clone());
337        let footer_data = footer.encode();
338        self.file.write_all(&footer_data)?;
339
340        // Write header at start
341        let header = Header::new(sections.len() as u32, footer_offset);
342        self.file.seek(SeekFrom::Start(0))?;
343        self.file.write_all(&header.encode())?;
344
345        // Flush and sync
346        self.file.flush()?;
347
348        Ok(SSTableBuilderResult {
349            file_size: footer_offset + footer_data.len() as u64,
350            num_entries: self.num_entries,
351            num_data_blocks: self.index_entries.len(),
352            smallest_key: self.smallest_key,
353            largest_key: self.largest_key,
354        })
355    }
356
357    /// Build index block
358    fn build_index(&self) -> std::io::Result<Vec<u8>> {
359        let mut builder = BlockBuilder::new(1); // No prefix compression for index
360
361        for entry in &self.index_entries {
362            let handle_encoded = entry.handle.encode();
363            builder.add(&entry.largest_key, &handle_encoded);
364        }
365
366        Ok(builder.finish())
367    }
368
369    /// Get number of entries added
370    pub fn num_entries(&self) -> u64 {
371        self.num_entries
372    }
373
374    /// Get current file size
375    pub fn file_size(&self) -> u64 {
376        self.offset
377    }
378}
379
380/// Result of building an SSTable
381#[derive(Debug)]
382pub struct SSTableBuilderResult {
383    /// Final file size in bytes
384    pub file_size: u64,
385    /// Number of entries
386    pub num_entries: u64,
387    /// Number of data blocks
388    pub num_data_blocks: usize,
389    /// Smallest key
390    pub smallest_key: Option<Vec<u8>>,
391    /// Largest key
392    pub largest_key: Option<Vec<u8>>,
393}
394
395// =============================================================================
396// Tests
397// =============================================================================
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use tempfile::tempdir;
403
404    #[test]
405    fn test_builder_basic() {
406        let dir = tempdir().unwrap();
407        let path = dir.path().join("test.sst");
408
409        let options = SSTableBuilderOptions {
410            block_size: 256, // Small blocks for testing
411            filter_policy: None,
412            ..Default::default()
413        };
414
415        let mut builder = SSTableBuilder::new(&path, options).unwrap();
416
417        for i in 0..100 {
418            let key = format!("key{:05}", i);
419            let value = format!("value{:05}", i);
420            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
421        }
422
423        let result = builder.finish().unwrap();
424
425        assert_eq!(result.num_entries, 100);
426        assert!(result.num_data_blocks > 0);
427        assert!(result.file_size > 0);
428    }
429
430    #[test]
431    fn test_builder_with_filter() {
432        let dir = tempdir().unwrap();
433        let path = dir.path().join("test_filter.sst");
434
435        let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
436        builder.set_estimated_keys(1000);
437
438        for i in 0..1000 {
439            let key = format!("key{:06}", i);
440            let value = format!("value{:06}", i);
441            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
442        }
443
444        let result = builder.finish().unwrap();
445
446        assert_eq!(result.num_entries, 1000);
447        assert!(result.file_size > 0);
448    }
449}