rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Segment writer for batching and compressing backup records.

use std::time::Instant;

use bytes::Bytes;
use sha2::{Digest, Sha256};
use tracing::debug;

use crate::compression;
use crate::config::CompressionType;
use crate::error::Result;
use crate::manifest::{BackupRecord, SegmentMetadata};

/// Magic bytes identifying the start of a segment file.
pub const SEGMENT_MAGIC: &[u8; 4] = b"RBAK";

/// Magic bytes identifying the end of a segment file.
pub const SEGMENT_MAGIC_END: &[u8; 4] = b"KABR";

/// Current segment format version.
pub const SEGMENT_VERSION: u8 = 1;

/// Header size in bytes (4 magic + 1 version + 1 compression + 2 reserved + 8 record_count + 8 first_ts + 8 last_ts).
pub const HEADER_SIZE: usize = 32;

/// Footer size in bytes (4 CRC32 + 4 magic end).
pub const FOOTER_SIZE: usize = 8;

/// A finalized segment ready for storage.
pub struct FinalizedSegment {
    /// Complete segment bytes (header + compressed payload + footer).
    pub data: Bytes,
    /// Metadata about the segment for the manifest.
    pub metadata: SegmentMetadata,
}

/// Writes backup records into compressed segments with size/time thresholds.
pub struct SegmentWriter {
    /// Accumulates serialized records (uncompressed).
    buffer: Vec<u8>,
    record_count: u64,
    first_timestamp: Option<i64>,
    last_timestamp: Option<i64>,
    segment_start_time: Instant,
    uncompressed_bytes: u64,
    /// Current segment sequence number.
    sequence: u64,
}

impl SegmentWriter {
    /// Create a new segment writer starting at the given sequence number.
    pub fn new(start_sequence: u64) -> Self {
        Self {
            buffer: Vec::with_capacity(1024 * 1024), // 1MB initial capacity
            record_count: 0,
            first_timestamp: None,
            last_timestamp: None,
            segment_start_time: Instant::now(),
            uncompressed_bytes: 0,
            sequence: start_sequence,
        }
    }

    /// Add a record to the current segment buffer.
    pub fn add_record(&mut self, record: &BackupRecord) -> Result<()> {
        let json = serde_json::to_vec(record)?;
        let len = json.len() as u32;

        // Write length-prefixed JSON
        self.buffer.extend_from_slice(&len.to_le_bytes());
        self.buffer.extend_from_slice(&json);

        self.record_count += 1;
        self.uncompressed_bytes += 4 + json.len() as u64;

        let ts = record.backed_up_at;
        if self.first_timestamp.is_none() {
            self.first_timestamp = Some(ts);
        }
        self.last_timestamp = Some(ts);

        Ok(())
    }

    /// Check if the segment should be rotated based on size or time thresholds.
    pub fn should_rotate(&self, max_bytes: u64, max_interval_ms: u64) -> bool {
        if self.buffer.is_empty() {
            return false;
        }
        if self.uncompressed_bytes >= max_bytes {
            return true;
        }
        if self.segment_start_time.elapsed().as_millis() as u64 >= max_interval_ms {
            return true;
        }
        false
    }

    /// Whether the buffer has any records.
    pub fn has_records(&self) -> bool {
        self.record_count > 0
    }

    /// Current number of records in the buffer.
    pub fn record_count(&self) -> u64 {
        self.record_count
    }

    /// Current sequence number.
    pub fn sequence(&self) -> u64 {
        self.sequence
    }

    /// Finalize the current segment: compress, build header/footer, return bytes + metadata.
    pub fn finalize(
        &mut self,
        compression_type: CompressionType,
        compression_level: i32,
        storage_key: String,
    ) -> Result<FinalizedSegment> {
        // Compress the record payload
        let compressed =
            compression::compress_with_level(&self.buffer, compression_type, compression_level)?;

        // Build the complete segment: header + compressed + footer
        let mut output = Vec::with_capacity(HEADER_SIZE + compressed.len() + FOOTER_SIZE);

        // Header (32 bytes)
        output.extend_from_slice(SEGMENT_MAGIC);
        output.push(SEGMENT_VERSION);
        output.push(compression_type_to_u8(compression_type));
        output.extend_from_slice(&[0u8; 2]); // reserved
        output.extend_from_slice(&self.record_count.to_le_bytes());
        output.extend_from_slice(&self.first_timestamp.unwrap_or(0).to_le_bytes());
        output.extend_from_slice(&self.last_timestamp.unwrap_or(0).to_le_bytes());

        // Compressed payload
        output.extend_from_slice(&compressed);

        // CRC32 of header + compressed payload
        let crc = crc32fast::hash(&output);
        output.extend_from_slice(&crc.to_le_bytes());
        output.extend_from_slice(SEGMENT_MAGIC_END);

        // SHA-256 checksum for manifest
        let mut hasher = Sha256::new();
        hasher.update(&output);
        let checksum = format!("{:x}", hasher.finalize());

        let metadata = SegmentMetadata {
            key: storage_key,
            sequence: self.sequence,
            record_count: self.record_count,
            size_bytes: output.len() as u64,
            uncompressed_bytes: self.uncompressed_bytes,
            first_timestamp: self.first_timestamp,
            last_timestamp: self.last_timestamp,
            checksum,
        };

        debug!(
            "Finalized segment {}: {} records, {} bytes compressed (from {} uncompressed)",
            self.sequence,
            self.record_count,
            output.len(),
            self.uncompressed_bytes
        );

        let result = FinalizedSegment {
            data: Bytes::from(output),
            metadata,
        };

        // Reset for next segment
        self.reset();

        Ok(result)
    }

