1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::collections::HashMap;
65use std::io::{Cursor, Read, Seek, SeekFrom, Write};
66
67pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
69
70pub const FORMAT_VERSION: u32 = 1;
72
73pub const HEADER_SIZE: usize = 32;
75
76pub const SECTION_ENTRY_SIZE: usize = 24;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct TableMagic([u8; 8]);
82
83impl TableMagic {
84 pub fn new() -> Self {
85 Self(TABLE_MAGIC)
86 }
87
88 pub fn as_bytes(&self) -> &[u8; 8] {
89 &self.0
90 }
91
92 pub fn is_valid(&self) -> bool {
93 self.0 == TABLE_MAGIC
94 }
95}
96
97impl Default for TableMagic {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[repr(u32)]
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
106pub enum SectionType {
107 DataBlocks = 0,
109 Filter = 1,
111 Index = 2,
113 Metadata = 3,
115 RangeTombstones = 4,
117 CompressionDict = 5,
119 Reserved = 0xFFFFFFFF,
121}
122
123impl TryFrom<u32> for SectionType {
124 type Error = ();
125
126 fn try_from(value: u32) -> Result<Self, Self::Error> {
127 match value {
128 0 => Ok(SectionType::DataBlocks),
129 1 => Ok(SectionType::Filter),
130 2 => Ok(SectionType::Index),
131 3 => Ok(SectionType::Metadata),
132 4 => Ok(SectionType::RangeTombstones),
133 5 => Ok(SectionType::CompressionDict),
134 _ => Err(()),
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct Section {
142 pub section_type: SectionType,
144 pub offset: u64,
146 pub size: u64,
148 pub checksum: u32,
150}
151
152impl Section {
153 pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
154 Self {
155 section_type,
156 offset,
157 size,
158 checksum,
159 }
160 }
161
162 pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
164 writer.write_u32::<LittleEndian>(self.section_type as u32)?;
165 writer.write_u64::<LittleEndian>(self.offset)?;
166 writer.write_u64::<LittleEndian>(self.size)?;
167 writer.write_u32::<LittleEndian>(self.checksum)?;
168 Ok(())
169 }
170
171 pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
173 let type_val = reader.read_u32::<LittleEndian>()?;
174 let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
175 let offset = reader.read_u64::<LittleEndian>()?;
176 let size = reader.read_u64::<LittleEndian>()?;
177 let checksum = reader.read_u32::<LittleEndian>()?;
178
179 Ok(Self {
180 section_type,
181 offset,
182 size,
183 checksum,
184 })
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct Header {
191 pub magic: TableMagic,
193 pub version: u32,
195 pub flags: u32,
197 pub num_sections: u32,
199 pub footer_offset: u64,
201 pub checksum: u32,
203}
204
205impl Header {
206 pub fn new(num_sections: u32, footer_offset: u64) -> Self {
207 let mut header = Self {
208 magic: TableMagic::new(),
209 version: FORMAT_VERSION,
210 flags: 0,
211 num_sections,
212 footer_offset,
213 checksum: 0,
214 };
215 header.checksum = header.compute_checksum();
216 header
217 }
218
219 pub fn encode(&self) -> [u8; HEADER_SIZE] {
221 let mut buf = [0u8; HEADER_SIZE];
222 let mut cursor = Cursor::new(&mut buf[..]);
223
224 cursor.write_all(self.magic.as_bytes()).unwrap();
225 cursor.write_u32::<LittleEndian>(self.version).unwrap();
226 cursor.write_u32::<LittleEndian>(self.flags).unwrap();
227 cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
228 cursor.write_u64::<LittleEndian>(self.footer_offset).unwrap();
229 cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
230
231 buf
232 }
233
234 pub fn decode(data: &[u8]) -> Option<Self> {
236 if data.len() < HEADER_SIZE {
237 return None;
238 }
239
240 let mut cursor = Cursor::new(data);
241
242 let mut magic_bytes = [0u8; 8];
243 cursor.read_exact(&mut magic_bytes).ok()?;
244 let magic = TableMagic(magic_bytes);
245
246 let version = cursor.read_u32::<LittleEndian>().ok()?;
247 let flags = cursor.read_u32::<LittleEndian>().ok()?;
248 let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
249 let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
250 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
251
252 let header = Self {
253 magic,
254 version,
255 flags,
256 num_sections,
257 footer_offset,
258 checksum,
259 };
260
261 if header.compute_checksum() != checksum {
263 return None;
264 }
265
266 Some(header)
267 }
268
269 fn compute_checksum(&self) -> u32 {
271 let mut hasher = crc32fast::Hasher::new();
272 hasher.update(self.magic.as_bytes());
273 hasher.update(&self.version.to_le_bytes());
274 hasher.update(&self.flags.to_le_bytes());
275 hasher.update(&self.num_sections.to_le_bytes());
276 hasher.update(&self.footer_offset.to_le_bytes());
277 hasher.finalize()
278 }
279
280 pub fn is_valid(&self) -> bool {
282 self.magic.is_valid() &&
283 self.version <= FORMAT_VERSION &&
284 self.compute_checksum() == self.checksum
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct Footer {
291 pub sections: Vec<Section>,
293 pub checksum: u32,
295 pub magic: TableMagic,
297}
298
299impl Footer {
300 pub fn new(sections: Vec<Section>) -> Self {
301 let mut footer = Self {
302 sections,
303 checksum: 0,
304 magic: TableMagic::new(),
305 };
306 footer.checksum = footer.compute_checksum();
307 footer
308 }
309
310 pub fn encode(&self) -> Vec<u8> {
312 let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
313 let mut buf = Vec::with_capacity(size);
314
315 for section in &self.sections {
316 section.encode(&mut buf).unwrap();
317 }
318
319 buf.write_u32::<LittleEndian>(self.checksum).unwrap();
320 buf.extend_from_slice(self.magic.as_bytes());
321
322 buf
323 }
324
325 pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
327 let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
328 if data.len() < expected_size {
329 return None;
330 }
331
332 let mut cursor = Cursor::new(data);
333
334 let mut sections = Vec::with_capacity(num_sections as usize);
335 for _ in 0..num_sections {
336 sections.push(Section::decode(&mut cursor).ok()?);
337 }
338
339 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
340
341 let mut magic_bytes = [0u8; 8];
342 cursor.read_exact(&mut magic_bytes).ok()?;
343 let magic = TableMagic(magic_bytes);
344
345 let footer = Self {
346 sections,
347 checksum,
348 magic,
349 };
350
351 if footer.compute_checksum() != checksum {
353 return None;
354 }
355
356 Some(footer)
357 }
358
359 fn compute_checksum(&self) -> u32 {
361 let mut hasher = crc32fast::Hasher::new();
362 for section in &self.sections {
363 hasher.update(&(section.section_type as u32).to_le_bytes());
364 hasher.update(§ion.offset.to_le_bytes());
365 hasher.update(§ion.size.to_le_bytes());
366 hasher.update(§ion.checksum.to_le_bytes());
367 }
368 hasher.finalize()
369 }
370
371 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
373 self.sections.iter().find(|s| s.section_type == section_type)
374 }
375
376 pub fn has_section(&self, section_type: SectionType) -> bool {
378 self.get_section(section_type).is_some()
379 }
380}
381
382pub struct SSTableFormat {
384 pub header: Header,
385 pub footer: Footer,
386}
387
388impl SSTableFormat {
389 pub fn new(sections: Vec<Section>) -> Self {
391 let footer_offset = sections.iter().map(|s| s.offset + s.size).max().unwrap_or(HEADER_SIZE as u64);
392
393 Self {
394 header: Header::new(sections.len() as u32, footer_offset),
395 footer: Footer::new(sections),
396 }
397 }
398
399 pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
401 let mut header_buf = [0u8; HEADER_SIZE];
403 reader.read_exact(&mut header_buf)?;
404
405 let header = Header::decode(&header_buf)
406 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"))?;
407
408 if !header.is_valid() {
409 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"));
410 }
411
412 reader.seek(SeekFrom::Start(header.footer_offset))?;
414
415 let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
417 let mut footer_buf = vec![0u8; footer_size];
418 reader.read_exact(&mut footer_buf)?;
419
420 let footer = Footer::decode(&footer_buf, header.num_sections)
421 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer"))?;
422
423 Ok(Self { header, footer })
424 }
425
426 pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
428 writer.seek(SeekFrom::Start(0))?;
430 writer.write_all(&self.header.encode())?;
431
432 writer.seek(SeekFrom::Start(self.header.footer_offset))?;
434 writer.write_all(&self.footer.encode())?;
435
436 Ok(())
437 }
438
439 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
441 self.footer.get_section(section_type)
442 }
443
444 pub fn validate_section<R: Read + Seek>(
446 &self,
447 reader: &mut R,
448 section: &Section,
449 ) -> std::io::Result<bool> {
450 reader.seek(SeekFrom::Start(section.offset))?;
451
452 let mut data = vec![0u8; section.size as usize];
453 reader.read_exact(&mut data)?;
454
455 let computed_checksum = crc32fast::hash(&data);
456 Ok(computed_checksum == section.checksum)
457 }
458
459 pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
463 for section in &self.footer.sections {
464 if !self.validate_section(reader, section)? {
465 return Ok(false);
466 }
467 }
468 Ok(true)
469 }
470}
471
472#[cfg(test)]
477mod tests {
478 use super::*;
479 use std::io::Cursor;
480
481 #[test]
482 fn test_table_magic() {
483 let magic = TableMagic::new();
484 assert!(magic.is_valid());
485 assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
486 }
487
488 #[test]
489 fn test_header_roundtrip() {
490 let header = Header::new(3, 1024);
491 let encoded = header.encode();
492
493 let decoded = Header::decode(&encoded).unwrap();
494 assert_eq!(decoded.version, FORMAT_VERSION);
495 assert_eq!(decoded.num_sections, 3);
496 assert_eq!(decoded.footer_offset, 1024);
497 assert!(decoded.is_valid());
498 }
499
500 #[test]
501 fn test_section_roundtrip() {
502 let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
503
504 let mut buf = Vec::new();
505 section.encode(&mut buf).unwrap();
506
507 let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
508 assert_eq!(decoded.section_type, SectionType::DataBlocks);
509 assert_eq!(decoded.offset, 100);
510 assert_eq!(decoded.size, 500);
511 assert_eq!(decoded.checksum, 12345);
512 }
513
514 #[test]
515 fn test_footer_roundtrip() {
516 let sections = vec![
517 Section::new(SectionType::DataBlocks, 32, 1000, 111),
518 Section::new(SectionType::Filter, 1032, 200, 222),
519 Section::new(SectionType::Index, 1232, 100, 333),
520 ];
521
522 let footer = Footer::new(sections);
523 let encoded = footer.encode();
524
525 let decoded = Footer::decode(&encoded, 3).unwrap();
526 assert_eq!(decoded.sections.len(), 3);
527 assert!(decoded.magic.is_valid());
528 }
529
530 #[test]
531 fn test_format_roundtrip() {
532 let sections = vec![
533 Section::new(SectionType::DataBlocks, 32, 1000, 111),
534 Section::new(SectionType::Index, 1032, 100, 222),
535 ];
536
537 let format = SSTableFormat::new(sections);
538
539 let mut buf = vec![0u8; 2048];
540 let mut cursor = Cursor::new(&mut buf[..]);
541 format.write(&mut cursor).unwrap();
542
543 let mut cursor = Cursor::new(&buf[..]);
544 let read_format = SSTableFormat::read(&mut cursor).unwrap();
545
546 assert_eq!(read_format.header.num_sections, 2);
547 assert!(read_format.get_section(SectionType::DataBlocks).is_some());
548 assert!(read_format.get_section(SectionType::Index).is_some());
549 assert!(read_format.get_section(SectionType::Filter).is_none());
550 }
551}