Skip to main content

crabka_protocol/records/
header.rs

1//! Record-batch v2 header types: `RecordBatchHeader` (zerocopy),
2//! `Attributes`, `TimestampType`.
3
4use crabka_compression::CompressionType;
5
6/// Timestamp-type bit in the attributes word.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum TimestampType {
9    CreateTime,
10    LogAppendTime,
11}
12
13/// Packed batch-level attributes, encoded as a 16-bit big-endian field
14/// in the wire header.
15///
16/// - bits 0-2: compression type (matches `CompressionType::as_attribute_bits`)
17/// - bit 3:    timestamp type (0 = `CreateTime`, 1 = `LogAppendTime`)
18/// - bit 4:    `is_transactional`
19/// - bit 5:    `is_control_batch`
20/// - bit 6:    `has_delete_horizon_ms` (Kafka 2.8+; not surfaced separately here)
21/// - bits 7-15: reserved
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct Attributes(pub i16);
24
25impl Attributes {
26    pub const TIMESTAMP_TYPE_BIT: i16 = 1 << 3;
27    pub const TRANSACTIONAL_BIT: i16 = 1 << 4;
28    pub const CONTROL_BIT: i16 = 1 << 5;
29
30    #[must_use]
31    pub fn compression(self) -> CompressionType {
32        // The low 3 bits are the codec id. Wider attribute bits are ignored.
33        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
34        let byte = (self.0 & 0x07) as u8;
35        CompressionType::from_attribute_bits(byte).unwrap_or(CompressionType::None)
36    }
37
38    #[must_use]
39    pub fn timestamp_type(self) -> TimestampType {
40        if self.0 & Self::TIMESTAMP_TYPE_BIT != 0 {
41            TimestampType::LogAppendTime
42        } else {
43            TimestampType::CreateTime
44        }
45    }
46
47    #[must_use]
48    pub fn is_transactional(self) -> bool {
49        self.0 & Self::TRANSACTIONAL_BIT != 0
50    }
51
52    #[must_use]
53    pub fn is_control_batch(self) -> bool {
54        self.0 & Self::CONTROL_BIT != 0
55    }
56
57    #[must_use]
58    pub fn with_compression(self, c: CompressionType) -> Self {
59        let cleared = self.0 & !0x07;
60        Self(cleared | i16::from(c.as_attribute_bits()))
61    }
62
63    #[must_use]
64    pub fn with_timestamp_type(self, t: TimestampType) -> Self {
65        match t {
66            TimestampType::CreateTime => Self(self.0 & !Self::TIMESTAMP_TYPE_BIT),
67            TimestampType::LogAppendTime => Self(self.0 | Self::TIMESTAMP_TYPE_BIT),
68        }
69    }
70
71    #[must_use]
72    pub fn with_transactional(self, b: bool) -> Self {
73        if b {
74            Self(self.0 | Self::TRANSACTIONAL_BIT)
75        } else {
76            Self(self.0 & !Self::TRANSACTIONAL_BIT)
77        }
78    }
79
80    #[must_use]
81    pub fn with_control(self, b: bool) -> Self {
82        if b {
83            Self(self.0 | Self::CONTROL_BIT)
84        } else {
85            Self(self.0 & !Self::CONTROL_BIT)
86        }
87    }
88}
89
90use std::mem::size_of;
91use zerocopy::byteorder::{I16, I32, I64, U32};
92use zerocopy::{BigEndian, FromBytes, Immutable, KnownLayout, Unaligned};
93
94/// The fixed 61-byte v2 record-batch header, reinterpreted in place from
95/// the wire bytes via `zerocopy`.
96#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
97#[repr(C)]
98pub struct RecordBatchHeader {
99    pub base_offset: I64<BigEndian>,
100    pub batch_length: I32<BigEndian>,
101    pub partition_leader_epoch: I32<BigEndian>,
102    pub magic: i8,
103    pub crc: U32<BigEndian>,
104    pub attributes: I16<BigEndian>,
105    pub last_offset_delta: I32<BigEndian>,
106    pub base_timestamp: I64<BigEndian>,
107    pub max_timestamp: I64<BigEndian>,
108    pub producer_id: I64<BigEndian>,
109    pub producer_epoch: I16<BigEndian>,
110    pub base_sequence: I32<BigEndian>,
111    pub records_count: I32<BigEndian>,
112}
113
114/// Size of the v2 record-batch header in bytes.
115pub const HEADER_LEN: usize = 61;
116
117// Compile-time assertion that the layout is exactly 61 bytes.
118const _: () = assert!(size_of::<RecordBatchHeader>() == HEADER_LEN);
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use assert2::assert;
124    use crabka_compression::CompressionType;
125
126    macro_rules! attr_case {
127        ($name:ident, $bits:expr, $codec:expr, $ts:expr, $txn:expr, $ctrl:expr) => {
128            #[test]
129            fn $name() {
130                let a = Attributes($bits);
131                assert!(
132                    a.compression() == $codec,
133                    "compression mismatch in {}",
134                    stringify!($name)
135                );
136                assert!(
137                    a.timestamp_type() == $ts,
138                    "timestamp_type mismatch in {}",
139                    stringify!($name)
140                );
141                assert!(
142                    a.is_transactional() == $txn,
143                    "is_transactional mismatch in {}",
144                    stringify!($name)
145                );
146                assert!(
147                    a.is_control_batch() == $ctrl,
148                    "is_control_batch mismatch in {}",
149                    stringify!($name)
150                );
151            }
152        };
153    }
154
155    attr_case!(
156        zero,
157        0,
158        CompressionType::None,
159        TimestampType::CreateTime,
160        false,
161        false
162    );
163    attr_case!(
164        gzip_only,
165        0b0000_0000_0000_0001,
166        CompressionType::Gzip,
167        TimestampType::CreateTime,
168        false,
169        false
170    );
171    attr_case!(
172        snappy_only,
173        0b0000_0000_0000_0010,
174        CompressionType::Snappy,
175        TimestampType::CreateTime,
176        false,
177        false
178    );
179    attr_case!(
180        lz4_only,
181        0b0000_0000_0000_0011,
182        CompressionType::Lz4,
183        TimestampType::CreateTime,
184        false,
185        false
186    );
187    attr_case!(
188        zstd_only,
189        0b0000_0000_0000_0100,
190        CompressionType::Zstd,
191        TimestampType::CreateTime,
192        false,
193        false
194    );
195    attr_case!(
196        log_append,
197        0b0000_0000_0000_1000,
198        CompressionType::None,
199        TimestampType::LogAppendTime,
200        false,
201        false
202    );
203    attr_case!(
204        transactional,
205        0b0000_0000_0001_0000,
206        CompressionType::None,
207        TimestampType::CreateTime,
208        true,
209        false
210    );
211    attr_case!(
212        control,
213        0b0000_0000_0010_0000,
214        CompressionType::None,
215        TimestampType::CreateTime,
216        false,
217        true
218    );
219    attr_case!(
220        all_set,
221        0b0000_0000_0011_1100,
222        CompressionType::Zstd,
223        TimestampType::LogAppendTime,
224        true,
225        true
226    );
227
228    #[test]
229    fn builder_round_trip() {
230        let a = Attributes::default()
231            .with_compression(CompressionType::Snappy)
232            .with_timestamp_type(TimestampType::LogAppendTime)
233            .with_transactional(true)
234            .with_control(false);
235
236        assert!(a.compression() == CompressionType::Snappy);
237        assert!(a.timestamp_type() == TimestampType::LogAppendTime);
238        assert!(a.is_transactional());
239        assert!(!a.is_control_batch());
240    }
241
242    #[test]
243    fn replacing_compression_clears_old_bits() {
244        // Starting with Lz4 (bits 0-2 = 011), switching to Gzip (= 001)
245        // must clear bit 1, not OR over it.
246        let a = Attributes::default().with_compression(CompressionType::Lz4);
247        let b = a.with_compression(CompressionType::Gzip);
248        assert!(b.compression() == CompressionType::Gzip);
249        assert!(b.0 & 0x07 == 1);
250    }
251
252    /// Build a sample 61-byte header with known values. Reused across the
253    /// header table tests below.
254    fn sample_header_bytes() -> [u8; HEADER_LEN] {
255        let mut buf = [0u8; HEADER_LEN];
256        buf[0..8].copy_from_slice(&100i64.to_be_bytes()); // base_offset
257        buf[8..12].copy_from_slice(&77i32.to_be_bytes()); // batch_length
258        buf[12..16].copy_from_slice(&1i32.to_be_bytes()); // partition_leader_epoch
259        buf[16] = 2; // magic
260        buf[17..21].copy_from_slice(&0x1234_5678u32.to_be_bytes()); // crc
261        buf[21..23].copy_from_slice(&0i16.to_be_bytes()); // attributes
262        buf[23..27].copy_from_slice(&3i32.to_be_bytes()); // last_offset_delta
263        buf[27..35].copy_from_slice(&111i64.to_be_bytes()); // base_timestamp
264        buf[35..43].copy_from_slice(&222i64.to_be_bytes()); // max_timestamp
265        buf[43..51].copy_from_slice(&(-1i64).to_be_bytes()); // producer_id
266        buf[51..53].copy_from_slice(&7i16.to_be_bytes()); // producer_epoch
267        buf[53..57].copy_from_slice(&(-1i32).to_be_bytes()); // base_sequence
268        buf[57..61].copy_from_slice(&4i32.to_be_bytes()); // records_count
269        buf
270    }
271
272    macro_rules! header_field {
273        ($name:ident, $field:ident, $expected:expr) => {
274            #[test]
275            fn $name() {
276                let buf = sample_header_bytes();
277                let h = RecordBatchHeader::ref_from_bytes(&buf[..]).expect("header reinterpret");
278                assert!(h.$field.get() == $expected);
279            }
280        };
281    }
282
283    header_field!(reads_base_offset, base_offset, 100i64);
284    header_field!(reads_batch_length, batch_length, 77i32);
285    header_field!(reads_partition_leader_epoch, partition_leader_epoch, 1i32);
286    header_field!(reads_crc, crc, 0x1234_5678u32);
287    header_field!(reads_last_offset_delta, last_offset_delta, 3i32);
288    header_field!(reads_base_timestamp, base_timestamp, 111i64);
289    header_field!(reads_max_timestamp, max_timestamp, 222i64);
290    header_field!(reads_producer_id, producer_id, -1i64);
291    header_field!(reads_producer_epoch, producer_epoch, 7i16);
292    header_field!(reads_base_sequence, base_sequence, -1i32);
293    header_field!(reads_records_count, records_count, 4i32);
294
295    #[test]
296    fn reads_magic_directly() {
297        let buf = sample_header_bytes();
298        let h = RecordBatchHeader::ref_from_bytes(&buf[..]).unwrap();
299        assert!(h.magic == 2);
300    }
301
302    #[test]
303    fn header_is_exactly_61_bytes() {
304        assert!(std::mem::size_of::<RecordBatchHeader>() == HEADER_LEN);
305    }
306
307    #[test]
308    fn too_short_buffer_errors() {
309        let buf = [0u8; HEADER_LEN - 1];
310        assert!(RecordBatchHeader::ref_from_bytes(&buf[..]).is_err());
311    }
312}