1use bytes::Bytes;
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{get_varint, get_varlong};
7use crate::records::RecordsError;
8use crate::records::crc::{crc32c, crc32c_append};
9use crate::records::header::{Attributes, HEADER_LEN, RecordBatchHeader};
10
11const HEADER_TAIL_LEN: i32 = 49;
17
18pub struct RecordBatch<'a> {
19 pub(crate) header: &'a RecordBatchHeader,
20 pub(crate) body: RecordBody<'a>,
21}
22
23pub(crate) enum RecordBody<'a> {
24 Borrowed(&'a [u8]),
25 Owned(Bytes),
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Record<'a> {
30 pub attributes: i8,
31 pub timestamp_delta: i64,
32 pub offset_delta: i32,
33 pub key: Option<&'a [u8]>,
34 pub value: Option<&'a [u8]>,
35 pub headers: Vec<RecordHeader<'a>>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RecordHeader<'a> {
40 pub key: &'a str,
41 pub value: Option<&'a [u8]>,
42}
43
44impl RecordBatch<'_> {
45 #[must_use]
46 pub fn header(&self) -> &RecordBatchHeader {
47 self.header
48 }
49
50 #[must_use]
51 pub fn attributes(&self) -> Attributes {
52 Attributes(self.header.attributes.get())
53 }
54}
55
56impl<'a> Default for RecordBatch<'a> {
57 fn default() -> Self {
61 use zerocopy::FromZeros as _;
62 let header: &'a RecordBatchHeader = Box::leak(Box::new(RecordBatchHeader::new_zeroed()));
64 Self {
65 header,
66 body: RecordBody::Owned(bytes::Bytes::new()),
67 }
68 }
69}
70
71impl<'de> crate::DecodeBorrow<'de> for RecordBatch<'de> {
74 fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
75 decode_borrow_impl(buf).map_err(Into::into)
76 }
77}
78
79fn decode_borrow_impl<'de>(buf: &mut &'de [u8]) -> Result<RecordBatch<'de>, RecordsError> {
80 if buf.len() < HEADER_LEN {
81 return Err(RecordsError::HeaderTooShort {
82 needed: HEADER_LEN - buf.len(),
83 });
84 }
85 let (hdr_slice, rest) = buf.split_at(HEADER_LEN);
87 let hdr: &'de RecordBatchHeader =
88 RecordBatchHeader::ref_from_bytes(hdr_slice).map_err(|_| RecordsError::ZerocopyFailure)?;
89 if hdr.magic != 2 {
90 return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
91 }
92
93 let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
94 .and_then(|n| usize::try_from(n).ok())
95 .ok_or_else(|| RecordsError::RecordParse("negative or oversized batch_length".into()))?;
96
97 if rest.len() < body_len {
98 return Err(RecordsError::BodyTooShort {
99 needed: body_len - rest.len(),
100 });
101 }
102 let (raw_body, after) = rest.split_at(body_len);
103 *buf = after;
104
105 let expected = hdr.crc.get();
108 let mut computed = crc32c(&hdr_slice[21..HEADER_LEN]);
109 computed = crc32c_append(computed, raw_body);
110 if computed != expected {
111 return Err(RecordsError::CrcMismatch { expected, computed });
112 }
113
114 let attributes = Attributes(hdr.attributes.get());
115 let codec = attributes.compression();
116 let body = if codec == crabka_compression::CompressionType::None {
117 RecordBody::Borrowed(raw_body)
118 } else {
119 let decompressed = crabka_compression::decompress(codec, raw_body)?;
120 RecordBody::Owned(decompressed)
121 };
122
123 Ok(RecordBatch { header: hdr, body })
124}
125
126impl RecordBatch<'_> {
129 pub fn iter(&self) -> RecordIter<'_> {
136 let body: &[u8] = match &self.body {
137 RecordBody::Borrowed(s) => s,
138 RecordBody::Owned(b) => b.as_ref(),
139 };
140 #[allow(clippy::cast_sign_loss)] let count = self.header.records_count.get().max(0) as usize;
142 RecordIter {
143 remaining: body,
144 count,
145 index: 0,
146 }
147 }
148}
149
150impl<'a> IntoIterator for &'a RecordBatch<'_> {
151 type Item = Result<Record<'a>, RecordsError>;
152 type IntoIter = RecordIter<'a>;
153
154 fn into_iter(self) -> Self::IntoIter {
155 self.iter()
156 }
157}
158
159pub struct RecordIter<'a> {
160 remaining: &'a [u8],
161 count: usize,
162 index: usize,
163}
164
165impl<'a> Iterator for RecordIter<'a> {
166 type Item = Result<Record<'a>, RecordsError>;
167
168 fn next(&mut self) -> Option<Self::Item> {
169 if self.index >= self.count {
170 return None;
171 }
172 self.index += 1;
173 Some(parse_one_record(&mut self.remaining))
174 }
175}
176
177fn parse_one_record<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
178 let body_len =
179 get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
180 let body_len = usize::try_from(body_len).map_err(|_| {
181 RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
182 })?;
183 if buf.len() < body_len {
184 return Err(RecordsError::BodyTooShort {
185 needed: body_len - buf.len(),
186 });
187 }
188 let (body, rest) = buf.split_at(body_len);
189 *buf = rest;
190 let mut body_cur = body;
191 let r = parse_body(&mut body_cur)?;
192 if !body_cur.is_empty() {
193 return Err(RecordsError::RecordParse(format!(
194 "trailing bytes inside record (left={})",
195 body_cur.len()
196 )));
197 }
198 Ok(r)
199}
200
201fn parse_body<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
202 if buf.is_empty() {
203 return Err(RecordsError::RecordParse("record body empty".into()));
204 }
205 #[allow(clippy::cast_possible_wrap)] let attributes = buf[0] as i8;
207 *buf = &buf[1..];
208 let timestamp_delta =
209 get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
210 let offset_delta =
211 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
212
213 let key = read_nullable_slice(buf, "key")?;
214 let value = read_nullable_slice(buf, "value")?;
215
216 let header_count =
217 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
218 if header_count < 0 {
219 return Err(RecordsError::RecordParse(format!(
220 "negative header count {header_count}"
221 )));
222 }
223 #[allow(clippy::cast_sign_loss)] let mut headers = Vec::with_capacity(header_count as usize);
225 for i in 0..header_count {
226 let key_len = get_varint(buf)
227 .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key length: {e}")))?;
228 if key_len < 0 {
229 return Err(RecordsError::RecordParse(format!(
230 "header[{i}] negative key length"
231 )));
232 }
233 #[allow(clippy::cast_sign_loss)] let n = key_len as usize;
235 if buf.len() < n {
236 return Err(RecordsError::BodyTooShort {
237 needed: n - buf.len(),
238 });
239 }
240 let (key_bytes, rest) = buf.split_at(n);
241 *buf = rest;
242 let key_str = std::str::from_utf8(key_bytes)
243 .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key utf-8: {e}")))?;
244
245 let value = read_nullable_slice(buf, &format!("header[{i}] value"))?;
246 headers.push(RecordHeader {
247 key: key_str,
248 value,
249 });
250 }
251
252 Ok(Record {
253 attributes,
254 timestamp_delta,
255 offset_delta,
256 key,
257 value,
258 headers,
259 })
260}
261
262fn read_nullable_slice<'a>(
263 buf: &mut &'a [u8],
264 label: &str,
265) -> Result<Option<&'a [u8]>, RecordsError> {
266 let len =
267 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
268 if len < 0 {
269 Ok(None)
270 } else {
271 #[allow(clippy::cast_sign_loss)] let n = len as usize;
273 if buf.len() < n {
274 return Err(RecordsError::BodyTooShort {
275 needed: n - buf.len(),
276 });
277 }
278 let (head, rest) = buf.split_at(n);
279 *buf = rest;
280 Ok(Some(head))
281 }
282}
283
284impl RecordBatch<'_> {
287 pub fn to_owned(&self) -> Result<super::owned::RecordBatch, RecordsError> {
290 let mut records = Vec::new();
291 for r in self {
292 let r = r?;
293 records.push(super::owned::Record {
294 attributes: r.attributes,
295 timestamp_delta: r.timestamp_delta,
296 offset_delta: r.offset_delta,
297 key: r.key.map(Bytes::copy_from_slice),
298 value: r.value.map(Bytes::copy_from_slice),
299 headers: r
300 .headers
301 .into_iter()
302 .map(|h| super::owned::RecordHeader {
303 key: h.key.to_string(),
304 value: h.value.map(Bytes::copy_from_slice),
305 })
306 .collect(),
307 });
308 }
309 Ok(super::owned::RecordBatch {
310 base_offset: self.header.base_offset.get(),
311 partition_leader_epoch: self.header.partition_leader_epoch.get(),
312 attributes: self.attributes(),
313 last_offset_delta: self.header.last_offset_delta.get(),
314 base_timestamp: self.header.base_timestamp.get(),
315 max_timestamp: self.header.max_timestamp.get(),
316 producer_id: self.header.producer_id.get(),
317 producer_epoch: self.header.producer_epoch.get(),
318 base_sequence: self.header.base_sequence.get(),
319 records,
320 })
321 }
322}
323
324impl std::fmt::Debug for RecordBatch<'_> {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 match self.to_owned() {
332 Ok(o) => o.fmt(f),
333 Err(e) => write!(f, "RecordBatch(<decode error: {e}>)"),
334 }
335 }
336}
337
338impl Clone for RecordBatch<'_> {
339 fn clone(&self) -> Self {
343 RecordBatch {
344 header: self.header,
345 body: match &self.body {
346 RecordBody::Borrowed(s) => RecordBody::Borrowed(s),
347 RecordBody::Owned(b) => RecordBody::Owned(b.clone()),
348 },
349 }
350 }
351}
352
353impl PartialEq for RecordBatch<'_> {
354 fn eq(&self, other: &Self) -> bool {
355 match (self.to_owned(), other.to_owned()) {
356 (Ok(a), Ok(b)) => a == b,
357 _ => false,
358 }
359 }
360}
361
362impl Eq for RecordBatch<'_> {}
363
364impl crate::Encode for RecordBatch<'_> {
367 fn encode<B: bytes::BufMut>(
368 &self,
369 buf: &mut B,
370 version: i16,
371 ) -> Result<(), crate::ProtocolError> {
372 let owned = self.to_owned().map_err(crate::ProtocolError::from)?;
373 crate::Encode::encode(&owned, buf, version)
374 }
375
376 fn encoded_len(&self, version: i16) -> usize {
377 match self.to_owned() {
378 Ok(o) => crate::Encode::encoded_len(&o, version),
379 Err(_) => 0,
380 }
381 }
382}
383
384#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::DecodeBorrow;
390 use assert2::assert;
391 use bytes::BytesMut;
392 use crabka_compression::CompressionType;
393
394 fn encode_owned_then_borrow(b: &super::super::owned::RecordBatch) -> Vec<u8> {
395 let mut buf = BytesMut::new();
396 b.encode(&mut buf).unwrap();
397 buf.to_vec()
398 }
399
400 macro_rules! borrowed_roundtrip {
401 ($name:ident, $codec:expr) => {
402 #[test]
403 fn $name() {
404 let mut owned = super::super::owned::RecordBatch::default();
405 owned.attributes = owned.attributes.with_compression($codec);
406 owned.records.push(super::super::owned::Record {
407 key: Some(Bytes::from_static(b"key")),
408 value: Some(Bytes::from_static(b"value")),
409 ..Default::default()
410 });
411
412 let encoded = encode_owned_then_borrow(&owned);
413 let mut cur: &[u8] = &encoded[..];
414 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
415 assert!(cur.is_empty());
416 assert!(borrowed.attributes() == owned.attributes);
417
418 let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
419 assert!(records.len() == 1);
420 assert!(records[0].key == Some(b"key".as_slice()));
421 assert!(records[0].value == Some(b"value".as_slice()));
422
423 let back_owned = borrowed.to_owned().unwrap();
424 assert!(back_owned == owned);
425 }
426 };
427 }
428
429 borrowed_roundtrip!(roundtrip_none, CompressionType::None);
430 borrowed_roundtrip!(roundtrip_gzip, CompressionType::Gzip);
431 borrowed_roundtrip!(roundtrip_snappy, CompressionType::Snappy);
432 borrowed_roundtrip!(roundtrip_lz4, CompressionType::Lz4);
433 borrowed_roundtrip!(roundtrip_zstd, CompressionType::Zstd);
434
435 #[test]
436 fn zero_copy_for_uncompressed() {
437 let mut owned = super::super::owned::RecordBatch::default();
440 owned.records.push(super::super::owned::Record {
441 key: Some(Bytes::from_static(b"k")),
442 value: Some(Bytes::from_static(b"v")),
443 ..Default::default()
444 });
445 let encoded = encode_owned_then_borrow(&owned);
446 let encoded_start = encoded.as_ptr() as usize;
447 let encoded_end = encoded_start + encoded.len();
448
449 let mut cur: &[u8] = &encoded[..];
450 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
451 let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
452
453 let v_ptr = records[0].value.unwrap().as_ptr() as usize;
454 assert!(
455 v_ptr >= encoded_start && v_ptr < encoded_end,
456 "value slice does not point into the input buffer: \
457 input range [{encoded_start:#x}, {encoded_end:#x}), value ptr {v_ptr:#x}",
458 );
459 }
460
461 #[test]
462 fn borrowed_encode_via_trait_roundtrips() {
463 use crate::Encode as _;
464 let owned_in = super::super::owned::RecordBatch {
465 records: vec![super::super::owned::Record {
466 key: Some(Bytes::from_static(b"x")),
467 value: Some(Bytes::from_static(b"y")),
468 ..Default::default()
469 }],
470 ..Default::default()
471 };
472 let bytes_in = encode_owned_then_borrow(&owned_in);
473 let mut cur: &[u8] = &bytes_in[..];
474 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
475
476 let mut out = BytesMut::new();
477 borrowed.encode(&mut out, 0).unwrap();
478 assert!(&out[..] == &bytes_in[..]);
479 }
480}