rocksdb_fileformat/
footer.rs

1// Copyright 2024 YaleDB Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use byteorder::{ByteOrder, LittleEndian};
5
6use crate::block_handle::BlockHandle;
7use crate::error::{Error, Result};
8use crate::types::{
9    ChecksumType, LEGACY_FOOTER_SIZE, LEGACY_MAGIC_NUMBER, ROCKSDB_FOOTER_SIZE,
10    ROCKSDB_MAGIC_NUMBER, checksum_modifier_for_context,
11};
12use std::io::{Cursor, Read, Seek, SeekFrom};
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct Footer {
16    pub checksum_type: ChecksumType,
17    pub metaindex_handle: BlockHandle,
18    pub index_handle: BlockHandle,
19    pub format_version: u32,
20    /// Base context checksum used as entropy source for footer checksum calculation.
21    /// Only present in format version 6 and higher. This value is combined with the
22    /// footer's file offset to create a unique checksum modifier, preventing block
23    /// reuse attacks by ensuring checksums are position-dependent.
24    pub base_context_checksum: Option<u32>,
25}
26
27struct ReverseCursor<'a> {
28    data: &'a [u8],
29    pos: usize,
30}
31
32impl<'a> ReverseCursor<'a> {
33    pub fn new(data: &'a [u8]) -> Self {
34        Self {
35            data,
36            pos: data.len(),
37        }
38    }
39
40    pub fn read_u64(&mut self) -> Result<u64> {
41        if self.pos < 8 {
42            return Err(Error::DataCorruption(
43                "Unable to read data from cursor, because it's end".to_string(),
44            ));
45        }
46
47        self.pos -= 8;
48
49        Ok(LittleEndian::read_u64(&self.data[self.pos..self.pos + 8]))
50    }
51
52    pub fn read_i32(&mut self) -> Result<i32> {
53        if self.pos < 4 {
54            return Err(Error::DataCorruption(
55                "Unable to read data from cursor, because it's end".to_string(),
56            ));
57        }
58
59        self.pos -= 4;
60
61        Ok(LittleEndian::read_i32(&self.data[self.pos..self.pos + 4]))
62    }
63
64    pub fn read_u32(&mut self) -> Result<u32> {
65        if self.pos < 4 {
66            return Err(Error::DataCorruption(
67                "Unable to read data from cursor, because it's end".to_string(),
68            ));
69        }
70
71        self.pos -= 4;
72
73        Ok(LittleEndian::read_u32(&self.data[self.pos..self.pos + 4]))
74    }
75
76    pub fn read_u8(&mut self) -> Result<u8> {
77        if self.pos < 1 {
78            return Err(Error::DataCorruption(
79                "Unable to read data from cursor, because it's end".to_string(),
80            ));
81        }
82
83        self.pos -= 1;
84
85        Ok(self.data[self.pos])
86    }
87
88    pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
89        if self.pos < buf.len() {
90            return Err(Error::DataCorruption(
91                "Unable to read data from cursor, because it's end".to_string(),
92            ));
93        }
94
95        self.pos -= buf.len();
96        buf.copy_from_slice(&self.data[self.pos..self.pos + buf.len()]);
97
98        Ok(())
99    }
100}
101
102impl Footer {
103    pub fn read_from<R: Read + Seek>(reader: &mut R) -> Result<Self> {
104        let file_size = reader.seek(SeekFrom::End(0))?;
105
106        // Minimum file size is 48 bytes (legacy footer)
107        if file_size < LEGACY_FOOTER_SIZE as u64 {
108            return Err(Error::FileTooSmall);
109        }
110
111        // First, check for the new magic number at position -8
112        reader.seek(SeekFrom::End(-8))?;
113        let mut magic_bytes = [0u8; 8];
114        reader.read_exact(&mut magic_bytes)?;
115        let magic = u64::from_le_bytes(magic_bytes);
116
117        if magic == ROCKSDB_MAGIC_NUMBER {
118            // New format - read format version to determine footer size
119            reader.seek(SeekFrom::End(-12))?;
120            let mut version_bytes = [0u8; 4];
121            reader.read_exact(&mut version_bytes)?;
122
123            // Read the full footer
124            if file_size < ROCKSDB_FOOTER_SIZE as u64 {
125                return Err(Error::FileTooSmall);
126            }
127            reader.seek(SeekFrom::End(-(ROCKSDB_FOOTER_SIZE as i64)))?;
128            let mut footer_data = vec![0u8; ROCKSDB_FOOTER_SIZE];
129            reader.read_exact(&mut footer_data)?;
130
131            let input_offset = file_size - (ROCKSDB_FOOTER_SIZE as u64);
132            Self::decode_from_bytes(&footer_data, input_offset)
133        } else {
134            // Check for legacy magic number at position -48
135            reader.seek(SeekFrom::End(-(LEGACY_FOOTER_SIZE as i64)))?;
136            let mut legacy_magic_bytes = [0u8; 8];
137            reader.read_exact(&mut legacy_magic_bytes)?;
138            let legacy_magic = u64::from_le_bytes(legacy_magic_bytes);
139
140            if legacy_magic == LEGACY_MAGIC_NUMBER {
141                // Legacy format (v0) - 48-byte footer
142                reader.seek(SeekFrom::End(-(LEGACY_FOOTER_SIZE as i64)))?;
143                let mut footer_data = vec![0u8; LEGACY_FOOTER_SIZE];
144                reader.read_exact(&mut footer_data)?;
145                let input_offset = file_size - (LEGACY_FOOTER_SIZE as u64);
146                Self::decode_from_bytes(&footer_data, input_offset)
147            } else {
148                Err(Error::InvalidMagicNumber(magic))
149            }
150        }
151    }
152
153    pub fn decode_from_bytes(data: &[u8], input_offset: u64) -> Result<Self> {
154        // Check for magic number at the end
155        if data.len() < 12 {
156            return Err(Error::InvalidFooterSize(data.len()));
157        }
158
159        // +---------------------------------------------------------------+
160        // | checksum (1B) | part2 (40B) | format_version (4B) | magic (8B)|
161        // +---------------------------------------------------------------+
162        let mut cursor = ReverseCursor::new(&data);
163        let magic = cursor.read_u64()?;
164
165        // Handle legacy format (v0) first
166        if magic == LEGACY_MAGIC_NUMBER {
167            if data.len() != LEGACY_FOOTER_SIZE {
168                return Err(Error::InvalidFooterSize(data.len()));
169            }
170
171            // Legacy format: varint handles directly in the 40 bytes before magic
172            let mut cursor = Cursor::new(&data);
173            let metaindex_handle = BlockHandle::decode_from(&mut cursor)?;
174            let index_handle = BlockHandle::decode_from(&mut cursor)?;
175
176            return Ok(Footer {
177                checksum_type: ChecksumType::CRC32c, // Legacy assumes CRC32c
178                metaindex_handle,
179                index_handle,
180                format_version: 0,
181                base_context_checksum: None,
182            });
183        }
184
185        if magic != ROCKSDB_MAGIC_NUMBER {
186            return Err(Error::InvalidMagicNumber(magic));
187        }
188
189        let format_version = cursor.read_u32()?;
190        if format_version >= 6 {
191            // second part!
192            // 8 + 16 = 24 bytes padded, reserved
193            {
194                // 16 bytes of unchecked reserved padding
195                let mut skip_bytes = [0u8; 16];
196                cursor.read_exact(&mut skip_bytes).map_err(|err| {
197                    Error::DataCorruption(format!(
198                        "Unable to read 16 bytes for reserved padding: {:?}",
199                        err
200                    ))
201                })?;
202
203                // 8 bytes of checked reserved padding (expected to be zero unless using a
204                // future feature).
205                let reserved = cursor.read_u64().map_err(|err| {
206                    Error::DataCorruption(format!("Unable to read reserved 8 bytes: {:?}", err))
207                })?;
208                if reserved != 0 {
209                    return Err(Error::Unsupported(format!(
210                        "File uses a future feature not supported in this version: {}",
211                        reserved
212                    )));
213                }
214            }
215
216            // TODO: Fix me
217            let adjustment = 5;
218            let footer_offset = input_offset - adjustment;
219
220            let metaindex_size = cursor.read_i32()? as u64;
221            let metaindex_handle = BlockHandle::new(footer_offset - metaindex_size, metaindex_size);
222
223            // Index handle is null for v6+
224            let index_handle = BlockHandle::new(0, 0);
225
226            let base_context_checksum = cursor.read_i32().map_err(|err| {
227                Error::DataCorruption(format!("Unable to read base context checksum: {:?}", err))
228            })? as u32;
229
230            let stored_checksum = cursor.read_i32().map_err(|err| {
231                Error::DataCorruption(format!("Unable to read stored checksum: {:?}", err))
232            })? as u32;
233
234            {
235                let mut magic_bytes = [0u8; 4];
236                cursor.read_exact(&mut magic_bytes).map_err(|err| {
237                    Error::DataCorruption(format!("Unable to read footer magic bytes: {:?}", err))
238                })?;
239
240                // Check for extended magiс
241                if magic_bytes != [0x3e, 0x00, 0x7a, 0x00] {
242                    return Err(Error::DataCorruption(format!(
243                        "Invalid extended magic, actual: {:?}",
244                        magic_bytes
245                    )));
246                }
247            }
248
249            let checksum_type = ChecksumType::try_from(cursor.read_u8()?)?;
250
251            // Perform checksum verification
252            let mut footer_copy = data.to_vec();
253            // Zero out the checksum field (bytes 5-8 from the start)
254            footer_copy[5..9].fill(0);
255
256            let computed_checksum = checksum_type.calculate(&footer_copy);
257            let modified_checksum = computed_checksum.wrapping_add(checksum_modifier_for_context(
258                base_context_checksum,
259                input_offset,
260            ));
261
262            if modified_checksum != stored_checksum {
263                return Err(Error::DataCorruption(format!(
264                    "Footer checksum mismatch at offset {}: expected {:#x}, computed {:#x}",
265                    input_offset, stored_checksum, modified_checksum
266                )));
267            }
268
269            Ok(Footer {
270                checksum_type,
271                metaindex_handle,
272                index_handle,
273                format_version,
274                base_context_checksum: Some(base_context_checksum),
275            })
276        } else {
277            let version_start = data.len() - 12;
278
279            // Format v1-v5
280            // Some v5 files don't have checksum type byte (legacy-style)
281            // Check if first byte looks like a varint (doesn't have high bit set)
282            let (checksum_type, phase2_data) = if data[0] <= 0x7F && format_version >= 1 {
283                // Might have checksum type
284                match ChecksumType::try_from(data[0]) {
285                    Ok(ct) => (ct, &data[1..version_start]),
286                    Err(_) => (ChecksumType::CRC32c, &data[..version_start]),
287                }
288            } else {
289                // No checksum type byte
290                (ChecksumType::CRC32c, &data[..version_start])
291            };
292
293            // Parse block handles
294            let mut padded_cursor: Cursor<&[u8]> = Cursor::new(phase2_data);
295            let metaindex_handle = BlockHandle::decode_from(&mut padded_cursor)?;
296            let index_handle = BlockHandle::decode_from(&mut padded_cursor)?;
297
298            Ok(Footer {
299                checksum_type,
300                metaindex_handle,
301                index_handle,
302                format_version,
303                base_context_checksum: None,
304            })
305        }
306    }
307
308    pub fn encode_to_bytes(&self, offset: u64) -> Result<Vec<u8>> {
309        if self.format_version >= 6 {
310            // Reverse order, see ReverseCuros
311            let mut data = Vec::with_capacity(53);
312
313            // 1. checksum type (1 byte) - first byte, read last by ReverseCursor
314            data.push(self.checksum_type as u8);
315            // 2. extended magic bytes (4 bytes)
316            data.extend(&[0x3e, 0x00, 0x7a, 0x00]);
317
318            // 3. footer checksum (4 bytes as i32), initially zero
319            data.extend(&[0u8; 4]);
320
321            // 4. base context checksum (4 bytes as i32)
322            let base_context_checksum = self.base_context_checksum.unwrap_or(0);
323            data.extend(&(base_context_checksum as i32).to_le_bytes());
324            // 5. metaindex size (4 bytes as i32)
325            data.extend(&(self.metaindex_handle.size as i32).to_le_bytes());
326
327            // 6. checked reserved (8 bytes, must be zero)
328            data.extend(&[0u8; 8]);
329            // 7. unchecked reserved padding (16 bytes)
330            data.extend(&[0u8; 16]);
331
332            data.extend(&self.format_version.to_le_bytes());
333            data.extend(&ROCKSDB_MAGIC_NUMBER.to_le_bytes());
334
335            // Calculate checksum with the provided offset
336            let computed_checksum = self.checksum_type.calculate(&data);
337            let modified_checksum = computed_checksum
338                .wrapping_add(checksum_modifier_for_context(base_context_checksum, offset));
339
340            // Write the checksum to bytes 5-8 (where the checksum field is)
341            data[5..9].copy_from_slice(&(modified_checksum as i32).to_le_bytes());
342
343            Ok(data)
344        } else {
345            // v1-v5 format (49 bytes)
346            let mut data = Vec::with_capacity(ROCKSDB_FOOTER_SIZE);
347
348            // Write checksum type first for v1+
349            data.push(self.checksum_type as u8);
350
351            // Write block handles
352            self.metaindex_handle.encode_to(&mut data)?;
353            self.index_handle.encode_to(&mut data)?;
354
355            let used_bytes = data.len();
356
357            // Format: checksum_type(1) + block_handles + padding + format_version(4) + magic(8)
358            let padding_size = ROCKSDB_FOOTER_SIZE - used_bytes - 12; // 4 bytes for format version + 8 for magic
359            data.extend(vec![0u8; padding_size]);
360            data.extend(&self.format_version.to_le_bytes());
361            data.extend(&ROCKSDB_MAGIC_NUMBER.to_le_bytes());
362
363            assert_eq!(data.len(), ROCKSDB_FOOTER_SIZE);
364            Ok(data)
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn test_footer_magic_number_validation() -> Result<()> {
375        let footer = Footer {
376            checksum_type: ChecksumType::CRC32c,
377            metaindex_handle: BlockHandle::new(1000, 500),
378            index_handle: BlockHandle::new(1500, 200),
379            format_version: 5,
380            base_context_checksum: None,
381        };
382
383        let mut encoded = footer.encode_to_bytes(1500)?; // Using footer offset from test
384
385        encoded[ROCKSDB_FOOTER_SIZE - 1] = 0xFF;
386
387        let footer_offset = 1500; // Example footer offset
388        let result = Footer::decode_from_bytes(&encoded, footer_offset);
389        assert!(matches!(result, Err(Error::InvalidMagicNumber(_))));
390        Ok(())
391    }
392
393    #[test]
394    fn test_footer_size_validation() -> Result<()> {
395        let data = vec![0u8; 10]; // Wrong size
396        let footer_offset = 0; // Example footer offset
397        let result = Footer::decode_from_bytes(&data, footer_offset);
398        // Any size < 8 should fail due to magic number check
399        assert!(result.is_err());
400        Ok(())
401    }
402
403    #[test]
404    fn test_footer_roundtrip_v5() -> Result<()> {
405        let original = Footer {
406            checksum_type: ChecksumType::CRC32c,
407            metaindex_handle: BlockHandle::new(1000, 500),
408            index_handle: BlockHandle::new(1500, 200),
409            format_version: 5,
410            base_context_checksum: None,
411        };
412
413        let footer_offset = 1000; // Example footer offset
414        let encoded = original.encode_to_bytes(footer_offset)?;
415        assert_eq!(encoded.len(), ROCKSDB_FOOTER_SIZE);
416        let decoded = Footer::decode_from_bytes(&encoded, footer_offset)?;
417
418        // Compare all fields to ensure proper roundtrip encoding/decoding
419        assert_eq!(decoded.checksum_type, original.checksum_type);
420        assert_eq!(
421            decoded.metaindex_handle.size,
422            original.metaindex_handle.size
423        );
424        assert_eq!(
425            decoded.metaindex_handle.offset,
426            original.metaindex_handle.offset
427        );
428        assert_eq!(decoded.index_handle.size, original.index_handle.size);
429        assert_eq!(decoded.index_handle.offset, original.index_handle.offset);
430        assert_eq!(decoded.format_version, original.format_version);
431        assert_eq!(
432            decoded.base_context_checksum,
433            original.base_context_checksum
434        );
435        Ok(())
436    }
437
438    #[test]
439    fn test_footer_v6_roundtrip() -> Result<()> {
440        // For v6+, the metaindex offset is calculated from (input_offset - 5) - metaindex_size
441        // So we need to use a footer offset that's large enough
442        let input_offset = 100000; // Large value to avoid overflow
443        let metaindex_size = 500;
444        let expected_metaindex_offset = (input_offset - 5) - metaindex_size; // adjustment = 5
445
446        let original = Footer {
447            checksum_type: ChecksumType::CRC32c,
448            metaindex_handle: BlockHandle::new(expected_metaindex_offset, metaindex_size),
449            index_handle: BlockHandle::new(0, 0), // Null for v6+
450            format_version: 6,
451            base_context_checksum: Some(0x12345678),
452        };
453
454        let encoded = original.encode_to_bytes(input_offset)?;
455        assert_eq!(encoded.len(), 53); // v6+ footer size
456
457        let decoded = Footer::decode_from_bytes(&encoded, input_offset)?;
458
459        // Compare all fields except possibly checksum calculation differences due to offset
460        assert_eq!(decoded.checksum_type, original.checksum_type);
461        assert_eq!(
462            decoded.metaindex_handle.size,
463            original.metaindex_handle.size
464        );
465        assert_eq!(decoded.metaindex_handle.offset, expected_metaindex_offset);
466        assert_eq!(decoded.index_handle, original.index_handle);
467        assert_eq!(decoded.format_version, original.format_version);
468        assert_eq!(
469            decoded.base_context_checksum,
470            original.base_context_checksum
471        );
472        Ok(())
473    }
474
475    #[test]
476    fn test_footer_v6_with_different_checksum_types() -> Result<()> {
477        let checksum_types = [
478            ChecksumType::None,
479            ChecksumType::CRC32c,
480            ChecksumType::Hash,
481            ChecksumType::Hash64,
482            ChecksumType::XXH3,
483        ];
484
485        for checksum_type in checksum_types {
486            let input_offset = 50000; // Large value to avoid overflow
487            let metaindex_size = 1024;
488
489            let footer = Footer {
490                checksum_type,
491                metaindex_handle: BlockHandle::new(
492                    (input_offset - 5) - metaindex_size,
493                    metaindex_size,
494                ),
495                index_handle: BlockHandle::new(0, 0),
496                format_version: 6,
497                base_context_checksum: Some(0xABCDEF12),
498            };
499
500            let encoded = footer.encode_to_bytes(input_offset)?;
501            assert_eq!(encoded.len(), 53);
502
503            let decoded = Footer::decode_from_bytes(&encoded, input_offset)?;
504
505            assert_eq!(decoded.checksum_type, checksum_type);
506            assert_eq!(decoded.format_version, 6);
507            assert_eq!(decoded.base_context_checksum, Some(0xABCDEF12));
508        }
509        Ok(())
510    }
511
512    #[test]
513    fn test_footer_v7_roundtrip() -> Result<()> {
514        let input_offset = 75000; // Large value to avoid overflow
515        let metaindex_size = 2048;
516
517        let original = Footer {
518            checksum_type: ChecksumType::XXH3,
519            metaindex_handle: BlockHandle::new((input_offset - 5) - metaindex_size, metaindex_size),
520            index_handle: BlockHandle::new(0, 0), // Null for v6+
521            format_version: 7,
522            base_context_checksum: Some(0x87654321),
523        };
524
525        let encoded = original.encode_to_bytes(input_offset)?;
526        assert_eq!(encoded.len(), 53); // v7 also uses 53 bytes
527
528        let decoded = Footer::decode_from_bytes(&encoded, input_offset)?;
529
530        assert_eq!(decoded.checksum_type, original.checksum_type);
531        assert_eq!(
532            decoded.metaindex_handle.size,
533            original.metaindex_handle.size
534        );
535        assert_eq!(
536            decoded.metaindex_handle.offset,
537            original.metaindex_handle.offset
538        );
539        assert_eq!(decoded.index_handle, original.index_handle);
540        assert_eq!(decoded.format_version, original.format_version);
541        assert_eq!(
542            decoded.base_context_checksum,
543            original.base_context_checksum
544        );
545        Ok(())
546    }
547
548    #[test]
549    fn test_footer_v6_no_base_context_checksum() -> Result<()> {
550        // Test with None base context checksum - should default to 0
551        let input_offset = 25000; // Large value to avoid overflow
552        let metaindex_size = 512;
553
554        let footer = Footer {
555            checksum_type: ChecksumType::CRC32c,
556            metaindex_handle: BlockHandle::new((input_offset - 5) - metaindex_size, metaindex_size),
557            index_handle: BlockHandle::new(0, 0),
558            format_version: 6,
559            base_context_checksum: None,
560        };
561
562        let encoded = footer.encode_to_bytes(input_offset)?;
563        assert_eq!(encoded.len(), 53);
564
565        let decoded = Footer::decode_from_bytes(&encoded, input_offset)?;
566
567        // Since encoding uses 0 when None, and decoding always creates Some(...),
568        // we expect Some(0) after roundtrip
569        assert_eq!(decoded.base_context_checksum, Some(0));
570        assert_eq!(decoded.format_version, 6);
571        Ok(())
572    }
573
574    #[test]
575    fn test_footer_v6_encoding_with_offset() -> Result<()> {
576        // Test that encoding with different offsets produces different checksums
577        let footer = Footer {
578            checksum_type: ChecksumType::CRC32c,
579            metaindex_handle: BlockHandle::new(0, 256),
580            index_handle: BlockHandle::new(0, 0),
581            format_version: 6,
582            base_context_checksum: Some(0x11223344),
583        };
584
585        let encoded_offset_0 = footer.encode_to_bytes(0)?;
586        let encoded_offset_1000 = footer.encode_to_bytes(1000)?;
587
588        // Different offsets should produce different encoded results (due to checksum)
589        assert_ne!(encoded_offset_0, encoded_offset_1000);
590        assert_eq!(encoded_offset_0.len(), 53);
591        assert_eq!(encoded_offset_1000.len(), 53);
592        Ok(())
593    }
594}