use bytes::{Buf, BufMut, Bytes, BytesMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{
Decode, Encode, KafkaBytes, KafkaString, TaggedField, TaggedFields, TryEncode,
};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_compact_nullable_array_len,
check_decode_array_len, check_decode_nullable_array_len,
};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FetchRequest {
pub replica_id: i32,
pub max_wait_ms: i32,
pub min_bytes: i32,
pub max_bytes: i32,
pub isolation_level: i8,
pub session_id: i32,
pub session_epoch: i32,
pub topics: Vec<FetchTopicRequest>,
pub forgotten_topics: Vec<FetchForgottenTopic>,
pub rack_id: String,
}
impl Default for FetchRequest {
fn default() -> Self {
Self {
replica_id: -1,
max_wait_ms: 0,
min_bytes: 0,
max_bytes: 0,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: Vec::new(),
forgotten_topics: Vec::new(),
rack_id: String::new(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct FetchForgottenTopic {
pub topic: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<i32>,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct FetchTopicRequest {
pub topic: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<FetchPartitionRequest>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FetchPartitionRequest {
pub partition: i32,
pub current_leader_epoch: i32,
pub fetch_offset: i64,
pub last_fetched_epoch: i32,
pub log_start_offset: i64,
pub partition_max_bytes: i32,
pub replica_directory_id: Option<[u8; 16]>,
pub high_watermark: Option<i64>,
}
impl Default for FetchPartitionRequest {
fn default() -> Self {
Self {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 0,
last_fetched_epoch: -1,
log_start_offset: -1,
partition_max_bytes: 0,
replica_directory_id: None,
high_watermark: None,
}
}
}
impl FetchRequest {
pub fn api_key() -> ApiKey {
ApiKey::Fetch
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_inner_v4(buf, false)
}
pub fn encode_v5(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_inner_v4(buf, true)
}
fn encode_inner_v4(&self, buf: &mut impl BufMut, include_log_start_offset: bool) -> Result<()> {
self.replica_id.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.isolation_level.encode(buf);
self.encode_topics_inner(buf, include_log_start_offset)
}
fn encode_topics_inner(
&self,
buf: &mut impl BufMut,
include_log_start_offset: bool,
) -> Result<()> {
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.fetch_offset.encode(buf);
if include_log_start_offset {
partition.log_start_offset.encode(buf);
}
partition.partition_max_bytes.encode(buf);
}
}
Ok(())
}
pub fn encode_v7(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_inner_v7(buf, false)
}
pub fn encode_v9(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_inner_v7(buf, true)
}
fn encode_inner_v7(&self, buf: &mut impl BufMut, include_leader_epoch: bool) -> Result<()> {
self.replica_id.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.isolation_level.encode(buf);
self.session_id.encode(buf);
self.session_epoch.encode(buf);
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition.encode(buf);
if include_leader_epoch {
partition.current_leader_epoch.encode(buf);
}
partition.fetch_offset.encode(buf);
partition.log_start_offset.encode(buf);
partition.partition_max_bytes.encode(buf);
}
}
buf.put_i32(array_len_i32(self.forgotten_topics.len())?);
for forgotten in &self.forgotten_topics {
KafkaString::new(&forgotten.topic).try_encode(buf)?;
buf.put_i32(array_len_i32(forgotten.partitions.len())?);
for &partition in &forgotten.partitions {
partition.encode(buf);
}
}
Ok(())
}
pub fn encode_v11(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v9(buf)?;
KafkaString::new(&self.rack_id).try_encode(buf)?;
Ok(())
}
pub fn encode_v12(&self, buf: &mut impl BufMut) -> Result<()> {
self.replica_id.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.isolation_level.encode(buf);
self.session_id.encode(buf);
self.session_epoch.encode(buf);
let topics_len = u32::try_from(self.topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode_compact(buf)?;
let parts_len = u32::try_from(topic.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.fetch_offset.encode(buf);
partition.last_fetched_epoch.encode(buf);
partition.log_start_offset.encode(buf);
partition.partition_max_bytes.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
let forgotten_len = u32::try_from(self.forgotten_topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten topics array too large"))?;
crate::util::varint::encode_unsigned_varint(forgotten_len, buf);
for forgotten in &self.forgotten_topics {
KafkaString::new(&forgotten.topic).try_encode_compact(buf)?;
let fp_len = u32::try_from(forgotten.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(fp_len, buf);
for &partition in &forgotten.partitions {
partition.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
KafkaString::new(&self.rack_id).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v13(&self, buf: &mut impl BufMut) -> Result<()> {
self.replica_id.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.isolation_level.encode(buf);
self.session_id.encode(buf);
self.session_epoch.encode(buf);
let topics_len = u32::try_from(self.topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
buf.put_slice(&topic.topic_id.unwrap_or([0u8; 16]));
let parts_len = u32::try_from(topic.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.fetch_offset.encode(buf);
partition.last_fetched_epoch.encode(buf);
partition.log_start_offset.encode(buf);
partition.partition_max_bytes.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
let forgotten_len = u32::try_from(self.forgotten_topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten topics array too large"))?;
crate::util::varint::encode_unsigned_varint(forgotten_len, buf);
for forgotten in &self.forgotten_topics {
buf.put_slice(&forgotten.topic_id.unwrap_or([0u8; 16]));
let fp_len = u32::try_from(forgotten.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(fp_len, buf);
for &partition in &forgotten.partitions {
partition.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
KafkaString::new(&self.rack_id).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v15(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v15_inner(buf, false, false)
}
pub fn encode_v17(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v15_inner(buf, true, false)
}
pub fn encode_v18(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v15_inner(buf, true, true)
}
fn encode_v15_inner(
&self,
buf: &mut impl BufMut,
include_directory_id: bool,
include_high_watermark: bool,
) -> Result<()> {
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.isolation_level.encode(buf);
self.session_id.encode(buf);
self.session_epoch.encode(buf);
let topics_len = u32::try_from(self.topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
buf.put_slice(&topic.topic_id.unwrap_or([0u8; 16]));
let parts_len = u32::try_from(topic.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.fetch_offset.encode(buf);
partition.last_fetched_epoch.encode(buf);
partition.log_start_offset.encode(buf);
partition.partition_max_bytes.encode(buf);
let mut partition_tags = Vec::new();
if include_directory_id && let Some(dir_id) = partition.replica_directory_id {
let mut tag_buf = BytesMut::with_capacity(16);
tag_buf.put_slice(&dir_id);
partition_tags.push(TaggedField {
tag: 0,
data: tag_buf.freeze(),
});
}
if include_high_watermark && let Some(hwm) = partition.high_watermark {
let mut tag_buf = BytesMut::with_capacity(8);
tag_buf.put_i64(hwm);
partition_tags.push(TaggedField {
tag: 1,
data: tag_buf.freeze(),
});
}
TaggedFields(partition_tags).try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
let forgotten_len = u32::try_from(self.forgotten_topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten topics array too large"))?;
crate::util::varint::encode_unsigned_varint(forgotten_len, buf);
for forgotten in &self.forgotten_topics {
buf.put_slice(&forgotten.topic_id.unwrap_or([0u8; 16]));
let fp_len = u32::try_from(forgotten.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("forgotten partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(fp_len, buf);
for &partition in &forgotten.partitions {
partition.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
KafkaString::new(&self.rack_id).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct FetchResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub session_id: i32,
pub responses: Vec<FetchTopicResponse>,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct FetchTopicResponse {
pub topic: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<FetchPartitionResponse>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FetchPartitionResponse {
pub partition: i32,
pub error_code: ErrorCode,
pub high_watermark: i64,
pub last_stable_offset: i64,
pub log_start_offset: i64,
pub aborted_transactions: Vec<AbortedTransaction>,
pub preferred_read_replica: i32,
pub records: Option<Bytes>,
}
impl Default for FetchPartitionResponse {
fn default() -> Self {
Self {
partition: 0,
error_code: ErrorCode::None,
high_watermark: -1,
last_stable_offset: -1,
log_start_offset: -1,
aborted_transactions: Vec::new(),
preferred_read_replica: -1,
records: None,
}
}
}
#[derive(Debug, Clone)]
pub struct AbortedTransaction {
pub producer_id: i64,
pub first_offset: i64,
}
impl FetchResponse {
pub fn decode_v4(buf: &mut impl Buf) -> Result<Self> {
Self::decode_inner_v4(buf, false)
}
pub fn decode_v5(buf: &mut impl Buf) -> Result<Self> {
Self::decode_inner_v4(buf, true)
}
fn decode_inner_v4(buf: &mut impl Buf, include_log_start_offset: bool) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let high_watermark = i64::decode(buf)?;
let last_stable_offset = i64::decode(buf)?;
let log_start_offset = if include_log_start_offset {
i64::decode(buf)?
} else {
-1
};
let aborted_tx_count = check_decode_nullable_array_len(i32::decode(buf)?)?;
let mut aborted_transactions = Vec::with_capacity(aborted_tx_count);
for _ in 0..aborted_tx_count {
aborted_transactions.push(AbortedTransaction {
producer_id: i64::decode(buf)?,
first_offset: i64::decode(buf)?,
});
}
let records = KafkaBytes::decode(buf)?.0;
partitions.push(FetchPartitionResponse {
partition,
error_code,
high_watermark,
last_stable_offset,
log_start_offset,
aborted_transactions,
preferred_read_replica: -1,
records,
});
}
responses.push(FetchTopicResponse {
topic,
topic_id: None,
partitions,
});
}
Ok(Self {
throttle_time_ms,
error_code: ErrorCode::None,
session_id: 0,
responses,
})
}
pub fn decode_v7(buf: &mut impl Buf) -> Result<Self> {
Self::decode_inner_v7(buf, false)
}
pub fn decode_v11(buf: &mut impl Buf) -> Result<Self> {
Self::decode_inner_v7(buf, true)
}
fn decode_inner_v7(buf: &mut impl Buf, has_preferred_read_replica: bool) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let session_id = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition = i32::decode(buf)?;
let partition_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let high_watermark = i64::decode(buf)?;
let last_stable_offset = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let aborted_tx_count = check_decode_nullable_array_len(i32::decode(buf)?)?;
let mut aborted_transactions = Vec::with_capacity(aborted_tx_count);
for _ in 0..aborted_tx_count {
aborted_transactions.push(AbortedTransaction {
producer_id: i64::decode(buf)?,
first_offset: i64::decode(buf)?,
});
}
let preferred_read_replica = if has_preferred_read_replica {
i32::decode(buf)?
} else {
-1
};
let records = KafkaBytes::decode(buf)?.0;
partitions.push(FetchPartitionResponse {
partition,
error_code: partition_error_code,
high_watermark,
last_stable_offset,
log_start_offset,
aborted_transactions,
preferred_read_replica,
records,
});
}
responses.push(FetchTopicResponse {
topic,
topic_id: None,
partitions,
});
}
Ok(Self {
throttle_time_ms,
error_code,
session_id,
responses,
})
}
pub fn decode_v12(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let session_id = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition = i32::decode(buf)?;
let partition_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let high_watermark = i64::decode(buf)?;
let last_stable_offset = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let aborted_tx_count = check_compact_nullable_array_len(
crate::util::varint::decode_unsigned_varint(buf)?,
)?;
let mut aborted_transactions = Vec::with_capacity(aborted_tx_count);
for _ in 0..aborted_tx_count {
aborted_transactions.push(AbortedTransaction {
producer_id: i64::decode(buf)?,
first_offset: i64::decode(buf)?,
});
let _ = TaggedFields::decode(buf)?;
}
let preferred_read_replica = i32::decode(buf)?;
let records = KafkaBytes::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
partitions.push(FetchPartitionResponse {
partition,
error_code: partition_error_code,
high_watermark,
last_stable_offset,
log_start_offset,
aborted_transactions,
preferred_read_replica,
records,
});
}
let _ = TaggedFields::decode(buf)?; responses.push(FetchTopicResponse {
topic,
topic_id: None,
partitions,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
session_id,
responses,
})
}
pub fn decode_v13(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let session_id = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic_id UUID"));
}
let mut topic_id = [0u8; 16];
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition = i32::decode(buf)?;
let partition_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let high_watermark = i64::decode(buf)?;
let last_stable_offset = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let aborted_tx_count = check_compact_nullable_array_len(
crate::util::varint::decode_unsigned_varint(buf)?,
)?;
let mut aborted_transactions = Vec::with_capacity(aborted_tx_count);
for _ in 0..aborted_tx_count {
aborted_transactions.push(AbortedTransaction {
producer_id: i64::decode(buf)?,
first_offset: i64::decode(buf)?,
});
let _ = TaggedFields::decode(buf)?;
}
let preferred_read_replica = i32::decode(buf)?;
let records = KafkaBytes::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
partitions.push(FetchPartitionResponse {
partition,
error_code: partition_error_code,
high_watermark,
last_stable_offset,
log_start_offset,
aborted_transactions,
preferred_read_replica,
records,
});
}
let _ = TaggedFields::decode(buf)?; responses.push(FetchTopicResponse {
topic: String::new(),
topic_id: Some(topic_id),
partitions,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
session_id,
responses,
})
}
}
impl VersionedEncode for FetchRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
4 => self.encode_v4(buf)?,
5 | 6 => self.encode_v5(buf)?,
7 | 8 => self.encode_v7(buf)?,
9 | 10 => self.encode_v9(buf)?,
11 => self.encode_v11(buf)?,
12 => self.encode_v12(buf)?,
13 | 14 => self.encode_v13(buf)?,
15 | 16 => self.encode_v15(buf)?,
17 => self.encode_v17(buf)?,
18 => self.encode_v18(buf)?,
_ => return unsupported_encode!("FetchRequest", version),
}
Ok(())
}
}
impl VersionedDecode for FetchResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
4 => Self::decode_v4(buf),
5 | 6 => Self::decode_v5(buf),
7..=10 => Self::decode_v7(buf),
11 => Self::decode_v11(buf),
12 => Self::decode_v12(buf),
13..=18 => Self::decode_v13(buf),
_ => unsupported_decode!("FetchResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
#[test]
fn test_fetch_request_encode_v7_includes_session_fields() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 42,
session_epoch: 3,
topics: vec![FetchTopicRequest {
topic: "test-topic".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: -1,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![FetchForgottenTopic {
topic: "old-topic".to_string(),
topic_id: None,
partitions: vec![1, 2],
}],
rack_id: String::new(),
};
let mut buf_v4 = BytesMut::new();
request.encode_v4(&mut buf_v4).unwrap();
let mut buf_v7 = BytesMut::new();
request.encode_v7(&mut buf_v7).unwrap();
assert!(buf_v7.len() > buf_v4.len());
let session_id_bytes = &buf_v7[17..21];
assert_eq!(i32::from_be_bytes(session_id_bytes.try_into().unwrap()), 42);
let session_epoch_bytes = &buf_v7[21..25];
assert_eq!(
i32::from_be_bytes(session_epoch_bytes.try_into().unwrap()),
3
);
}
#[test]
fn test_fetch_request_encode_v7_empty_forgotten_topics() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_v7(&mut buf).unwrap();
assert_eq!(buf.len(), 33);
}
#[test]
fn test_fetch_response_decode_v7_round_trip() {
let mut raw = BytesMut::new();
raw.put_i32(100); raw.put_i16(0); raw.put_i32(42); raw.put_i32(1); raw.put_i16(5);
raw.put_slice(b"topic");
raw.put_i32(1); raw.put_i32(0); raw.put_i16(0); raw.put_i64(1000); raw.put_i64(999); raw.put_i64(0); raw.put_i32(0); raw.put_i32(-1);
let mut buf = raw.freeze();
let resp = FetchResponse::decode_v7(&mut buf).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.error_code, ErrorCode::None);
assert_eq!(resp.session_id, 42);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].topic, "topic");
assert_eq!(resp.responses[0].partitions.len(), 1);
assert_eq!(resp.responses[0].partitions[0].partition, 0);
assert_eq!(resp.responses[0].partitions[0].high_watermark, 1000);
assert_eq!(resp.responses[0].partitions[0].last_stable_offset, 999);
assert_eq!(resp.responses[0].partitions[0].log_start_offset, 0);
}
#[test]
fn test_fetch_response_decode_v7_session_error() {
let mut raw = BytesMut::new();
raw.put_i32(0); raw.put_i16(70); raw.put_i32(0); raw.put_i32(0);
let mut buf = raw.freeze();
let resp = FetchResponse::decode_v7(&mut buf).unwrap();
assert_eq!(resp.error_code, ErrorCode::FetchSessionIdNotFound);
assert_eq!(resp.session_id, 0);
assert!(resp.responses.is_empty());
}
#[test]
fn test_fetch_response_decode_v7_vs_v4_extra_fields() {
let mut raw_v4 = BytesMut::new();
raw_v4.put_i32(50); raw_v4.put_i32(0);
let mut buf_v4 = raw_v4.freeze();
let resp_v4 = FetchResponse::decode_v4(&mut buf_v4).unwrap();
assert_eq!(resp_v4.throttle_time_ms, 50);
assert_eq!(resp_v4.error_code, ErrorCode::None); assert_eq!(resp_v4.session_id, 0);
let mut raw_v7 = BytesMut::new();
raw_v7.put_i32(50); raw_v7.put_i16(0); raw_v7.put_i32(99); raw_v7.put_i32(0);
let mut buf_v7 = raw_v7.freeze();
let resp_v7 = FetchResponse::decode_v7(&mut buf_v7).unwrap();
assert_eq!(resp_v7.throttle_time_ms, 50);
assert_eq!(resp_v7.session_id, 99);
}
#[test]
fn test_fetch_request_encode_v5_includes_log_start_offset() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 1,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: 42,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v4 = BytesMut::new();
request.encode_v4(&mut buf_v4).unwrap();
let mut buf_v5 = BytesMut::new();
request.encode_v5(&mut buf_v5).unwrap();
assert_eq!(
buf_v5.len(),
buf_v4.len() + 8,
"v5 should be 8 bytes longer than v4 (log_start_offset per partition)"
);
fn header_and_partition_prefix_len(buf: &[u8]) -> usize {
let mut c: &[u8] = buf;
i32::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); i8::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); KafkaString::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); i32::decode(&mut c).unwrap(); i64::decode(&mut c).unwrap(); buf.len() - c.len()
}
let skip = header_and_partition_prefix_len(&buf_v5);
let mut cursor: &[u8] = &buf_v5[skip..];
let log_start_offset = i64::decode(&mut cursor).unwrap();
let partition_max_bytes = i32::decode(&mut cursor).unwrap();
assert_eq!(
log_start_offset, 42,
"v5 log_start_offset at expected position"
);
assert_eq!(partition_max_bytes, 1048576);
let skip = header_and_partition_prefix_len(&buf_v4);
let mut cursor_v4: &[u8] = &buf_v4[skip..];
let v4_partition_max_bytes = i32::decode(&mut cursor_v4).unwrap();
assert_eq!(
v4_partition_max_bytes, 1048576,
"v4 has no log_start_offset"
);
assert!(cursor_v4.is_empty(), "v4 buffer fully consumed");
}
#[test]
fn test_fetch_request_encode_v6_same_as_v5() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: 10,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v5 = BytesMut::new();
request.encode_versioned(5, &mut buf_v5).unwrap();
let mut buf_v6 = BytesMut::new();
request.encode_versioned(6, &mut buf_v6).unwrap();
assert_eq!(buf_v6, buf_v5, "v6 should produce same bytes as v5");
}
#[test]
fn test_fetch_response_decode_v5_round_trip() {
let mut raw = BytesMut::new();
raw.put_i32(100); raw.put_i32(1); raw.put_i16(5); raw.put_slice(b"topic");
raw.put_i32(1); raw.put_i32(0); raw.put_i16(0); raw.put_i64(1000); raw.put_i64(999); raw.put_i64(42); raw.put_i32(0); raw.put_i32(-1);
let mut buf = raw.freeze();
let resp = FetchResponse::decode_v5(&mut buf).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.error_code, ErrorCode::None);
assert_eq!(resp.session_id, 0);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].topic, "topic");
assert_eq!(resp.responses[0].partitions.len(), 1);
let p = &resp.responses[0].partitions[0];
assert_eq!(p.partition, 0);
assert_eq!(p.high_watermark, 1000);
assert_eq!(p.last_stable_offset, 999);
assert_eq!(p.log_start_offset, 42);
assert_eq!(p.preferred_read_replica, -1);
}
#[test]
fn test_fetch_response_decode_v5_vs_v4_log_start_offset() {
let mut raw_v4 = BytesMut::new();
raw_v4.put_i32(50); raw_v4.put_i32(1); raw_v4.put_i16(1); raw_v4.put_slice(b"t");
raw_v4.put_i32(1); raw_v4.put_i32(0); raw_v4.put_i16(0); raw_v4.put_i64(500); raw_v4.put_i64(499); raw_v4.put_i32(-1); raw_v4.put_i32(-1);
let mut buf_v4 = raw_v4.freeze();
let resp_v4 = FetchResponse::decode_v4(&mut buf_v4).unwrap();
assert_eq!(resp_v4.responses[0].partitions[0].log_start_offset, -1);
let mut raw_v5 = BytesMut::new();
raw_v5.put_i32(50); raw_v5.put_i32(1); raw_v5.put_i16(1); raw_v5.put_slice(b"t");
raw_v5.put_i32(1); raw_v5.put_i32(0); raw_v5.put_i16(0); raw_v5.put_i64(500); raw_v5.put_i64(499); raw_v5.put_i64(10); raw_v5.put_i32(-1); raw_v5.put_i32(-1);
let mut buf_v5 = raw_v5.freeze();
let resp_v5 = FetchResponse::decode_v5(&mut buf_v5).unwrap();
assert_eq!(resp_v5.responses[0].partitions[0].log_start_offset, 10);
}
#[test]
fn test_versioned_decode_fetch_v5_v6_dispatches_to_decode_v5() {
let mut raw = BytesMut::new();
raw.put_i32(0); raw.put_i32(0);
let data = raw.freeze();
for version in 5..=6 {
let resp = FetchResponse::decode_versioned(version, &mut data.clone()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert!(resp.responses.is_empty());
}
}
#[test]
fn test_versioned_encode_fetch_request_dispatches_correctly() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v4 = BytesMut::new();
request.encode_versioned(4, &mut buf_v4).unwrap();
let mut buf_v7 = BytesMut::new();
request.encode_versioned(7, &mut buf_v7).unwrap();
assert!(buf_v7.len() > buf_v4.len());
let mut buf_v0 = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf_v0).is_err());
}
#[test]
fn test_fetch_request_encode_v11_appends_rack_id() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: 5,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: 0,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: "us-east-1a".to_string(),
};
let mut buf_v9 = BytesMut::new();
request.encode_v9(&mut buf_v9).unwrap();
let mut buf_v11 = BytesMut::new();
request.encode_v11(&mut buf_v11).unwrap();
assert_eq!(buf_v11.len(), buf_v9.len() + 2 + 10);
assert_eq!(&buf_v11[..buf_v9.len()], &buf_v9[..]);
}
#[test]
fn test_fetch_request_encode_v11_empty_rack_id() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v9 = BytesMut::new();
request.encode_v9(&mut buf_v9).unwrap();
let mut buf_v11 = BytesMut::new();
request.encode_v11(&mut buf_v11).unwrap();
assert_eq!(buf_v11.len(), buf_v9.len() + 2);
}
#[test]
fn test_fetch_response_decode_v11_preferred_read_replica() {
let mut raw = BytesMut::new();
raw.put_i32(100); raw.put_i16(0); raw.put_i32(42); raw.put_i32(1); raw.put_i16(5);
raw.put_slice(b"topic");
raw.put_i32(1); raw.put_i32(0); raw.put_i16(0); raw.put_i64(1000); raw.put_i64(999); raw.put_i64(0); raw.put_i32(0); raw.put_i32(3); raw.put_i32(-1);
let mut buf = raw.freeze();
let resp = FetchResponse::decode_v11(&mut buf).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.session_id, 42);
assert_eq!(resp.responses.len(), 1);
let part = &resp.responses[0].partitions[0];
assert_eq!(part.partition, 0);
assert_eq!(part.high_watermark, 1000);
assert_eq!(part.preferred_read_replica, 3);
}
#[test]
fn test_fetch_response_decode_v11_no_preferred_replica() {
let mut raw = BytesMut::new();
raw.put_i32(0); raw.put_i16(0); raw.put_i32(0); raw.put_i32(1); raw.put_i16(1);
raw.put_slice(b"t");
raw.put_i32(1); raw.put_i32(0); raw.put_i16(0); raw.put_i64(500); raw.put_i64(499); raw.put_i64(0); raw.put_i32(0); raw.put_i32(-1); raw.put_i32(-1);
let mut buf = raw.freeze();
let resp = FetchResponse::decode_v11(&mut buf).unwrap();
assert_eq!(resp.responses[0].partitions[0].preferred_read_replica, -1);
}
#[test]
fn test_versioned_encode_fetch_v11_dispatches_to_encode_v11() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![],
forgotten_topics: vec![],
rack_id: "rack-a".to_string(),
};
let mut buf_direct = BytesMut::new();
request.encode_v11(&mut buf_direct).unwrap();
let mut buf_versioned = BytesMut::new();
request.encode_versioned(11, &mut buf_versioned).unwrap();
assert_eq!(buf_direct, buf_versioned);
}
#[test]
fn test_versioned_decode_fetch_v11_dispatches_to_decode_v11() {
let mut raw = BytesMut::new();
raw.put_i32(0); raw.put_i16(0); raw.put_i32(0); raw.put_i32(0);
let data = raw.freeze();
let resp = FetchResponse::decode_versioned(11, &mut data.clone()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert!(resp.responses.is_empty());
}
#[test]
fn test_versioned_encode_fetch_v8_dispatches_to_v7() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: 0,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v7 = BytesMut::new();
request.encode_versioned(7, &mut buf_v7).unwrap();
let mut buf_v8 = BytesMut::new();
request.encode_versioned(8, &mut buf_v8).unwrap();
assert_eq!(buf_v8, buf_v7, "v8 should produce same bytes as v7");
}
#[test]
fn test_versioned_encode_fetch_v9_v10_dispatches_to_v9() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1048576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: 5,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: 0,
partition_max_bytes: 1048576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v9 = BytesMut::new();
request.encode_v9(&mut buf_v9).unwrap();
for version in 9..=10 {
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert_eq!(buf, buf_v9, "v{version} should produce same bytes as v9");
}
let mut buf_v7 = BytesMut::new();
request.encode_v7(&mut buf_v7).unwrap();
assert_eq!(
buf_v9.len(),
buf_v7.len() + 4,
"v9 should be 4 bytes longer than v7 (current_leader_epoch per partition)"
);
}
#[test]
fn test_fetch_request_v4_encodes() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_versioned(4, &mut buf).unwrap();
let mut r = buf.freeze();
assert_eq!(i32::decode(&mut r).unwrap(), -1); assert_eq!(i32::decode(&mut r).unwrap(), 500); assert_eq!(i32::decode(&mut r).unwrap(), 1); assert_eq!(i32::decode(&mut r).unwrap(), 1_048_576); assert_eq!(i8::decode(&mut r).unwrap(), 0); }
#[test]
fn test_fetch_request_v12_encodes() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_versioned(12, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_fetch_request_below_min_rejected() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
let mut buf2 = BytesMut::new();
assert!(request.encode_versioned(3, &mut buf2).is_err());
}
#[rstest]
#[case::fetch_v0(0)]
#[case::fetch_v1(1)]
#[case::fetch_v2(2)]
#[case::fetch_v3(3)]
fn test_fetch_encode_below_min(#[case] version: i16) {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[test]
fn test_fetch_request_encode_v12_flexible() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![FetchTopicRequest {
topic: "t1".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: 5,
fetch_offset: 100,
last_fetched_epoch: 3,
log_start_offset: 0,
partition_max_bytes: 1_048_576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: "us-east-1a".to_string(),
};
let mut buf_v11 = BytesMut::new();
request.encode_versioned(11, &mut buf_v11).unwrap();
let mut buf_v12 = BytesMut::new();
request.encode_versioned(12, &mut buf_v12).unwrap();
assert!(!buf_v12.is_empty());
assert_ne!(buf_v11.as_ref(), buf_v12.as_ref());
}
#[test]
fn test_fetch_request_v12_last_fetched_epoch() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: None,
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: 1,
fetch_offset: 0,
last_fetched_epoch: 42, log_start_offset: 0,
partition_max_bytes: 1_048_576,
replica_directory_id: None,
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v12 = BytesMut::new();
request.encode_versioned(12, &mut buf_v12).unwrap();
let mut buf_v11 = BytesMut::new();
request.encode_versioned(11, &mut buf_v11).unwrap();
assert_ne!(buf_v11.len(), buf_v12.len());
}
#[test]
fn test_fetch_response_decode_v12_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf);
let topic = b"t1";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0); buf.put_i16(0); buf.put_i64(1000); buf.put_i64(999); buf.put_i64(0); varint::encode_unsigned_varint(1, &mut buf);
buf.put_i32(-1); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = FetchResponse::decode_versioned(12, &mut buf.freeze()).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].topic, "t1");
assert_eq!(resp.responses[0].partitions[0].high_watermark, 1000);
assert_eq!(resp.responses[0].partitions[0].last_stable_offset, 999);
assert_eq!(resp.responses[0].partitions[0].preferred_read_replica, -1);
}
#[test]
fn test_fetch_response_v11_still_decodes() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); let topic = b"t";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1); buf.put_i32(0); buf.put_i16(0); buf.put_i64(100); buf.put_i64(100); buf.put_i64(0); buf.put_i32(0); buf.put_i32(-1); buf.put_i32(-1);
let resp = FetchResponse::decode_versioned(11, &mut buf.freeze()).unwrap();
assert_eq!(resp.responses[0].topic, "t");
assert_eq!(resp.responses[0].partitions[0].high_watermark, 100);
assert_eq!(resp.responses[0].partitions[0].preferred_read_replica, -1);
}
#[test]
fn test_fetch_response_decode_v13_topic_id() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); let topic_id: [u8; 16] = [7; 16];
buf.put_slice(&topic_id);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); buf.put_i16(0); buf.put_i64(200); buf.put_i64(200); buf.put_i64(0); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(-1); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = FetchResponse::decode_versioned(13, &mut buf.freeze()).unwrap();
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].topic_id, Some(topic_id));
assert!(resp.responses[0].topic.is_empty());
assert_eq!(resp.responses[0].partitions[0].high_watermark, 200);
}
#[test]
fn test_fetch_request_encode_v13_topic_id() {
let topic_id: [u8; 16] = [0xBB; 16];
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: -1,
topics: vec![FetchTopicRequest {
topic: String::new(),
topic_id: Some(topic_id),
partitions: vec![],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_v13(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_i32(), -1); assert_eq!(cur.get_i32(), 500); assert_eq!(cur.get_i32(), 1); assert_eq!(cur.get_i32(), 1_048_576); assert_eq!(cur.get_i8(), 0); assert_eq!(cur.get_i32(), 0); assert_eq!(cur.get_i32(), -1); let topics_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(topics_varint, 2); let mut read_id = [0u8; 16];
cur.copy_to_slice(&mut read_id);
assert_eq!(read_id, topic_id);
}
#[test]
fn test_fetch_request_encode_v15_no_replica_id() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 300,
min_bytes: 1,
max_bytes: 512,
isolation_level: 1,
session_id: 5,
session_epoch: 2,
topics: vec![],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_v15(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_i32(), 300); assert_eq!(cur.get_i32(), 1); assert_eq!(cur.get_i32(), 512); assert_eq!(cur.get_i8(), 1); assert_eq!(cur.get_i32(), 5); assert_eq!(cur.get_i32(), 2); }
#[test]
fn test_fetch_request_v17_encodes_replica_directory_id() {
let dir_id = [1u8; 16];
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: Some([2u8; 16]),
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: -1,
partition_max_bytes: 1_048_576,
replica_directory_id: Some(dir_id),
high_watermark: None,
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf = BytesMut::new();
request.encode_versioned(17, &mut buf).unwrap();
assert!(!buf.is_empty());
let encoded = buf.freeze();
assert!(
encoded.windows(16).any(|w| w == dir_id),
"expected replica_directory_id in v17 output"
);
}
#[test]
fn test_fetch_request_v18_encodes_high_watermark() {
let request = FetchRequest {
replica_id: -1,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
isolation_level: 0,
session_id: 0,
session_epoch: 0,
topics: vec![FetchTopicRequest {
topic: "t".to_string(),
topic_id: Some([3u8; 16]),
partitions: vec![FetchPartitionRequest {
partition: 0,
current_leader_epoch: -1,
fetch_offset: 100,
last_fetched_epoch: -1,
log_start_offset: -1,
partition_max_bytes: 1_048_576,
replica_directory_id: None,
high_watermark: Some(9999),
}],
}],
forgotten_topics: vec![],
rack_id: String::new(),
};
let mut buf_v17 = BytesMut::new();
request.encode_versioned(17, &mut buf_v17).unwrap();
let mut buf_v18 = BytesMut::new();
request.encode_versioned(18, &mut buf_v18).unwrap();
assert!(
buf_v18.len() > buf_v17.len(),
"v18 should be longer due to high_watermark tagged field"
);
}
#[test]
fn test_fetch_response_decode_v17_v18_reuses_v13() {
let mut buf = BytesMut::new();
buf.put_i32(10); buf.put_i16(0); buf.put_i32(0); varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); for version in [17, 18] {
let clone = buf.clone();
let resp = FetchResponse::decode_versioned(version, &mut clone.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
}
}
}