use bytes::{Bytes, BytesMut};
use crabka_compression::CompressionType;
use crabka_protocol::records::RecordBatch;
use crabka_protocol::records::RecordsPayload;
use crabka_records_legacy::{Magic, v2_to_legacy};
use crate::codes;
pub(crate) fn down_convert_for_fetch(
batch: &RecordBatch,
request_version: i16,
) -> Result<Option<RecordsPayload>, i16> {
if request_version >= 4 {
return Ok(Some(RecordsPayload::V2(vec![batch.clone()])));
}
if batch.attributes.is_control_batch() {
return Ok(None);
}
let working = if batch.attributes.compression() == CompressionType::Zstd {
let mut clone = batch.clone();
clone.attributes = clone.attributes.with_compression(CompressionType::Snappy);
clone
} else {
batch.clone()
};
let target = if request_version >= 2 {
Magic::V1
} else {
Magic::V0
};
let bytes = v2_to_legacy(&working, target).map_err(|e| {
tracing::warn!(error = %e, "v2_to_legacy failed during fetch down-conversion");
codes::CORRUPT_MESSAGE
})?;
Ok(Some(RecordsPayload::Legacy(bytes)))
}
pub(crate) fn down_convert_payload_for_fetch(
payload: &RecordsPayload,
request_version: i16,
) -> Result<Option<RecordsPayload>, i16> {
let batches: Vec<RecordBatch> = match payload {
RecordsPayload::V2(b) => b.clone(),
RecordsPayload::Raw(bytes) => match RecordsPayload::from_bytes(bytes.clone()) {
Ok(RecordsPayload::V2(b)) => b,
_ => return Err(crate::codes::CORRUPT_MESSAGE),
},
RecordsPayload::Legacy(_) => return Ok(Some(payload.clone())),
};
let mut out = BytesMut::new();
for batch in &batches {
match down_convert_for_fetch(batch, request_version)? {
Some(RecordsPayload::Legacy(b)) => out.extend_from_slice(&b),
Some(RecordsPayload::V2(_) | RecordsPayload::Raw(_)) => {
return Err(crate::codes::CORRUPT_MESSAGE);
}
None => {}
}
}
if out.is_empty() {
Ok(None)
} else {
Ok(Some(RecordsPayload::Legacy(Bytes::from(out))))
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use bytes::Bytes;
use crabka_protocol::records::{Attributes, Record, RecordBatch};
fn make_batch(codec: CompressionType, records: Vec<Record>) -> RecordBatch {
RecordBatch {
base_offset: 0,
partition_leader_epoch: -1,
attributes: Attributes::default().with_compression(codec),
last_offset_delta: i32::try_from(records.len().saturating_sub(1)).unwrap_or(i32::MAX),
base_timestamp: 1_700_000_000,
max_timestamp: 1_700_000_000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records,
}
}
fn sample_record(key: &'static str, value: &'static str) -> Record {
Record {
attributes: 0,
offset_delta: 0,
timestamp_delta: 0,
key: Some(Bytes::from_static(key.as_bytes())),
value: Some(Bytes::from_static(value.as_bytes())),
headers: vec![],
}
}
#[test]
fn version_gte_4_returns_v2_unchanged() {
let batch = make_batch(CompressionType::None, vec![sample_record("k", "v")]);
let result = down_convert_for_fetch(&batch, 5).unwrap();
let payload = result.expect("should have Some payload");
match payload {
RecordsPayload::V2(v) => assert!(v == vec![batch]),
RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => {
panic!("expected V2 for version >= 4")
}
}
}
#[test]
fn control_batch_returns_none() {
let mut batch = make_batch(CompressionType::None, vec![sample_record("k", "v")]);
batch.attributes = batch.attributes.with_control(true);
let result = down_convert_for_fetch(&batch, 3).unwrap();
assert!(
result.is_none(),
"control batch must be dropped on legacy path"
);
}
#[test]
fn zstd_batch_recompressed_as_snappy() {
let records: Vec<Record> = (0..50)
.map(|i| Record {
attributes: 0,
offset_delta: i,
timestamp_delta: i64::from(i) * 1000,
key: Some(Bytes::from(format!("key-{i}"))),
value: Some(Bytes::from(format!("val-{i} hello world this is a test"))),
headers: vec![],
})
.collect();
let batch = make_batch(CompressionType::Zstd, records);
let result = down_convert_for_fetch(&batch, 3).unwrap();
let payload = result.expect("zstd batch should produce Some payload");
let bytes = match payload {
RecordsPayload::Legacy(b) => b,
RecordsPayload::V2(_) | RecordsPayload::Raw(_) => {
panic!("expected Legacy for version < 4")
}
};
assert!(
bytes.len() > 17,
"expected non-empty legacy bytes, got len={}",
bytes.len()
);
let attrs = bytes[17] & 0x07;
assert!(
attrs == 2,
"expected snappy codec (2) in wrapper message attributes, got {attrs}"
);
}
#[test]
fn uncompressed_batch_decodes_correctly() {
use crabka_records_legacy::decode_message_set;
let batch = make_batch(CompressionType::None, vec![sample_record("hello", "world")]);
let result = down_convert_for_fetch(&batch, 3).unwrap();
let payload = result.expect("should have Some payload");
let bytes = match payload {
RecordsPayload::Legacy(b) => b,
RecordsPayload::V2(_) | RecordsPayload::Raw(_) => panic!("expected Legacy"),
};
let mut cur: &[u8] = &bytes;
let recs = decode_message_set(&mut cur, bytes.len()).unwrap();
assert!(recs.len() == 1);
assert!(recs[0].key.as_deref() == Some(b"hello".as_ref()));
assert!(recs[0].value.as_deref() == Some(b"world".as_ref()));
}
#[test]
fn version_0_uses_magic_v0() {
use crabka_records_legacy::decode_message_set;
let mut batch = make_batch(CompressionType::None, vec![sample_record("k", "v")]);
batch.base_timestamp = 1_700_000_000;
batch.records[0].timestamp_delta = 500;
let result = down_convert_for_fetch(&batch, 0).unwrap();
let payload = result.expect("should have Some payload");
let bytes = match payload {
RecordsPayload::Legacy(b) => b,
RecordsPayload::V2(_) | RecordsPayload::Raw(_) => panic!("expected Legacy"),
};
let mut cur: &[u8] = &bytes;
let recs = decode_message_set(&mut cur, bytes.len()).unwrap();
assert!(recs[0].timestamp == None, "v0 should have no timestamps");
}
#[test]
fn payload_multi_batch_concats_legacy() {
let b0 = make_batch(CompressionType::None, vec![sample_record("a", "1")]);
let b1 = make_batch(CompressionType::None, vec![sample_record("b", "2")]);
let payload = RecordsPayload::V2(vec![b0, b1]);
let out = down_convert_payload_for_fetch(&payload, 3)
.unwrap()
.expect("some");
let RecordsPayload::Legacy(bytes) = out else {
panic!("expected Legacy");
};
let mut cur: &[u8] = &bytes;
let recs = crabka_records_legacy::decode_message_set(&mut cur, bytes.len()).unwrap();
assert!(recs.len() == 2);
}
}