1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::io::{Cursor, Read, Seek, SeekFrom, Write};
65
66pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
68
69pub const FORMAT_VERSION: u32 = 1;
71
72pub const HEADER_SIZE: usize = 32;
74
75pub const SECTION_ENTRY_SIZE: usize = 24;
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct TableMagic([u8; 8]);
81
82impl TableMagic {
83 pub fn new() -> Self {
84 Self(TABLE_MAGIC)
85 }
86
87 pub fn as_bytes(&self) -> &[u8; 8] {
88 &self.0
89 }
90
91 pub fn is_valid(&self) -> bool {
92 self.0 == TABLE_MAGIC
93 }
94}
95
96impl Default for TableMagic {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102#[repr(u32)]
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
105pub enum SectionType {
106 DataBlocks = 0,
108 Filter = 1,
110 Index = 2,
112 Metadata = 3,
114 RangeTombstones = 4,
116 CompressionDict = 5,
118 Reserved = 0xFFFFFFFF,
120}
121
122impl TryFrom<u32> for SectionType {
123 type Error = ();
124
125 fn try_from(value: u32) -> Result<Self, Self::Error> {
126 match value {
127 0 => Ok(SectionType::DataBlocks),
128 1 => Ok(SectionType::Filter),
129 2 => Ok(SectionType::Index),
130 3 => Ok(SectionType::Metadata),
131 4 => Ok(SectionType::RangeTombstones),
132 5 => Ok(SectionType::CompressionDict),
133 _ => Err(()),
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct Section {
141 pub section_type: SectionType,
143 pub offset: u64,
145 pub size: u64,
147 pub checksum: u32,
149}
150
151impl Section {
152 pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
153 Self {
154 section_type,
155 offset,
156 size,
157 checksum,
158 }
159 }
160
161 pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
163 writer.write_u32::<LittleEndian>(self.section_type as u32)?;
164 writer.write_u64::<LittleEndian>(self.offset)?;
165 writer.write_u64::<LittleEndian>(self.size)?;
166 writer.write_u32::<LittleEndian>(self.checksum)?;
167 Ok(())
168 }
169
170 pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
172 let type_val = reader.read_u32::<LittleEndian>()?;
173 let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
174 let offset = reader.read_u64::<LittleEndian>()?;
175 let size = reader.read_u64::<LittleEndian>()?;
176 let checksum = reader.read_u32::<LittleEndian>()?;
177
178 Ok(Self {
179 section_type,
180 offset,
181 size,
182 checksum,
183 })
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct Header {
190 pub magic: TableMagic,
192 pub version: u32,
194 pub flags: u32,
196 pub num_sections: u32,
198 pub footer_offset: u64,
200 pub checksum: u32,
202}
203
204impl Header {
205 pub fn new(num_sections: u32, footer_offset: u64) -> Self {
206 let mut header = Self {
207 magic: TableMagic::new(),
208 version: FORMAT_VERSION,
209 flags: 0,
210 num_sections,
211 footer_offset,
212 checksum: 0,
213 };
214 header.checksum = header.compute_checksum();
215 header
216 }
217
218 pub fn encode(&self) -> [u8; HEADER_SIZE] {
220 let mut buf = [0u8; HEADER_SIZE];
221 let mut cursor = Cursor::new(&mut buf[..]);
222
223 cursor.write_all(self.magic.as_bytes()).unwrap();
224 cursor.write_u32::<LittleEndian>(self.version).unwrap();
225 cursor.write_u32::<LittleEndian>(self.flags).unwrap();
226 cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
227 cursor
228 .write_u64::<LittleEndian>(self.footer_offset)
229 .unwrap();
230 cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
231
232 buf
233 }
234
235 pub fn decode(data: &[u8]) -> Option<Self> {
237 if data.len() < HEADER_SIZE {
238 return None;
239 }
240
241 let mut cursor = Cursor::new(data);
242
243 let mut magic_bytes = [0u8; 8];
244 cursor.read_exact(&mut magic_bytes).ok()?;
245 let magic = TableMagic(magic_bytes);
246
247 let version = cursor.read_u32::<LittleEndian>().ok()?;
248 let flags = cursor.read_u32::<LittleEndian>().ok()?;
249 let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
250 let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
251 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
252
253 let header = Self {
254 magic,
255 version,
256 flags,
257 num_sections,
258 footer_offset,
259 checksum,
260 };
261
262 if header.compute_checksum() != checksum {
264 return None;
265 }
266
267 Some(header)
268 }
269
270 fn compute_checksum(&self) -> u32 {
272 let mut hasher = crc32fast::Hasher::new();
273 hasher.update(self.magic.as_bytes());
274 hasher.update(&self.version.to_le_bytes());
275 hasher.update(&self.flags.to_le_bytes());
276 hasher.update(&self.num_sections.to_le_bytes());
277 hasher.update(&self.footer_offset.to_le_bytes());
278 hasher.finalize()
279 }
280
281 pub fn is_valid(&self) -> bool {
283 self.magic.is_valid()
284 && self.version <= FORMAT_VERSION
285 && self.compute_checksum() == self.checksum
286 }
287}
288
289#[derive(Debug, Clone)]
291pub struct Footer {
292 pub sections: Vec<Section>,
294 pub checksum: u32,
296 pub magic: TableMagic,
298}
299
300impl Footer {
301 pub fn new(sections: Vec<Section>) -> Self {
302 let mut footer = Self {
303 sections,
304 checksum: 0,
305 magic: TableMagic::new(),
306 };
307 footer.checksum = footer.compute_checksum();
308 footer
309 }
310
311 pub fn encode(&self) -> Vec<u8> {
313 let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
314 let mut buf = Vec::with_capacity(size);
315
316 for section in &self.sections {
317 section.encode(&mut buf).unwrap();
318 }
319
320 buf.write_u32::<LittleEndian>(self.checksum).unwrap();
321 buf.extend_from_slice(self.magic.as_bytes());
322
323 buf
324 }
325
326 pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
328 let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
329 if data.len() < expected_size {
330 return None;
331 }
332
333 let mut cursor = Cursor::new(data);
334
335 let mut sections = Vec::with_capacity(num_sections as usize);
336 for _ in 0..num_sections {
337 sections.push(Section::decode(&mut cursor).ok()?);
338 }
339
340 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
341
342 let mut magic_bytes = [0u8; 8];
343 cursor.read_exact(&mut magic_bytes).ok()?;
344 let magic = TableMagic(magic_bytes);
345
346 let footer = Self {
347 sections,
348 checksum,
349 magic,
350 };
351
352 if footer.compute_checksum() != checksum {
354 return None;
355 }
356
357 Some(footer)
358 }
359
360 fn compute_checksum(&self) -> u32 {
362 let mut hasher = crc32fast::Hasher::new();
363 for section in &self.sections {
364 hasher.update(&(section.section_type as u32).to_le_bytes());
365 hasher.update(§ion.offset.to_le_bytes());
366 hasher.update(§ion.size.to_le_bytes());
367 hasher.update(§ion.checksum.to_le_bytes());
368 }
369 hasher.finalize()
370 }
371
372 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
374 self.sections
375 .iter()
376 .find(|s| s.section_type == section_type)
377 }
378
379 pub fn has_section(&self, section_type: SectionType) -> bool {
381 self.get_section(section_type).is_some()
382 }
383}
384
385pub struct SSTableFormat {
387 pub header: Header,
388 pub footer: Footer,
389}
390
391impl SSTableFormat {
392 pub fn new(sections: Vec<Section>) -> Self {
394 let footer_offset = sections
395 .iter()
396 .map(|s| s.offset + s.size)
397 .max()
398 .unwrap_or(HEADER_SIZE as u64);
399
400 Self {
401 header: Header::new(sections.len() as u32, footer_offset),
402 footer: Footer::new(sections),
403 }
404 }
405
406 pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
408 let mut header_buf = [0u8; HEADER_SIZE];
410 reader.read_exact(&mut header_buf)?;
411
412 let header = Header::decode(&header_buf).ok_or_else(|| {
413 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header")
414 })?;
415
416 if !header.is_valid() {
417 return Err(std::io::Error::new(
418 std::io::ErrorKind::InvalidData,
419 "Invalid header",
420 ));
421 }
422
423 reader.seek(SeekFrom::Start(header.footer_offset))?;
425
426 let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
428 let mut footer_buf = vec![0u8; footer_size];
429 reader.read_exact(&mut footer_buf)?;
430
431 let footer = Footer::decode(&footer_buf, header.num_sections).ok_or_else(|| {
432 std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer")
433 })?;
434
435 Ok(Self { header, footer })
436 }
437
438 pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
440 writer.seek(SeekFrom::Start(0))?;
442 writer.write_all(&self.header.encode())?;
443
444 writer.seek(SeekFrom::Start(self.header.footer_offset))?;
446 writer.write_all(&self.footer.encode())?;
447
448 Ok(())
449 }
450
451 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
453 self.footer.get_section(section_type)
454 }
455
456 pub fn validate_section<R: Read + Seek>(
458 &self,
459 reader: &mut R,
460 section: &Section,
461 ) -> std::io::Result<bool> {
462 reader.seek(SeekFrom::Start(section.offset))?;
463
464 let mut data = vec![0u8; section.size as usize];
465 reader.read_exact(&mut data)?;
466
467 let computed_checksum = crc32fast::hash(&data);
468 Ok(computed_checksum == section.checksum)
469 }
470
471 pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
475 for section in &self.footer.sections {
476 if !self.validate_section(reader, section)? {
477 return Ok(false);
478 }
479 }
480 Ok(true)
481 }
482}
483
484#[cfg(test)]
489mod tests {
490 use super::*;
491 use std::io::Cursor;
492
493 #[test]
494 fn test_table_magic() {
495 let magic = TableMagic::new();
496 assert!(magic.is_valid());
497 assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
498 }
499
500 #[test]
501 fn test_header_roundtrip() {
502 let header = Header::new(3, 1024);
503 let encoded = header.encode();
504
505 let decoded = Header::decode(&encoded).unwrap();
506 assert_eq!(decoded.version, FORMAT_VERSION);
507 assert_eq!(decoded.num_sections, 3);
508 assert_eq!(decoded.footer_offset, 1024);
509 assert!(decoded.is_valid());
510 }
511
512 #[test]
513 fn test_section_roundtrip() {
514 let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
515
516 let mut buf = Vec::new();
517 section.encode(&mut buf).unwrap();
518
519 let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
520 assert_eq!(decoded.section_type, SectionType::DataBlocks);
521 assert_eq!(decoded.offset, 100);
522 assert_eq!(decoded.size, 500);
523 assert_eq!(decoded.checksum, 12345);
524 }
525
526 #[test]
527 fn test_footer_roundtrip() {
528 let sections = vec![
529 Section::new(SectionType::DataBlocks, 32, 1000, 111),
530 Section::new(SectionType::Filter, 1032, 200, 222),
531 Section::new(SectionType::Index, 1232, 100, 333),
532 ];
533
534 let footer = Footer::new(sections);
535 let encoded = footer.encode();
536
537 let decoded = Footer::decode(&encoded, 3).unwrap();
538 assert_eq!(decoded.sections.len(), 3);
539 assert!(decoded.magic.is_valid());
540 }
541
542 #[test]
543 fn test_format_roundtrip() {
544 let sections = vec![
545 Section::new(SectionType::DataBlocks, 32, 1000, 111),
546 Section::new(SectionType::Index, 1032, 100, 222),
547 ];
548
549 let format = SSTableFormat::new(sections);
550
551 let mut buf = vec![0u8; 2048];
552 let mut cursor = Cursor::new(&mut buf[..]);
553 format.write(&mut cursor).unwrap();
554
555 let mut cursor = Cursor::new(&buf[..]);
556 let read_format = SSTableFormat::read(&mut cursor).unwrap();
557
558 assert_eq!(read_format.header.num_sections, 2);
559 assert!(read_format.get_section(SectionType::DataBlocks).is_some());
560 assert!(read_format.get_section(SectionType::Index).is_some());
561 assert!(read_format.get_section(SectionType::Filter).is_none());
562 }
563}