1use bytes::{Buf, BufMut, Bytes};
20
21use crate::records::RecordsError;
22use crate::records::borrowed::RecordBatch as RecordBatchBorrowed;
23use crate::records::owned::RecordBatch;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum RecordsPayload {
28 V2(Vec<RecordBatch>),
30 Raw(Bytes),
33 Legacy(Bytes),
36}
37
38impl RecordsPayload {
39 pub fn from_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
42 if looks_like_v2(&bytes) {
43 let mut cur: &[u8] = &bytes;
44 let mut batches = Vec::new();
45 while !cur.is_empty() {
46 batches.push(RecordBatch::decode(&mut cur)?);
47 }
48 Ok(Self::V2(batches))
49 } else {
50 Ok(Self::Legacy(bytes))
51 }
52 }
53
54 #[must_use]
56 pub fn payload_len(&self) -> usize {
57 match self {
58 Self::V2(batches) => batches.iter().map(RecordBatch::encoded_len).sum(),
59 Self::Raw(b) | Self::Legacy(b) => b.len(),
60 }
61 }
62
63 pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
65 match self {
66 Self::V2(batches) => {
67 for b in batches {
68 b.encode(buf)?;
69 }
70 Ok(())
71 }
72 Self::Raw(b) | Self::Legacy(b) => {
73 buf.put_slice(b);
74 Ok(())
75 }
76 }
77 }
78
79 #[must_use]
82 pub fn as_v2(&self) -> Option<&[RecordBatch]> {
83 match self {
84 Self::V2(batches) => Some(batches),
85 Self::Raw(_) | Self::Legacy(_) => None,
86 }
87 }
88
89 #[must_use]
91 pub fn as_legacy(&self) -> Option<&Bytes> {
92 match self {
93 Self::Legacy(b) => Some(b),
94 Self::V2(_) | Self::Raw(_) => None,
95 }
96 }
97}
98
99impl From<RecordBatch> for RecordsPayload {
100 fn from(rb: RecordBatch) -> Self {
101 Self::V2(vec![rb])
102 }
103}
104
105impl From<Vec<RecordBatch>> for RecordsPayload {
106 fn from(v: Vec<RecordBatch>) -> Self {
107 Self::V2(v)
108 }
109}
110
111impl Default for RecordsPayload {
112 fn default() -> Self {
113 Self::V2(Vec::new())
114 }
115}
116
117impl crate::Encode for RecordsPayload {
118 fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
119 self.encode_to(buf).map_err(Into::into)
120 }
121
122 fn encoded_len(&self, _version: i16) -> usize {
123 self.payload_len()
124 }
125}
126
127impl crate::Decode<'_> for RecordsPayload {
128 fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
129 let bytes = buf.copy_to_bytes(buf.remaining());
133 Self::from_bytes(bytes).map_err(Into::into)
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
139pub enum RecordsPayloadBorrowed<'a> {
140 V2(Vec<RecordBatchBorrowed<'a>>),
141 Legacy(&'a [u8]),
142}
143
144impl<'a> RecordsPayloadBorrowed<'a> {
145 pub fn from_slice(bytes: &'a [u8]) -> Result<Self, RecordsError> {
146 if looks_like_v2(bytes) {
147 let mut cur: &'a [u8] = bytes;
148 let mut batches = Vec::new();
149 while !cur.is_empty() {
150 let rb = <RecordBatchBorrowed<'a> as crate::DecodeBorrow<'a>>::decode_borrow(
151 &mut cur, 0,
152 )
153 .map_err(|e| RecordsError::RecordParse(format!("borrowed v2 decode: {e}")))?;
154 batches.push(rb);
155 }
156 Ok(Self::V2(batches))
157 } else {
158 Ok(Self::Legacy(bytes))
159 }
160 }
161
162 #[must_use]
163 pub fn payload_len(&self) -> usize {
164 match self {
165 Self::V2(batches) => batches
166 .iter()
167 .map(|rb| crate::Encode::encoded_len(rb, 0))
168 .sum(),
169 Self::Legacy(b) => b.len(),
170 }
171 }
172
173 pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
174 match self {
175 Self::V2(batches) => {
176 for rb in batches {
177 crate::Encode::encode(rb, buf, 0).map_err(|e| {
178 RecordsError::RecordParse(format!("borrowed v2 encode: {e}"))
179 })?;
180 }
181 Ok(())
182 }
183 Self::Legacy(b) => {
184 buf.put_slice(b);
185 Ok(())
186 }
187 }
188 }
189
190 pub fn to_owned(&self) -> Result<RecordsPayload, RecordsError> {
192 match self {
193 Self::V2(batches) => {
194 let mut owned = Vec::with_capacity(batches.len());
195 for rb in batches {
196 owned.push(rb.to_owned()?);
197 }
198 Ok(RecordsPayload::V2(owned))
199 }
200 Self::Legacy(b) => Ok(RecordsPayload::Legacy(Bytes::copy_from_slice(b))),
201 }
202 }
203}
204
205impl Default for RecordsPayloadBorrowed<'_> {
206 fn default() -> Self {
207 Self::V2(Vec::new())
208 }
209}
210
211impl crate::Encode for RecordsPayloadBorrowed<'_> {
212 fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
213 self.encode_to(buf).map_err(Into::into)
214 }
215
216 fn encoded_len(&self, _version: i16) -> usize {
217 self.payload_len()
218 }
219}
220
221impl<'de> crate::DecodeBorrow<'de> for RecordsPayloadBorrowed<'de> {
222 fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
223 let bytes = std::mem::take(buf);
226 Self::from_slice(bytes).map_err(Into::into)
227 }
228}
229
230#[inline]
239fn looks_like_v2(bytes: &[u8]) -> bool {
240 const MAGIC_OFFSET: usize = 16;
245 bytes.len() > MAGIC_OFFSET && bytes[MAGIC_OFFSET] == 2
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::records::{Record, RecordBatch};
252 use bytes::BytesMut;
253
254 fn sample_v2() -> RecordBatch {
255 RecordBatch {
256 base_offset: 42,
257 records: vec![Record {
258 key: Some(Bytes::from_static(b"k")),
259 value: Some(Bytes::from_static(b"v")),
260 ..Default::default()
261 }],
262 ..RecordBatch::default()
263 }
264 }
265
266 #[test]
267 fn from_bytes_dispatches_v2() {
268 let rb = sample_v2();
269 let mut buf = BytesMut::new();
270 rb.encode(&mut buf).unwrap();
271 let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
272 match p {
273 RecordsPayload::V2(batches) => assert_eq!(batches, vec![rb]),
274 RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
275 }
276 }
277
278 #[test]
279 fn from_bytes_parses_all_batches() {
280 let mut b0 = sample_v2();
282 b0.base_offset = 0;
283 let mut b1 = sample_v2();
284 b1.base_offset = 1;
285 let mut buf = BytesMut::new();
286 b0.encode(&mut buf).unwrap();
287 b1.encode(&mut buf).unwrap();
288 let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
289 let batches = p.as_v2().expect("v2");
290 assert_eq!(batches.len(), 2);
291 assert_eq!(batches[0].base_offset, 0);
292 assert_eq!(batches[1].base_offset, 1);
293 }
294
295 #[test]
296 fn raw_passthrough_roundtrips() {
297 let mut b = sample_v2();
298 b.base_offset = 7;
299 let mut wire = BytesMut::new();
300 b.encode(&mut wire).unwrap();
301 let wire = wire.freeze();
302 let p = RecordsPayload::Raw(wire.clone());
303 assert_eq!(p.payload_len(), wire.len());
304 let mut out = BytesMut::new();
305 p.encode_to(&mut out).unwrap();
306 assert_eq!(&out[..], &wire[..]); assert!(p.as_v2().is_none()); }
309
310 #[test]
311 fn from_bytes_dispatches_legacy() {
312 let mut buf = vec![0u8; 17];
315 buf[16] = 1;
316 let p = RecordsPayload::from_bytes(Bytes::from(buf.clone())).unwrap();
317 match p {
318 RecordsPayload::Legacy(b) => assert_eq!(&b[..], &buf[..]),
319 RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
320 }
321 }
322
323 #[test]
324 fn roundtrip_v2() {
325 let p: RecordsPayload = sample_v2().into();
326 let mut buf = BytesMut::new();
327 p.encode_to(&mut buf).unwrap();
328 let back = RecordsPayload::from_bytes(buf.freeze()).unwrap();
329 assert_eq!(p, back);
330 assert_eq!(p.payload_len(), back.payload_len());
331 }
332
333 #[test]
334 fn encode_decode_via_traits() {
335 let p: RecordsPayload = sample_v2().into();
336 let mut buf = BytesMut::new();
337 <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
338 let mut cur: &[u8] = &buf;
339 let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
340 assert_eq!(p, back);
341 }
342
343 #[test]
344 fn borrowed_dispatches() {
345 let rb = sample_v2();
346 let mut buf = BytesMut::new();
347 rb.encode(&mut buf).unwrap();
348 let frozen = buf.freeze();
349 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
350 assert!(matches!(p, RecordsPayloadBorrowed::V2(_)));
351 let owned = p.to_owned().unwrap();
352 match owned {
353 RecordsPayload::V2(batches) => assert_eq!(batches[0].base_offset, 42),
354 RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
355 }
356 }
357
358 #[test]
359 fn from_record_batch() {
360 let rb = sample_v2();
361 let p: RecordsPayload = rb.clone().into();
362 assert_eq!(p.as_v2(), Some(&[rb][..]));
363 assert!(p.as_legacy().is_none());
364 }
365
366 fn legacy_bytes() -> Bytes {
367 let mut buf = vec![0u8; 24];
368 buf[16] = 1;
369 for (i, b) in (b'a'..=b'h').enumerate() {
370 buf[17 + i % 7] = b;
371 }
372 Bytes::from(buf)
373 }
374
375 #[test]
376 fn legacy_payload_len_and_encode_owned() {
377 let bytes = legacy_bytes();
378 let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
379 assert_eq!(p.payload_len(), bytes.len());
380 assert!(p.as_v2().is_none());
381 assert_eq!(p.as_legacy(), Some(&bytes));
382
383 let mut out = BytesMut::new();
384 p.encode_to(&mut out).unwrap();
385 assert_eq!(&out[..], &bytes[..]);
386 }
387
388 #[test]
389 fn legacy_roundtrip_via_traits() {
390 let bytes = legacy_bytes();
391 let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
392 let mut buf = BytesMut::new();
393 <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
394 assert_eq!(
395 <RecordsPayload as crate::Encode>::encoded_len(&p, 0),
396 bytes.len()
397 );
398 let mut cur: &[u8] = &buf;
399 let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
400 assert!(matches!(back, RecordsPayload::Legacy(_)));
401 assert_eq!(back.as_legacy().unwrap(), &bytes);
402 }
403
404 #[test]
405 fn owned_default_is_empty_v2() {
406 let p = RecordsPayload::default();
407 assert!(matches!(p, RecordsPayload::V2(ref v) if v.is_empty()));
408 }
409
410 #[test]
411 fn looks_like_v2_rejects_short_buffer() {
412 let short = Bytes::from_static(&[0u8; 10]);
414 let p = RecordsPayload::from_bytes(short.clone()).unwrap();
415 assert_eq!(p.as_legacy(), Some(&short));
416 }
417
418 #[test]
419 fn borrowed_legacy_roundtrip() {
420 let bytes = legacy_bytes();
421 let p = RecordsPayloadBorrowed::from_slice(&bytes).unwrap();
422 assert!(matches!(p, RecordsPayloadBorrowed::Legacy(_)));
423 assert_eq!(p.payload_len(), bytes.len());
424
425 let mut out = BytesMut::new();
426 p.encode_to(&mut out).unwrap();
427 assert_eq!(&out[..], &bytes[..]);
428
429 let owned = p.to_owned().unwrap();
430 match owned {
431 RecordsPayload::Legacy(b) => assert_eq!(&b[..], &bytes[..]),
432 RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
433 }
434 }
435
436 #[test]
437 fn borrowed_v2_payload_len_and_encode() {
438 let rb = sample_v2();
439 let mut buf = BytesMut::new();
440 rb.encode(&mut buf).unwrap();
441 let frozen = buf.freeze();
442 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
443 assert_eq!(p.payload_len(), frozen.len());
444
445 let mut out = BytesMut::new();
446 p.encode_to(&mut out).unwrap();
447 assert_eq!(&out[..], &frozen[..]);
448 }
449
450 #[test]
451 fn borrowed_encode_decode_via_traits() {
452 let rb = sample_v2();
453 let mut buf = BytesMut::new();
454 rb.encode(&mut buf).unwrap();
455 let frozen = buf.freeze();
456
457 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
458 let mut out = BytesMut::new();
459 <RecordsPayloadBorrowed as crate::Encode>::encode(&p, &mut out, 0).unwrap();
460 assert_eq!(
461 <RecordsPayloadBorrowed as crate::Encode>::encoded_len(&p, 0),
462 frozen.len()
463 );
464
465 let mut cur: &[u8] = &out;
466 let back =
467 <RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut cur, 0).unwrap();
468 assert!(matches!(back, RecordsPayloadBorrowed::V2(_)));
469 }
470
471 #[test]
472 fn borrowed_default_is_empty_v2() {
473 let p = RecordsPayloadBorrowed::default();
474 assert!(matches!(p, RecordsPayloadBorrowed::V2(ref v) if v.is_empty()));
475 }
476}