use bytes::{BufMut, Bytes, BytesMut};
use crabka_metadata::{MetadataImage, MetadataRecord, from_kraft_value, to_kraft_values};
use crabka_protocol::Encode;
use crabka_protocol::owned::snapshot_footer_record::SnapshotFooterRecord;
use crabka_protocol::owned::snapshot_header_record::SnapshotHeaderRecord;
use crabka_protocol::records::metadata::control::{
ControlRecordType, control_record_key, encode_control_batch,
};
use crabka_protocol::records::{Record, RecordBatch};
use uuid::Uuid;
use crate::error::RaftError;
pub(crate) struct SnapshotWriter;
impl SnapshotWriter {
pub(crate) fn serialize(
image: &MetadataImage,
last_contained_log_timestamp: i64,
) -> Result<Bytes, RaftError> {
let records = image.to_records();
let mut out = BytesMut::new();
let header = SnapshotHeaderRecord {
version: 0,
last_contained_log_timestamp,
..Default::default()
};
let mut header_body = BytesMut::new();
header.encode(&mut header_body, 0)?;
out.put_slice(&encode_control_batch(
0,
control_record_key(ControlRecordType::SnapshotHeader),
header_body.freeze(),
));
let mut value_blobs: Vec<Bytes> = Vec::new();
for rec in &records {
if matches!(
rec,
MetadataRecord::V1Voters(_) | MetadataRecord::V1KRaftVersion(_)
) {
continue;
}
let mut blobs = to_kraft_values(rec, image)
.map_err(|e| RaftError::ChangeRejected(format!("snapshot encode: {e}")))?;
value_blobs.append(&mut blobs);
}
let total_blobs = value_blobs.len();
if total_blobs > 0 {
let last_offset_delta = i32::try_from(total_blobs - 1).unwrap_or(i32::MAX);
let data_records = value_blobs
.into_iter()
.enumerate()
.map(|(i, blob)| Record {
offset_delta: i32::try_from(i).unwrap_or(i32::MAX),
value: Some(blob),
..Default::default()
})
.collect();
let data_batch = RecordBatch {
base_offset: 1,
last_offset_delta,
records: data_records,
..Default::default()
};
data_batch.encode(&mut out)?;
}
let footer_base_offset = if total_blobs == 0 {
1
} else {
1 + i64::try_from(total_blobs).unwrap_or(i64::MAX)
};
let footer = SnapshotFooterRecord {
version: 0,
..Default::default()
};
let mut footer_body = BytesMut::new();
footer.encode(&mut footer_body, 0)?;
out.put_slice(&encode_control_batch(
footer_base_offset,
control_record_key(ControlRecordType::SnapshotFooter),
footer_body.freeze(),
));
Ok(out.freeze())
}
}
pub(crate) struct SnapshotReader;
impl SnapshotReader {
pub(crate) fn read_records(bytes: &[u8]) -> Result<Vec<MetadataRecord>, RaftError> {
let mut cursor: &[u8] = bytes;
let mut records = Vec::new();
let mut ctx = MetadataImage::new(Uuid::nil());
while !cursor.is_empty() {
let batch = RecordBatch::decode(&mut cursor)?;
if batch.attributes.is_control_batch() {
continue;
}
for rec in &batch.records {
let Some(value) = rec.value.as_ref() else {
continue;
};
let decoded = from_kraft_value(value, &ctx)
.map_err(|e| RaftError::ChangeRejected(format!("snapshot decode: {e}")))?;
ctx.apply(&decoded);
records.push(decoded);
}
}
Ok(records)
}
pub(crate) fn byte_range(bytes: &[u8], position: usize, max: usize) -> &[u8] {
let start = position.min(bytes.len());
let end = start.saturating_add(max).min(bytes.len());
&bytes[start..end]
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_metadata::{
FeatureLevelRecord, MetadataImage, MetadataRecord, PartitionRecord, TopicRecord,
};
use uuid::Uuid;
#[test]
fn writer_reader_round_trips_image() {
let cid = Uuid::new_v4();
let mut image = MetadataImage::new(cid);
image.apply(&MetadataRecord::V1Topic(TopicRecord {
name: "orders".into(),
topic_id: Uuid::new_v4(),
partitions: 3,
replication_factor: 2,
}));
for p in 0..3 {
image.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: "orders".into(),
partition: p,
leader: 1,
replicas: vec![1, 2],
isr: vec![1, 2],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
}
let bytes = SnapshotWriter::serialize(&image, 1_700_000_000_000).unwrap();
let records = SnapshotReader::read_records(&bytes).unwrap();
assert!(MetadataImage::from_records(cid, &records) == image);
}
#[test]
fn writer_reader_round_trips_image_with_features() {
let cid = Uuid::new_v4();
let mut image = MetadataImage::new(cid);
for (name, level) in [
("metadata.version", 24),
("metadata.version", 25),
("group.version", 1),
("metadata.version", 25),
] {
image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
name: name.into(),
level,
}));
}
assert!(image.finalized_features_epoch() == 3);
let bytes = SnapshotWriter::serialize(&image, 1_700_000_000_000).unwrap();
let records = SnapshotReader::read_records(&bytes).unwrap();
let rebuilt = MetadataImage::from_records(cid, &records);
assert!(rebuilt == image);
assert!(rebuilt.finalized_features().get("metadata.version") == Some(&25));
assert!(rebuilt.finalized_features().get("group.version") == Some(&1));
assert!(rebuilt.finalized_features_epoch() == 3);
}
#[test]
fn writer_reader_round_trips_empty_image() {
let cid = Uuid::new_v4();
let image = MetadataImage::new(cid);
let bytes = SnapshotWriter::serialize(&image, 0).unwrap();
let records = SnapshotReader::read_records(&bytes).unwrap();
assert!(records.is_empty());
assert!(MetadataImage::from_records(cid, &records) == image);
}
#[test]
#[ignore = "requires Docker"]
fn jvm_dump_log_parses_engine_snapshot() {
use crabka_metadata::{BrokerConfigRecord, BrokerRegistrationRecord, TopicConfigRecord};
use std::io::Write as _;
use std::process::Command;
let cid = Uuid::new_v4();
let mut image = MetadataImage::new(cid);
image.apply(&MetadataRecord::V1BrokerRegistration(
BrokerRegistrationRecord {
node_id: 1,
broker_epoch: 0,
incarnation_id: uuid::Uuid::from_u128(0x0102_0304_0506_0708_090a_0b0c_0d0e_0f10),
host: "broker-1".into(),
port: 9092,
rack: Some("rack-a".into()),
endpoints: vec![],
},
));
image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
node_id: 1,
config_name: "leader.replication.throttled.rate".into(),
config_value: Some("1048576".into()),
}));
image.apply(&MetadataRecord::V1Topic(TopicRecord {
name: "orders".into(),
topic_id: Uuid::new_v4(),
partitions: 2,
replication_factor: 1,
}));
for p in 0..2 {
image.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: "orders".into(),
partition: p,
leader: 1,
replicas: vec![1],
isr: vec![1],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
}
image.apply(&MetadataRecord::V1TopicConfig(TopicConfigRecord {
topic: "orders".into(),
overrides: [("retention.ms".to_string(), "604800000".to_string())].into(),
}));
let bytes = SnapshotWriter::serialize(&image, 1_700_000_000_000).unwrap();
let dir = tempfile::tempdir().expect("tempdir");
let path = dir
.path()
.join("00000000000000000000-0000000000.checkpoint");
std::fs::File::create(&path)
.unwrap()
.write_all(&bytes)
.unwrap();
let out = Command::new("docker")
.args([
"run",
"--rm",
"-v",
&format!("{}:/work", dir.path().display()),
"apache/kafka:4.0.0",
"/opt/kafka/bin/kafka-dump-log.sh",
"--cluster-metadata-decoder",
"--files",
"/work/00000000000000000000-0000000000.checkpoint",
])
.output()
.expect("docker run kafka-dump-log");
let text = format!(
"{}{}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr)
);
eprintln!("{text}");
assert!(out.status.success(), "kafka-dump-log failed: {text}");
for needle in [
"REGISTER_BROKER_RECORD",
"TOPIC_RECORD",
"PARTITION_RECORD",
"CONFIG_RECORD",
] {
assert!(text.contains(needle), "missing {needle} in dump: {text}");
}
assert!(
!text.contains("isvalid: false") && !text.to_lowercase().contains("could not"),
"dump-log reported an invalid record: {text}"
);
assert!(
!text.contains("incarnationId=00000000-0000-0000-0000-000000000000"),
"all RegisterBroker records must have a non-nil incarnationId; found nil in dump output"
);
assert!(
!text
.lines()
.any(|l| l.contains("PartitionRecord") && l.contains("partitionEpoch=-1")),
"all PartitionRecord entries must have partitionEpoch >= 0 after Slice 6; found -1 in dump"
);
}
#[test]
fn byte_range_returns_expected_slice() {
let buf: Vec<u8> = (0u8..=255).collect();
assert!(SnapshotReader::byte_range(&buf, 10, 5) == &buf[10..15]);
assert!(SnapshotReader::byte_range(&buf, 1000, 5).is_empty());
assert!(SnapshotReader::byte_range(&buf, 250, 100) == &buf[250..]);
}
}