use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{array_len_i32, check_compact_array_len, check_decode_array_len};
#[derive(Debug, Clone)]
pub struct OffsetCommitRequestPartition {
pub partition_index: i32,
pub committed_offset: i64,
pub committed_leader_epoch: i32,
pub commit_timestamp: i64,
pub committed_metadata: Option<String>,
}
#[derive(Debug, Clone)]
pub struct OffsetCommitRequestTopic {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<OffsetCommitRequestPartition>,
}
#[derive(Debug, Clone)]
pub struct OffsetCommitRequest {
pub group_id: String,
pub generation_id: i32,
pub member_id: String,
pub group_instance_id: Option<String>,
pub retention_time_ms: i64,
pub topics: Vec<OffsetCommitRequestTopic>,
}
impl OffsetCommitRequest {
pub fn api_key() -> ApiKey {
ApiKey::OffsetCommit
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode(buf)?;
self.retention_time_ms.encode(buf);
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
}
}
Ok(())
}
pub fn encode_v5(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode(buf)?;
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
}
}
Ok(())
}
pub fn encode_v6(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode(buf)?;
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
partition.committed_leader_epoch.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
}
}
Ok(())
}
pub fn encode_v7(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode(buf)?;
match &self.group_instance_id {
Some(id) => KafkaString::new(id).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
partition.committed_leader_epoch.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
}
}
Ok(())
}
pub fn encode_v8(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode_compact(buf)?;
match &self.group_instance_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
let topics_len = u32::try_from(self.topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode_compact(buf)?;
let parts_len = u32::try_from(topic.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
partition.committed_leader_epoch.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v10(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
self.generation_id.encode(buf);
KafkaString::new(&self.member_id).try_encode_compact(buf)?;
match &self.group_instance_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
let topics_len = u32::try_from(self.topics.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("topics array too large"))?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
buf.put_slice(&topic.topic_id.unwrap_or([0u8; 16]));
let parts_len = u32::try_from(topic.partitions.len().saturating_add(1))
.map_err(|_| KrafkaError::protocol("partitions array too large"))?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.committed_offset.encode(buf);
partition.committed_leader_epoch.encode(buf);
match &partition.committed_metadata {
Some(m) => KafkaString::new(m).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct OffsetCommitResponsePartition {
pub partition_index: i32,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct OffsetCommitResponseTopic {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partitions: Vec<OffsetCommitResponsePartition>,
}
#[derive(Debug, Clone)]
pub struct OffsetCommitResponse {
pub throttle_time_ms: i32,
pub topics: Vec<OffsetCommitResponseTopic>,
}
impl OffsetCommitResponse {
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
Ok(Self {
throttle_time_ms: 0,
topics: Self::decode_topics(buf)?,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics: Self::decode_topics(buf)?,
})
}
pub fn decode_v8(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topics = Self::decode_topics_compact(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
})
}
pub fn decode_v10(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic_id UUID"));
}
let mut topic_id = [0u8; 16];
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(OffsetCommitResponsePartition {
partition_index,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(OffsetCommitResponseTopic {
name: String::new(),
topic_id: Some(topic_id),
partitions,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
})
}
fn decode_topics(buf: &mut impl Buf) -> Result<Vec<OffsetCommitResponseTopic>> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(OffsetCommitResponsePartition {
partition_index,
error_code,
});
}
topics.push(OffsetCommitResponseTopic {
name,
topic_id: None,
partitions,
});
}
Ok(topics)
}
pub fn all_success(&self) -> bool {
self.topics
.iter()
.flat_map(|t| t.partitions.iter())
.all(|p| p.error_code.is_ok())
}
fn decode_topics_compact(buf: &mut impl Buf) -> Result<Vec<OffsetCommitResponseTopic>> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(OffsetCommitResponsePartition {
partition_index,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(OffsetCommitResponseTopic {
name,
topic_id: None,
partitions,
});
}
Ok(topics)
}
}
impl VersionedEncode for OffsetCommitRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
2..=4 => self.encode_v2(buf)?,
5 => self.encode_v5(buf)?,
6 => self.encode_v6(buf)?,
7 => self.encode_v7(buf)?,
8..=9 => self.encode_v8(buf)?,
10 => self.encode_v10(buf)?,
_ => return unsupported_encode!("OffsetCommitRequest", version),
}
Ok(())
}
}
impl VersionedDecode for OffsetCommitResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
2 => Self::decode_v2(buf),
3..=7 => Self::decode_v3(buf),
8..=9 => Self::decode_v8(buf),
10 => Self::decode_v10(buf),
_ => unsupported_decode!("OffsetCommitResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
fn sample_offset_commit_request() -> OffsetCommitRequest {
OffsetCommitRequest {
group_id: "my-group".to_string(),
generation_id: 5,
member_id: "member-1".to_string(),
group_instance_id: Some("instance-x".to_string()),
retention_time_ms: 86_400_000,
topics: vec![OffsetCommitRequestTopic {
name: "orders".to_string(),
topic_id: None,
partitions: vec![
OffsetCommitRequestPartition {
partition_index: 0,
committed_offset: 42,
committed_leader_epoch: 3,
commit_timestamp: -1,
committed_metadata: Some("metadata".to_string()),
},
OffsetCommitRequestPartition {
partition_index: 1,
committed_offset: 100,
committed_leader_epoch: 5,
commit_timestamp: -1,
committed_metadata: None,
},
],
}],
}
}
#[test]
fn test_txn_offset_commit_v3_flexible() {
let request =
TxnOffsetCommitRequest::new("txn-1", "grp-1", 100, 0).add_offset("topic1", 0, 50, None);
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
let mut v3 = BytesMut::new();
request.encode_v3(&mut v3).unwrap();
assert_ne!(v2.freeze(), v3.clone().freeze());
let mut v5 = BytesMut::new();
request.encode_versioned(5, &mut v5).unwrap();
assert_eq!(v3.freeze(), v5.freeze());
}
#[test]
fn test_txn_offset_commit_v3_with_member_info() {
let mut request = TxnOffsetCommitRequest::new("txn-1", "grp-1", 100, 0).add_offset(
"topic1",
0,
50,
Some("meta".to_string()),
);
request.generation_id = 42;
request.member_id = "member-1".to_string();
request.group_instance_id = Some("instance-1".to_string());
let mut buf = BytesMut::new();
request.encode_v3(&mut buf).unwrap();
assert!(!buf.is_empty());
let default_request = TxnOffsetCommitRequest::new("txn-1", "grp-1", 100, 0).add_offset(
"topic1",
0,
50,
Some("meta".to_string()),
);
let mut buf2 = BytesMut::new();
default_request.encode_v3(&mut buf2).unwrap();
assert!(buf.len() > buf2.len());
}
#[test]
fn test_txn_offset_commit_response_v3_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(5); crate::util::varint::encode_unsigned_varint(2, &mut buf); let name = b"t1";
crate::util::varint::encode_unsigned_varint(name.len() as u32 + 1, &mut buf);
buf.put_slice(name);
crate::util::varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); buf.put_i16(0); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let resp = TxnOffsetCommitResponse::decode_v3(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 5);
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name, "t1");
assert!(resp.is_ok());
}
#[rstest]
#[case::v3(3)]
#[case::v4(4)]
#[case::v5(5)]
fn test_txn_offset_commit_response_v3_v5_decode(#[case] version: i16) {
let mut buf = BytesMut::new();
buf.put_i32(0); crate::util::varint::encode_unsigned_varint(2, &mut buf); let name = b"t1";
crate::util::varint::encode_unsigned_varint(name.len() as u32 + 1, &mut buf);
buf.put_slice(name);
crate::util::varint::encode_unsigned_varint(1, &mut buf); buf.put_u8(0); buf.put_u8(0); let resp = TxnOffsetCommitResponse::decode_versioned(version, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name, "t1");
assert!(resp.topics[0].partitions.is_empty());
}
#[test]
fn test_txn_offset_commit_request() {
let request = TxnOffsetCommitRequest::new("my-txn", "my-group", 12345, 0)
.add_offset("topic1", 0, 100, Some("metadata".to_string()))
.add_offset("topic1", 1, 200, None);
assert_eq!(request.transactional_id, "my-txn");
assert_eq!(request.group_id, "my-group");
assert_eq!(request.topics.len(), 1);
assert_eq!(request.topics[0].partitions.len(), 2);
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_txn_offset_commit_v2_dispatch() {
let request =
TxnOffsetCommitRequest::new("txn-1", "grp-1", 100, 0).add_offset("topic1", 0, 50, None);
let mut buf = BytesMut::new();
request.encode_versioned(2, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf_v0 = BytesMut::new();
request.encode_versioned(0, &mut buf_v0).unwrap();
assert!(buf.len() > buf_v0.len());
}
#[test]
fn test_txn_offset_commit_response_decode_v2() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i32(1);
let topic = b"t1";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0);
buf.put_i16(0);
let resp = TxnOffsetCommitResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert_eq!(resp.topics.len(), 1);
assert!(resp.is_ok());
}
#[test]
fn test_offset_commit_request_v2_encode() {
let request = OffsetCommitRequest {
group_id: "grp".to_string(),
generation_id: 1,
member_id: "m1".to_string(),
group_instance_id: None,
retention_time_ms: 86_400_000,
topics: vec![OffsetCommitRequestTopic {
name: "t".to_string(),
topic_id: None,
partitions: vec![OffsetCommitRequestPartition {
partition_index: 0,
committed_offset: 100,
committed_leader_epoch: -1,
commit_timestamp: -1,
committed_metadata: Some("meta".to_string()),
}],
}],
};
let mut buf = BytesMut::new();
request.encode_versioned(2, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_offset_commit_request_below_min_rejected() {
let request = OffsetCommitRequest {
group_id: "g".to_string(),
generation_id: 0,
member_id: String::new(),
group_instance_id: None,
retention_time_ms: -1,
topics: vec![],
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
let mut buf2 = BytesMut::new();
assert!(request.encode_versioned(1, &mut buf2).is_err());
}
#[test]
fn test_offset_commit_response_decode_v2() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"t";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0); buf.put_i16(0);
let resp = OffsetCommitResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].partitions[0].error_code, ErrorCode::None);
}
#[rstest]
#[case::oc_v0(0)]
#[case::oc_v1(1)]
fn test_offset_commit_encode_below_min(#[case] version: i16) {
let request = OffsetCommitRequest {
group_id: "g".to_string(),
generation_id: 0,
member_id: String::new(),
group_instance_id: None,
retention_time_ms: -1,
topics: vec![],
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[rstest]
#[case::v2(2)]
#[case::v4(4)]
fn test_offset_commit_request_v2_to_v4_has_retention_time(#[case] version: i16) {
let request = sample_offset_commit_request();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_v2(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_offset_commit_request_v5_drops_retention_time() {
let request = sample_offset_commit_request();
let mut buf_v2 = BytesMut::new();
request.encode_versioned(2, &mut buf_v2).unwrap();
let mut buf_v5 = BytesMut::new();
request.encode_versioned(5, &mut buf_v5).unwrap();
assert!(
buf_v5.len() < buf_v2.len(),
"v5 should be shorter (no retention_time_ms)"
);
}
#[test]
fn test_offset_commit_request_v6_has_leader_epoch() {
let request = sample_offset_commit_request();
let mut buf_v5 = BytesMut::new();
request.encode_versioned(5, &mut buf_v5).unwrap();
let mut buf_v6 = BytesMut::new();
request.encode_versioned(6, &mut buf_v6).unwrap();
assert!(
buf_v6.len() > buf_v5.len(),
"v6 should be longer (committed_leader_epoch)"
);
}
#[test]
fn test_offset_commit_request_v8_flexible() {
let request = sample_offset_commit_request();
let mut buf_v7 = BytesMut::new();
request.encode_versioned(7, &mut buf_v7).unwrap();
let mut buf_v8 = BytesMut::new();
request.encode_versioned(8, &mut buf_v8).unwrap();
assert_ne!(
buf_v7.as_ref(),
buf_v8.as_ref(),
"v8 flexible should differ from v7"
);
}
#[rstest]
#[case::v8(8)]
#[case::v9(9)]
fn test_offset_commit_request_v8_v9_wire_identical(#[case] version: i16) {
let request = sample_offset_commit_request();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
let mut buf_v8 = BytesMut::new();
request.encode_v8(&mut buf_v8).unwrap();
assert_eq!(buf, buf_v8, "v8 and v9 should be wire-identical");
}
#[test]
fn test_offset_commit_response_decode_v2_wire_format() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"orders";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0); buf.put_i16(0);
let resp = OffsetCommitResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name, "orders");
assert_eq!(resp.topics[0].partitions[0].partition_index, 0);
assert!(resp.topics[0].partitions[0].error_code.is_ok());
assert_eq!(resp.throttle_time_ms, 0);
}
#[test]
fn test_offset_commit_response_decode_v3_throttle() {
let mut buf = BytesMut::new();
buf.put_i32(200); buf.put_i32(1); let topic = b"t";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1); buf.put_i32(0); buf.put_i16(0);
let resp = OffsetCommitResponse::decode_versioned(3, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 200);
assert_eq!(resp.topics[0].name, "t");
}
#[test]
fn test_offset_commit_response_decode_v8_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(100); varint::encode_unsigned_varint(2, &mut buf);
let topic = b"committed-topic";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(3); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetCommitResponse::decode_versioned(8, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.topics[0].name, "committed-topic");
assert_eq!(resp.topics[0].partitions[0].partition_index, 3);
assert!(resp.topics[0].partitions[0].error_code.is_ok());
}
#[test]
fn test_offset_commit_response_v9_decodes_same_as_v8() {
let mut buf = BytesMut::new();
buf.put_i32(0);
varint::encode_unsigned_varint(2, &mut buf);
let topic = b"t1";
varint::encode_unsigned_varint(3, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0);
buf.put_i16(0);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetCommitResponse::decode_versioned(9, &mut buf.freeze()).unwrap();
assert!(resp.topics[0].partitions[0].error_code.is_ok());
}
#[test]
fn test_offset_commit_request_encode_v10_topic_id() {
let topic_id: [u8; 16] = [0xCC; 16];
let request = OffsetCommitRequest {
group_id: "grp".to_string(),
generation_id: 1,
member_id: "m".to_string(),
group_instance_id: None,
retention_time_ms: -1,
topics: vec![OffsetCommitRequestTopic {
name: String::new(),
topic_id: Some(topic_id),
partitions: vec![OffsetCommitRequestPartition {
partition_index: 0,
committed_offset: 50,
committed_leader_epoch: 2,
commit_timestamp: -1,
committed_metadata: None,
}],
}],
};
let mut buf = BytesMut::new();
request.encode_v10(&mut buf).unwrap();
let mut cur = &buf[..];
let gid_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(gid_len, 4); let mut gid_bytes = vec![0u8; 3];
cur.copy_to_slice(&mut gid_bytes);
assert_eq!(&gid_bytes, b"grp");
assert_eq!(cur.get_i32(), 1); let mid_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(mid_len, 2); cur.advance(1);
let gii_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(gii_len, 0);
let topic_count = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(topic_count, 2); let mut read_id = [0u8; 16];
cur.copy_to_slice(&mut read_id);
assert_eq!(read_id, topic_id);
}
#[test]
fn test_offset_commit_response_decode_v10_topic_id() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); let topic_id: [u8; 16] = [0xDD; 16];
buf.put_slice(&topic_id);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = OffsetCommitResponse::decode_versioned(10, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].topic_id, Some(topic_id));
assert!(resp.topics[0].name.is_empty());
}
}