Skip to main content

crabka_protocol/records/
header.rs

1//! Record-batch v2 header types: `RecordBatchHeader` (zerocopy),
2//! `Attributes`, `TimestampType`.
3
4use crabka_compression::CompressionType;
5
6/// Timestamp-type bit in the attributes word.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum TimestampType {
9    CreateTime,
10    LogAppendTime,
11}
12
13/// Packed batch-level attributes, encoded as a 16-bit big-endian field
14/// in the wire header.
15///
16/// - bits 0-2: compression type (matches `CompressionType::as_attribute_bits`)
17/// - bit 3:    timestamp type (0 = `CreateTime`, 1 = `LogAppendTime`)
18/// - bit 4:    `is_transactional`
19/// - bit 5:    `is_control_batch`
20/// - bit 6:    `has_delete_horizon_ms` (Kafka 2.8+; not surfaced separately here)
21/// - bits 7-15: reserved
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct Attributes(pub i16);
24
25impl Attributes {
26    pub const TIMESTAMP_TYPE_BIT: i16 = 1 << 3;
27    pub const TRANSACTIONAL_BIT: i16 = 1 << 4;
28    pub const CONTROL_BIT: i16 = 1 << 5;
29
30    #[must_use]
31    pub fn compression(self) -> CompressionType {
32        // The low 3 bits are the codec id. Wider attribute bits are ignored.
33        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
34        let byte = (self.0 & 0x07) as u8;
35        CompressionType::from_attribute_bits(byte).unwrap_or(CompressionType::None)
36    }
37
38    #[must_use]
39    pub fn timestamp_type(self) -> TimestampType {
40        if self.0 & Self::TIMESTAMP_TYPE_BIT != 0 {
41            TimestampType::LogAppendTime
42        } else {
43            TimestampType::CreateTime
44        }
45    }
46
47    #[must_use]
48    pub fn is_transactional(self) -> bool {
49        self.0 & Self::TRANSACTIONAL_BIT != 0
50    }
51
52    #[must_use]
53    pub fn is_control_batch(self) -> bool {
54        self.0 & Self::CONTROL_BIT != 0
55    }
56
57    #[must_use]
58    pub fn with_compression(self, c: CompressionType) -> Self {
59        let cleared = self.0 & !0x07;
60        Self(cleared | i16::from(c.as_attribute_bits()))
61    }
62
63    #[must_use]
64    pub fn with_timestamp_type(self, t: TimestampType) -> Self {
65        match t {
66            TimestampType::CreateTime => Self(self.0 & !Self::TIMESTAMP_TYPE_BIT),
67            TimestampType::LogAppendTime => Self(self.0 | Self::TIMESTAMP_TYPE_BIT),
68        }
69    }
70
71    #[must_use]
72    pub fn with_transactional(self, b: bool) -> Self {
73        if b {
74            Self(self.0 | Self::TRANSACTIONAL_BIT)
75        } else {
76            Self(self.0 & !Self::TRANSACTIONAL_BIT)
77        }
78    }
79
80    #[must_use]
81    pub fn with_control(self, b: bool) -> Self {
82        if b {
83            Self(self.0 | Self::CONTROL_BIT)
84        } else {
85            Self(self.0 & !Self::CONTROL_BIT)
86        }
87    }
88}
89
90use std::mem::size_of;
91use zerocopy::byteorder::{I16, I32, I64, U32};
92use zerocopy::{BigEndian, FromBytes, Immutable, KnownLayout, Unaligned};
93
94/// The fixed 61-byte v2 record-batch header, reinterpreted in place from
95/// the wire bytes via `zerocopy`.
96#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
97#[repr(C)]
98pub struct RecordBatchHeader {
99    pub base_offset: I64<BigEndian>,
100    pub batch_length: I32<BigEndian>,
101    pub partition_leader_epoch: I32<BigEndian>,
102    pub magic: i8,
103    pub crc: U32<BigEndian>,
104    pub attributes: I16<BigEndian>,
105    pub last_offset_delta: I32<BigEndian>,
106    pub base_timestamp: I64<BigEndian>,
107    pub max_timestamp: I64<BigEndian>,
108    pub producer_id: I64<BigEndian>,
109    pub producer_epoch: I16<BigEndian>,
110    pub base_sequence: I32<BigEndian>,
111    pub records_count: I32<BigEndian>,
112}
113
114/// Size of the v2 record-batch header in bytes.
115pub const HEADER_LEN: usize = 61;
116
117// Compile-time assertion that the layout is exactly 61 bytes.
118const _: () = assert!(size_of::<RecordBatchHeader>() == HEADER_LEN);
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crabka_compression::CompressionType;
124
125    macro_rules! attr_case {
126        ($name:ident, $bits:expr, $codec:expr, $ts:expr, $txn:expr, $ctrl:expr) => {
127            #[test]
128            fn $name() {
129                let a = Attributes($bits);
130                assert_eq!(
131                    a.compression(),
132                    $codec,
133                    "compression mismatch in {}",
134                    stringify!($name)
135                );
136                assert_eq!(
137                    a.timestamp_type(),
138                    $ts,
139                    "timestamp_type mismatch in {}",
140                    stringify!($name)
141                );
142                assert_eq!(
143                    a.is_transactional(),
144                    $txn,
145                    "is_transactional mismatch in {}",
146                    stringify!($name)
147                );
148                assert_eq!(
149                    a.is_control_batch(),
150                    $ctrl,
151                    "is_control_batch mismatch in {}",
152                    stringify!($name)
153                );
154            }
155        };
156    }
157
158    attr_case!(
159        zero,
160        0,
161        CompressionType::None,
162        TimestampType::CreateTime,
163        false,
164        false
165    );
166    attr_case!(
167        gzip_only,
168        0b0000_0000_0000_0001,
169        CompressionType::Gzip,
170        TimestampType::CreateTime,
171        false,
172        false
173    );
174    attr_case!(
175        snappy_only,
176        0b0000_0000_0000_0010,
177        CompressionType::Snappy,
178        TimestampType::CreateTime,
179        false,
180        false
181    );
182    attr_case!(
183        lz4_only,
184        0b0000_0000_0000_0011,
185        CompressionType::Lz4,
186        TimestampType::CreateTime,
187        false,
188        false
189    );
190    attr_case!(
191        zstd_only,
192        0b0000_0000_0000_0100,
193        CompressionType::Zstd,
194        TimestampType::CreateTime,
195        false,
196        false
197    );
198    attr_case!(
199        log_append,
200        0b0000_0000_0000_1000,
201        CompressionType::None,
202        TimestampType::LogAppendTime,
203        false,
204        false
205    );
206    attr_case!(
207        transactional,
208        0b0000_0000_0001_0000,
209        CompressionType::None,
210        TimestampType::CreateTime,
211        true,
212        false
213    );
214    attr_case!(
215        control,
216        0b0000_0000_0010_0000,
217        CompressionType::None,
218        TimestampType::CreateTime,
219        false,
220        true
221    );
222    attr_case!(
223        all_set,
224        0b0000_0000_0011_1100,
225        CompressionType::Zstd,
226        TimestampType::LogAppendTime,
227        true,
228        true
229    );
230
231    #[test]
232    fn builder_round_trip() {
233        let a = Attributes::default()
234            .with_compression(CompressionType::Snappy)
235            .with_timestamp_type(TimestampType::LogAppendTime)
236            .with_transactional(true)
237            .with_control(false);
238
239        assert_eq!(a.compression(), CompressionType::Snappy);
240        assert_eq!(a.timestamp_type(), TimestampType::LogAppendTime);
241        assert!(a.is_transactional());
242        assert!(!a.is_control_batch());
243    }
244
245    #[test]
246    fn replacing_compression_clears_old_bits() {
247        // Starting with Lz4 (bits 0-2 = 011), switching to Gzip (= 001)
248        // must clear bit 1, not OR over it.
249        let a = Attributes::default().with_compression(CompressionType::Lz4);
250        let b = a.with_compression(CompressionType::Gzip);
251        assert_eq!(b.compression(), CompressionType::Gzip);
252        assert_eq!(b.0 & 0x07, 1);
253    }
254
255    /// Build a sample 61-byte header with known values. Reused across the
256    /// header table tests below.
257    fn sample_header_bytes() -> [u8; HEADER_LEN] {
258        let mut buf = [0u8; HEADER_LEN];
259        buf[0..8].copy_from_slice(&100i64.to_be_bytes()); // base_offset
260        buf[8..12].copy_from_slice(&77i32.to_be_bytes()); // batch_length
261        buf[12..16].copy_from_slice(&1i32.to_be_bytes()); // partition_leader_epoch
262        buf[16] = 2; // magic
263        buf[17..21].copy_from_slice(&0x1234_5678u32.to_be_bytes()); // crc
264        buf[21..23].copy_from_slice(&0i16.to_be_bytes()); // attributes
265        buf[23..27].copy_from_slice(&3i32.to_be_bytes()); // last_offset_delta
266        buf[27..35].copy_from_slice(&111i64.to_be_bytes()); // base_timestamp
267        buf[35..43].copy_from_slice(&222i64.to_be_bytes()); // max_timestamp
268        buf[43..51].copy_from_slice(&(-1i64).to_be_bytes()); // producer_id
269        buf[51..53].copy_from_slice(&7i16.to_be_bytes()); // producer_epoch
270        buf[53..57].copy_from_slice(&(-1i32).to_be_bytes()); // base_sequence
271        buf[57..61].copy_from_slice(&4i32.to_be_bytes()); // records_count
272        buf
273    }
274
275    macro_rules! header_field {
276        ($name:ident, $field:ident, $expected:expr) => {
277            #[test]
278            fn $name() {
279                let buf = sample_header_bytes();
280                let h = RecordBatchHeader::ref_from_bytes(&buf[..]).expect("header reinterpret");
281                assert_eq!(h.$field.get(), $expected);
282            }
283        };
284    }
285
286    header_field!(reads_base_offset, base_offset, 100i64);
287    header_field!(reads_batch_length, batch_length, 77i32);
288    header_field!(reads_partition_leader_epoch, partition_leader_epoch, 1i32);
289    header_field!(reads_crc, crc, 0x1234_5678u32);
290    header_field!(reads_last_offset_delta, last_offset_delta, 3i32);
291    header_field!(reads_base_timestamp, base_timestamp, 111i64);
292    header_field!(reads_max_timestamp, max_timestamp, 222i64);
293    header_field!(reads_producer_id, producer_id, -1i64);
294    header_field!(reads_producer_epoch, producer_epoch, 7i16);
295    header_field!(reads_base_sequence, base_sequence, -1i32);
296    header_field!(reads_records_count, records_count, 4i32);
297
298    #[test]
299    fn reads_magic_directly() {
300        let buf = sample_header_bytes();
301        let h = RecordBatchHeader::ref_from_bytes(&buf[..]).unwrap();
302        assert_eq!(h.magic, 2);
303    }
304
305    #[test]
306    fn header_is_exactly_61_bytes() {
307        assert_eq!(std::mem::size_of::<RecordBatchHeader>(), HEADER_LEN);
308    }
309
310    #[test]
311    fn too_short_buffer_errors() {
312        let buf = [0u8; HEADER_LEN - 1];
313        assert!(RecordBatchHeader::ref_from_bytes(&buf[..]).is_err());
314    }
315}