Skip to main content

sochdb_storage/sstable/
builder.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SSTable Builder
19//!
20//! This module provides a builder for creating SSTable files with:
21//! - Configurable block size
22//! - Optional bloom/ribbon/xor filters
23//! - Two-level index for large tables
24//! - Compression support
25
26use std::fs::File;
27use std::io::{BufWriter, Seek, SeekFrom, Write};
28use std::path::Path;
29
30use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
31use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
32use super::format::{Footer, HEADER_SIZE, Header, Section, SectionType};
33
34/// Default block size (4KB - matches typical filesystem block)
35pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
36
37/// Default filter bits per key
38pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
39
40/// Builder options
41#[derive(Debug, Clone)]
42pub struct SSTableBuilderOptions {
43    /// Target block size in bytes
44    pub block_size: usize,
45    /// Restart interval for prefix compression
46    pub restart_interval: usize,
47    /// Block compression type
48    pub compression: BlockType,
49    /// Filter policy (None = no filter)
50    pub filter_policy: Option<Box<dyn FilterPolicy>>,
51    /// Use hash index for blocks
52    pub use_block_hash_index: bool,
53    /// Enable two-level index for large tables
54    pub use_two_level_index: bool,
55}
56
57impl Default for SSTableBuilderOptions {
58    fn default() -> Self {
59        Self {
60            block_size: DEFAULT_BLOCK_SIZE,
61            restart_interval: DEFAULT_RESTART_INTERVAL,
62            compression: BlockType::Uncompressed,
63            filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
64                DEFAULT_FILTER_BITS_PER_KEY,
65            ))),
66            use_block_hash_index: true,
67            use_two_level_index: false,
68        }
69    }
70}
71
72// Implement Clone for FilterPolicy
73impl Clone for Box<dyn FilterPolicy> {
74    fn clone(&self) -> Self {
75        // Create a new policy with same configuration
76        // This is a simplified approach - could use a proper clone trait
77        Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
78    }
79}
80
81/// Index entry pointing to a data block
82#[derive(Debug, Clone)]
83struct IndexEntry {
84    /// Largest key in the block (separator key)
85    largest_key: Vec<u8>,
86    /// Block handle (offset, size)
87    handle: BlockHandle,
88}
89
90/// SSTable builder
91pub struct SSTableBuilder {
92    /// Options
93    options: SSTableBuilderOptions,
94    /// Output file
95    file: BufWriter<File>,
96    /// Current data block builder
97    data_block: BlockBuilder,
98    /// Index entries
99    index_entries: Vec<IndexEntry>,
100    /// Filter builder (if enabled)
101    filter_builder: Option<Box<dyn FilterBuilder>>,
102    /// Current file offset
103    offset: u64,
104    /// Number of entries
105    num_entries: u64,
106    /// Smallest key
107    smallest_key: Option<Vec<u8>>,
108    /// Largest key
109    largest_key: Option<Vec<u8>>,
110    /// Last key added (for sorting check)
111    last_key: Option<Vec<u8>>,
112    /// Pending index entry for current block
113    pending_index_entry: bool,
114    /// Last block's largest key
115    pending_largest_key: Vec<u8>,
116    /// Data blocks section start
117    data_section_start: u64,
118    /// Estimated keys for filter sizing
119    estimated_keys: usize,
120}
121
122impl SSTableBuilder {
123    /// Create a new SSTable builder
124    pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
125        let file = File::create(path)?;
126        let mut writer = BufWriter::new(file);
127
128        // Reserve space for header
129        writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
130
131        let data_block = if options.use_block_hash_index {
132            BlockBuilder::with_hash_index(options.restart_interval)
133        } else {
134            BlockBuilder::new(options.restart_interval)
135        };
136
137        Ok(Self {
138            options,
139            file: writer,
140            data_block,
141            index_entries: Vec::new(),
142            filter_builder: None,
143            offset: HEADER_SIZE as u64,
144            num_entries: 0,
145            smallest_key: None,
146            largest_key: None,
147            last_key: None,
148            pending_index_entry: false,
149            pending_largest_key: Vec::new(),
150            data_section_start: HEADER_SIZE as u64,
151            estimated_keys: 0,
152        })
153    }
154
155    /// Set estimated number of keys (for filter sizing)
156    pub fn set_estimated_keys(&mut self, count: usize) {
157        self.estimated_keys = count;
158
159        // Initialize filter builder with proper size
160        if let Some(ref policy) = self.options.filter_policy {
161            self.filter_builder = Some(policy.create_builder(count));
162        }
163    }
164
165    /// Add a key-value pair
166    ///
167    /// Keys must be added in sorted order.
168    pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
169        // Verify sorted order
170        if let Some(ref last) = self.last_key {
171            debug_assert!(key > last.as_slice(), "Keys must be added in sorted order");
172        }
173
174        // Handle pending index entry from previous block
175        if self.pending_index_entry {
176            self.add_index_entry(&self.pending_largest_key.clone())?;
177            self.pending_index_entry = false;
178        }
179
180        // Add to filter if enabled
181        if let Some(ref mut builder) = self.filter_builder {
182            builder.add_key(key);
183        }
184
185        // Add to data block
186        self.data_block.add(key, value);
187
188        // Track keys
189        if self.smallest_key.is_none() {
190            self.smallest_key = Some(key.to_vec());
191        }
192        self.largest_key = Some(key.to_vec());
193        self.last_key = Some(key.to_vec());
194        self.num_entries += 1;
195
196        // Flush block if it's large enough
197        if self.data_block.estimated_size() >= self.options.block_size {
198            self.flush_data_block()?;
199        }
200
201        Ok(())
202    }
203
204    /// Flush current data block to file
205    fn flush_data_block(&mut self) -> std::io::Result<()> {
206        if self.data_block.is_empty() {
207            return Ok(());
208        }
209
210        // Get block contents
211        let block_data = self.data_block.finish();
212        let block_size = block_data.len();
213
214        // Compress if enabled
215        let (compressed_data, block_type) = self.maybe_compress(&block_data);
216
217        // Write block
218        let block_offset = self.offset;
219        self.file.write_all(&compressed_data)?;
220
221        // Write block trailer (type + checksum)
222        self.file.write_all(&[block_type as u8])?;
223        let checksum = crc32fast::hash(&compressed_data);
224        self.file.write_all(&checksum.to_le_bytes())?;
225
226        // Update offset
227        let total_size = compressed_data.len() + 1 + 4; // data + type + checksum
228        self.offset += total_size as u64;
229
230        // Record pending index entry
231        if let Some(ref key) = self.largest_key {
232            self.pending_largest_key = key.clone();
233        }
234        self.pending_index_entry = true;
235
236        // Create index entry
237        let handle = BlockHandle::new(block_offset, block_size as u64);
238        self.index_entries.push(IndexEntry {
239            largest_key: self.pending_largest_key.clone(),
240            handle,
241        });
242
243        // Reset block builder
244        self.data_block.reset();
245
246        Ok(())
247    }
248
249    /// Maybe compress block data
250    fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
251        match self.options.compression {
252            BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
253            BlockType::Lz4 => {
254                // Use LZ4 compression
255                match lz4_flex::compress_prepend_size(data) {
256                    compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
257                    _ => (data.to_vec(), BlockType::Uncompressed),
258                }
259            }
260            BlockType::Zstd => {
261                // Use Zstd compression
262                match zstd::encode_all(data, 3) {
263                    Ok(compressed) if compressed.len() < data.len() => {
264                        (compressed, BlockType::Zstd)
265                    }
266                    _ => (data.to_vec(), BlockType::Uncompressed),
267                }
268            }
269            BlockType::Snappy => {
270                // Snappy not implemented - fall back to uncompressed
271                (data.to_vec(), BlockType::Uncompressed)
272            }
273        }
274    }
275
276    /// Add index entry
277    fn add_index_entry(&mut self, _largest_key: &[u8]) -> std::io::Result<()> {
278        // Index entries are already added in flush_data_block
279        // This is for any additional processing
280        Ok(())
281    }
282
283    /// Finish building the SSTable
284    pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
285        // Flush any remaining data
286        self.flush_data_block()?;
287
288        let data_section_end = self.offset;
289        let data_section_size = data_section_end - self.data_section_start;
290        let data_checksum = 0u32; // Would compute from all blocks
291
292        let mut sections = vec![Section::new(
293            SectionType::DataBlocks,
294            self.data_section_start,
295            data_section_size,
296            data_checksum,
297        )];
298
299        // Write filter section
300        if let Some(mut builder) = self.filter_builder.take() {
301            let filter_data = builder.finish();
302            let filter_offset = self.offset;
303            let filter_size = filter_data.len() as u64;
304            let filter_checksum = crc32fast::hash(&filter_data);
305
306            self.file.write_all(&filter_data)?;
307            self.offset += filter_size;
308
309            sections.push(Section::new(
310                SectionType::Filter,
311                filter_offset,
312                filter_size,
313                filter_checksum,
314            ));
315        }
316
317        // Write index section
318        let index_offset = self.offset;
319        let index_data = self.build_index()?;
320        let index_size = index_data.len() as u64;
321        let index_checksum = crc32fast::hash(&index_data);
322
323        self.file.write_all(&index_data)?;
324        self.offset += index_size;
325
326        sections.push(Section::new(
327            SectionType::Index,
328            index_offset,
329            index_size,
330            index_checksum,
331        ));
332
333        // Write footer
334        let footer_offset = self.offset;
335        let footer = Footer::new(sections.clone());
336        let footer_data = footer.encode();
337        self.file.write_all(&footer_data)?;
338
339        // Write header at start
340        let header = Header::new(sections.len() as u32, footer_offset);
341        self.file.seek(SeekFrom::Start(0))?;
342        self.file.write_all(&header.encode())?;
343
344        // Flush and sync
345        self.file.flush()?;
346
347        Ok(SSTableBuilderResult {
348            file_size: footer_offset + footer_data.len() as u64,
349            num_entries: self.num_entries,
350            num_data_blocks: self.index_entries.len(),
351            smallest_key: self.smallest_key,
352            largest_key: self.largest_key,
353        })
354    }
355
356    /// Build index block
357    fn build_index(&self) -> std::io::Result<Vec<u8>> {
358        let mut builder = BlockBuilder::new(1); // No prefix compression for index
359
360        for entry in &self.index_entries {
361            let handle_encoded = entry.handle.encode();
362            builder.add(&entry.largest_key, &handle_encoded);
363        }
364
365        Ok(builder.finish())
366    }
367
368    /// Get number of entries added
369    pub fn num_entries(&self) -> u64 {
370        self.num_entries
371    }
372
373    /// Get current file size
374    pub fn file_size(&self) -> u64 {
375        self.offset
376    }
377}
378
379/// Result of building an SSTable
380#[derive(Debug)]
381pub struct SSTableBuilderResult {
382    /// Final file size in bytes
383    pub file_size: u64,
384    /// Number of entries
385    pub num_entries: u64,
386    /// Number of data blocks
387    pub num_data_blocks: usize,
388    /// Smallest key
389    pub smallest_key: Option<Vec<u8>>,
390    /// Largest key
391    pub largest_key: Option<Vec<u8>>,
392}
393
394// =============================================================================
395// Tests
396// =============================================================================
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use tempfile::tempdir;
402
403    #[test]
404    fn test_builder_basic() {
405        let dir = tempdir().unwrap();
406        let path = dir.path().join("test.sst");
407
408        let options = SSTableBuilderOptions {
409            block_size: 256, // Small blocks for testing
410            filter_policy: None,
411            ..Default::default()
412        };
413
414        let mut builder = SSTableBuilder::new(&path, options).unwrap();
415
416        for i in 0..100 {
417            let key = format!("key{:05}", i);
418            let value = format!("value{:05}", i);
419            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
420        }
421
422        let result = builder.finish().unwrap();
423
424        assert_eq!(result.num_entries, 100);
425        assert!(result.num_data_blocks > 0);
426        assert!(result.file_size > 0);
427    }
428
429    #[test]
430    fn test_builder_with_filter() {
431        let dir = tempdir().unwrap();
432        let path = dir.path().join("test_filter.sst");
433
434        let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
435        builder.set_estimated_keys(1000);
436
437        for i in 0..1000 {
438            let key = format!("key{:06}", i);
439            let value = format!("value{:06}", i);
440            builder.add(key.as_bytes(), value.as_bytes()).unwrap();
441        }
442
443        let result = builder.finish().unwrap();
444
445        assert_eq!(result.num_entries, 1000);
446        assert!(result.file_size > 0);
447    }
448}