use std::path::Path;
use bytes::{BufMut, BytesMut};
use crabka_remote_storage::{
PartitionDump, RemoteLogSegmentMetadata, RemotePartitionDeleteState, RlmmCacheDump,
TopicIdPartition,
};
use crate::error::{CodecError, SnapshotError};
use crate::serde::{MetadataEvent, Reader, read_uvarint, write_uvarint};
pub const SNAPSHOT_FORMAT_VERSION: u16 = 0;
pub const SNAPSHOT_FILE_NAME: &str = "snapshot";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Snapshot {
pub committed_offsets: Vec<i64>,
pub dump: RlmmCacheDump,
}
impl Snapshot {
#[must_use]
pub fn encode(&self) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(256);
buf.put_u16(SNAPSHOT_FORMAT_VERSION);
write_uvarint(self.committed_offsets.len() as u64, &mut buf);
for (p, &off) in self.committed_offsets.iter().enumerate() {
buf.put_i32(i32::try_from(p).expect("partition fits in i32"));
buf.put_i64(off);
}
let mut entries: Vec<bytes::Bytes> = Vec::new();
for p in &self.dump.partitions {
for seg in &p.segments {
entries.push(MetadataEvent::AddSegment(seg.clone()).encode());
}
if let Some(state) = p.delete_state {
entries.push(
MetadataEvent::PartitionDelete(
crabka_remote_storage::RemotePartitionDeleteMetadata {
topic_id_partition: p.topic_id_partition.clone(),
state,
event_timestamp_ms: 0,
broker_id: 0,
},
)
.encode(),
);
}
}
write_uvarint(entries.len() as u64, &mut buf);
for entry in entries {
write_uvarint(entry.len() as u64, &mut buf);
buf.put_slice(&entry);
}
buf.to_vec()
}
pub fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
use std::collections::BTreeMap;
let mut r = Reader::new(bytes);
let version = read_u16(&mut r)?;
if version != SNAPSHOT_FORMAT_VERSION {
return Err(SnapshotError::UnsupportedVersion(version));
}
let n_offsets = usize::try_from(read_uvarint(&mut r)?)
.map_err(|_| CodecError::LengthOverflow(u64::MAX))?;
let mut committed_offsets = vec![-1i64; n_offsets];
for _ in 0..n_offsets {
let p = r.read_i32()?;
let off = r.read_i64()?;
let idx = usize::try_from(p).map_err(|_| CodecError::LengthOverflow(u64::MAX))?;
if idx >= committed_offsets.len() {
return Err(SnapshotError::Malformed(CodecError::LengthOverflow(
idx as u64,
)));
}
committed_offsets[idx] = off;
}
let n_entries = usize::try_from(read_uvarint(&mut r)?)
.map_err(|_| CodecError::LengthOverflow(u64::MAX))?;
let mut order: Vec<TopicIdPartition> = Vec::new();
let mut by_tp: BTreeMap<
(uuid::Uuid, i32),
(
Vec<RemoteLogSegmentMetadata>,
Option<RemotePartitionDeleteState>,
),
> = BTreeMap::new();
for _ in 0..n_entries {
let len = usize::try_from(read_uvarint(&mut r)?)
.map_err(|_| CodecError::LengthOverflow(u64::MAX))?;
let raw = r.read_n(len)?;
match MetadataEvent::decode(raw)? {
MetadataEvent::AddSegment(md) => {
let tp = md.remote_log_segment_id().topic_id_partition.clone();
let key = (tp.topic_id, tp.partition);
if !by_tp.contains_key(&key) {
order.push(tp.clone());
}
by_tp.entry(key).or_default().0.push(md);
}
MetadataEvent::PartitionDelete(d) => {
let key = (
d.topic_id_partition.topic_id,
d.topic_id_partition.partition,
);
if !by_tp.contains_key(&key) {
order.push(d.topic_id_partition.clone());
}
by_tp.entry(key).or_default().1 = Some(d.state);
}
MetadataEvent::UpdateSegment(_) => {
return Err(SnapshotError::Malformed(CodecError::Domain(
"snapshot must not contain an UpdateSegment event".to_string(),
)));
}
}
}
if r.remaining() != 0 {
return Err(SnapshotError::TrailingBytes(r.remaining()));
}
let partitions = order
.into_iter()
.map(|tp| {
let key = (tp.topic_id, tp.partition);
let (segments, delete_state) = by_tp.remove(&key).expect("key present");
PartitionDump {
topic_id_partition: tp,
segments,
delete_state,
}
})
.collect();
Ok(Self {
committed_offsets,
dump: RlmmCacheDump { partitions },
})
}
pub fn write_atomic(&self, path: &Path) -> Result<(), SnapshotError> {
use std::io::Write;
let bytes = self.encode();
let parent = path.parent().unwrap_or_else(|| Path::new("."));
std::fs::create_dir_all(parent)?;
let tmp = path.with_extension("tmp");
let write_tmp = || -> Result<(), SnapshotError> {
let mut f = std::fs::File::create(&tmp)?;
f.write_all(&bytes)?;
f.sync_all()?;
std::fs::rename(&tmp, path)?;
Ok(())
};
if let Err(e) = write_tmp() {
let _ = std::fs::remove_file(&tmp);
return Err(e);
}
if let Ok(dir) = std::fs::File::open(parent) {
let _ = dir.sync_all();
}
Ok(())
}
pub fn load(path: &Path) -> Result<Option<Self>, SnapshotError> {
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(SnapshotError::Io(e)),
};
Ok(Some(Self::decode(&bytes)?))
}
}
fn read_u16(r: &mut Reader<'_>) -> Result<u16, CodecError> {
let hi = u16::from(r.read_u8()?);
let lo = u16::from(r.read_u8()?);
Ok((hi << 8) | lo)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::BTreeMap;
use uuid::Uuid;
use crabka_remote_storage::{
RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState,
RemotePartitionDeleteState, TopicIdPartition,
};
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
}
fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
start,
end,
end + 1,
1,
100,
2048,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, start)]),
)
.unwrap()
}
fn sample_snapshot() -> Snapshot {
let dump = RlmmCacheDump {
partitions: vec![PartitionDump {
topic_id_partition: tp(),
segments: vec![started(10, 0, 99), started(11, 100, 199)],
delete_state: Some(RemotePartitionDeleteState::DeletePartitionMarked),
}],
};
Snapshot {
committed_offsets: vec![5, -1, 2],
dump,
}
}
#[test]
fn encode_decode_round_trip() {
let snap = sample_snapshot();
let bytes = snap.encode();
let back = Snapshot::decode(&bytes).expect("decodes");
assert!(back == snap);
}
#[test]
fn truncated_file_is_error_not_panic() {
let bytes = sample_snapshot().encode();
let err = Snapshot::decode(&bytes[..bytes.len() - 3]).unwrap_err();
assert!(matches!(
err,
SnapshotError::Malformed(_) | SnapshotError::TrailingBytes(_)
));
}
#[test]
fn garbage_bytes_are_error_not_panic() {
let err = Snapshot::decode(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap_err();
assert!(matches!(
err,
SnapshotError::UnsupportedVersion(_) | SnapshotError::Malformed(_)
));
}
#[test]
fn empty_buffer_is_error_not_panic() {
let err = Snapshot::decode(&[]).unwrap_err();
assert!(matches!(err, SnapshotError::Malformed(_)));
}
#[test]
fn write_then_load_round_trips_through_a_file() {
let dir = std::env::temp_dir().join(format!("crabka-snap-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("snapshot");
let snap = sample_snapshot();
snap.write_atomic(&path).expect("write");
let loaded = Snapshot::load(&path).expect("load").expect("present");
assert!(loaded == snap);
assert!(
std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.all(|e| e.file_name() == "snapshot")
);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn load_absent_file_is_ok_none() {
let path = std::env::temp_dir().join("crabka-snap-does-not-exist-xyz");
let _ = std::fs::remove_file(&path);
assert!(Snapshot::load(&path).unwrap() == None);
}
#[test]
fn load_corrupt_file_is_err() {
let dir = std::env::temp_dir().join(format!("crabka-snap-corrupt-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("snapshot");
std::fs::write(&path, [0xFF, 0xFF, 0x00, 0x01]).unwrap();
assert!(Snapshot::load(&path).is_err());
std::fs::remove_dir_all(&dir).ok();
}
}