    /// Reset the writer for the next segment.
    fn reset(&mut self) {
        self.buffer.clear();
        self.record_count = 0;
        self.first_timestamp = None;
        self.last_timestamp = None;
        self.segment_start_time = Instant::now();
        self.uncompressed_bytes = 0;
        self.sequence += 1;
    }
}

fn compression_type_to_u8(ct: CompressionType) -> u8 {
    match ct {
        CompressionType::None => 0,
        CompressionType::Zstd => 1,
        CompressionType::Lz4 => 2,
    }
}

pub fn u8_to_compression_type(v: u8) -> Result<CompressionType> {
    match v {
        0 => Ok(CompressionType::None),
        1 => Ok(CompressionType::Zstd),
        2 => Ok(CompressionType::Lz4),
        _ => Err(crate::Error::Compression(format!(
            "Unknown compression type: {}",
            v
        ))),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::manifest::{BackupProperties, BackupRecord};

    fn make_test_record(index: u64) -> BackupRecord {
        BackupRecord {
            body: Some(format!("message body {}", index).into_bytes()),
            properties: BackupProperties::default(),
            headers: vec![],
            exchange: "test-exchange".to_string(),
            routing_key: "test.key".to_string(),
            delivery_tag: index,
            redelivered: false,
            backed_up_at: 1700000000000 + index as i64,
            source_queue: "test-queue".to_string(),
            source_vhost: "/".to_string(),
        }
    }

    #[test]
    fn test_segment_writer_add_record() {
        let mut writer = SegmentWriter::new(1);
        assert!(!writer.has_records());

        writer.add_record(&make_test_record(1)).unwrap();
        assert!(writer.has_records());
        assert_eq!(writer.record_count(), 1);
    }

    #[test]
    fn test_segment_writer_finalize() {
        let mut writer = SegmentWriter::new(1);
        for i in 0..10 {
            writer.add_record(&make_test_record(i)).unwrap();
        }

        let segment = writer
            .finalize(
                CompressionType::Zstd,
                3,
                "test/segment-0001.zst".to_string(),
            )
            .unwrap();

        // Verify header
        assert_eq!(&segment.data[0..4], SEGMENT_MAGIC);
        assert_eq!(segment.data[4], SEGMENT_VERSION);
        assert_eq!(segment.data[5], 1); // zstd

        // Verify footer
        let len = segment.data.len();
        assert_eq!(&segment.data[len - 4..], SEGMENT_MAGIC_END);

        // Verify metadata
        assert_eq!(segment.metadata.record_count, 10);
        assert_eq!(segment.metadata.sequence, 1);
        assert_eq!(segment.metadata.key, "test/segment-0001.zst");
        assert!(segment.metadata.size_bytes > 0);
        assert!(segment.metadata.uncompressed_bytes > 0);
        assert!(!segment.metadata.checksum.is_empty());

        // After finalize, writer should be reset
        assert!(!writer.has_records());
        assert_eq!(writer.sequence(), 2); // incremented
    }

    #[test]
    fn test_segment_writer_should_rotate_by_size() {
        let mut writer = SegmentWriter::new(1);
        writer.add_record(&make_test_record(1)).unwrap();

        // Small threshold — should rotate
        assert!(writer.should_rotate(1, u64::MAX));
        // Large threshold — should not
        assert!(!writer.should_rotate(u64::MAX, u64::MAX));
    }

    #[test]
    fn test_segment_writer_no_compression() {
        let mut writer = SegmentWriter::new(1);
        writer.add_record(&make_test_record(1)).unwrap();

        let segment = writer
            .finalize(CompressionType::None, 0, "test/segment-0001".to_string())
            .unwrap();

        assert_eq!(segment.data[5], 0); // none
        assert!(segment.data.len() > HEADER_SIZE + FOOTER_SIZE);
    }
}