use bytes::{Buf, BufMut, Bytes};
use crate::records::RecordsError;
use crate::records::borrowed::RecordBatch as RecordBatchBorrowed;
use crate::records::owned::RecordBatch;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RecordsPayload {
V2(Vec<RecordBatch>),
Raw(Bytes),
Legacy(Bytes),
}
impl RecordsPayload {
pub fn from_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
if looks_like_v2(&bytes) {
let mut cur: &[u8] = &bytes;
let mut batches = Vec::new();
while !cur.is_empty() {
batches.push(RecordBatch::decode(&mut cur)?);
}
Ok(Self::V2(batches))
} else {
Ok(Self::Legacy(bytes))
}
}
#[must_use]
pub fn payload_len(&self) -> usize {
match self {
Self::V2(batches) => batches.iter().map(RecordBatch::encoded_len).sum(),
Self::Raw(b) | Self::Legacy(b) => b.len(),
}
}
pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
match self {
Self::V2(batches) => {
for b in batches {
b.encode(buf)?;
}
Ok(())
}
Self::Raw(b) | Self::Legacy(b) => {
buf.put_slice(b);
Ok(())
}
}
}
#[must_use]
pub fn as_v2(&self) -> Option<&[RecordBatch]> {
match self {
Self::V2(batches) => Some(batches),
Self::Raw(_) | Self::Legacy(_) => None,
}
}
#[must_use]
pub fn as_legacy(&self) -> Option<&Bytes> {
match self {
Self::Legacy(b) => Some(b),
Self::V2(_) | Self::Raw(_) => None,
}
}
pub fn from_fetch_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
if !looks_like_v2(&bytes) {
return Ok(Self::Legacy(bytes));
}
let mut cur: &[u8] = &bytes;
let mut batches = Vec::new();
while !cur.is_empty() {
match RecordBatch::decode(&mut cur) {
Ok(rb) => batches.push(rb),
Err(RecordsError::HeaderTooShort { .. } | RecordsError::BodyTooShort { .. }) => {
break;
}
Err(e) => return Err(e),
}
}
Ok(Self::V2(batches))
}
pub fn decode_lenient<B: Buf>(
buf: &mut B,
_version: i16,
) -> Result<Self, crate::ProtocolError> {
let bytes = buf.copy_to_bytes(buf.remaining());
Self::from_fetch_bytes(bytes).map_err(Into::into)
}
}
impl From<RecordBatch> for RecordsPayload {
fn from(rb: RecordBatch) -> Self {
Self::V2(vec![rb])
}
}
impl From<Vec<RecordBatch>> for RecordsPayload {
fn from(v: Vec<RecordBatch>) -> Self {
Self::V2(v)
}
}
impl Default for RecordsPayload {
fn default() -> Self {
Self::V2(Vec::new())
}
}
impl crate::Encode for RecordsPayload {
fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
self.encode_to(buf).map_err(Into::into)
}
fn encoded_len(&self, _version: i16) -> usize {
self.payload_len()
}
}
impl crate::Decode<'_> for RecordsPayload {
fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
let bytes = buf.copy_to_bytes(buf.remaining());
Self::from_bytes(bytes).map_err(Into::into)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RecordsPayloadBorrowed<'a> {
V2(Vec<RecordBatchBorrowed<'a>>),
Legacy(&'a [u8]),
}
impl<'a> RecordsPayloadBorrowed<'a> {
pub fn from_slice(bytes: &'a [u8]) -> Result<Self, RecordsError> {
if looks_like_v2(bytes) {
let mut cur: &'a [u8] = bytes;
let mut batches = Vec::new();
while !cur.is_empty() {
let rb = <RecordBatchBorrowed<'a> as crate::DecodeBorrow<'a>>::decode_borrow(
&mut cur, 0,
)
.map_err(|e| RecordsError::RecordParse(format!("borrowed v2 decode: {e}")))?;
batches.push(rb);
}
Ok(Self::V2(batches))
} else {
Ok(Self::Legacy(bytes))
}
}
#[must_use]
pub fn payload_len(&self) -> usize {
match self {
Self::V2(batches) => batches
.iter()
.map(|rb| crate::Encode::encoded_len(rb, 0))
.sum(),
Self::Legacy(b) => b.len(),
}
}
pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
match self {
Self::V2(batches) => {
for rb in batches {
crate::Encode::encode(rb, buf, 0).map_err(|e| {
RecordsError::RecordParse(format!("borrowed v2 encode: {e}"))
})?;
}
Ok(())
}
Self::Legacy(b) => {
buf.put_slice(b);
Ok(())
}
}
}
pub fn to_owned(&self) -> Result<RecordsPayload, RecordsError> {
match self {
Self::V2(batches) => {
let mut owned = Vec::with_capacity(batches.len());
for rb in batches {
owned.push(rb.to_owned()?);
}
Ok(RecordsPayload::V2(owned))
}
Self::Legacy(b) => Ok(RecordsPayload::Legacy(Bytes::copy_from_slice(b))),
}
}
}
impl Default for RecordsPayloadBorrowed<'_> {
fn default() -> Self {
Self::V2(Vec::new())
}
}
impl crate::Encode for RecordsPayloadBorrowed<'_> {
fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
self.encode_to(buf).map_err(Into::into)
}
fn encoded_len(&self, _version: i16) -> usize {
self.payload_len()
}
}
impl<'de> crate::DecodeBorrow<'de> for RecordsPayloadBorrowed<'de> {
fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
let bytes = std::mem::take(buf);
Self::from_slice(bytes).map_err(Into::into)
}
}
#[inline]
fn looks_like_v2(bytes: &[u8]) -> bool {
const MAGIC_OFFSET: usize = 16;
bytes.len() > MAGIC_OFFSET && bytes[MAGIC_OFFSET] == 2
}
#[cfg(test)]
mod tests {
use super::*;
use crate::records::{Record, RecordBatch};
use assert2::assert;
use bytes::BytesMut;
fn sample_v2() -> RecordBatch {
RecordBatch {
base_offset: 42,
records: vec![Record {
key: Some(Bytes::from_static(b"k")),
value: Some(Bytes::from_static(b"v")),
..Default::default()
}],
..RecordBatch::default()
}
}
#[test]
fn from_bytes_dispatches_v2() {
let rb = sample_v2();
let mut buf = BytesMut::new();
rb.encode(&mut buf).unwrap();
let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
match p {
RecordsPayload::V2(batches) => assert!(batches == vec![rb]),
RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
}
}
#[test]
fn from_bytes_parses_all_batches() {
let mut b0 = sample_v2();
b0.base_offset = 0;
let mut b1 = sample_v2();
b1.base_offset = 1;
let mut buf = BytesMut::new();
b0.encode(&mut buf).unwrap();
b1.encode(&mut buf).unwrap();
let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
let batches = p.as_v2().expect("v2");
assert!(batches.len() == 2);
assert!(batches[0].base_offset == 0);
assert!(batches[1].base_offset == 1);
}
#[test]
fn raw_passthrough_roundtrips() {
let mut b = sample_v2();
b.base_offset = 7;
let mut wire = BytesMut::new();
b.encode(&mut wire).unwrap();
let wire = wire.freeze();
let p = RecordsPayload::Raw(wire.clone());
assert!(p.payload_len() == wire.len());
let mut out = BytesMut::new();
p.encode_to(&mut out).unwrap();
assert!(&out[..] == &wire[..]); assert!(p.as_v2().is_none()); }
#[test]
fn from_bytes_dispatches_legacy() {
let mut buf = vec![0u8; 17];
buf[16] = 1;
let p = RecordsPayload::from_bytes(Bytes::from(buf.clone())).unwrap();
match p {
RecordsPayload::Legacy(b) => assert!(&b[..] == &buf[..]),
RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
}
}
#[test]
fn roundtrip_v2() {
let p: RecordsPayload = sample_v2().into();
let mut buf = BytesMut::new();
p.encode_to(&mut buf).unwrap();
let back = RecordsPayload::from_bytes(buf.freeze()).unwrap();
assert!(p == back);
assert!(p.payload_len() == back.payload_len());
}
#[test]
fn encode_decode_via_traits() {
let p: RecordsPayload = sample_v2().into();
let mut buf = BytesMut::new();
<RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
let mut cur: &[u8] = &buf;
let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
assert!(p == back);
}
#[test]
fn borrowed_dispatches() {
let rb = sample_v2();
let mut buf = BytesMut::new();
rb.encode(&mut buf).unwrap();
let frozen = buf.freeze();
let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
assert!(matches!(p, RecordsPayloadBorrowed::V2(_)));
let owned = p.to_owned().unwrap();
match owned {
RecordsPayload::V2(batches) => assert!(batches[0].base_offset == 42),
RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
}
}
#[test]
fn from_record_batch() {
let rb = sample_v2();
let p: RecordsPayload = rb.clone().into();
assert!(p.as_v2() == Some(&[rb][..]));
assert!(p.as_legacy().is_none());
}
fn legacy_bytes() -> Bytes {
let mut buf = vec![0u8; 24];
buf[16] = 1;
for (i, b) in (b'a'..=b'h').enumerate() {
buf[17 + i % 7] = b;
}
Bytes::from(buf)
}
#[test]
fn legacy_payload_len_and_encode_owned() {
let bytes = legacy_bytes();
let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
assert!(p.payload_len() == bytes.len());
assert!(p.as_v2().is_none());
assert!(p.as_legacy() == Some(&bytes));
let mut out = BytesMut::new();
p.encode_to(&mut out).unwrap();
assert!(&out[..] == &bytes[..]);
}
#[test]
fn legacy_roundtrip_via_traits() {
let bytes = legacy_bytes();
let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
let mut buf = BytesMut::new();
<RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
assert!(<RecordsPayload as crate::Encode>::encoded_len(&p, 0) == bytes.len());
let mut cur: &[u8] = &buf;
let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
assert!(matches!(back, RecordsPayload::Legacy(_)));
assert!(back.as_legacy().unwrap() == &bytes);
}
#[test]
fn owned_default_is_empty_v2() {
let p = RecordsPayload::default();
assert!(matches!(p, RecordsPayload::V2(ref v) if v.is_empty()));
}
#[test]
fn looks_like_v2_rejects_short_buffer() {
let short = Bytes::from_static(&[0u8; 10]);
let p = RecordsPayload::from_bytes(short.clone()).unwrap();
assert!(p.as_legacy() == Some(&short));
}
#[test]
fn borrowed_legacy_roundtrip() {
let bytes = legacy_bytes();
let p = RecordsPayloadBorrowed::from_slice(&bytes).unwrap();
assert!(matches!(p, RecordsPayloadBorrowed::Legacy(_)));
assert!(p.payload_len() == bytes.len());
let mut out = BytesMut::new();
p.encode_to(&mut out).unwrap();
assert!(&out[..] == &bytes[..]);
let owned = p.to_owned().unwrap();
match owned {
RecordsPayload::Legacy(b) => assert!(&b[..] == &bytes[..]),
RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
}
}
#[test]
fn borrowed_v2_payload_len_and_encode() {
let rb = sample_v2();
let mut buf = BytesMut::new();
rb.encode(&mut buf).unwrap();
let frozen = buf.freeze();
let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
assert!(p.payload_len() == frozen.len());
let mut out = BytesMut::new();
p.encode_to(&mut out).unwrap();
assert!(&out[..] == &frozen[..]);
}
#[test]
fn borrowed_encode_decode_via_traits() {
let rb = sample_v2();
let mut buf = BytesMut::new();
rb.encode(&mut buf).unwrap();
let frozen = buf.freeze();
let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
let mut out = BytesMut::new();
<RecordsPayloadBorrowed as crate::Encode>::encode(&p, &mut out, 0).unwrap();
assert!(<RecordsPayloadBorrowed as crate::Encode>::encoded_len(&p, 0) == frozen.len());
let mut cur: &[u8] = &out;
let back =
<RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut cur, 0).unwrap();
assert!(matches!(back, RecordsPayloadBorrowed::V2(_)));
}
#[test]
fn borrowed_default_is_empty_v2() {
let p = RecordsPayloadBorrowed::default();
assert!(matches!(p, RecordsPayloadBorrowed::V2(ref v) if v.is_empty()));
}
#[test]
fn from_fetch_bytes_drops_incomplete_trailing_batch() {
let mut b0 = sample_v2();
b0.base_offset = 0;
let mut b1 = sample_v2();
b1.base_offset = 1;
let mut buf = BytesMut::new();
b0.encode(&mut buf).unwrap();
b1.encode(&mut buf).unwrap();
buf.extend_from_slice(&[0u8; 7]); let p = RecordsPayload::from_fetch_bytes(buf.freeze()).unwrap();
let batches = p.as_v2().expect("v2");
assert!(batches.len() == 2);
assert!(batches[0].base_offset == 0);
assert!(batches[1].base_offset == 1);
}
#[test]
fn from_fetch_bytes_still_errors_on_corrupt_batch() {
let rb = sample_v2();
let mut buf = BytesMut::new();
rb.encode(&mut buf).unwrap();
let mut bytes = buf.to_vec();
bytes[61] ^= 0xFF;
let err = RecordsPayload::from_fetch_bytes(Bytes::from(bytes)).unwrap_err();
assert!(matches!(err, RecordsError::CrcMismatch { .. }));
}
#[test]
fn from_fetch_bytes_legacy_passes_through() {
let bytes = legacy_bytes();
let p = RecordsPayload::from_fetch_bytes(bytes.clone()).unwrap();
assert!(p.as_legacy() == Some(&bytes));
}
#[test]
fn from_fetch_bytes_empty_is_empty_v2() {
let p = RecordsPayload::from_fetch_bytes(Bytes::new()).unwrap();
assert!(matches!(p, RecordsPayload::Legacy(ref b) if b.is_empty()));
}
}