use std::time::Instant;
use bytes::Bytes;
use sha2::{Digest, Sha256};
use tracing::debug;
use crate::compression;
use crate::config::CompressionType;
use crate::error::Result;
use crate::manifest::{BackupRecord, SegmentMetadata};
pub const SEGMENT_MAGIC: &[u8; 4] = b"RBAK";
pub const SEGMENT_MAGIC_END: &[u8; 4] = b"KABR";
pub const SEGMENT_VERSION: u8 = 1;
pub const HEADER_SIZE: usize = 32;
pub const FOOTER_SIZE: usize = 8;
pub struct FinalizedSegment {
pub data: Bytes,
pub metadata: SegmentMetadata,
}
pub struct SegmentWriter {
buffer: Vec<u8>,
record_count: u64,
first_timestamp: Option<i64>,
last_timestamp: Option<i64>,
segment_start_time: Instant,
uncompressed_bytes: u64,
sequence: u64,
}
impl SegmentWriter {
pub fn new(start_sequence: u64) -> Self {
Self {
buffer: Vec::with_capacity(1024 * 1024), record_count: 0,
first_timestamp: None,
last_timestamp: None,
segment_start_time: Instant::now(),
uncompressed_bytes: 0,
sequence: start_sequence,
}
}
pub fn add_record(&mut self, record: &BackupRecord) -> Result<()> {
let json = serde_json::to_vec(record)?;
let len = json.len() as u32;
self.buffer.extend_from_slice(&len.to_le_bytes());
self.buffer.extend_from_slice(&json);
self.record_count += 1;
self.uncompressed_bytes += 4 + json.len() as u64;
let ts = record.backed_up_at;
if self.first_timestamp.is_none() {
self.first_timestamp = Some(ts);
}
self.last_timestamp = Some(ts);
Ok(())
}
pub fn should_rotate(&self, max_bytes: u64, max_interval_ms: u64) -> bool {
if self.buffer.is_empty() {
return false;
}
if self.uncompressed_bytes >= max_bytes {
return true;
}
if self.segment_start_time.elapsed().as_millis() as u64 >= max_interval_ms {
return true;
}
false
}
pub fn has_records(&self) -> bool {
self.record_count > 0
}
pub fn record_count(&self) -> u64 {
self.record_count
}
pub fn sequence(&self) -> u64 {
self.sequence
}
pub fn finalize(
&mut self,
compression_type: CompressionType,
compression_level: i32,
storage_key: String,
) -> Result<FinalizedSegment> {
let compressed =
compression::compress_with_level(&self.buffer, compression_type, compression_level)?;
let mut output = Vec::with_capacity(HEADER_SIZE + compressed.len() + FOOTER_SIZE);
output.extend_from_slice(SEGMENT_MAGIC);
output.push(SEGMENT_VERSION);
output.push(compression_type_to_u8(compression_type));
output.extend_from_slice(&[0u8; 2]); output.extend_from_slice(&self.record_count.to_le_bytes());
output.extend_from_slice(&self.first_timestamp.unwrap_or(0).to_le_bytes());
output.extend_from_slice(&self.last_timestamp.unwrap_or(0).to_le_bytes());
output.extend_from_slice(&compressed);
let crc = crc32fast::hash(&output);
output.extend_from_slice(&crc.to_le_bytes());
output.extend_from_slice(SEGMENT_MAGIC_END);
let mut hasher = Sha256::new();
hasher.update(&output);
let checksum = format!("{:x}", hasher.finalize());
let metadata = SegmentMetadata {
key: storage_key,
sequence: self.sequence,
record_count: self.record_count,
size_bytes: output.len() as u64,
uncompressed_bytes: self.uncompressed_bytes,
first_timestamp: self.first_timestamp,
last_timestamp: self.last_timestamp,
checksum,
};
debug!(
"Finalized segment {}: {} records, {} bytes compressed (from {} uncompressed)",
self.sequence,
self.record_count,
output.len(),
self.uncompressed_bytes
);
let result = FinalizedSegment {
data: Bytes::from(output),
metadata,
};
self.reset();
Ok(result)
}
fn reset(&mut self) {
self.buffer.clear();
self.record_count = 0;
self.first_timestamp = None;
self.last_timestamp = None;
self.segment_start_time = Instant::now();
self.uncompressed_bytes = 0;
self.sequence += 1;
}
}
fn compression_type_to_u8(ct: CompressionType) -> u8 {
match ct {
CompressionType::None => 0,
CompressionType::Zstd => 1,
CompressionType::Lz4 => 2,
}
}
pub fn u8_to_compression_type(v: u8) -> Result<CompressionType> {
match v {
0 => Ok(CompressionType::None),
1 => Ok(CompressionType::Zstd),
2 => Ok(CompressionType::Lz4),
_ => Err(crate::Error::Compression(format!(
"Unknown compression type: {}",
v
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::{BackupProperties, BackupRecord};
fn make_test_record(index: u64) -> BackupRecord {
BackupRecord {
body: Some(format!("message body {}", index).into_bytes()),
properties: BackupProperties::default(),
headers: vec![],
exchange: "test-exchange".to_string(),
routing_key: "test.key".to_string(),
delivery_tag: index,
redelivered: false,
backed_up_at: 1700000000000 + index as i64,
source_queue: "test-queue".to_string(),
source_vhost: "/".to_string(),
}
}
#[test]
fn test_segment_writer_add_record() {
let mut writer = SegmentWriter::new(1);
assert!(!writer.has_records());
writer.add_record(&make_test_record(1)).unwrap();
assert!(writer.has_records());
assert_eq!(writer.record_count(), 1);
}
#[test]
fn test_segment_writer_finalize() {
let mut writer = SegmentWriter::new(1);
for i in 0..10 {
writer.add_record(&make_test_record(i)).unwrap();
}
let segment = writer
.finalize(
CompressionType::Zstd,
3,
"test/segment-0001.zst".to_string(),
)
.unwrap();
assert_eq!(&segment.data[0..4], SEGMENT_MAGIC);
assert_eq!(segment.data[4], SEGMENT_VERSION);
assert_eq!(segment.data[5], 1);
let len = segment.data.len();
assert_eq!(&segment.data[len - 4..], SEGMENT_MAGIC_END);
assert_eq!(segment.metadata.record_count, 10);
assert_eq!(segment.metadata.sequence, 1);
assert_eq!(segment.metadata.key, "test/segment-0001.zst");
assert!(segment.metadata.size_bytes > 0);
assert!(segment.metadata.uncompressed_bytes > 0);
assert!(!segment.metadata.checksum.is_empty());
assert!(!writer.has_records());
assert_eq!(writer.sequence(), 2); }
#[test]
fn test_segment_writer_should_rotate_by_size() {
let mut writer = SegmentWriter::new(1);
writer.add_record(&make_test_record(1)).unwrap();
assert!(writer.should_rotate(1, u64::MAX));
assert!(!writer.should_rotate(u64::MAX, u64::MAX));
}
#[test]
fn test_segment_writer_no_compression() {
let mut writer = SegmentWriter::new(1);
writer.add_record(&make_test_record(1)).unwrap();
let segment = writer
.finalize(CompressionType::None, 0, "test/segment-0001".to_string())
.unwrap();
assert_eq!(segment.data[5], 0); assert!(segment.data.len() > HEADER_SIZE + FOOTER_SIZE);
}
}