1use bytes::Bytes;
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{get_varint, get_varlong};
7use crate::records::RecordsError;
8use crate::records::crc::{crc32c, crc32c_append};
9use crate::records::header::{Attributes, HEADER_LEN, RecordBatchHeader};
10
11const HEADER_TAIL_LEN: i32 = 49;
17
18pub struct RecordBatch<'a> {
19 pub(crate) header: &'a RecordBatchHeader,
20 pub(crate) body: RecordBody<'a>,
21}
22
23pub(crate) enum RecordBody<'a> {
24 Borrowed(&'a [u8]),
25 Owned(Bytes),
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Record<'a> {
30 pub attributes: i8,
31 pub timestamp_delta: i64,
32 pub offset_delta: i32,
33 pub key: Option<&'a [u8]>,
34 pub value: Option<&'a [u8]>,
35 pub headers: Vec<RecordHeader<'a>>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RecordHeader<'a> {
40 pub key: &'a str,
41 pub value: Option<&'a [u8]>,
42}
43
44impl RecordBatch<'_> {
45 #[must_use]
46 pub fn header(&self) -> &RecordBatchHeader {
47 self.header
48 }
49
50 #[must_use]
51 pub fn attributes(&self) -> Attributes {
52 Attributes(self.header.attributes.get())
53 }
54}
55
56impl<'a> Default for RecordBatch<'a> {
57 fn default() -> Self {
61 use zerocopy::FromZeros as _;
62 let header: &'a RecordBatchHeader = Box::leak(Box::new(RecordBatchHeader::new_zeroed()));
64 Self {
65 header,
66 body: RecordBody::Owned(bytes::Bytes::new()),
67 }
68 }
69}
70
71impl<'de> crate::DecodeBorrow<'de> for RecordBatch<'de> {
74 fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
75 decode_borrow_impl(buf).map_err(Into::into)
76 }
77}
78
79fn decode_borrow_impl<'de>(buf: &mut &'de [u8]) -> Result<RecordBatch<'de>, RecordsError> {
80 if buf.len() < HEADER_LEN {
81 return Err(RecordsError::HeaderTooShort {
82 needed: HEADER_LEN - buf.len(),
83 });
84 }
85 let (hdr_slice, rest) = buf.split_at(HEADER_LEN);
87 let hdr: &'de RecordBatchHeader =
88 RecordBatchHeader::ref_from_bytes(hdr_slice).map_err(|_| RecordsError::ZerocopyFailure)?;
89 if hdr.magic != 2 {
90 return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
91 }
92
93 let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
94 .and_then(|n| usize::try_from(n).ok())
95 .ok_or_else(|| RecordsError::RecordParse("negative or oversized batch_length".into()))?;
96
97 if rest.len() < body_len {
98 return Err(RecordsError::BodyTooShort {
99 needed: body_len - rest.len(),
100 });
101 }
102 let (raw_body, after) = rest.split_at(body_len);
103 *buf = after;
104
105 let expected = hdr.crc.get();
108 let mut computed = crc32c(&hdr_slice[21..HEADER_LEN]);
109 computed = crc32c_append(computed, raw_body);
110 if computed != expected {
111 return Err(RecordsError::CrcMismatch { expected, computed });
112 }
113
114 let attributes = Attributes(hdr.attributes.get());
115 let codec = attributes.compression();
116 let body = if codec == crabka_compression::CompressionType::None {
117 RecordBody::Borrowed(raw_body)
118 } else {
119 const DECOMPRESS_MIN_CAP: usize = 16 * 1024 * 1024; const DECOMPRESS_MAX_RATIO: usize = 100; const DECOMPRESS_ABSOLUTE_CEILING: usize = 1024 * 1024 * 1024; let max_output = raw_body
126 .len()
127 .saturating_mul(DECOMPRESS_MAX_RATIO)
128 .clamp(DECOMPRESS_MIN_CAP, DECOMPRESS_ABSOLUTE_CEILING);
129 let decompressed = crabka_compression::decompress(codec, raw_body, max_output)?;
130 RecordBody::Owned(decompressed)
131 };
132
133 Ok(RecordBatch { header: hdr, body })
134}
135
136impl RecordBatch<'_> {
139 pub fn iter(&self) -> RecordIter<'_> {
146 let body: &[u8] = match &self.body {
147 RecordBody::Borrowed(s) => s,
148 RecordBody::Owned(b) => b.as_ref(),
149 };
150 #[allow(clippy::cast_sign_loss)] let count = self.header.records_count.get().max(0) as usize;
152 RecordIter {
153 remaining: body,
154 count,
155 index: 0,
156 }
157 }
158}
159
160impl<'a> IntoIterator for &'a RecordBatch<'_> {
161 type Item = Result<Record<'a>, RecordsError>;
162 type IntoIter = RecordIter<'a>;
163
164 fn into_iter(self) -> Self::IntoIter {
165 self.iter()
166 }
167}
168
169pub struct RecordIter<'a> {
170 remaining: &'a [u8],
171 count: usize,
172 index: usize,
173}
174
175impl<'a> Iterator for RecordIter<'a> {
176 type Item = Result<Record<'a>, RecordsError>;
177
178 fn next(&mut self) -> Option<Self::Item> {
179 if self.index >= self.count {
180 return None;
181 }
182 self.index += 1;
183 Some(parse_one_record(&mut self.remaining))
184 }
185}
186
187fn parse_one_record<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
188 let body_len =
189 get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
190 let body_len = usize::try_from(body_len).map_err(|_| {
191 RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
192 })?;
193 if buf.len() < body_len {
194 return Err(RecordsError::BodyTooShort {
195 needed: body_len - buf.len(),
196 });
197 }
198 let (body, rest) = buf.split_at(body_len);
199 *buf = rest;
200 let mut body_cur = body;
201 let r = parse_body(&mut body_cur)?;
202 if !body_cur.is_empty() {
203 return Err(RecordsError::RecordParse(format!(
204 "trailing bytes inside record (left={})",
205 body_cur.len()
206 )));
207 }
208 Ok(r)
209}
210
211fn parse_body<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
212 if buf.is_empty() {
213 return Err(RecordsError::RecordParse("record body empty".into()));
214 }
215 #[allow(clippy::cast_possible_wrap)] let attributes = buf[0] as i8;
217 *buf = &buf[1..];
218 let timestamp_delta =
219 get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
220 let offset_delta =
221 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
222
223 let key = read_nullable_slice(buf, "key")?;
224 let value = read_nullable_slice(buf, "value")?;
225
226 let header_count =
227 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
228 if header_count < 0 {
229 return Err(RecordsError::RecordParse(format!(
230 "negative header count {header_count}"
231 )));
232 }
233 #[allow(clippy::cast_sign_loss)] let mut headers = Vec::with_capacity((header_count as usize).min(buf.len()));
238 for i in 0..header_count {
239 let key_len = get_varint(buf)
240 .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key length: {e}")))?;
241 if key_len < 0 {
242 return Err(RecordsError::RecordParse(format!(
243 "header[{i}] negative key length"
244 )));
245 }
246 #[allow(clippy::cast_sign_loss)] let n = key_len as usize;
248 if buf.len() < n {
249 return Err(RecordsError::BodyTooShort {
250 needed: n - buf.len(),
251 });
252 }
253 let (key_bytes, rest) = buf.split_at(n);
254 *buf = rest;
255 let key_str = std::str::from_utf8(key_bytes)
256 .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key utf-8: {e}")))?;
257
258 let value = read_nullable_slice(buf, &format!("header[{i}] value"))?;
259 headers.push(RecordHeader {
260 key: key_str,
261 value,
262 });
263 }
264
265 Ok(Record {
266 attributes,
267 timestamp_delta,
268 offset_delta,
269 key,
270 value,
271 headers,
272 })
273}
274
275fn read_nullable_slice<'a>(
276 buf: &mut &'a [u8],
277 label: &str,
278) -> Result<Option<&'a [u8]>, RecordsError> {
279 let len =
280 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
281 if len < 0 {
282 Ok(None)
283 } else {
284 #[allow(clippy::cast_sign_loss)] let n = len as usize;
286 if buf.len() < n {
287 return Err(RecordsError::BodyTooShort {
288 needed: n - buf.len(),
289 });
290 }
291 let (head, rest) = buf.split_at(n);
292 *buf = rest;
293 Ok(Some(head))
294 }
295}
296
297impl RecordBatch<'_> {
300 pub fn to_owned(&self) -> Result<super::owned::RecordBatch, RecordsError> {
303 let mut records = Vec::new();
304 for r in self {
305 let r = r?;
306 records.push(super::owned::Record {
307 attributes: r.attributes,
308 timestamp_delta: r.timestamp_delta,
309 offset_delta: r.offset_delta,
310 key: r.key.map(Bytes::copy_from_slice),
311 value: r.value.map(Bytes::copy_from_slice),
312 headers: r
313 .headers
314 .into_iter()
315 .map(|h| super::owned::RecordHeader {
316 key: h.key.to_string(),
317 value: h.value.map(Bytes::copy_from_slice),
318 })
319 .collect(),
320 });
321 }
322 Ok(super::owned::RecordBatch {
323 base_offset: self.header.base_offset.get(),
324 partition_leader_epoch: self.header.partition_leader_epoch.get(),
325 attributes: self.attributes(),
326 last_offset_delta: self.header.last_offset_delta.get(),
327 base_timestamp: self.header.base_timestamp.get(),
328 max_timestamp: self.header.max_timestamp.get(),
329 producer_id: self.header.producer_id.get(),
330 producer_epoch: self.header.producer_epoch.get(),
331 base_sequence: self.header.base_sequence.get(),
332 records,
333 })
334 }
335}
336
337impl std::fmt::Debug for RecordBatch<'_> {
343 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344 match self.to_owned() {
345 Ok(o) => o.fmt(f),
346 Err(e) => write!(f, "RecordBatch(<decode error: {e}>)"),
347 }
348 }
349}
350
351impl Clone for RecordBatch<'_> {
352 fn clone(&self) -> Self {
356 RecordBatch {
357 header: self.header,
358 body: match &self.body {
359 RecordBody::Borrowed(s) => RecordBody::Borrowed(s),
360 RecordBody::Owned(b) => RecordBody::Owned(b.clone()),
361 },
362 }
363 }
364}
365
366impl PartialEq for RecordBatch<'_> {
367 fn eq(&self, other: &Self) -> bool {
368 match (self.to_owned(), other.to_owned()) {
369 (Ok(a), Ok(b)) => a == b,
370 _ => false,
371 }
372 }
373}
374
375impl Eq for RecordBatch<'_> {}
376
377impl crate::Encode for RecordBatch<'_> {
380 fn encode<B: bytes::BufMut>(
381 &self,
382 buf: &mut B,
383 version: i16,
384 ) -> Result<(), crate::ProtocolError> {
385 let owned = self.to_owned().map_err(crate::ProtocolError::from)?;
386 crate::Encode::encode(&owned, buf, version)
387 }
388
389 fn encoded_len(&self, version: i16) -> usize {
390 match self.to_owned() {
391 Ok(o) => crate::Encode::encoded_len(&o, version),
392 Err(_) => 0,
393 }
394 }
395}
396
397#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::DecodeBorrow;
403 use assert2::assert;
404 use bytes::BytesMut;
405 use crabka_compression::CompressionType;
406
407 fn encode_owned_then_borrow(b: &super::super::owned::RecordBatch) -> Vec<u8> {
408 let mut buf = BytesMut::new();
409 b.encode(&mut buf).unwrap();
410 buf.to_vec()
411 }
412
413 macro_rules! borrowed_roundtrip {
414 ($name:ident, $codec:expr) => {
415 #[test]
416 fn $name() {
417 let mut owned = super::super::owned::RecordBatch::default();
418 owned.attributes = owned.attributes.with_compression($codec);
419 owned.records.push(super::super::owned::Record {
420 key: Some(Bytes::from_static(b"key")),
421 value: Some(Bytes::from_static(b"value")),
422 ..Default::default()
423 });
424
425 let encoded = encode_owned_then_borrow(&owned);
426 let mut cur: &[u8] = &encoded[..];
427 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
428 assert!(cur.is_empty());
429 assert!(borrowed.attributes() == owned.attributes);
430
431 let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
432 assert!(records.len() == 1);
433 assert!(records[0].key == Some(b"key".as_slice()));
434 assert!(records[0].value == Some(b"value".as_slice()));
435
436 let back_owned = borrowed.to_owned().unwrap();
437 assert!(back_owned == owned);
438 }
439 };
440 }
441
442 borrowed_roundtrip!(roundtrip_none, CompressionType::None);
443 borrowed_roundtrip!(roundtrip_gzip, CompressionType::Gzip);
444 borrowed_roundtrip!(roundtrip_snappy, CompressionType::Snappy);
445 borrowed_roundtrip!(roundtrip_lz4, CompressionType::Lz4);
446 borrowed_roundtrip!(roundtrip_zstd, CompressionType::Zstd);
447
448 #[test]
449 fn zero_copy_for_uncompressed() {
450 let mut owned = super::super::owned::RecordBatch::default();
453 owned.records.push(super::super::owned::Record {
454 key: Some(Bytes::from_static(b"k")),
455 value: Some(Bytes::from_static(b"v")),
456 ..Default::default()
457 });
458 let encoded = encode_owned_then_borrow(&owned);
459 let encoded_start = encoded.as_ptr() as usize;
460 let encoded_end = encoded_start + encoded.len();
461
462 let mut cur: &[u8] = &encoded[..];
463 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
464 let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
465
466 let v_ptr = records[0].value.unwrap().as_ptr() as usize;
467 assert!(
468 v_ptr >= encoded_start && v_ptr < encoded_end,
469 "value slice does not point into the input buffer: \
470 input range [{encoded_start:#x}, {encoded_end:#x}), value ptr {v_ptr:#x}",
471 );
472 }
473
474 #[test]
475 fn borrowed_encode_via_trait_roundtrips() {
476 use crate::Encode as _;
477 let owned_in = super::super::owned::RecordBatch {
478 records: vec![super::super::owned::Record {
479 key: Some(Bytes::from_static(b"x")),
480 value: Some(Bytes::from_static(b"y")),
481 ..Default::default()
482 }],
483 ..Default::default()
484 };
485 let bytes_in = encode_owned_then_borrow(&owned_in);
486 let mut cur: &[u8] = &bytes_in[..];
487 let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
488
489 let mut out = BytesMut::new();
490 borrowed.encode(&mut out, 0).unwrap();
491 assert!(&out[..] == &bytes_in[..]);
492 }
493}