sochdb_storage/sstable/
format.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Forward-Compatible SSTable Container Format
16//!
17//! This module defines the on-disk format for SSTables with support for
18//! future extensibility and safe mmap operations.
19//!
20//! ## Design Goals
21//!
22//! 1. **Forward compatibility**: New sections can be added without breaking readers
23//! 2. **Backward compatibility**: Old readers skip unknown sections
24//! 3. **Safe mmap**: All data validated before memory mapping
25//! 4. **Efficient access**: Direct offset-based section lookup
26//!
27//! ## File Format
28//!
29//! ```text
30//! ┌─────────────────────────────────────────────────────────────────────────┐
31//! │                         SSTable File Layout                              │
32//! ├─────────────────────────────────────────────────────────────────────────┤
33//! │ Header (32 bytes):                                                       │
34//! │   Magic (8 bytes): "TDBSSTab"                                           │
35//! │   Version (4 bytes): Format version                                      │
36//! │   Flags (4 bytes): Feature flags                                         │
37//! │   Num Sections (4 bytes): Number of sections                            │
38//! │   Footer Offset (8 bytes): Offset to footer                              │
39//! │   Header Checksum (4 bytes): CRC32 of header                            │
40//! ├─────────────────────────────────────────────────────────────────────────┤
41//! │ Section 0: Data Blocks                                                   │
42//! │   [Block 0][Block 1]...[Block N]                                        │
43//! ├─────────────────────────────────────────────────────────────────────────┤
44//! │ Section 1: Filter (optional)                                            │
45//! │   [Filter Data]                                                          │
46//! ├─────────────────────────────────────────────────────────────────────────┤
47//! │ Section 2: Index                                                         │
48//! │   [Index Block]                                                          │
49//! ├─────────────────────────────────────────────────────────────────────────┤
50//! │ Section 3: Metadata (optional)                                           │
51//! │   [Properties, Stats, etc.]                                              │
52//! ├─────────────────────────────────────────────────────────────────────────┤
53//! │ Footer (variable):                                                       │
54//! │   Section Directory: [Type, Offset, Size, Checksum] × N                 │
55//! │   Footer Checksum (4 bytes)                                              │
56//! │   Magic (8 bytes): "TDBSSTab"                                           │
57//! └─────────────────────────────────────────────────────────────────────────┘
58//! ```
59
60use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
61use std::collections::HashMap;
62use std::io::{Cursor, Read, Seek, SeekFrom, Write};
63
64/// SSTable magic number: "TDBSSTab" in ASCII
65pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
66
67/// Current format version
68pub const FORMAT_VERSION: u32 = 1;
69
70/// Header size in bytes
71pub const HEADER_SIZE: usize = 32;
72
73/// Section entry size in footer (type + offset + size + checksum)
74pub const SECTION_ENTRY_SIZE: usize = 24;
75
76/// Table magic newtype for type safety
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct TableMagic([u8; 8]);
79
80impl TableMagic {
81    pub fn new() -> Self {
82        Self(TABLE_MAGIC)
83    }
84
85    pub fn as_bytes(&self) -> &[u8; 8] {
86        &self.0
87    }
88
89    pub fn is_valid(&self) -> bool {
90        self.0 == TABLE_MAGIC
91    }
92}
93
94impl Default for TableMagic {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100/// Section types in an SSTable
101#[repr(u32)]
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
103pub enum SectionType {
104    /// Data blocks containing key-value pairs
105    DataBlocks = 0,
106    /// Bloom/Ribbon/Xor filter for the table
107    Filter = 1,
108    /// Index block for data block lookup
109    Index = 2,
110    /// Metadata (properties, stats, etc.)
111    Metadata = 3,
112    /// Range tombstones
113    RangeTombstones = 4,
114    /// Compression dictionary
115    CompressionDict = 5,
116    /// Reserved for future use
117    Reserved = 0xFFFFFFFF,
118}
119
120impl TryFrom<u32> for SectionType {
121    type Error = ();
122
123    fn try_from(value: u32) -> Result<Self, Self::Error> {
124        match value {
125            0 => Ok(SectionType::DataBlocks),
126            1 => Ok(SectionType::Filter),
127            2 => Ok(SectionType::Index),
128            3 => Ok(SectionType::Metadata),
129            4 => Ok(SectionType::RangeTombstones),
130            5 => Ok(SectionType::CompressionDict),
131            _ => Err(()),
132        }
133    }
134}
135
136/// A section in the SSTable
137#[derive(Debug, Clone)]
138pub struct Section {
139    /// Section type
140    pub section_type: SectionType,
141    /// Offset in file
142    pub offset: u64,
143    /// Size in bytes
144    pub size: u64,
145    /// CRC32 checksum of section data
146    pub checksum: u32,
147}
148
149impl Section {
150    pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
151        Self {
152            section_type,
153            offset,
154            size,
155            checksum,
156        }
157    }
158
159    /// Encode to bytes
160    pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
161        writer.write_u32::<LittleEndian>(self.section_type as u32)?;
162        writer.write_u64::<LittleEndian>(self.offset)?;
163        writer.write_u64::<LittleEndian>(self.size)?;
164        writer.write_u32::<LittleEndian>(self.checksum)?;
165        Ok(())
166    }
167
168    /// Decode from bytes
169    pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
170        let type_val = reader.read_u32::<LittleEndian>()?;
171        let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
172        let offset = reader.read_u64::<LittleEndian>()?;
173        let size = reader.read_u64::<LittleEndian>()?;
174        let checksum = reader.read_u32::<LittleEndian>()?;
175        
176        Ok(Self {
177            section_type,
178            offset,
179            size,
180            checksum,
181        })
182    }
183}
184
185/// SSTable file header
186#[derive(Debug, Clone)]
187pub struct Header {
188    /// Magic number
189    pub magic: TableMagic,
190    /// Format version
191    pub version: u32,
192    /// Feature flags
193    pub flags: u32,
194    /// Number of sections
195    pub num_sections: u32,
196    /// Offset to footer
197    pub footer_offset: u64,
198    /// Header checksum
199    pub checksum: u32,
200}
201
202impl Header {
203    pub fn new(num_sections: u32, footer_offset: u64) -> Self {
204        let mut header = Self {
205            magic: TableMagic::new(),
206            version: FORMAT_VERSION,
207            flags: 0,
208            num_sections,
209            footer_offset,
210            checksum: 0,
211        };
212        header.checksum = header.compute_checksum();
213        header
214    }
215
216    /// Encode header to bytes
217    pub fn encode(&self) -> [u8; HEADER_SIZE] {
218        let mut buf = [0u8; HEADER_SIZE];
219        let mut cursor = Cursor::new(&mut buf[..]);
220        
221        cursor.write_all(self.magic.as_bytes()).unwrap();
222        cursor.write_u32::<LittleEndian>(self.version).unwrap();
223        cursor.write_u32::<LittleEndian>(self.flags).unwrap();
224        cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
225        cursor.write_u64::<LittleEndian>(self.footer_offset).unwrap();
226        cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
227        
228        buf
229    }
230
231    /// Decode header from bytes
232    pub fn decode(data: &[u8]) -> Option<Self> {
233        if data.len() < HEADER_SIZE {
234            return None;
235        }
236        
237        let mut cursor = Cursor::new(data);
238        
239        let mut magic_bytes = [0u8; 8];
240        cursor.read_exact(&mut magic_bytes).ok()?;
241        let magic = TableMagic(magic_bytes);
242        
243        let version = cursor.read_u32::<LittleEndian>().ok()?;
244        let flags = cursor.read_u32::<LittleEndian>().ok()?;
245        let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
246        let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
247        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
248        
249        let header = Self {
250            magic,
251            version,
252            flags,
253            num_sections,
254            footer_offset,
255            checksum,
256        };
257        
258        // Verify checksum
259        if header.compute_checksum() != checksum {
260            return None;
261        }
262        
263        Some(header)
264    }
265
266    /// Compute checksum of header (excluding checksum field)
267    fn compute_checksum(&self) -> u32 {
268        let mut hasher = crc32fast::Hasher::new();
269        hasher.update(self.magic.as_bytes());
270        hasher.update(&self.version.to_le_bytes());
271        hasher.update(&self.flags.to_le_bytes());
272        hasher.update(&self.num_sections.to_le_bytes());
273        hasher.update(&self.footer_offset.to_le_bytes());
274        hasher.finalize()
275    }
276
277    /// Validate header
278    pub fn is_valid(&self) -> bool {
279        self.magic.is_valid() && 
280        self.version <= FORMAT_VERSION &&
281        self.compute_checksum() == self.checksum
282    }
283}
284
285/// SSTable footer
286#[derive(Debug, Clone)]
287pub struct Footer {
288    /// Section directory
289    pub sections: Vec<Section>,
290    /// Footer checksum
291    pub checksum: u32,
292    /// Magic number (repeated for validation)
293    pub magic: TableMagic,
294}
295
296impl Footer {
297    pub fn new(sections: Vec<Section>) -> Self {
298        let mut footer = Self {
299            sections,
300            checksum: 0,
301            magic: TableMagic::new(),
302        };
303        footer.checksum = footer.compute_checksum();
304        footer
305    }
306
307    /// Encode footer to bytes
308    pub fn encode(&self) -> Vec<u8> {
309        let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
310        let mut buf = Vec::with_capacity(size);
311        
312        for section in &self.sections {
313            section.encode(&mut buf).unwrap();
314        }
315        
316        buf.write_u32::<LittleEndian>(self.checksum).unwrap();
317        buf.extend_from_slice(self.magic.as_bytes());
318        
319        buf
320    }
321
322    /// Decode footer from bytes
323    pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
324        let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
325        if data.len() < expected_size {
326            return None;
327        }
328        
329        let mut cursor = Cursor::new(data);
330        
331        let mut sections = Vec::with_capacity(num_sections as usize);
332        for _ in 0..num_sections {
333            sections.push(Section::decode(&mut cursor).ok()?);
334        }
335        
336        let checksum = cursor.read_u32::<LittleEndian>().ok()?;
337        
338        let mut magic_bytes = [0u8; 8];
339        cursor.read_exact(&mut magic_bytes).ok()?;
340        let magic = TableMagic(magic_bytes);
341        
342        let footer = Self {
343            sections,
344            checksum,
345            magic,
346        };
347        
348        // Verify checksum
349        if footer.compute_checksum() != checksum {
350            return None;
351        }
352        
353        Some(footer)
354    }
355
356    /// Compute checksum of footer (excluding checksum and magic)
357    fn compute_checksum(&self) -> u32 {
358        let mut hasher = crc32fast::Hasher::new();
359        for section in &self.sections {
360            hasher.update(&(section.section_type as u32).to_le_bytes());
361            hasher.update(&section.offset.to_le_bytes());
362            hasher.update(&section.size.to_le_bytes());
363            hasher.update(&section.checksum.to_le_bytes());
364        }
365        hasher.finalize()
366    }
367
368    /// Get section by type
369    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
370        self.sections.iter().find(|s| s.section_type == section_type)
371    }
372
373    /// Check if section exists
374    pub fn has_section(&self, section_type: SectionType) -> bool {
375        self.get_section(section_type).is_some()
376    }
377}
378
379/// SSTable format reader/writer
380pub struct SSTableFormat {
381    pub header: Header,
382    pub footer: Footer,
383}
384
385impl SSTableFormat {
386    /// Create a new format with given sections
387    pub fn new(sections: Vec<Section>) -> Self {
388        let footer_offset = sections.iter().map(|s| s.offset + s.size).max().unwrap_or(HEADER_SIZE as u64);
389        
390        Self {
391            header: Header::new(sections.len() as u32, footer_offset),
392            footer: Footer::new(sections),
393        }
394    }
395
396    /// Read format from file
397    pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
398        // Read header
399        let mut header_buf = [0u8; HEADER_SIZE];
400        reader.read_exact(&mut header_buf)?;
401        
402        let header = Header::decode(&header_buf)
403            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"))?;
404        
405        if !header.is_valid() {
406            return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"));
407        }
408        
409        // Seek to footer
410        reader.seek(SeekFrom::Start(header.footer_offset))?;
411        
412        // Read footer
413        let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
414        let mut footer_buf = vec![0u8; footer_size];
415        reader.read_exact(&mut footer_buf)?;
416        
417        let footer = Footer::decode(&footer_buf, header.num_sections)
418            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer"))?;
419        
420        Ok(Self { header, footer })
421    }
422
423    /// Write format to file (header and footer only)
424    pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
425        // Write header at start
426        writer.seek(SeekFrom::Start(0))?;
427        writer.write_all(&self.header.encode())?;
428        
429        // Write footer at footer_offset
430        writer.seek(SeekFrom::Start(self.header.footer_offset))?;
431        writer.write_all(&self.footer.encode())?;
432        
433        Ok(())
434    }
435
436    /// Get section by type
437    pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
438        self.footer.get_section(section_type)
439    }
440
441    /// Validate section data against checksum
442    pub fn validate_section<R: Read + Seek>(
443        &self,
444        reader: &mut R,
445        section: &Section,
446    ) -> std::io::Result<bool> {
447        reader.seek(SeekFrom::Start(section.offset))?;
448        
449        let mut data = vec![0u8; section.size as usize];
450        reader.read_exact(&mut data)?;
451        
452        let computed_checksum = crc32fast::hash(&data);
453        Ok(computed_checksum == section.checksum)
454    }
455
456    /// Pre-validate all sections before mmap
457    ///
458    /// This establishes the safety invariant that all mapped pages are valid.
459    pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
460        for section in &self.footer.sections {
461            if !self.validate_section(reader, section)? {
462                return Ok(false);
463            }
464        }
465        Ok(true)
466    }
467}
468
469// =============================================================================
470// Tests
471// =============================================================================
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476    use std::io::Cursor;
477
478    #[test]
479    fn test_table_magic() {
480        let magic = TableMagic::new();
481        assert!(magic.is_valid());
482        assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
483    }
484
485    #[test]
486    fn test_header_roundtrip() {
487        let header = Header::new(3, 1024);
488        let encoded = header.encode();
489        
490        let decoded = Header::decode(&encoded).unwrap();
491        assert_eq!(decoded.version, FORMAT_VERSION);
492        assert_eq!(decoded.num_sections, 3);
493        assert_eq!(decoded.footer_offset, 1024);
494        assert!(decoded.is_valid());
495    }
496
497    #[test]
498    fn test_section_roundtrip() {
499        let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
500        
501        let mut buf = Vec::new();
502        section.encode(&mut buf).unwrap();
503        
504        let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
505        assert_eq!(decoded.section_type, SectionType::DataBlocks);
506        assert_eq!(decoded.offset, 100);
507        assert_eq!(decoded.size, 500);
508        assert_eq!(decoded.checksum, 12345);
509    }
510
511    #[test]
512    fn test_footer_roundtrip() {
513        let sections = vec![
514            Section::new(SectionType::DataBlocks, 32, 1000, 111),
515            Section::new(SectionType::Filter, 1032, 200, 222),
516            Section::new(SectionType::Index, 1232, 100, 333),
517        ];
518        
519        let footer = Footer::new(sections);
520        let encoded = footer.encode();
521        
522        let decoded = Footer::decode(&encoded, 3).unwrap();
523        assert_eq!(decoded.sections.len(), 3);
524        assert!(decoded.magic.is_valid());
525    }
526
527    #[test]
528    fn test_format_roundtrip() {
529        let sections = vec![
530            Section::new(SectionType::DataBlocks, 32, 1000, 111),
531            Section::new(SectionType::Index, 1032, 100, 222),
532        ];
533        
534        let format = SSTableFormat::new(sections);
535        
536        let mut buf = vec![0u8; 2048];
537        let mut cursor = Cursor::new(&mut buf[..]);
538        format.write(&mut cursor).unwrap();
539        
540        let mut cursor = Cursor::new(&buf[..]);
541        let read_format = SSTableFormat::read(&mut cursor).unwrap();
542        
543        assert_eq!(read_format.header.num_sections, 2);
544        assert!(read_format.get_section(SectionType::DataBlocks).is_some());
545        assert!(read_format.get_section(SectionType::Index).is_some());
546        assert!(read_format.get_section(SectionType::Filter).is_none());
547    }
548}