Skip to main content

sochdb_storage/sstable/
format.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Forward-Compatible SSTable Container Format
19//!
20//! This module defines the on-disk format for SSTables with support for
21//! future extensibility and safe mmap operations.
22//!
23//! ## Design Goals
24//!
25//! 1. **Forward compatibility**: New sections can be added without breaking readers
26//! 2. **Backward compatibility**: Old readers skip unknown sections
27//! 3. **Safe mmap**: All data validated before memory mapping
28//! 4. **Efficient access**: Direct offset-based section lookup
29//!
30//! ## File Format
31//!
32//! ```text
33//! ┌─────────────────────────────────────────────────────────────────────────┐
34//! │                         SSTable File Layout                              │
35//! ├─────────────────────────────────────────────────────────────────────────┤
36//! │ Header (32 bytes):                                                       │
37//! │   Magic (8 bytes): "TDBSSTab"                                           │
38//! │   Version (4 bytes): Format version                                      │
39//! │   Flags (4 bytes): Feature flags                                         │
40//! │   Num Sections (4 bytes): Number of sections                            │
41//! │   Footer Offset (8 bytes): Offset to footer                              │
42//! │   Header Checksum (4 bytes): CRC32 of header                            │
43//! ├─────────────────────────────────────────────────────────────────────────┤
44//! │ Section 0: Data Blocks                                                   │
45//! │   [Block 0][Block 1]...[Block N]                                        │
46//! ├─────────────────────────────────────────────────────────────────────────┤
47//! │ Section 1: Filter (optional)                                            │
48//! │   [Filter Data]                                                          │
49//! ├─────────────────────────────────────────────────────────────────────────┤
50//! │ Section 2: Index                                                         │
51//! │   [Index Block]                                                          │
52//! ├─────────────────────────────────────────────────────────────────────────┤
53//! │ Section 3: Metadata (optional)                                           │
54//! │   [Properties, Stats, etc.]                                              │
55//! ├─────────────────────────────────────────────────────────────────────────┤
56//! │ Footer (variable):                                                       │
57//! │   Section Directory: [Type, Offset, Size, Checksum] × N                 │
58//! │   Footer Checksum (4 bytes)                                              │
59//! │   Magic (8 bytes): "TDBSSTab"                                           │
60//! └─────────────────────────────────────────────────────────────────────────┘
61//! ```
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::collections::HashMap;
65use std::io::{Cursor, Read, Seek, SeekFrom, Write};
66
67/// SSTable magic number: "TDBSSTab" in ASCII
68pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
69
70/// Current format version
71pub const FORMAT_VERSION: u32 = 1;
72
73/// Header size in bytes
74pub const HEADER_SIZE: usize = 32;
75
76/// Section entry size in footer (type + offset + size + checksum)
77pub const SECTION_ENTRY_SIZE: usize = 24;
78
79/// Table magic newtype for type safety
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct TableMagic([u8; 8]);
82
83impl TableMagic {
84    pub fn new() -> Self {
85        Self(TABLE_MAGIC)
86    }
87
88    pub fn as_bytes(&self) -> &[u8; 8] {
89        &self.0
90    }
91
92    pub fn is_valid(&self) -> bool {
93        self.0 == TABLE_MAGIC
94    }
95}
96
97impl Default for TableMagic {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103/// Section types in an SSTable
104#[repr(u32)]
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
106pub enum SectionType {
107    /// Data blocks containing key-value pairs
108    DataBlocks = 0,
109    /// Bloom/Ribbon/Xor filter for the table
110    Filter = 1,
111    /// Index block for data block lookup
112    Index = 2,
113    /// Metadata (properties, stats, etc.)
114    Metadata = 3,
115    /// Range tombstones
116    RangeTombstones = 4,
117    /// Compression dictionary
118    CompressionDict = 5,
119    /// Reserved for future use
120    Reserved = 0xFFFFFFFF,
121}
122
123impl TryFrom<u32> for SectionType {
124    type Error = ();
125
126    fn try_from(value: u32) -> Result<Self, Self::Error> {
127        match value {
128            0 => Ok(SectionType::DataBlocks),
129            1 => Ok(SectionType::Filter),
130            2 => Ok(SectionType::Index),
131            3 => Ok(SectionType::Metadata),
132            4 => Ok(SectionType::RangeTombstones),
133            5 => Ok(SectionType::CompressionDict),
134            _ => Err(()),
135        }
136    }
137}
138
139/// A section in the SSTable
140#[derive(Debug, Clone)]
141pub struct Section {
142    /// Section type
143    pub section_type: SectionType,
144    /// Offset in file
145    pub offset: u64,
146    /// Size in bytes
147    pub size: u64,
148    /// CRC32 checksum of section data
149    pub checksum: u32,
150}
151
152impl Section {
153    pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
154        Self {
155            section_type,
156            offset,
157            size,
158            checksum,
159        }
160    }
161
162    /// Encode to bytes
163    pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
164        writer.write_u32::<LittleEndian>(self.section_type as u32)?;
165        writer.write_u64::<LittleEndian>(self.offset)?;
166        writer.write_u64::<LittleEndian>(self.size)?;
167        writer.write_u32::<LittleEndian>(self.checksum)?;
168        Ok(())
169    }
170
171    /// Decode from bytes
172    pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
173        let type_val = reader.read_u32::<LittleEndian>()?;
174        let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
175        let offset = reader.read_u64::<LittleEndian>()?;
176        let size = reader.read_u64::<LittleEndian>()?;
177        let checksum = reader.read_u32::<LittleEndian>()?;
178        
179        Ok(Self {
180            section_type,
181            offset,
182            size,
183            checksum,
184        })
185    }
186}
187
188/// SSTable file header
189#[derive(Debug, Clone)]
190pub struct Header {
191    /// Magic number
192    pub magic: TableMagic,
193    /// Format version
194    pub version: u32,
195    /// Feature flags
196    pub flags: u32,
197    /// Number of sections
198    pub num_sections: u32,
199    /// Offset to footer
200    pub footer_offset: u64,
201    /// Header checksum
202    pub checksum: u32,
203}
204
205impl Header {
206    pub fn new(num_sections: u32, footer_offset: u64) -> Self {
207        let mut header = Self {
208            magic: TableMagic::new(),
209            version: FORMAT_VERSION,
210            flags: 0,
211            num_sections,
212            footer_offset,
213            checksum: 0,
214        };
215        header.checksum = header.compute_checksum();
216        header
217    }
218
219    /// Encode header to bytes
220    pub fn encode(&self) -> [u8; HEADER_SIZE] {
221        let mut buf = [0u8; HEADER_SIZE];
222        let mut cursor = Cursor::new(&mut buf[..]);
223        
224        cursor.write_all(self.magic.as_bytes()).unwrap();
225        cursor.write_u32::<LittleEndian>(self.version).unwrap();
226        cursor.write_u32::<LittleEndian>(self.flags).unwrap();
227        cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
228        cursor.write_u64::<LittleEndian>(self.footer_offset).unwrap();
229        cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
230        
231        buf
232    }
233
234    /// Decode header from bytes
235    pub fn decode(data: &[u8]) -> Option<Self> {
236        if data.len() < HEADER_SIZE {
237            return None;
238        }
239        
240        let mut cursor = Cursor::new(data);
241        
242        let mut magic_bytes = [0u8; 8];
243        cursor.read_exact(&mut magic_bytes).ok()?;
244        let magic = TableMagic(magic_bytes);
245        
246        let version = cursor.read_u32::<LittleEndian>().ok()?;
247        let flags = cursor.read_u32::<LittleEndian>().ok()?;
248        let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
249        let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
250        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
251        
252        let header = Self {
253            magic,
254            version,
255            flags,
256            num_sections,
257            footer_offset,
258            checksum,
259        };
260        
261        // Verify checksum
262        if header.compute_checksum() != checksum {
263            return None;
264        }
265        
266        Some(header)
267    }
268
269    /// Compute checksum of header (excluding checksum field)
270    fn compute_checksum(&self) -> u32 {
271        let mut hasher = crc32fast::Hasher::new();
272        hasher.update(self.magic.as_bytes());
273        hasher.update(&self.version.to_le_bytes());
274        hasher.update(&self.flags.to_le_bytes());
275        hasher.update(&self.num_sections.to_le_bytes());
276        hasher.update(&self.footer_offset.to_le_bytes());
277        hasher.finalize()
278    }
279
280    /// Validate header
281    pub fn is_valid(&self) -> bool {
282        self.magic.is_valid() && 
283        self.version <= FORMAT_VERSION &&
284        self.compute_checksum() == self.checksum
285    }
286}
287
288/// SSTable footer
289#[derive(Debug, Clone)]
290pub struct Footer {
291    /// Section directory
292    pub sections: Vec<Section>,
293    /// Footer checksum
294    pub checksum: u32,
295    /// Magic number (repeated for validation)
296    pub magic: TableMagic,
297}
298
299impl Footer {
300    pub fn new(sections: Vec<Section>) -> Self {
301        let mut footer = Self {
302            sections,
303            checksum: 0,
304            magic: TableMagic::new(),
305        };
306        footer.checksum = footer.compute_checksum();
307        footer
308    }
309
310    /// Encode footer to bytes
311    pub fn encode(&self) -> Vec<u8> {
312        let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
313        let mut buf = Vec::with_capacity(size);
314        
315        for section in &self.sections {
316            section.encode(&mut buf).unwrap();
317        }
318        
319        buf.write_u32::<LittleEndian>(self.checksum).unwrap();
320        buf.extend_from_slice(self.magic.as_bytes());
321        
322        buf
323    }
324
325    /// Decode footer from bytes
326    pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
327        let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
328        if data.len() < expected_size {
329            return None;
330        }
331        
332        let mut cursor = Cursor::new(data);
333        
334        let mut sections = Vec::with_capacity(num_sections as usize);
335        for _ in 0..num_sections {
336            sections.push(Section::decode(&mut cursor).ok()?);
337        }
338        
339        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
340        
341        let mut magic_bytes = [0u8; 8];
342        cursor.read_exact(&mut magic_bytes).ok()?;
343        let magic = TableMagic(magic_bytes);
344        
345        let footer = Self {
346            sections,
347            checksum,
348            magic,
349        };
350        
351        // Verify checksum
352        if footer.compute_checksum() != checksum {
353            return None;
354        }
355        
356        Some(footer)
357    }
358
359    /// Compute checksum of footer (excluding checksum and magic)
360    fn compute_checksum(&self) -> u32 {
361        let mut hasher = crc32fast::Hasher::new();
362        for section in &self.sections {
363            hasher.update(&(section.section_type as u32).to_le_bytes());
364            hasher.update(&section.offset.to_le_bytes());
365            hasher.update(&section.size.to_le_bytes());
366            hasher.update(&section.checksum.to_le_bytes());
367        }
368        hasher.finalize()
369    }
370
371    /// Get section by type
372    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
373        self.sections.iter().find(|s| s.section_type == section_type)
374    }
375
376    /// Check if section exists
377    pub fn has_section(&self, section_type: SectionType) -> bool {
378        self.get_section(section_type).is_some()
379    }
380}
381
382/// SSTable format reader/writer
383pub struct SSTableFormat {
384    pub header: Header,
385    pub footer: Footer,
386}
387
388impl SSTableFormat {
389    /// Create a new format with given sections
390    pub fn new(sections: Vec<Section>) -> Self {
391        let footer_offset = sections.iter().map(|s| s.offset + s.size).max().unwrap_or(HEADER_SIZE as u64);
392        
393        Self {
394            header: Header::new(sections.len() as u32, footer_offset),
395            footer: Footer::new(sections),
396        }
397    }
398
399    /// Read format from file
400    pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
401        // Read header
402        let mut header_buf = [0u8; HEADER_SIZE];
403        reader.read_exact(&mut header_buf)?;
404        
405        let header = Header::decode(&header_buf)
406            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"))?;
407        
408        if !header.is_valid() {
409            return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"));
410        }
411        
412        // Seek to footer
413        reader.seek(SeekFrom::Start(header.footer_offset))?;
414        
415        // Read footer
416        let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
417        let mut footer_buf = vec![0u8; footer_size];
418        reader.read_exact(&mut footer_buf)?;
419        
420        let footer = Footer::decode(&footer_buf, header.num_sections)
421            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer"))?;
422        
423        Ok(Self { header, footer })
424    }
425
426    /// Write format to file (header and footer only)
427    pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
428        // Write header at start
429        writer.seek(SeekFrom::Start(0))?;
430        writer.write_all(&self.header.encode())?;
431        
432        // Write footer at footer_offset
433        writer.seek(SeekFrom::Start(self.header.footer_offset))?;
434        writer.write_all(&self.footer.encode())?;
435        
436        Ok(())
437    }
438
439    /// Get section by type
440    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
441        self.footer.get_section(section_type)
442    }
443
444    /// Validate section data against checksum
445    pub fn validate_section<R: Read + Seek>(
446        &self,
447        reader: &mut R,
448        section: &Section,
449    ) -> std::io::Result<bool> {
450        reader.seek(SeekFrom::Start(section.offset))?;
451        
452        let mut data = vec![0u8; section.size as usize];
453        reader.read_exact(&mut data)?;
454        
455        let computed_checksum = crc32fast::hash(&data);
456        Ok(computed_checksum == section.checksum)
457    }
458
459    /// Pre-validate all sections before mmap
460    ///
461    /// This establishes the safety invariant that all mapped pages are valid.
462    pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
463        for section in &self.footer.sections {
464            if !self.validate_section(reader, section)? {
465                return Ok(false);
466            }
467        }
468        Ok(true)
469    }
470}
471
472// =============================================================================
473// Tests
474// =============================================================================
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use std::io::Cursor;
480
481    #[test]
482    fn test_table_magic() {
483        let magic = TableMagic::new();
484        assert!(magic.is_valid());
485        assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
486    }
487
488    #[test]
489    fn test_header_roundtrip() {
490        let header = Header::new(3, 1024);
491        let encoded = header.encode();
492        
493        let decoded = Header::decode(&encoded).unwrap();
494        assert_eq!(decoded.version, FORMAT_VERSION);
495        assert_eq!(decoded.num_sections, 3);
496        assert_eq!(decoded.footer_offset, 1024);
497        assert!(decoded.is_valid());
498    }
499
500    #[test]
501    fn test_section_roundtrip() {
502        let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
503        
504        let mut buf = Vec::new();
505        section.encode(&mut buf).unwrap();
506        
507        let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
508        assert_eq!(decoded.section_type, SectionType::DataBlocks);
509        assert_eq!(decoded.offset, 100);
510        assert_eq!(decoded.size, 500);
511        assert_eq!(decoded.checksum, 12345);
512    }
513
514    #[test]
515    fn test_footer_roundtrip() {
516        let sections = vec![
517            Section::new(SectionType::DataBlocks, 32, 1000, 111),
518            Section::new(SectionType::Filter, 1032, 200, 222),
519            Section::new(SectionType::Index, 1232, 100, 333),
520        ];
521        
522        let footer = Footer::new(sections);
523        let encoded = footer.encode();
524        
525        let decoded = Footer::decode(&encoded, 3).unwrap();
526        assert_eq!(decoded.sections.len(), 3);
527        assert!(decoded.magic.is_valid());
528    }
529
530    #[test]
531    fn test_format_roundtrip() {
532        let sections = vec![
533            Section::new(SectionType::DataBlocks, 32, 1000, 111),
534            Section::new(SectionType::Index, 1032, 100, 222),
535        ];
536        
537        let format = SSTableFormat::new(sections);
538        
539        let mut buf = vec![0u8; 2048];
540        let mut cursor = Cursor::new(&mut buf[..]);
541        format.write(&mut cursor).unwrap();
542        
543        let mut cursor = Cursor::new(&buf[..]);
544        let read_format = SSTableFormat::read(&mut cursor).unwrap();
545        
546        assert_eq!(read_format.header.num_sections, 2);
547        assert!(read_format.get_section(SectionType::DataBlocks).is_some());
548        assert!(read_format.get_section(SectionType::Index).is_some());
549        assert!(read_format.get_section(SectionType::Filter).is_none());
550    }
551}