use bytes::{BufMut, Bytes, BytesMut};
use crate::Encode;
use crate::owned::feature_level_record::FeatureLevelRecord;
use crate::owned::snapshot_footer_record::SnapshotFooterRecord;
use crate::owned::snapshot_header_record::SnapshotHeaderRecord;
use crate::records::metadata::control::{
ControlRecordType, control_record_key, encode_control_batch,
};
use crate::records::metadata::record::KraftMetadataRecord;
use crate::records::{Record, RecordBatch};
const FEATURE_LEVEL_API_VERSION: i16 = 0;
#[must_use]
pub fn build_bootstrap_checkpoint(features: &[(&str, i16)]) -> Bytes {
let mut out = BytesMut::new();
let header = SnapshotHeaderRecord {
version: 0,
last_contained_log_timestamp: 0,
..Default::default()
};
let mut header_body = BytesMut::new();
header
.encode(&mut header_body, 0)
.expect("snapshot header encodes");
out.put_slice(&encode_control_batch(
0,
control_record_key(ControlRecordType::SnapshotHeader),
header_body.freeze(),
));
let records: Vec<Record> = features
.iter()
.enumerate()
.map(|(i, (name, level))| {
let rec = KraftMetadataRecord::FeatureLevel(FeatureLevelRecord {
name: (*name).to_string(),
feature_level: *level,
..Default::default()
});
Record {
offset_delta: i32::try_from(i).expect("few features"),
value: Some(
rec.encode_value(FEATURE_LEVEL_API_VERSION)
.expect("feature record encodes"),
),
..Default::default()
}
})
.collect();
let data = RecordBatch {
base_offset: 1,
last_offset_delta: i32::try_from(features.len().saturating_sub(1)).unwrap_or(0),
records,
..Default::default()
};
data.encode(&mut out).expect("feature data batch encodes");
let footer_offset = 1 + i64::try_from(features.len()).expect("few features");
let footer = SnapshotFooterRecord {
version: 0,
..Default::default()
};
let mut footer_body = BytesMut::new();
footer
.encode(&mut footer_body, 0)
.expect("snapshot footer encodes");
out.put_slice(&encode_control_batch(
footer_offset,
control_record_key(ControlRecordType::SnapshotFooter),
footer_body.freeze(),
));
out.freeze()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::records::RecordBatch;
use assert2::assert;
#[test]
fn bootstrap_checkpoint_has_header_features_footer() {
let bytes = build_bootstrap_checkpoint(&[
("metadata.version", 25),
("group.version", 1),
("transaction.version", 2),
]);
let mut cur: &[u8] = &bytes;
let header = RecordBatch::decode(&mut cur).expect("header batch");
assert!(header.base_offset == 0);
assert!(header.attributes.is_control_batch());
let data = RecordBatch::decode(&mut cur).expect("data batch");
assert!(data.base_offset == 1);
assert!(!data.attributes.is_control_batch());
assert!(data.records.len() == 3);
let footer = RecordBatch::decode(&mut cur).expect("footer batch");
assert!(footer.attributes.is_control_batch());
assert!(cur.is_empty());
}
}