Skip to main content

sochdb_storage/sstable/
format.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Forward-Compatible SSTable Container Format
19//!
20//! This module defines the on-disk format for SSTables with support for
21//! future extensibility and safe mmap operations.
22//!
23//! ## Design Goals
24//!
25//! 1. **Forward compatibility**: New sections can be added without breaking readers
26//! 2. **Backward compatibility**: Old readers skip unknown sections
27//! 3. **Safe mmap**: All data validated before memory mapping
28//! 4. **Efficient access**: Direct offset-based section lookup
29//!
30//! ## File Format
31//!
32//! ```text
33//! ┌─────────────────────────────────────────────────────────────────────────┐
34//! │                         SSTable File Layout                              │
35//! ├─────────────────────────────────────────────────────────────────────────┤
36//! │ Header (32 bytes):                                                       │
37//! │   Magic (8 bytes): "TDBSSTab"                                           │
38//! │   Version (4 bytes): Format version                                      │
39//! │   Flags (4 bytes): Feature flags                                         │
40//! │   Num Sections (4 bytes): Number of sections                            │
41//! │   Footer Offset (8 bytes): Offset to footer                              │
42//! │   Header Checksum (4 bytes): CRC32 of header                            │
43//! ├─────────────────────────────────────────────────────────────────────────┤
44//! │ Section 0: Data Blocks                                                   │
45//! │   [Block 0][Block 1]...[Block N]                                        │
46//! ├─────────────────────────────────────────────────────────────────────────┤
47//! │ Section 1: Filter (optional)                                            │
48//! │   [Filter Data]                                                          │
49//! ├─────────────────────────────────────────────────────────────────────────┤
50//! │ Section 2: Index                                                         │
51//! │   [Index Block]                                                          │
52//! ├─────────────────────────────────────────────────────────────────────────┤
53//! │ Section 3: Metadata (optional)                                           │
54//! │   [Properties, Stats, etc.]                                              │
55//! ├─────────────────────────────────────────────────────────────────────────┤
56//! │ Footer (variable):                                                       │
57//! │   Section Directory: [Type, Offset, Size, Checksum] × N                 │
58//! │   Footer Checksum (4 bytes)                                              │
59//! │   Magic (8 bytes): "TDBSSTab"                                           │
60//! └─────────────────────────────────────────────────────────────────────────┘
61//! ```
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::io::{Cursor, Read, Seek, SeekFrom, Write};
65
66/// SSTable magic number: "TDBSSTab" in ASCII
67pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
68
69/// Current format version
70pub const FORMAT_VERSION: u32 = 1;
71
72/// Header size in bytes
73pub const HEADER_SIZE: usize = 32;
74
75/// Section entry size in footer (type + offset + size + checksum)
76pub const SECTION_ENTRY_SIZE: usize = 24;
77
78/// Table magic newtype for type safety
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct TableMagic([u8; 8]);
81
82impl TableMagic {
83    pub fn new() -> Self {
84        Self(TABLE_MAGIC)
85    }
86
87    pub fn as_bytes(&self) -> &[u8; 8] {
88        &self.0
89    }
90
91    pub fn is_valid(&self) -> bool {
92        self.0 == TABLE_MAGIC
93    }
94}
95
96impl Default for TableMagic {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102/// Section types in an SSTable
103#[repr(u32)]
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
105pub enum SectionType {
106    /// Data blocks containing key-value pairs
107    DataBlocks = 0,
108    /// Bloom/Ribbon/Xor filter for the table
109    Filter = 1,
110    /// Index block for data block lookup
111    Index = 2,
112    /// Metadata (properties, stats, etc.)
113    Metadata = 3,
114    /// Range tombstones
115    RangeTombstones = 4,
116    /// Compression dictionary
117    CompressionDict = 5,
118    /// Reserved for future use
119    Reserved = 0xFFFFFFFF,
120}
121
122impl TryFrom<u32> for SectionType {
123    type Error = ();
124
125    fn try_from(value: u32) -> Result<Self, Self::Error> {
126        match value {
127            0 => Ok(SectionType::DataBlocks),
128            1 => Ok(SectionType::Filter),
129            2 => Ok(SectionType::Index),
130            3 => Ok(SectionType::Metadata),
131            4 => Ok(SectionType::RangeTombstones),
132            5 => Ok(SectionType::CompressionDict),
133            _ => Err(()),
134        }
135    }
136}
137
138/// A section in the SSTable
139#[derive(Debug, Clone)]
140pub struct Section {
141    /// Section type
142    pub section_type: SectionType,
143    /// Offset in file
144    pub offset: u64,
145    /// Size in bytes
146    pub size: u64,
147    /// CRC32 checksum of section data
148    pub checksum: u32,
149}
150
151impl Section {
152    pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
153        Self {
154            section_type,
155            offset,
156            size,
157            checksum,
158        }
159    }
160
161    /// Encode to bytes
162    pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
163        writer.write_u32::<LittleEndian>(self.section_type as u32)?;
164        writer.write_u64::<LittleEndian>(self.offset)?;
165        writer.write_u64::<LittleEndian>(self.size)?;
166        writer.write_u32::<LittleEndian>(self.checksum)?;
167        Ok(())
168    }
169
170    /// Decode from bytes
171    pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
172        let type_val = reader.read_u32::<LittleEndian>()?;
173        let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
174        let offset = reader.read_u64::<LittleEndian>()?;
175        let size = reader.read_u64::<LittleEndian>()?;
176        let checksum = reader.read_u32::<LittleEndian>()?;
177
178        Ok(Self {
179            section_type,
180            offset,
181            size,
182            checksum,
183        })
184    }
185}
186
187/// SSTable file header
188#[derive(Debug, Clone)]
189pub struct Header {
190    /// Magic number
191    pub magic: TableMagic,
192    /// Format version
193    pub version: u32,
194    /// Feature flags
195    pub flags: u32,
196    /// Number of sections
197    pub num_sections: u32,
198    /// Offset to footer
199    pub footer_offset: u64,
200    /// Header checksum
201    pub checksum: u32,
202}
203
204impl Header {
205    pub fn new(num_sections: u32, footer_offset: u64) -> Self {
206        let mut header = Self {
207            magic: TableMagic::new(),
208            version: FORMAT_VERSION,
209            flags: 0,
210            num_sections,
211            footer_offset,
212            checksum: 0,
213        };
214        header.checksum = header.compute_checksum();
215        header
216    }
217
218    /// Encode header to bytes
219    pub fn encode(&self) -> [u8; HEADER_SIZE] {
220        let mut buf = [0u8; HEADER_SIZE];
221        let mut cursor = Cursor::new(&mut buf[..]);
222
223        cursor.write_all(self.magic.as_bytes()).unwrap();
224        cursor.write_u32::<LittleEndian>(self.version).unwrap();
225        cursor.write_u32::<LittleEndian>(self.flags).unwrap();
226        cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
227        cursor
228            .write_u64::<LittleEndian>(self.footer_offset)
229            .unwrap();
230        cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
231
232        buf
233    }
234
235    /// Decode header from bytes
236    pub fn decode(data: &[u8]) -> Option<Self> {
237        if data.len() < HEADER_SIZE {
238            return None;
239        }
240
241        let mut cursor = Cursor::new(data);
242
243        let mut magic_bytes = [0u8; 8];
244        cursor.read_exact(&mut magic_bytes).ok()?;
245        let magic = TableMagic(magic_bytes);
246
247        let version = cursor.read_u32::<LittleEndian>().ok()?;
248        let flags = cursor.read_u32::<LittleEndian>().ok()?;
249        let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
250        let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
251        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
252
253        let header = Self {
254            magic,
255            version,
256            flags,
257            num_sections,
258            footer_offset,
259            checksum,
260        };
261
262        // Verify checksum
263        if header.compute_checksum() != checksum {
264            return None;
265        }
266
267        Some(header)
268    }
269
270    /// Compute checksum of header (excluding checksum field)
271    fn compute_checksum(&self) -> u32 {
272        let mut hasher = crc32fast::Hasher::new();
273        hasher.update(self.magic.as_bytes());
274        hasher.update(&self.version.to_le_bytes());
275        hasher.update(&self.flags.to_le_bytes());
276        hasher.update(&self.num_sections.to_le_bytes());
277        hasher.update(&self.footer_offset.to_le_bytes());
278        hasher.finalize()
279    }
280
281    /// Validate header
282    pub fn is_valid(&self) -> bool {
283        self.magic.is_valid()
284            && self.version <= FORMAT_VERSION
285            && self.compute_checksum() == self.checksum
286    }
287}
288
289/// SSTable footer
290#[derive(Debug, Clone)]
291pub struct Footer {
292    /// Section directory
293    pub sections: Vec<Section>,
294    /// Footer checksum
295    pub checksum: u32,
296    /// Magic number (repeated for validation)
297    pub magic: TableMagic,
298}
299
300impl Footer {
301    pub fn new(sections: Vec<Section>) -> Self {
302        let mut footer = Self {
303            sections,
304            checksum: 0,
305            magic: TableMagic::new(),
306        };
307        footer.checksum = footer.compute_checksum();
308        footer
309    }
310
311    /// Encode footer to bytes
312    pub fn encode(&self) -> Vec<u8> {
313        let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
314        let mut buf = Vec::with_capacity(size);
315
316        for section in &self.sections {
317            section.encode(&mut buf).unwrap();
318        }
319
320        buf.write_u32::<LittleEndian>(self.checksum).unwrap();
321        buf.extend_from_slice(self.magic.as_bytes());
322
323        buf
324    }
325
326    /// Decode footer from bytes
327    pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
328        let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
329        if data.len() < expected_size {
330            return None;
331        }
332
333        let mut cursor = Cursor::new(data);
334
335        let mut sections = Vec::with_capacity(num_sections as usize);
336        for _ in 0..num_sections {
337            sections.push(Section::decode(&mut cursor).ok()?);
338        }
339
340        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
341
342        let mut magic_bytes = [0u8; 8];
343        cursor.read_exact(&mut magic_bytes).ok()?;
344        let magic = TableMagic(magic_bytes);
345
346        let footer = Self {
347            sections,
348            checksum,
349            magic,
350        };
351
352        // Verify checksum
353        if footer.compute_checksum() != checksum {
354            return None;
355        }
356
357        Some(footer)
358    }
359
360    /// Compute checksum of footer (excluding checksum and magic)
361    fn compute_checksum(&self) -> u32 {
362        let mut hasher = crc32fast::Hasher::new();
363        for section in &self.sections {
364            hasher.update(&(section.section_type as u32).to_le_bytes());
365            hasher.update(&section.offset.to_le_bytes());
366            hasher.update(&section.size.to_le_bytes());
367            hasher.update(&section.checksum.to_le_bytes());
368        }
369        hasher.finalize()
370    }
371
372    /// Get section by type
373    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
374        self.sections
375            .iter()
376            .find(|s| s.section_type == section_type)
377    }
378
379    /// Check if section exists
380    pub fn has_section(&self, section_type: SectionType) -> bool {
381        self.get_section(section_type).is_some()
382    }
383}
384
385/// SSTable format reader/writer
386pub struct SSTableFormat {
387    pub header: Header,
388    pub footer: Footer,
389}
390
391impl SSTableFormat {
392    /// Create a new format with given sections
393    pub fn new(sections: Vec<Section>) -> Self {
394        let footer_offset = sections
395            .iter()
396            .map(|s| s.offset + s.size)
397            .max()
398            .unwrap_or(HEADER_SIZE as u64);
399
400        Self {
401            header: Header::new(sections.len() as u32, footer_offset),
402            footer: Footer::new(sections),
403        }
404    }
405
406    /// Read format from file
407    pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
408        // Read header
409        let mut header_buf = [0u8; HEADER_SIZE];
410        reader.read_exact(&mut header_buf)?;
411
412        let header = Header::decode(&header_buf).ok_or_else(|| {
413            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header")
414        })?;
415
416        if !header.is_valid() {
417            return Err(std::io::Error::new(
418                std::io::ErrorKind::InvalidData,
419                "Invalid header",
420            ));
421        }
422
423        // Seek to footer
424        reader.seek(SeekFrom::Start(header.footer_offset))?;
425
426        // Read footer
427        let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
428        let mut footer_buf = vec![0u8; footer_size];
429        reader.read_exact(&mut footer_buf)?;
430
431        let footer = Footer::decode(&footer_buf, header.num_sections).ok_or_else(|| {
432            std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer")
433        })?;
434
435        Ok(Self { header, footer })
436    }
437
438    /// Write format to file (header and footer only)
439    pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
440        // Write header at start
441        writer.seek(SeekFrom::Start(0))?;
442        writer.write_all(&self.header.encode())?;
443
444        // Write footer at footer_offset
445        writer.seek(SeekFrom::Start(self.header.footer_offset))?;
446        writer.write_all(&self.footer.encode())?;
447
448        Ok(())
449    }
450
451    /// Get section by type
452    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
453        self.footer.get_section(section_type)
454    }
455
456    /// Validate section data against checksum
457    pub fn validate_section<R: Read + Seek>(
458        &self,
459        reader: &mut R,
460        section: &Section,
461    ) -> std::io::Result<bool> {
462        reader.seek(SeekFrom::Start(section.offset))?;
463
464        let mut data = vec![0u8; section.size as usize];
465        reader.read_exact(&mut data)?;
466
467        let computed_checksum = crc32fast::hash(&data);
468        Ok(computed_checksum == section.checksum)
469    }
470
471    /// Pre-validate all sections before mmap
472    ///
473    /// This establishes the safety invariant that all mapped pages are valid.
474    pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
475        for section in &self.footer.sections {
476            if !self.validate_section(reader, section)? {
477                return Ok(false);
478            }
479        }
480        Ok(true)
481    }
482}
483
484// =============================================================================
485// Tests
486// =============================================================================
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use std::io::Cursor;
492
493    #[test]
494    fn test_table_magic() {
495        let magic = TableMagic::new();
496        assert!(magic.is_valid());
497        assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
498    }
499
500    #[test]
501    fn test_header_roundtrip() {
502        let header = Header::new(3, 1024);
503        let encoded = header.encode();
504
505        let decoded = Header::decode(&encoded).unwrap();
506        assert_eq!(decoded.version, FORMAT_VERSION);
507        assert_eq!(decoded.num_sections, 3);
508        assert_eq!(decoded.footer_offset, 1024);
509        assert!(decoded.is_valid());
510    }
511
512    #[test]
513    fn test_section_roundtrip() {
514        let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
515
516        let mut buf = Vec::new();
517        section.encode(&mut buf).unwrap();
518
519        let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
520        assert_eq!(decoded.section_type, SectionType::DataBlocks);
521        assert_eq!(decoded.offset, 100);
522        assert_eq!(decoded.size, 500);
523        assert_eq!(decoded.checksum, 12345);
524    }
525
526    #[test]
527    fn test_footer_roundtrip() {
528        let sections = vec![
529            Section::new(SectionType::DataBlocks, 32, 1000, 111),
530            Section::new(SectionType::Filter, 1032, 200, 222),
531            Section::new(SectionType::Index, 1232, 100, 333),
532        ];
533
534        let footer = Footer::new(sections);
535        let encoded = footer.encode();
536
537        let decoded = Footer::decode(&encoded, 3).unwrap();
538        assert_eq!(decoded.sections.len(), 3);
539        assert!(decoded.magic.is_valid());
540    }
541
542    #[test]
543    fn test_format_roundtrip() {
544        let sections = vec![
545            Section::new(SectionType::DataBlocks, 32, 1000, 111),
546            Section::new(SectionType::Index, 1032, 100, 222),
547        ];
548
549        let format = SSTableFormat::new(sections);
550
551        let mut buf = vec![0u8; 2048];
552        let mut cursor = Cursor::new(&mut buf[..]);
553        format.write(&mut cursor).unwrap();
554
555        let mut cursor = Cursor::new(&buf[..]);
556        let read_format = SSTableFormat::read(&mut cursor).unwrap();
557
558        assert_eq!(read_format.header.num_sections, 2);
559        assert!(read_format.get_section(SectionType::DataBlocks).is_some());
560        assert!(read_format.get_section(SectionType::Index).is_some());
561        assert!(read_format.get_section(SectionType::Filter).is_none());
562    }
563}