use bytes::{BufMut, Bytes, BytesMut};
use crate::records::{Attributes, Record, RecordBatch};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i16)]
pub enum ControlRecordType {
LeaderChange = 2,
SnapshotHeader = 3,
SnapshotFooter = 4,
}
const CONTROL_KEY_VERSION: i16 = 0;
#[must_use]
pub fn control_record_key(ty: ControlRecordType) -> Bytes {
let mut key = BytesMut::with_capacity(4);
key.put_i16(CONTROL_KEY_VERSION);
key.put_i16(ty as i16);
key.freeze()
}
#[must_use]
pub fn encode_control_batch(base_offset: i64, key: Bytes, value: Bytes) -> Bytes {
let batch = RecordBatch {
base_offset,
attributes: Attributes::default().with_control(true),
records: vec![Record {
key: Some(key),
value: Some(value),
..Default::default()
}],
..Default::default()
};
let mut out = BytesMut::new();
batch
.encode(&mut out)
.expect("control batch encodes (no compression, in-range)");
out.freeze()
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use bytes::Buf;
#[test]
fn control_key_is_version_then_type() {
let key = control_record_key(ControlRecordType::SnapshotHeader);
let mut cur: &[u8] = &key;
assert!(cur.get_i16() == 0); assert!(cur.get_i16() == 3); }
#[test]
fn control_batch_sets_control_bit() {
let key = control_record_key(ControlRecordType::LeaderChange);
let batch = encode_control_batch(0, key, bytes::Bytes::from_static(b"\x00\x00"));
let attrs = i16::from_be_bytes([batch[21], batch[22]]);
assert!(attrs & 0x20 != 0);
}
}