use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{array_len_i32, check_compact_array_len, check_decode_array_len};
#[derive(Debug, Clone)]
pub struct OffsetFetchRequestTopic {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partition_indexes: Vec<i32>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct OffsetFetchRequest {
pub group_id: String,
pub topics: Option<Vec<OffsetFetchRequestTopic>>,
pub require_stable: bool,
pub member_id: Option<String>,
pub member_epoch: i32,
}
impl OffsetFetchRequest {
pub fn api_key() -> ApiKey {
ApiKey::OffsetFetch
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
self.encode_topics_non_flexible(buf)
}
pub fn encode_v6(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
self.encode_topics_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v7(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
self.encode_topics_compact(buf)?;
buf.put_u8(u8::from(self.require_stable));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v8(&self, buf: &mut impl BufMut) -> Result<()> {
crate::util::varint::encode_unsigned_varint(2, buf);
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
self.encode_topics_compact(buf)?;
TaggedFields::default().try_encode(buf)?; buf.put_u8(u8::from(self.require_stable));
TaggedFields::default().try_encode(buf)?; Ok(())
}
pub fn encode_v9(&self, buf: &mut impl BufMut) -> Result<()> {
crate::util::varint::encode_unsigned_varint(2, buf);
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
match &self.member_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.member_epoch.encode(buf);
self.encode_topics_compact(buf)?;
TaggedFields::default().try_encode(buf)?; buf.put_u8(u8::from(self.require_stable));
TaggedFields::default().try_encode(buf)?; Ok(())
}
pub fn encode_v10(&self, buf: &mut impl BufMut) -> Result<()> {
crate::util::varint::encode_unsigned_varint(2, buf);
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
match &self.member_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.member_epoch.encode(buf);
match &self.topics {
Some(topics) => {
let len = u32::try_from(topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(len, buf);
for topic in topics {
buf.put_slice(&topic.topic_id.unwrap_or([0u8; 16]));
let parts_len = u32::try_from(topic.partition_indexes.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partition_indexes {
partition.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
}
None => {
crate::util::varint::encode_unsigned_varint(0, buf);
}
}
TaggedFields::default().try_encode(buf)?; buf.put_u8(u8::from(self.require_stable));
TaggedFields::default().try_encode(buf)?; Ok(())
}
fn encode_topics_non_flexible(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.topics {
Some(topics) => {
buf.put_i32(array_len_i32(topics.len())?);
for topic in topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partition_indexes.len())?);
for partition in &topic.partition_indexes {
partition.encode(buf);
}
}
}
None => {
buf.put_i32(-1);
}
}
Ok(())
}
fn encode_topics_compact(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.topics {
Some(topics) => {
let len = u32::try_from(topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(len, buf);
for topic in topics {
KafkaString::new(&topic.name).try_encode_compact(buf)?;
let parts_len = u32::try_from(topic.partition_indexes.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partition_indexes {
partition.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
}
None => {
crate::util::varint::encode_unsigned_varint(0, buf);
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct OffsetFetchResponsePartition {
pub partition_index: i32,
pub committed_offset: i64,
pub committed_leader_epoch: i32,
pub metadata: Option<String>,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct OffsetFetchResponseTopic {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<OffsetFetchResponsePartition>,
}
#[derive(Debug, Clone)]
pub struct OffsetFetchResponse {
pub throttle_time_ms: i32,
pub topics: Vec<OffsetFetchResponseTopic>,
pub error_code: ErrorCode,
}
impl OffsetFetchResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
Ok(Self {
throttle_time_ms: 0,
topics: Self::decode_topics(buf)?,
error_code: ErrorCode::None,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let topics = Self::decode_topics(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
Ok(Self {
throttle_time_ms: 0,
topics,
error_code,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topics = Self::decode_topics(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
Ok(Self {
throttle_time_ms,
topics,
error_code,
})
}
pub fn decode_v5(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topics = Self::decode_topics_v5(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
Ok(Self {
throttle_time_ms,
topics,
error_code,
})
}
pub fn decode_v6(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topics = Self::decode_topics_compact(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
error_code,
})
}
pub fn decode_v8(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let group_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
if group_count == 0 {
let _ = TaggedFields::decode(buf)?;
return Err(KrafkaError::protocol(
"OffsetFetchResponse v8-v9 contained empty Groups array",
));
}
let _group_id = KafkaString::decode_compact(buf)?;
let topics = Self::decode_topics_compact(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
for _ in 1..group_count {
let _ = KafkaString::decode_compact(buf)?;
Self::skip_topics_compact(buf)?;
let _ = i16::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
error_code,
})
}
pub fn decode_v10(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let group_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
if group_count == 0 {
let _ = TaggedFields::decode(buf)?;
return Err(KrafkaError::protocol(
"OffsetFetchResponse v10 contained empty Groups array",
));
}
let _group_id = KafkaString::decode_compact(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic_id UUID"));
}
let mut topic_id = [0u8; 16];
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let committed_offset = i64::decode(buf)?;
let committed_leader_epoch = i32::decode(buf)?;
let metadata = KafkaString::decode_compact(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(OffsetFetchResponsePartition {
partition_index,
committed_offset,
committed_leader_epoch,
metadata,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(OffsetFetchResponseTopic {
name: String::new(),
topic_id: Some(topic_id),
partitions,
});
}
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
for _ in 1..group_count {
let _ = KafkaString::decode_compact(buf)?;
let tc = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
for _ in 0..tc {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic_id UUID"));
}
buf.advance(16); let pc =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
for _ in 0..pc {
let _ = i32::decode(buf)?; let _ = i64::decode(buf)?; let _ = i32::decode(buf)?; let _ = KafkaString::decode_compact(buf)?; let _ = i16::decode(buf)?; let _ = TaggedFields::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?;
}
let _ = i16::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
error_code,
})
}
fn decode_topics(buf: &mut impl Buf) -> Result<Vec<OffsetFetchResponseTopic>> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let committed_offset = i64::decode(buf)?;
let metadata = KafkaString::decode(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(OffsetFetchResponsePartition {
partition_index,
committed_offset,
committed_leader_epoch: -1,
metadata,
error_code,
});
}
topics.push(OffsetFetchResponseTopic {
name,
topic_id: None,
partitions,
});
}
Ok(topics)
}
fn decode_topics_v5(buf: &mut impl Buf) -> Result<Vec<OffsetFetchResponseTopic>> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let committed_offset = i64::decode(buf)?;
let committed_leader_epoch = i32::decode(buf)?;
let metadata = KafkaString::decode(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(OffsetFetchResponsePartition {
partition_index,
committed_offset,
committed_leader_epoch,
metadata,
error_code,
});
}
topics.push(OffsetFetchResponseTopic {
name,
topic_id: None,
partitions,
});
}
Ok(topics)
}
fn decode_topics_compact(buf: &mut impl Buf) -> Result<Vec<OffsetFetchResponseTopic>> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let committed_offset = i64::decode(buf)?;
let committed_leader_epoch = i32::decode(buf)?;
let metadata = KafkaString::decode_compact(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(OffsetFetchResponsePartition {
partition_index,
committed_offset,
committed_leader_epoch,
metadata,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(OffsetFetchResponseTopic {
name,
topic_id: None,
partitions,
});
}
Ok(topics)
}
fn skip_topics_compact(buf: &mut impl Buf) -> Result<()> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
for _ in 0..topic_count {
let _ = KafkaString::decode_compact(buf)?;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
for _ in 0..part_count {
let _ = i32::decode(buf)?; let _ = i64::decode(buf)?; let _ = i32::decode(buf)?; let _ = KafkaString::decode_compact(buf)?; let _ = i16::decode(buf)?; let _ = TaggedFields::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?;
}
Ok(())
}
pub fn get_offset(&self, topic: &str, partition: i32) -> Option<i64> {
self.topics.iter().find(|t| t.name == topic).and_then(|t| {
t.partitions
.iter()
.find(|p| p.partition_index == partition)
.map(|p| p.committed_offset)
})
}
}
impl VersionedEncode for OffsetFetchRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1..=5 => self.encode_v1(buf)?,
6 => self.encode_v6(buf)?,
7 => self.encode_v7(buf)?,
8 => self.encode_v8(buf)?,
9 => self.encode_v9(buf)?,
10 => self.encode_v10(buf)?,
_ => return unsupported_encode!("OffsetFetchRequest", version),
}
Ok(())
}
}
impl VersionedDecode for OffsetFetchResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
3..=4 => Self::decode_v3(buf),
5 => Self::decode_v5(buf),
6..=7 => Self::decode_v6(buf),
8..=9 => Self::decode_v8(buf),
10 => Self::decode_v10(buf),
_ => unsupported_decode!("OffsetFetchResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
fn sample_offset_fetch_request() -> OffsetFetchRequest {
OffsetFetchRequest {
group_id: "my-group".to_string(),
topics: Some(vec![OffsetFetchRequestTopic {
name: "topic-1".to_string(),
topic_id: None,
partition_indexes: vec![0, 1, 2],
}]),
require_stable: true,
member_id: Some("consumer-1".to_string()),
member_epoch: 7,
}
}
#[test]
fn test_offset_fetch_request_v1_encode() {
let request = OffsetFetchRequest {
group_id: "grp1".to_string(),
topics: Some(vec![OffsetFetchRequestTopic {
name: "topic1".to_string(),
topic_id: None,
partition_indexes: vec![0, 1],
}]),
member_id: None,
member_epoch: -1,
require_stable: false,
};
let mut buf = BytesMut::new();
request.encode_versioned(1, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_offset_fetch_request_below_min_rejected() {
let request = OffsetFetchRequest {
group_id: "g".to_string(),
topics: None,
member_id: None,
member_epoch: -1,
require_stable: false,
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
}
#[rstest]
#[case::of_v0(0)]
fn test_offset_fetch_encode_below_min(#[case] version: i16) {
let request = OffsetFetchRequest {
group_id: "g".to_string(),
topics: None,
member_id: None,
member_epoch: -1,
require_stable: false,
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[rstest]
#[case::v1_min(1)]
#[case::v5(5)]
fn test_offset_fetch_request_encode_non_flexible(#[case] version: i16) {
let request = sample_offset_fetch_request();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_v1(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_offset_fetch_request_v6_flexible() {
let request = sample_offset_fetch_request();
let mut buf_v5 = BytesMut::new();
request.encode_versioned(5, &mut buf_v5).unwrap();
let mut buf_v6 = BytesMut::new();
request.encode_versioned(6, &mut buf_v6).unwrap();
assert_ne!(
buf_v5.as_ref(),
buf_v6.as_ref(),
"v6 flexible should differ from v5"
);
}
#[test]
fn test_offset_fetch_request_v7_has_require_stable() {
let request = sample_offset_fetch_request();
let mut buf_v6 = BytesMut::new();
request.encode_versioned(6, &mut buf_v6).unwrap();
let mut buf_v7 = BytesMut::new();
request.encode_versioned(7, &mut buf_v7).unwrap();
assert!(
buf_v7.len() > buf_v6.len(),
"v7 should be longer (require_stable)"
);
}
#[test]
fn test_offset_fetch_request_v8_batched_groups() {
let request = sample_offset_fetch_request();
let mut buf_v7 = BytesMut::new();
request.encode_versioned(7, &mut buf_v7).unwrap();
let mut buf_v8 = BytesMut::new();
request.encode_versioned(8, &mut buf_v8).unwrap();
assert_ne!(
buf_v7.as_ref(),
buf_v8.as_ref(),
"v8 batched format should differ from v7"
);
}
#[test]
fn test_offset_fetch_request_v9_has_member_epoch() {
let request = sample_offset_fetch_request();
let mut buf_v8 = BytesMut::new();
request.encode_versioned(8, &mut buf_v8).unwrap();
let mut buf_v9 = BytesMut::new();
request.encode_versioned(9, &mut buf_v9).unwrap();
assert!(
buf_v9.len() > buf_v8.len(),
"v9 should be longer (member_id + member_epoch)"
);
}
#[test]
fn test_offset_fetch_request_null_topics() {
let request = OffsetFetchRequest {
group_id: "g".to_string(),
topics: None,
require_stable: false,
member_id: None,
member_epoch: -1,
};
let mut buf = BytesMut::new();
request.encode_versioned(1, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf_v6 = BytesMut::new();
request.encode_versioned(6, &mut buf_v6).unwrap();
assert!(!buf_v6.is_empty());
}
#[test]
fn test_offset_fetch_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"topic-1";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0); buf.put_i64(42); let meta = b"meta";
buf.put_i16(meta.len() as i16);
buf.put_slice(meta);
buf.put_i16(0);
let resp = OffsetFetchResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name, "topic-1");
assert_eq!(resp.topics[0].partitions[0].committed_offset, 42);
assert_eq!(
resp.topics[0].partitions[0].metadata.as_deref(),
Some("meta")
);
}
#[test]
fn test_offset_fetch_response_decode_v2_has_error_code() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"t";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1); buf.put_i32(0); buf.put_i64(10); buf.put_i16(-1); buf.put_i16(0); buf.put_i16(0);
let resp = OffsetFetchResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.topics[0].partitions[0].committed_offset, 10);
}
#[test]
fn test_offset_fetch_response_decode_v5_leader_epoch() {
let mut buf = BytesMut::new();
buf.put_i32(50); buf.put_i32(1); let topic = b"t";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1); buf.put_i32(0); buf.put_i64(100); buf.put_i32(7); buf.put_i16(-1); buf.put_i16(0); buf.put_i16(0);
let resp = OffsetFetchResponse::decode_versioned(5, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.topics[0].partitions[0].committed_leader_epoch, 7);
}
#[test]
fn test_offset_fetch_response_decode_v6_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf);
let topic = b"flex-topic";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0); buf.put_i64(200); buf.put_i32(3); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetFetchResponse::decode_versioned(6, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name, "flex-topic");
assert_eq!(resp.topics[0].partitions[0].committed_offset, 200);
assert_eq!(resp.topics[0].partitions[0].committed_leader_epoch, 3);
}
#[test]
fn test_offset_fetch_response_decode_v8_batched() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf);
let group = b"my-group";
varint::encode_unsigned_varint(group.len() as u32 + 1, &mut buf);
buf.put_slice(group);
varint::encode_unsigned_varint(2, &mut buf);
let topic = b"t1";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0); buf.put_i64(500); buf.put_i32(2); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetFetchResponse::decode_versioned(8, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name, "t1");
assert_eq!(resp.topics[0].partitions[0].committed_offset, 500);
let mut buf2 = BytesMut::new();
buf2.put_i32(0); varint::encode_unsigned_varint(2, &mut buf2); let grp = b"g";
varint::encode_unsigned_varint(grp.len() as u32 + 1, &mut buf2);
buf2.put_slice(grp);
varint::encode_unsigned_varint(2, &mut buf2); let t = b"t";
varint::encode_unsigned_varint(t.len() as u32 + 1, &mut buf2);
buf2.put_slice(t);
varint::encode_unsigned_varint(2, &mut buf2); buf2.put_i32(0);
buf2.put_i64(1);
buf2.put_i32(0);
varint::encode_unsigned_varint(0, &mut buf2); buf2.put_i16(0); varint::encode_unsigned_varint(0, &mut buf2); varint::encode_unsigned_varint(0, &mut buf2); buf2.put_i16(0); varint::encode_unsigned_varint(0, &mut buf2); varint::encode_unsigned_varint(0, &mut buf2);
let resp2 = OffsetFetchResponse::decode_versioned(9, &mut buf2.freeze()).unwrap();
assert_eq!(resp2.topics[0].partitions[0].committed_offset, 1);
}
#[test]
fn test_offset_fetch_request_encode_v10_topic_id() {
let topic_id: [u8; 16] = [0xEE; 16];
let request = OffsetFetchRequest {
group_id: "g".to_string(),
topics: Some(vec![OffsetFetchRequestTopic {
name: String::new(),
topic_id: Some(topic_id),
partition_indexes: vec![0],
}]),
require_stable: true,
member_id: Some("m1".to_string()),
member_epoch: 3,
};
let mut buf = BytesMut::new();
request.encode_v10(&mut buf).unwrap();
let mut cur = &buf[..];
let groups_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(groups_varint, 2); let gid_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(gid_len, 2); cur.advance(1);
let mid_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(mid_len, 3); cur.advance(2);
assert_eq!(cur.get_i32(), 3); let topics_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(topics_varint, 2); let mut read_id = [0u8; 16];
cur.copy_to_slice(&mut read_id);
assert_eq!(read_id, topic_id);
}
#[test]
fn test_offset_fetch_response_decode_v10_topic_id() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); varint::encode_unsigned_varint(3, &mut buf); buf.put_slice(b"g1");
varint::encode_unsigned_varint(2, &mut buf); let topic_id: [u8; 16] = [0xFF; 16];
buf.put_slice(&topic_id);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); buf.put_i64(77); buf.put_i32(4); varint::encode_unsigned_varint(0, &mut buf); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetFetchResponse::decode_versioned(10, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].topic_id, Some(topic_id));
assert!(resp.topics[0].name.is_empty());
assert_eq!(resp.topics[0].partitions[0].committed_offset, 77);
assert_eq!(resp.topics[0].partitions[0].committed_leader_epoch, 4);
}
}