rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Segment reader for decompressing and deserializing backup records.

use crate::compression;
use crate::error::{Error, Result};
use crate::manifest::BackupRecord;
use crate::segment::writer::{
    u8_to_compression_type, FOOTER_SIZE, HEADER_SIZE, SEGMENT_MAGIC, SEGMENT_MAGIC_END,
    SEGMENT_VERSION,
};

/// Parsed segment header.
#[derive(Debug, Clone)]
pub struct SegmentHeader {
    pub version: u8,
    pub compression: crate::config::CompressionType,
    pub record_count: u64,
    pub first_timestamp: i64,
    pub last_timestamp: i64,
}

/// Reads and decompresses segments from storage.
pub struct SegmentReader;

impl SegmentReader {
    /// Parse the header from segment bytes.
    pub fn read_header(data: &[u8]) -> Result<SegmentHeader> {
        if data.len() < HEADER_SIZE + FOOTER_SIZE {
            return Err(Error::Serialization(
                "Segment too small to contain header and footer".to_string(),
            ));
        }

        // Verify magic
        if &data[0..4] != SEGMENT_MAGIC {
            return Err(Error::Serialization(
                "Invalid segment magic bytes".to_string(),
            ));
        }

        let version = data[4];
        if version != SEGMENT_VERSION {
            return Err(Error::Serialization(format!(
                "Unsupported segment version: {}",
                version
            )));
        }

        let compression = u8_to_compression_type(data[5])?;
        let record_count = u64::from_le_bytes(data[8..16].try_into().unwrap());
        let first_timestamp = i64::from_le_bytes(data[16..24].try_into().unwrap());
        let last_timestamp = i64::from_le_bytes(data[24..32].try_into().unwrap());

        Ok(SegmentHeader {
            version,
            compression,
            record_count,
            first_timestamp,
            last_timestamp,
        })
    }

    /// Verify the CRC32 and end magic of a segment.
    pub fn verify_integrity(data: &[u8]) -> Result<()> {
        if data.len() < HEADER_SIZE + FOOTER_SIZE {
            return Err(Error::Serialization("Segment too small".to_string()));
        }

        let footer_start = data.len() - FOOTER_SIZE;

        // Verify end magic
        if &data[data.len() - 4..] != SEGMENT_MAGIC_END {
            return Err(Error::Serialization(
                "Invalid segment end magic bytes".to_string(),
            ));
        }

        // Verify CRC32
        let expected_crc =
            u32::from_le_bytes(data[footer_start..footer_start + 4].try_into().unwrap());
        let actual_crc = crc32fast::hash(&data[..footer_start]);

        if expected_crc != actual_crc {
            return Err(Error::Serialization(format!(
                "CRC32 mismatch: expected {}, got {}",
                expected_crc, actual_crc
            )));
        }

        Ok(())
    }

    /// Read all records from a segment.
    pub fn read_records(data: &[u8]) -> Result<Vec<BackupRecord>> {
        Self::verify_integrity(data)?;
        let header = Self::read_header(data)?;

        // Extract compressed payload (between header and footer)
        let payload_start = HEADER_SIZE;
        let payload_end = data.len() - FOOTER_SIZE;
        let compressed = &data[payload_start..payload_end];

        // Decompress
        let decompressed = compression::decompress(compressed, header.compression)?;

        // Parse length-prefixed JSON records
        let mut records = Vec::with_capacity(header.record_count as usize);
        let mut offset = 0;

        while offset < decompressed.len() {
            if offset + 4 > decompressed.len() {
                break;
            }

            let len =
                u32::from_le_bytes(decompressed[offset..offset + 4].try_into().unwrap()) as usize;
            offset += 4;

            if offset + len > decompressed.len() {
                return Err(Error::Serialization(format!(
                    "Record length {} exceeds available data at offset {}",
                    len, offset
                )));
            }

            let record: BackupRecord = serde_json::from_slice(&decompressed[offset..offset + len])?;
            records.push(record);
            offset += len;
        }

        if records.len() as u64 != header.record_count {
            return Err(Error::Serialization(format!(
                "Expected {} records but found {}",
                header.record_count,
                records.len()
            )));
        }

        Ok(records)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::CompressionType;
    use crate::manifest::BackupProperties;
    use crate::segment::SegmentWriter;

    fn make_test_record(index: u64) -> BackupRecord {
        BackupRecord {
            body: Some(format!("test body {}", index).into_bytes()),
            properties: BackupProperties {
                content_type: Some("text/plain".to_string()),
                delivery_mode: Some(2),
                ..Default::default()
            },
            headers: vec![],
            exchange: "test-exchange".to_string(),
            routing_key: "test.routing.key".to_string(),
            delivery_tag: index,
            redelivered: false,
            backed_up_at: 1700000000000 + index as i64,
            source_queue: "test-queue".to_string(),
            source_vhost: "/".to_string(),
        }
    }

    #[test]
    fn test_segment_roundtrip_zstd() {
        let mut writer = SegmentWriter::new(1);
        let records: Vec<BackupRecord> = (0..5).map(make_test_record).collect();
        for r in &records {
            writer.add_record(r).unwrap();
        }

        let segment = writer
            .finalize(CompressionType::Zstd, 3, "test.zst".to_string())
            .unwrap();

        let read_records = SegmentReader::read_records(&segment.data).unwrap();
        assert_eq!(read_records.len(), 5);

        for (original, read) in records.iter().zip(read_records.iter()) {
            assert_eq!(original.body, read.body);
            assert_eq!(original.exchange, read.exchange);
            assert_eq!(original.routing_key, read.routing_key);
            assert_eq!(original.delivery_tag, read.delivery_tag);
            assert_eq!(original.backed_up_at, read.backed_up_at);
            assert_eq!(
                original.properties.content_type,
                read.properties.content_type
            );
            assert_eq!(
                original.properties.delivery_mode,
                read.properties.delivery_mode
            );
        }
    }

    #[test]
    fn test_segment_roundtrip_no_compression() {
        let mut writer = SegmentWriter::new(1);
        writer.add_record(&make_test_record(1)).unwrap();

        let segment = writer
            .finalize(CompressionType::None, 0, "test".to_string())
            .unwrap();

        let records = SegmentReader::read_records(&segment.data).unwrap();
        assert_eq!(records.len(), 1);
        assert_eq!(records[0].delivery_tag, 1);
    }

    #[test]
    fn test_segment_header_parsing() {
        let mut writer = SegmentWriter::new(1);
        writer.add_record(&make_test_record(1)).unwrap();

        let segment = writer
            .finalize(CompressionType::Lz4, 0, "test.lz4".to_string())
            .unwrap();

        let header = SegmentReader::read_header(&segment.data).unwrap();
        assert_eq!(header.version, 1);
        assert_eq!(header.compression, CompressionType::Lz4);
        assert_eq!(header.record_count, 1);
    }

    #[test]
    fn test_segment_integrity_check() {
        let mut writer = SegmentWriter::new(1);
        writer.add_record(&make_test_record(1)).unwrap();

        let segment = writer
            .finalize(CompressionType::Zstd, 3, "test.zst".to_string())
            .unwrap();

        // Valid segment passes
        assert!(SegmentReader::verify_integrity(&segment.data).is_ok());

        // Corrupted segment fails
        let mut corrupted = segment.data.to_vec();
        corrupted[HEADER_SIZE + 1] ^= 0xFF; // flip a byte in the payload
        assert!(SegmentReader::verify_integrity(&corrupted).is_err());
    }
}