1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
61use std::collections::HashMap;
62use std::io::{Cursor, Read, Seek, SeekFrom, Write};
63
64pub const TABLE_MAGIC: [u8; 8] = [0x54, 0x44, 0x42, 0x53, 0x53, 0x54, 0x61, 0x62];
66
67pub const FORMAT_VERSION: u32 = 1;
69
70pub const HEADER_SIZE: usize = 32;
72
73pub const SECTION_ENTRY_SIZE: usize = 24;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct TableMagic([u8; 8]);
79
80impl TableMagic {
81 pub fn new() -> Self {
82 Self(TABLE_MAGIC)
83 }
84
85 pub fn as_bytes(&self) -> &[u8; 8] {
86 &self.0
87 }
88
89 pub fn is_valid(&self) -> bool {
90 self.0 == TABLE_MAGIC
91 }
92}
93
94impl Default for TableMagic {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100#[repr(u32)]
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
103pub enum SectionType {
104 DataBlocks = 0,
106 Filter = 1,
108 Index = 2,
110 Metadata = 3,
112 RangeTombstones = 4,
114 CompressionDict = 5,
116 Reserved = 0xFFFFFFFF,
118}
119
120impl TryFrom<u32> for SectionType {
121 type Error = ();
122
123 fn try_from(value: u32) -> Result<Self, Self::Error> {
124 match value {
125 0 => Ok(SectionType::DataBlocks),
126 1 => Ok(SectionType::Filter),
127 2 => Ok(SectionType::Index),
128 3 => Ok(SectionType::Metadata),
129 4 => Ok(SectionType::RangeTombstones),
130 5 => Ok(SectionType::CompressionDict),
131 _ => Err(()),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
138pub struct Section {
139 pub section_type: SectionType,
141 pub offset: u64,
143 pub size: u64,
145 pub checksum: u32,
147}
148
149impl Section {
150 pub fn new(section_type: SectionType, offset: u64, size: u64, checksum: u32) -> Self {
151 Self {
152 section_type,
153 offset,
154 size,
155 checksum,
156 }
157 }
158
159 pub fn encode<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
161 writer.write_u32::<LittleEndian>(self.section_type as u32)?;
162 writer.write_u64::<LittleEndian>(self.offset)?;
163 writer.write_u64::<LittleEndian>(self.size)?;
164 writer.write_u32::<LittleEndian>(self.checksum)?;
165 Ok(())
166 }
167
168 pub fn decode<R: Read>(reader: &mut R) -> std::io::Result<Self> {
170 let type_val = reader.read_u32::<LittleEndian>()?;
171 let section_type = SectionType::try_from(type_val).unwrap_or(SectionType::Reserved);
172 let offset = reader.read_u64::<LittleEndian>()?;
173 let size = reader.read_u64::<LittleEndian>()?;
174 let checksum = reader.read_u32::<LittleEndian>()?;
175
176 Ok(Self {
177 section_type,
178 offset,
179 size,
180 checksum,
181 })
182 }
183}
184
185#[derive(Debug, Clone)]
187pub struct Header {
188 pub magic: TableMagic,
190 pub version: u32,
192 pub flags: u32,
194 pub num_sections: u32,
196 pub footer_offset: u64,
198 pub checksum: u32,
200}
201
202impl Header {
203 pub fn new(num_sections: u32, footer_offset: u64) -> Self {
204 let mut header = Self {
205 magic: TableMagic::new(),
206 version: FORMAT_VERSION,
207 flags: 0,
208 num_sections,
209 footer_offset,
210 checksum: 0,
211 };
212 header.checksum = header.compute_checksum();
213 header
214 }
215
216 pub fn encode(&self) -> [u8; HEADER_SIZE] {
218 let mut buf = [0u8; HEADER_SIZE];
219 let mut cursor = Cursor::new(&mut buf[..]);
220
221 cursor.write_all(self.magic.as_bytes()).unwrap();
222 cursor.write_u32::<LittleEndian>(self.version).unwrap();
223 cursor.write_u32::<LittleEndian>(self.flags).unwrap();
224 cursor.write_u32::<LittleEndian>(self.num_sections).unwrap();
225 cursor.write_u64::<LittleEndian>(self.footer_offset).unwrap();
226 cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
227
228 buf
229 }
230
231 pub fn decode(data: &[u8]) -> Option<Self> {
233 if data.len() < HEADER_SIZE {
234 return None;
235 }
236
237 let mut cursor = Cursor::new(data);
238
239 let mut magic_bytes = [0u8; 8];
240 cursor.read_exact(&mut magic_bytes).ok()?;
241 let magic = TableMagic(magic_bytes);
242
243 let version = cursor.read_u32::<LittleEndian>().ok()?;
244 let flags = cursor.read_u32::<LittleEndian>().ok()?;
245 let num_sections = cursor.read_u32::<LittleEndian>().ok()?;
246 let footer_offset = cursor.read_u64::<LittleEndian>().ok()?;
247 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
248
249 let header = Self {
250 magic,
251 version,
252 flags,
253 num_sections,
254 footer_offset,
255 checksum,
256 };
257
258 if header.compute_checksum() != checksum {
260 return None;
261 }
262
263 Some(header)
264 }
265
266 fn compute_checksum(&self) -> u32 {
268 let mut hasher = crc32fast::Hasher::new();
269 hasher.update(self.magic.as_bytes());
270 hasher.update(&self.version.to_le_bytes());
271 hasher.update(&self.flags.to_le_bytes());
272 hasher.update(&self.num_sections.to_le_bytes());
273 hasher.update(&self.footer_offset.to_le_bytes());
274 hasher.finalize()
275 }
276
277 pub fn is_valid(&self) -> bool {
279 self.magic.is_valid() &&
280 self.version <= FORMAT_VERSION &&
281 self.compute_checksum() == self.checksum
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct Footer {
288 pub sections: Vec<Section>,
290 pub checksum: u32,
292 pub magic: TableMagic,
294}
295
296impl Footer {
297 pub fn new(sections: Vec<Section>) -> Self {
298 let mut footer = Self {
299 sections,
300 checksum: 0,
301 magic: TableMagic::new(),
302 };
303 footer.checksum = footer.compute_checksum();
304 footer
305 }
306
307 pub fn encode(&self) -> Vec<u8> {
309 let size = self.sections.len() * SECTION_ENTRY_SIZE + 4 + 8;
310 let mut buf = Vec::with_capacity(size);
311
312 for section in &self.sections {
313 section.encode(&mut buf).unwrap();
314 }
315
316 buf.write_u32::<LittleEndian>(self.checksum).unwrap();
317 buf.extend_from_slice(self.magic.as_bytes());
318
319 buf
320 }
321
322 pub fn decode(data: &[u8], num_sections: u32) -> Option<Self> {
324 let expected_size = num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
325 if data.len() < expected_size {
326 return None;
327 }
328
329 let mut cursor = Cursor::new(data);
330
331 let mut sections = Vec::with_capacity(num_sections as usize);
332 for _ in 0..num_sections {
333 sections.push(Section::decode(&mut cursor).ok()?);
334 }
335
336 let checksum = cursor.read_u32::<LittleEndian>().ok()?;
337
338 let mut magic_bytes = [0u8; 8];
339 cursor.read_exact(&mut magic_bytes).ok()?;
340 let magic = TableMagic(magic_bytes);
341
342 let footer = Self {
343 sections,
344 checksum,
345 magic,
346 };
347
348 if footer.compute_checksum() != checksum {
350 return None;
351 }
352
353 Some(footer)
354 }
355
356 fn compute_checksum(&self) -> u32 {
358 let mut hasher = crc32fast::Hasher::new();
359 for section in &self.sections {
360 hasher.update(&(section.section_type as u32).to_le_bytes());
361 hasher.update(§ion.offset.to_le_bytes());
362 hasher.update(§ion.size.to_le_bytes());
363 hasher.update(§ion.checksum.to_le_bytes());
364 }
365 hasher.finalize()
366 }
367
368 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
370 self.sections.iter().find(|s| s.section_type == section_type)
371 }
372
373 pub fn has_section(&self, section_type: SectionType) -> bool {
375 self.get_section(section_type).is_some()
376 }
377}
378
379pub struct SSTableFormat {
381 pub header: Header,
382 pub footer: Footer,
383}
384
385impl SSTableFormat {
386 pub fn new(sections: Vec<Section>) -> Self {
388 let footer_offset = sections.iter().map(|s| s.offset + s.size).max().unwrap_or(HEADER_SIZE as u64);
389
390 Self {
391 header: Header::new(sections.len() as u32, footer_offset),
392 footer: Footer::new(sections),
393 }
394 }
395
396 pub fn read<R: Read + Seek>(reader: &mut R) -> std::io::Result<Self> {
398 let mut header_buf = [0u8; HEADER_SIZE];
400 reader.read_exact(&mut header_buf)?;
401
402 let header = Header::decode(&header_buf)
403 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"))?;
404
405 if !header.is_valid() {
406 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid header"));
407 }
408
409 reader.seek(SeekFrom::Start(header.footer_offset))?;
411
412 let footer_size = header.num_sections as usize * SECTION_ENTRY_SIZE + 4 + 8;
414 let mut footer_buf = vec![0u8; footer_size];
415 reader.read_exact(&mut footer_buf)?;
416
417 let footer = Footer::decode(&footer_buf, header.num_sections)
418 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid footer"))?;
419
420 Ok(Self { header, footer })
421 }
422
423 pub fn write<W: Write + Seek>(&self, writer: &mut W) -> std::io::Result<()> {
425 writer.seek(SeekFrom::Start(0))?;
427 writer.write_all(&self.header.encode())?;
428
429 writer.seek(SeekFrom::Start(self.header.footer_offset))?;
431 writer.write_all(&self.footer.encode())?;
432
433 Ok(())
434 }
435
436 pub fn get_section(&self, section_type: SectionType) -> Option<&Section> {
438 self.footer.get_section(section_type)
439 }
440
441 pub fn validate_section<R: Read + Seek>(
443 &self,
444 reader: &mut R,
445 section: &Section,
446 ) -> std::io::Result<bool> {
447 reader.seek(SeekFrom::Start(section.offset))?;
448
449 let mut data = vec![0u8; section.size as usize];
450 reader.read_exact(&mut data)?;
451
452 let computed_checksum = crc32fast::hash(&data);
453 Ok(computed_checksum == section.checksum)
454 }
455
456 pub fn validate_all_sections<R: Read + Seek>(&self, reader: &mut R) -> std::io::Result<bool> {
460 for section in &self.footer.sections {
461 if !self.validate_section(reader, section)? {
462 return Ok(false);
463 }
464 }
465 Ok(true)
466 }
467}
468
469#[cfg(test)]
474mod tests {
475 use super::*;
476 use std::io::Cursor;
477
478 #[test]
479 fn test_table_magic() {
480 let magic = TableMagic::new();
481 assert!(magic.is_valid());
482 assert_eq!(magic.as_bytes(), &TABLE_MAGIC);
483 }
484
485 #[test]
486 fn test_header_roundtrip() {
487 let header = Header::new(3, 1024);
488 let encoded = header.encode();
489
490 let decoded = Header::decode(&encoded).unwrap();
491 assert_eq!(decoded.version, FORMAT_VERSION);
492 assert_eq!(decoded.num_sections, 3);
493 assert_eq!(decoded.footer_offset, 1024);
494 assert!(decoded.is_valid());
495 }
496
497 #[test]
498 fn test_section_roundtrip() {
499 let section = Section::new(SectionType::DataBlocks, 100, 500, 12345);
500
501 let mut buf = Vec::new();
502 section.encode(&mut buf).unwrap();
503
504 let decoded = Section::decode(&mut Cursor::new(&buf)).unwrap();
505 assert_eq!(decoded.section_type, SectionType::DataBlocks);
506 assert_eq!(decoded.offset, 100);
507 assert_eq!(decoded.size, 500);
508 assert_eq!(decoded.checksum, 12345);
509 }
510
511 #[test]
512 fn test_footer_roundtrip() {
513 let sections = vec![
514 Section::new(SectionType::DataBlocks, 32, 1000, 111),
515 Section::new(SectionType::Filter, 1032, 200, 222),
516 Section::new(SectionType::Index, 1232, 100, 333),
517 ];
518
519 let footer = Footer::new(sections);
520 let encoded = footer.encode();
521
522 let decoded = Footer::decode(&encoded, 3).unwrap();
523 assert_eq!(decoded.sections.len(), 3);
524 assert!(decoded.magic.is_valid());
525 }
526
527 #[test]
528 fn test_format_roundtrip() {
529 let sections = vec![
530 Section::new(SectionType::DataBlocks, 32, 1000, 111),
531 Section::new(SectionType::Index, 1032, 100, 222),
532 ];
533
534 let format = SSTableFormat::new(sections);
535
536 let mut buf = vec![0u8; 2048];
537 let mut cursor = Cursor::new(&mut buf[..]);
538 format.write(&mut cursor).unwrap();
539
540 let mut cursor = Cursor::new(&buf[..]);
541 let read_format = SSTableFormat::read(&mut cursor).unwrap();
542
543 assert_eq!(read_format.header.num_sections, 2);
544 assert!(read_format.get_section(SectionType::DataBlocks).is_some());
545 assert!(read_format.get_section(SectionType::Index).is_some());
546 assert!(read_format.get_section(SectionType::Filter).is_none());
547 }
548}