1use crabka_compression::CompressionType;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum TimestampType {
9 CreateTime,
10 LogAppendTime,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct Attributes(pub i16);
24
25impl Attributes {
26 pub const TIMESTAMP_TYPE_BIT: i16 = 1 << 3;
27 pub const TRANSACTIONAL_BIT: i16 = 1 << 4;
28 pub const CONTROL_BIT: i16 = 1 << 5;
29
30 #[must_use]
31 pub fn compression(self) -> CompressionType {
32 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
34 let byte = (self.0 & 0x07) as u8;
35 CompressionType::from_attribute_bits(byte).unwrap_or(CompressionType::None)
36 }
37
38 #[must_use]
39 pub fn timestamp_type(self) -> TimestampType {
40 if self.0 & Self::TIMESTAMP_TYPE_BIT != 0 {
41 TimestampType::LogAppendTime
42 } else {
43 TimestampType::CreateTime
44 }
45 }
46
47 #[must_use]
48 pub fn is_transactional(self) -> bool {
49 self.0 & Self::TRANSACTIONAL_BIT != 0
50 }
51
52 #[must_use]
53 pub fn is_control_batch(self) -> bool {
54 self.0 & Self::CONTROL_BIT != 0
55 }
56
57 #[must_use]
58 pub fn with_compression(self, c: CompressionType) -> Self {
59 let cleared = self.0 & !0x07;
60 Self(cleared | i16::from(c.as_attribute_bits()))
61 }
62
63 #[must_use]
64 pub fn with_timestamp_type(self, t: TimestampType) -> Self {
65 match t {
66 TimestampType::CreateTime => Self(self.0 & !Self::TIMESTAMP_TYPE_BIT),
67 TimestampType::LogAppendTime => Self(self.0 | Self::TIMESTAMP_TYPE_BIT),
68 }
69 }
70
71 #[must_use]
72 pub fn with_transactional(self, b: bool) -> Self {
73 if b {
74 Self(self.0 | Self::TRANSACTIONAL_BIT)
75 } else {
76 Self(self.0 & !Self::TRANSACTIONAL_BIT)
77 }
78 }
79
80 #[must_use]
81 pub fn with_control(self, b: bool) -> Self {
82 if b {
83 Self(self.0 | Self::CONTROL_BIT)
84 } else {
85 Self(self.0 & !Self::CONTROL_BIT)
86 }
87 }
88}
89
90use std::mem::size_of;
91use zerocopy::byteorder::{I16, I32, I64, U32};
92use zerocopy::{BigEndian, FromBytes, Immutable, KnownLayout, Unaligned};
93
94#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
97#[repr(C)]
98pub struct RecordBatchHeader {
99 pub base_offset: I64<BigEndian>,
100 pub batch_length: I32<BigEndian>,
101 pub partition_leader_epoch: I32<BigEndian>,
102 pub magic: i8,
103 pub crc: U32<BigEndian>,
104 pub attributes: I16<BigEndian>,
105 pub last_offset_delta: I32<BigEndian>,
106 pub base_timestamp: I64<BigEndian>,
107 pub max_timestamp: I64<BigEndian>,
108 pub producer_id: I64<BigEndian>,
109 pub producer_epoch: I16<BigEndian>,
110 pub base_sequence: I32<BigEndian>,
111 pub records_count: I32<BigEndian>,
112}
113
114pub const HEADER_LEN: usize = 61;
116
117const _: () = assert!(size_of::<RecordBatchHeader>() == HEADER_LEN);
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use assert2::assert;
124 use crabka_compression::CompressionType;
125
126 macro_rules! attr_case {
127 ($name:ident, $bits:expr, $codec:expr, $ts:expr, $txn:expr, $ctrl:expr) => {
128 #[test]
129 fn $name() {
130 let a = Attributes($bits);
131 assert!(
132 a.compression() == $codec,
133 "compression mismatch in {}",
134 stringify!($name)
135 );
136 assert!(
137 a.timestamp_type() == $ts,
138 "timestamp_type mismatch in {}",
139 stringify!($name)
140 );
141 assert!(
142 a.is_transactional() == $txn,
143 "is_transactional mismatch in {}",
144 stringify!($name)
145 );
146 assert!(
147 a.is_control_batch() == $ctrl,
148 "is_control_batch mismatch in {}",
149 stringify!($name)
150 );
151 }
152 };
153 }
154
155 attr_case!(
156 zero,
157 0,
158 CompressionType::None,
159 TimestampType::CreateTime,
160 false,
161 false
162 );
163 attr_case!(
164 gzip_only,
165 0b0000_0000_0000_0001,
166 CompressionType::Gzip,
167 TimestampType::CreateTime,
168 false,
169 false
170 );
171 attr_case!(
172 snappy_only,
173 0b0000_0000_0000_0010,
174 CompressionType::Snappy,
175 TimestampType::CreateTime,
176 false,
177 false
178 );
179 attr_case!(
180 lz4_only,
181 0b0000_0000_0000_0011,
182 CompressionType::Lz4,
183 TimestampType::CreateTime,
184 false,
185 false
186 );
187 attr_case!(
188 zstd_only,
189 0b0000_0000_0000_0100,
190 CompressionType::Zstd,
191 TimestampType::CreateTime,
192 false,
193 false
194 );
195 attr_case!(
196 log_append,
197 0b0000_0000_0000_1000,
198 CompressionType::None,
199 TimestampType::LogAppendTime,
200 false,
201 false
202 );
203 attr_case!(
204 transactional,
205 0b0000_0000_0001_0000,
206 CompressionType::None,
207 TimestampType::CreateTime,
208 true,
209 false
210 );
211 attr_case!(
212 control,
213 0b0000_0000_0010_0000,
214 CompressionType::None,
215 TimestampType::CreateTime,
216 false,
217 true
218 );
219 attr_case!(
220 all_set,
221 0b0000_0000_0011_1100,
222 CompressionType::Zstd,
223 TimestampType::LogAppendTime,
224 true,
225 true
226 );
227
228 #[test]
229 fn builder_round_trip() {
230 let a = Attributes::default()
231 .with_compression(CompressionType::Snappy)
232 .with_timestamp_type(TimestampType::LogAppendTime)
233 .with_transactional(true)
234 .with_control(false);
235
236 assert!(a.compression() == CompressionType::Snappy);
237 assert!(a.timestamp_type() == TimestampType::LogAppendTime);
238 assert!(a.is_transactional());
239 assert!(!a.is_control_batch());
240 }
241
242 #[test]
243 fn replacing_compression_clears_old_bits() {
244 let a = Attributes::default().with_compression(CompressionType::Lz4);
247 let b = a.with_compression(CompressionType::Gzip);
248 assert!(b.compression() == CompressionType::Gzip);
249 assert!(b.0 & 0x07 == 1);
250 }
251
252 fn sample_header_bytes() -> [u8; HEADER_LEN] {
255 let mut buf = [0u8; HEADER_LEN];
256 buf[0..8].copy_from_slice(&100i64.to_be_bytes()); buf[8..12].copy_from_slice(&77i32.to_be_bytes()); buf[12..16].copy_from_slice(&1i32.to_be_bytes()); buf[16] = 2; buf[17..21].copy_from_slice(&0x1234_5678u32.to_be_bytes()); buf[21..23].copy_from_slice(&0i16.to_be_bytes()); buf[23..27].copy_from_slice(&3i32.to_be_bytes()); buf[27..35].copy_from_slice(&111i64.to_be_bytes()); buf[35..43].copy_from_slice(&222i64.to_be_bytes()); buf[43..51].copy_from_slice(&(-1i64).to_be_bytes()); buf[51..53].copy_from_slice(&7i16.to_be_bytes()); buf[53..57].copy_from_slice(&(-1i32).to_be_bytes()); buf[57..61].copy_from_slice(&4i32.to_be_bytes()); buf
270 }
271
272 macro_rules! header_field {
273 ($name:ident, $field:ident, $expected:expr) => {
274 #[test]
275 fn $name() {
276 let buf = sample_header_bytes();
277 let h = RecordBatchHeader::ref_from_bytes(&buf[..]).expect("header reinterpret");
278 assert!(h.$field.get() == $expected);
279 }
280 };
281 }
282
283 header_field!(reads_base_offset, base_offset, 100i64);
284 header_field!(reads_batch_length, batch_length, 77i32);
285 header_field!(reads_partition_leader_epoch, partition_leader_epoch, 1i32);
286 header_field!(reads_crc, crc, 0x1234_5678u32);
287 header_field!(reads_last_offset_delta, last_offset_delta, 3i32);
288 header_field!(reads_base_timestamp, base_timestamp, 111i64);
289 header_field!(reads_max_timestamp, max_timestamp, 222i64);
290 header_field!(reads_producer_id, producer_id, -1i64);
291 header_field!(reads_producer_epoch, producer_epoch, 7i16);
292 header_field!(reads_base_sequence, base_sequence, -1i32);
293 header_field!(reads_records_count, records_count, 4i32);
294
295 #[test]
296 fn reads_magic_directly() {
297 let buf = sample_header_bytes();
298 let h = RecordBatchHeader::ref_from_bytes(&buf[..]).unwrap();
299 assert!(h.magic == 2);
300 }
301
302 #[test]
303 fn header_is_exactly_61_bytes() {
304 assert!(std::mem::size_of::<RecordBatchHeader>() == HEADER_LEN);
305 }
306
307 #[test]
308 fn too_short_buffer_errors() {
309 let buf = [0u8; HEADER_LEN - 1];
310 assert!(RecordBatchHeader::ref_from_bytes(&buf[..]).is_err());
311 }
312}