Skip to main content

crabka_protocol/records/metadata/
control.rs

1//! `KRaft` control-record framing. Control records (`LeaderChange`,
2//! `SnapshotHeader`, `SnapshotFooter`) live in a batch with the control bit set;
3//! the record key is `version (i16) + type (i16)` and the value is the message
4//! body.
5
6use bytes::{BufMut, Bytes, BytesMut};
7
8use crate::records::{Attributes, Record, RecordBatch};
9
10/// `KRaft` control record types (the i16 written after the i16 version in the
11/// key).
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(i16)]
14pub enum ControlRecordType {
15    LeaderChange = 2,
16    SnapshotHeader = 3,
17    SnapshotFooter = 4,
18}
19
20/// Control record key version (Kafka writes 0).
21const CONTROL_KEY_VERSION: i16 = 0;
22
23/// Build a control record key: `version(i16) + type(i16)`.
24#[must_use]
25pub fn control_record_key(ty: ControlRecordType) -> Bytes {
26    let mut key = BytesMut::with_capacity(4);
27    key.put_i16(CONTROL_KEY_VERSION);
28    key.put_i16(ty as i16);
29    key.freeze()
30}
31
32/// Encode a single-record control batch at `base_offset` with the control bit
33/// set, returning the full v2 `RecordBatch` bytes (CRC computed by the encoder).
34///
35/// # Panics
36/// Panics only if the underlying record-batch encoder fails, which cannot happen
37/// for an uncompressed in-range single-record batch.
38#[must_use]
39pub fn encode_control_batch(base_offset: i64, key: Bytes, value: Bytes) -> Bytes {
40    let batch = RecordBatch {
41        base_offset,
42        attributes: Attributes::default().with_control(true),
43        records: vec![Record {
44            key: Some(key),
45            value: Some(value),
46            ..Default::default()
47        }],
48        ..Default::default()
49    };
50    let mut out = BytesMut::new();
51    batch
52        .encode(&mut out)
53        .expect("control batch encodes (no compression, in-range)");
54    out.freeze()
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60    use assert2::assert;
61    use bytes::Buf;
62
63    #[test]
64    fn control_key_is_version_then_type() {
65        let key = control_record_key(ControlRecordType::SnapshotHeader);
66        let mut cur: &[u8] = &key;
67        assert!(cur.get_i16() == 0); // version
68        assert!(cur.get_i16() == 3); // SnapshotHeader type
69    }
70
71    #[test]
72    fn control_batch_sets_control_bit() {
73        let key = control_record_key(ControlRecordType::LeaderChange);
74        let batch = encode_control_batch(0, key, bytes::Bytes::from_static(b"\x00\x00"));
75        // magic byte at offset 16, attributes i16 at offset 21..23; control bit = 0x20.
76        let attrs = i16::from_be_bytes([batch[21], batch[22]]);
77        assert!(attrs & 0x20 != 0);
78    }
79}