use crate::compression;
use crate::error::{Error, Result};
use crate::manifest::BackupRecord;
use crate::segment::writer::{
u8_to_compression_type, FOOTER_SIZE, HEADER_SIZE, SEGMENT_MAGIC, SEGMENT_MAGIC_END,
SEGMENT_VERSION,
};
#[derive(Debug, Clone)]
pub struct SegmentHeader {
pub version: u8,
pub compression: crate::config::CompressionType,
pub record_count: u64,
pub first_timestamp: i64,
pub last_timestamp: i64,
}
pub struct SegmentReader;
impl SegmentReader {
pub fn read_header(data: &[u8]) -> Result<SegmentHeader> {
if data.len() < HEADER_SIZE + FOOTER_SIZE {
return Err(Error::Serialization(
"Segment too small to contain header and footer".to_string(),
));
}
if &data[0..4] != SEGMENT_MAGIC {
return Err(Error::Serialization(
"Invalid segment magic bytes".to_string(),
));
}
let version = data[4];
if version != SEGMENT_VERSION {
return Err(Error::Serialization(format!(
"Unsupported segment version: {}",
version
)));
}
let compression = u8_to_compression_type(data[5])?;
let record_count = u64::from_le_bytes(data[8..16].try_into().unwrap());
let first_timestamp = i64::from_le_bytes(data[16..24].try_into().unwrap());
let last_timestamp = i64::from_le_bytes(data[24..32].try_into().unwrap());
Ok(SegmentHeader {
version,
compression,
record_count,
first_timestamp,
last_timestamp,
})
}
pub fn verify_integrity(data: &[u8]) -> Result<()> {
if data.len() < HEADER_SIZE + FOOTER_SIZE {
return Err(Error::Serialization("Segment too small".to_string()));
}
let footer_start = data.len() - FOOTER_SIZE;
if &data[data.len() - 4..] != SEGMENT_MAGIC_END {
return Err(Error::Serialization(
"Invalid segment end magic bytes".to_string(),
));
}
let expected_crc =
u32::from_le_bytes(data[footer_start..footer_start + 4].try_into().unwrap());
let actual_crc = crc32fast::hash(&data[..footer_start]);
if expected_crc != actual_crc {
return Err(Error::Serialization(format!(
"CRC32 mismatch: expected {}, got {}",
expected_crc, actual_crc
)));
}
Ok(())
}
pub fn read_records(data: &[u8]) -> Result<Vec<BackupRecord>> {
Self::verify_integrity(data)?;
let header = Self::read_header(data)?;
let payload_start = HEADER_SIZE;
let payload_end = data.len() - FOOTER_SIZE;
let compressed = &data[payload_start..payload_end];
let decompressed = compression::decompress(compressed, header.compression)?;
let mut records = Vec::with_capacity(header.record_count as usize);
let mut offset = 0;
while offset < decompressed.len() {
if offset + 4 > decompressed.len() {
break;
}
let len =
u32::from_le_bytes(decompressed[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
if offset + len > decompressed.len() {
return Err(Error::Serialization(format!(
"Record length {} exceeds available data at offset {}",
len, offset
)));
}
let record: BackupRecord = serde_json::from_slice(&decompressed[offset..offset + len])?;
records.push(record);
offset += len;
}
if records.len() as u64 != header.record_count {
return Err(Error::Serialization(format!(
"Expected {} records but found {}",
header.record_count,
records.len()
)));
}
Ok(records)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CompressionType;
use crate::manifest::BackupProperties;
use crate::segment::SegmentWriter;
fn make_test_record(index: u64) -> BackupRecord {
BackupRecord {
body: Some(format!("test body {}", index).into_bytes()),
properties: BackupProperties {
content_type: Some("text/plain".to_string()),
delivery_mode: Some(2),
..Default::default()
},
headers: vec![],
exchange: "test-exchange".to_string(),
routing_key: "test.routing.key".to_string(),
delivery_tag: index,
redelivered: false,
backed_up_at: 1700000000000 + index as i64,
source_queue: "test-queue".to_string(),
source_vhost: "/".to_string(),
}
}
#[test]
fn test_segment_roundtrip_zstd() {
let mut writer = SegmentWriter::new(1);
let records: Vec<BackupRecord> = (0..5).map(make_test_record).collect();
for r in &records {
writer.add_record(r).unwrap();
}
let segment = writer
.finalize(CompressionType::Zstd, 3, "test.zst".to_string())
.unwrap();
let read_records = SegmentReader::read_records(&segment.data).unwrap();
assert_eq!(read_records.len(), 5);
for (original, read) in records.iter().zip(read_records.iter()) {
assert_eq!(original.body, read.body);
assert_eq!(original.exchange, read.exchange);
assert_eq!(original.routing_key, read.routing_key);
assert_eq!(original.delivery_tag, read.delivery_tag);
assert_eq!(original.backed_up_at, read.backed_up_at);
assert_eq!(
original.properties.content_type,
read.properties.content_type
);
assert_eq!(
original.properties.delivery_mode,
read.properties.delivery_mode
);
}
}
#[test]
fn test_segment_roundtrip_no_compression() {
let mut writer = SegmentWriter::new(1);
writer.add_record(&make_test_record(1)).unwrap();
let segment = writer
.finalize(CompressionType::None, 0, "test".to_string())
.unwrap();
let records = SegmentReader::read_records(&segment.data).unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].delivery_tag, 1);
}
#[test]
fn test_segment_header_parsing() {
let mut writer = SegmentWriter::new(1);
writer.add_record(&make_test_record(1)).unwrap();
let segment = writer
.finalize(CompressionType::Lz4, 0, "test.lz4".to_string())
.unwrap();
let header = SegmentReader::read_header(&segment.data).unwrap();
assert_eq!(header.version, 1);
assert_eq!(header.compression, CompressionType::Lz4);
assert_eq!(header.record_count, 1);
}
#[test]
fn test_segment_integrity_check() {
let mut writer = SegmentWriter::new(1);
writer.add_record(&make_test_record(1)).unwrap();
let segment = writer
.finalize(CompressionType::Zstd, 3, "test.zst".to_string())
.unwrap();
assert!(SegmentReader::verify_integrity(&segment.data).is_ok());
let mut corrupted = segment.data.to_vec();
corrupted[HEADER_SIZE + 1] ^= 0xFF; assert!(SegmentReader::verify_integrity(&corrupted).is_err());
}
}