1use crabka_compression::CompressionType;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum TimestampType {
9 CreateTime,
10 LogAppendTime,
11}
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct Attributes(pub i16);
24
25impl Attributes {
26 pub const TIMESTAMP_TYPE_BIT: i16 = 1 << 3;
27 pub const TRANSACTIONAL_BIT: i16 = 1 << 4;
28 pub const CONTROL_BIT: i16 = 1 << 5;
29
30 #[must_use]
31 pub fn compression(self) -> CompressionType {
32 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
34 let byte = (self.0 & 0x07) as u8;
35 CompressionType::from_attribute_bits(byte).unwrap_or(CompressionType::None)
36 }
37
38 #[must_use]
39 pub fn timestamp_type(self) -> TimestampType {
40 if self.0 & Self::TIMESTAMP_TYPE_BIT != 0 {
41 TimestampType::LogAppendTime
42 } else {
43 TimestampType::CreateTime
44 }
45 }
46
47 #[must_use]
48 pub fn is_transactional(self) -> bool {
49 self.0 & Self::TRANSACTIONAL_BIT != 0
50 }
51
52 #[must_use]
53 pub fn is_control_batch(self) -> bool {
54 self.0 & Self::CONTROL_BIT != 0
55 }
56
57 #[must_use]
58 pub fn with_compression(self, c: CompressionType) -> Self {
59 let cleared = self.0 & !0x07;
60 Self(cleared | i16::from(c.as_attribute_bits()))
61 }
62
63 #[must_use]
64 pub fn with_timestamp_type(self, t: TimestampType) -> Self {
65 match t {
66 TimestampType::CreateTime => Self(self.0 & !Self::TIMESTAMP_TYPE_BIT),
67 TimestampType::LogAppendTime => Self(self.0 | Self::TIMESTAMP_TYPE_BIT),
68 }
69 }
70
71 #[must_use]
72 pub fn with_transactional(self, b: bool) -> Self {
73 if b {
74 Self(self.0 | Self::TRANSACTIONAL_BIT)
75 } else {
76 Self(self.0 & !Self::TRANSACTIONAL_BIT)
77 }
78 }
79
80 #[must_use]
81 pub fn with_control(self, b: bool) -> Self {
82 if b {
83 Self(self.0 | Self::CONTROL_BIT)
84 } else {
85 Self(self.0 & !Self::CONTROL_BIT)
86 }
87 }
88}
89
90use std::mem::size_of;
91use zerocopy::byteorder::{I16, I32, I64, U32};
92use zerocopy::{BigEndian, FromBytes, Immutable, KnownLayout, Unaligned};
93
94#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
97#[repr(C)]
98pub struct RecordBatchHeader {
99 pub base_offset: I64<BigEndian>,
100 pub batch_length: I32<BigEndian>,
101 pub partition_leader_epoch: I32<BigEndian>,
102 pub magic: i8,
103 pub crc: U32<BigEndian>,
104 pub attributes: I16<BigEndian>,
105 pub last_offset_delta: I32<BigEndian>,
106 pub base_timestamp: I64<BigEndian>,
107 pub max_timestamp: I64<BigEndian>,
108 pub producer_id: I64<BigEndian>,
109 pub producer_epoch: I16<BigEndian>,
110 pub base_sequence: I32<BigEndian>,
111 pub records_count: I32<BigEndian>,
112}
113
114pub const HEADER_LEN: usize = 61;
116
117const _: () = assert!(size_of::<RecordBatchHeader>() == HEADER_LEN);
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use crabka_compression::CompressionType;
124
125 macro_rules! attr_case {
126 ($name:ident, $bits:expr, $codec:expr, $ts:expr, $txn:expr, $ctrl:expr) => {
127 #[test]
128 fn $name() {
129 let a = Attributes($bits);
130 assert_eq!(
131 a.compression(),
132 $codec,
133 "compression mismatch in {}",
134 stringify!($name)
135 );
136 assert_eq!(
137 a.timestamp_type(),
138 $ts,
139 "timestamp_type mismatch in {}",
140 stringify!($name)
141 );
142 assert_eq!(
143 a.is_transactional(),
144 $txn,
145 "is_transactional mismatch in {}",
146 stringify!($name)
147 );
148 assert_eq!(
149 a.is_control_batch(),
150 $ctrl,
151 "is_control_batch mismatch in {}",
152 stringify!($name)
153 );
154 }
155 };
156 }
157
158 attr_case!(
159 zero,
160 0,
161 CompressionType::None,
162 TimestampType::CreateTime,
163 false,
164 false
165 );
166 attr_case!(
167 gzip_only,
168 0b0000_0000_0000_0001,
169 CompressionType::Gzip,
170 TimestampType::CreateTime,
171 false,
172 false
173 );
174 attr_case!(
175 snappy_only,
176 0b0000_0000_0000_0010,
177 CompressionType::Snappy,
178 TimestampType::CreateTime,
179 false,
180 false
181 );
182 attr_case!(
183 lz4_only,
184 0b0000_0000_0000_0011,
185 CompressionType::Lz4,
186 TimestampType::CreateTime,
187 false,
188 false
189 );
190 attr_case!(
191 zstd_only,
192 0b0000_0000_0000_0100,
193 CompressionType::Zstd,
194 TimestampType::CreateTime,
195 false,
196 false
197 );
198 attr_case!(
199 log_append,
200 0b0000_0000_0000_1000,
201 CompressionType::None,
202 TimestampType::LogAppendTime,
203 false,
204 false
205 );
206 attr_case!(
207 transactional,
208 0b0000_0000_0001_0000,
209 CompressionType::None,
210 TimestampType::CreateTime,
211 true,
212 false
213 );
214 attr_case!(
215 control,
216 0b0000_0000_0010_0000,
217 CompressionType::None,
218 TimestampType::CreateTime,
219 false,
220 true
221 );
222 attr_case!(
223 all_set,
224 0b0000_0000_0011_1100,
225 CompressionType::Zstd,
226 TimestampType::LogAppendTime,
227 true,
228 true
229 );
230
231 #[test]
232 fn builder_round_trip() {
233 let a = Attributes::default()
234 .with_compression(CompressionType::Snappy)
235 .with_timestamp_type(TimestampType::LogAppendTime)
236 .with_transactional(true)
237 .with_control(false);
238
239 assert_eq!(a.compression(), CompressionType::Snappy);
240 assert_eq!(a.timestamp_type(), TimestampType::LogAppendTime);
241 assert!(a.is_transactional());
242 assert!(!a.is_control_batch());
243 }
244
245 #[test]
246 fn replacing_compression_clears_old_bits() {
247 let a = Attributes::default().with_compression(CompressionType::Lz4);
250 let b = a.with_compression(CompressionType::Gzip);
251 assert_eq!(b.compression(), CompressionType::Gzip);
252 assert_eq!(b.0 & 0x07, 1);
253 }
254
255 fn sample_header_bytes() -> [u8; HEADER_LEN] {
258 let mut buf = [0u8; HEADER_LEN];
259 buf[0..8].copy_from_slice(&100i64.to_be_bytes()); buf[8..12].copy_from_slice(&77i32.to_be_bytes()); buf[12..16].copy_from_slice(&1i32.to_be_bytes()); buf[16] = 2; buf[17..21].copy_from_slice(&0x1234_5678u32.to_be_bytes()); buf[21..23].copy_from_slice(&0i16.to_be_bytes()); buf[23..27].copy_from_slice(&3i32.to_be_bytes()); buf[27..35].copy_from_slice(&111i64.to_be_bytes()); buf[35..43].copy_from_slice(&222i64.to_be_bytes()); buf[43..51].copy_from_slice(&(-1i64).to_be_bytes()); buf[51..53].copy_from_slice(&7i16.to_be_bytes()); buf[53..57].copy_from_slice(&(-1i32).to_be_bytes()); buf[57..61].copy_from_slice(&4i32.to_be_bytes()); buf
273 }
274
275 macro_rules! header_field {
276 ($name:ident, $field:ident, $expected:expr) => {
277 #[test]
278 fn $name() {
279 let buf = sample_header_bytes();
280 let h = RecordBatchHeader::ref_from_bytes(&buf[..]).expect("header reinterpret");
281 assert_eq!(h.$field.get(), $expected);
282 }
283 };
284 }
285
286 header_field!(reads_base_offset, base_offset, 100i64);
287 header_field!(reads_batch_length, batch_length, 77i32);
288 header_field!(reads_partition_leader_epoch, partition_leader_epoch, 1i32);
289 header_field!(reads_crc, crc, 0x1234_5678u32);
290 header_field!(reads_last_offset_delta, last_offset_delta, 3i32);
291 header_field!(reads_base_timestamp, base_timestamp, 111i64);
292 header_field!(reads_max_timestamp, max_timestamp, 222i64);
293 header_field!(reads_producer_id, producer_id, -1i64);
294 header_field!(reads_producer_epoch, producer_epoch, 7i16);
295 header_field!(reads_base_sequence, base_sequence, -1i32);
296 header_field!(reads_records_count, records_count, 4i32);
297
298 #[test]
299 fn reads_magic_directly() {
300 let buf = sample_header_bytes();
301 let h = RecordBatchHeader::ref_from_bytes(&buf[..]).unwrap();
302 assert_eq!(h.magic, 2);
303 }
304
305 #[test]
306 fn header_is_exactly_61_bytes() {
307 assert_eq!(std::mem::size_of::<RecordBatchHeader>(), HEADER_LEN);
308 }
309
310 #[test]
311 fn too_short_buffer_errors() {
312 let buf = [0u8; HEADER_LEN - 1];
313 assert!(RecordBatchHeader::ref_from_bytes(&buf[..]).is_err());
314 }
315}