crabka_protocol/records/metadata/
control.rs1use bytes::{BufMut, Bytes, BytesMut};
7
8use crate::records::{Attributes, Record, RecordBatch};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(i16)]
14pub enum ControlRecordType {
15 LeaderChange = 2,
16 SnapshotHeader = 3,
17 SnapshotFooter = 4,
18}
19
20const CONTROL_KEY_VERSION: i16 = 0;
22
23#[must_use]
25pub fn control_record_key(ty: ControlRecordType) -> Bytes {
26 let mut key = BytesMut::with_capacity(4);
27 key.put_i16(CONTROL_KEY_VERSION);
28 key.put_i16(ty as i16);
29 key.freeze()
30}
31
32#[must_use]
39pub fn encode_control_batch(base_offset: i64, key: Bytes, value: Bytes) -> Bytes {
40 let batch = RecordBatch {
41 base_offset,
42 attributes: Attributes::default().with_control(true),
43 records: vec![Record {
44 key: Some(key),
45 value: Some(value),
46 ..Default::default()
47 }],
48 ..Default::default()
49 };
50 let mut out = BytesMut::new();
51 batch
52 .encode(&mut out)
53 .expect("control batch encodes (no compression, in-range)");
54 out.freeze()
55}
56
57#[cfg(test)]
58mod tests {
59 use super::*;
60 use assert2::assert;
61 use bytes::Buf;
62
63 #[test]
64 fn control_key_is_version_then_type() {
65 let key = control_record_key(ControlRecordType::SnapshotHeader);
66 let mut cur: &[u8] = &key;
67 assert!(cur.get_i16() == 0); assert!(cur.get_i16() == 3); }
70
71 #[test]
72 fn control_batch_sets_control_bit() {
73 let key = control_record_key(ControlRecordType::LeaderChange);
74 let batch = encode_control_batch(0, key, bytes::Bytes::from_static(b"\x00\x00"));
75 let attrs = i16::from_be_bytes([batch[21], batch[22]]);
77 assert!(attrs & 0x20 != 0);
78 }
79}