use bytes::{Buf, BufMut, Bytes};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, KrafkaError, ProtocolErrorKind, Result};
use crate::protocol::primitives::{
Decode, Encode, KafkaBytes, KafkaString, TaggedFields, TryEncode,
};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone)]
pub struct ShareGroupHeartbeatRequest {
pub group_id: String,
pub member_id: String,
pub member_epoch: i32,
pub rack_id: Option<String>,
pub subscribed_topic_names: Option<Vec<String>>,
}
impl ShareGroupHeartbeatRequest {
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
KafkaString::new(&self.member_id).try_encode_compact(buf)?;
self.member_epoch.encode(buf);
KafkaString(self.rack_id.clone()).try_encode_compact(buf)?;
if let Some(ref topics) = self.subscribed_topic_names {
let len = u32::try_from(topics.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"subscribed topics array too large",
)
})?;
crate::util::varint::encode_unsigned_varint(len, buf);
for topic in topics {
KafkaString::new(topic).try_encode_compact(buf)?;
}
} else {
crate::util::varint::encode_unsigned_varint(0, buf);
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupHeartbeatResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub member_id: Option<String>,
pub member_epoch: i32,
pub heartbeat_interval_ms: i32,
pub assignment: Option<Vec<ShareGroupTopicPartitions>>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupTopicPartitions {
pub topic_id: [u8; 16],
pub partitions: Vec<i32>,
}
impl ShareGroupHeartbeatResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let member_id = KafkaString::decode_compact(buf)?.0;
let member_epoch = i32::decode(buf)?;
let heartbeat_interval_ms = i32::decode(buf)?;
let assignment = Self::decode_assignment(buf)?;
let _ = TaggedFields::decode(buf)?; Ok(Self {
throttle_time_ms,
error_code,
error_message,
member_id,
member_epoch,
heartbeat_interval_ms,
assignment,
})
}
fn decode_assignment(buf: &mut impl Buf) -> Result<Option<Vec<ShareGroupTopicPartitions>>> {
if buf.remaining() < 1 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for assignment presence tag",
));
}
let presence = buf.get_i8();
if presence < 0 {
return Ok(None);
}
if presence != 1 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::Malformed,
format!(
"invalid assignment presence tag: expected negative for null or 1 for present, got {presence}"
),
));
}
let tp_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topic_partitions = Vec::with_capacity(tp_count);
for _ in 0..tp_count {
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for topic_id UUID",
));
}
buf.copy_to_slice(&mut topic_id);
let p_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(p_count);
for _ in 0..p_count {
partitions.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
topic_partitions.push(ShareGroupTopicPartitions {
topic_id,
partitions,
});
}
let _ = TaggedFields::decode(buf)?; Ok(Some(topic_partitions))
}
}
#[derive(Debug, Clone)]
pub struct ShareGroupDescribeRequest {
pub group_ids: Vec<String>,
pub include_authorized_operations: bool,
}
impl ShareGroupDescribeRequest {
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
let len = u32::try_from(self.group_ids.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"group_ids array too large",
)
})?;
crate::util::varint::encode_unsigned_varint(len, buf);
for gid in &self.group_ids {
KafkaString::new(gid).try_encode_compact(buf)?;
}
buf.put_u8(u8::from(self.include_authorized_operations));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupDescribeResponse {
pub throttle_time_ms: i32,
pub groups: Vec<ShareGroupDescription>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupDescription {
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub group_id: String,
pub group_state: String,
pub group_epoch: i32,
pub assignment_epoch: i32,
pub assignor_name: String,
pub members: Vec<ShareGroupMember>,
pub authorized_operations: i32,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupMember {
pub member_id: String,
pub rack_id: Option<String>,
pub member_epoch: i32,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: Vec<String>,
pub assignment: Vec<ShareGroupDescribeTopicPartitions>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareGroupDescribeTopicPartitions {
pub topic_id: [u8; 16],
pub topic_name: String,
pub partitions: Vec<i32>,
}
impl ShareGroupDescribeResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let group_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut groups = Vec::with_capacity(group_count);
for _ in 0..group_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let group_id =
super::non_nullable_string("group_id", KafkaString::decode_compact(buf)?.0)?;
let group_state =
super::non_nullable_string("group_state", KafkaString::decode_compact(buf)?.0)?;
let group_epoch = i32::decode(buf)?;
let assignment_epoch = i32::decode(buf)?;
let assignor_name =
super::non_nullable_string("assignor_name", KafkaString::decode_compact(buf)?.0)?;
let member_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut members = Vec::with_capacity(member_count);
for _ in 0..member_count {
let member_id =
super::non_nullable_string("member_id", KafkaString::decode_compact(buf)?.0)?;
let rack_id = KafkaString::decode_compact(buf)?.0;
let member_epoch = i32::decode(buf)?;
let client_id =
super::non_nullable_string("client_id", KafkaString::decode_compact(buf)?.0)?;
let client_host =
super::non_nullable_string("client_host", KafkaString::decode_compact(buf)?.0)?;
let stn_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut subscribed_topic_names = Vec::with_capacity(stn_count);
for _ in 0..stn_count {
subscribed_topic_names.push(super::non_nullable_string(
"subscribed_topic_name",
KafkaString::decode_compact(buf)?.0,
)?);
}
let tp_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut assignment = Vec::with_capacity(tp_count);
for _ in 0..tp_count {
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for topic_id",
));
}
buf.copy_to_slice(&mut topic_id);
let topic_name = super::non_nullable_string(
"topic_name",
KafkaString::decode_compact(buf)?.0,
)?;
let p_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(p_count);
for _ in 0..p_count {
partitions.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
assignment.push(ShareGroupDescribeTopicPartitions {
topic_id,
topic_name,
partitions,
});
}
let _ = TaggedFields::decode(buf)?; let _ = TaggedFields::decode(buf)?; members.push(ShareGroupMember {
member_id,
rack_id,
member_epoch,
client_id,
client_host,
subscribed_topic_names,
assignment,
});
}
let authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?; groups.push(ShareGroupDescription {
error_code,
error_message,
group_id,
group_state,
group_epoch,
assignment_epoch,
assignor_name,
members,
authorized_operations,
});
}
let _ = TaggedFields::decode(buf)?; Ok(Self {
throttle_time_ms,
groups,
})
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareAcknowledgementBatch {
pub first_offset: i64,
pub last_offset: i64,
pub acknowledge_types: Vec<i8>,
}
#[derive(Debug, Clone)]
pub struct ShareFetchRequest {
pub group_id: Option<String>,
pub member_id: Option<String>,
pub share_session_epoch: i32,
pub max_wait_ms: i32,
pub min_bytes: i32,
pub max_bytes: i32,
pub max_records: i32,
pub batch_size: i32,
pub topics: Vec<ShareFetchTopic>,
pub forgotten_topics: Vec<ShareFetchForgottenTopic>,
}
#[derive(Debug, Clone)]
pub struct ShareFetchTopic {
pub topic_id: [u8; 16],
pub partitions: Vec<ShareFetchPartition>,
}
#[derive(Debug, Clone)]
pub struct ShareFetchPartition {
pub partition_index: i32,
pub acknowledgement_batches: Vec<ShareAcknowledgementBatch>,
}
#[derive(Debug, Clone)]
pub struct ShareFetchForgottenTopic {
pub topic_id: [u8; 16],
pub partitions: Vec<i32>,
}
impl ShareFetchRequest {
fn encode_topics_and_forgotten(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
buf.put_slice(&topic.topic_id);
encode_compact_array_len(topic.partitions.len(), buf)?;
for partition in &topic.partitions {
partition.partition_index.encode(buf);
encode_compact_array_len(partition.acknowledgement_batches.len(), buf)?;
for batch in &partition.acknowledgement_batches {
batch.first_offset.encode(buf);
batch.last_offset.encode(buf);
encode_compact_array_len(batch.acknowledge_types.len(), buf)?;
for &t in &batch.acknowledge_types {
t.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
encode_compact_array_len(self.forgotten_topics.len(), buf)?;
for forgotten in &self.forgotten_topics {
buf.put_slice(&forgotten.topic_id);
encode_compact_array_len(forgotten.partitions.len(), buf)?;
for &p in &forgotten.partitions {
p.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString(self.group_id.clone()).try_encode_compact(buf)?;
KafkaString(self.member_id.clone()).try_encode_compact(buf)?;
self.share_session_epoch.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.max_records.encode(buf);
self.batch_size.encode(buf);
self.encode_topics_and_forgotten(buf)
}
pub fn encode_v2(
&self,
buf: &mut impl BufMut,
share_acquire_mode: i8,
is_renew_ack: bool,
) -> Result<()> {
KafkaString(self.group_id.clone()).try_encode_compact(buf)?;
KafkaString(self.member_id.clone()).try_encode_compact(buf)?;
self.share_session_epoch.encode(buf);
self.max_wait_ms.encode(buf);
self.min_bytes.encode(buf);
self.max_bytes.encode(buf);
self.max_records.encode(buf);
self.batch_size.encode(buf);
share_acquire_mode.encode(buf);
buf.put_u8(u8::from(is_renew_ack));
self.encode_topics_and_forgotten(buf)
}
}
#[derive(Debug, Clone, Default)]
pub struct ShareLeaderIdAndEpoch {
pub leader_id: i32,
pub leader_epoch: i32,
}
#[derive(Debug, Clone)]
pub struct ShareAcquiredRecords {
pub first_offset: i64,
pub last_offset: i64,
pub delivery_count: i16,
}
#[derive(Debug, Clone)]
pub struct ShareNodeEndpoint {
pub node_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareFetchResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub acquisition_lock_timeout_ms: i32,
pub responses: Vec<ShareFetchTopicResponse>,
pub node_endpoints: Vec<ShareNodeEndpoint>,
}
#[derive(Debug, Clone)]
pub struct ShareFetchTopicResponse {
pub topic_id: [u8; 16],
pub partitions: Vec<ShareFetchPartitionResponse>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareFetchPartitionResponse {
pub partition_index: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub acknowledge_error_code: ErrorCode,
pub acknowledge_error_message: Option<String>,
pub current_leader: ShareLeaderIdAndEpoch,
pub records: Option<Bytes>,
pub acquired_records: Vec<ShareAcquiredRecords>,
}
impl ShareFetchResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let acquisition_lock_timeout_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for topic_id UUID",
));
}
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let part_error = ErrorCode::from_i16(i16::decode(buf)?);
let part_error_message = KafkaString::decode_compact(buf)?.0;
let ack_error = ErrorCode::from_i16(i16::decode(buf)?);
let ack_error_message = KafkaString::decode_compact(buf)?.0;
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?; let records = KafkaBytes::decode_compact(buf)?.0;
let ar_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut acquired_records = Vec::with_capacity(ar_count);
for _ in 0..ar_count {
acquired_records.push(ShareAcquiredRecords {
first_offset: i64::decode(buf)?,
last_offset: i64::decode(buf)?,
delivery_count: i16::decode(buf)?,
});
let _ = TaggedFields::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?; partitions.push(ShareFetchPartitionResponse {
partition_index,
error_code: part_error,
error_message: part_error_message,
acknowledge_error_code: ack_error,
acknowledge_error_message: ack_error_message,
current_leader: ShareLeaderIdAndEpoch {
leader_id,
leader_epoch,
},
records,
acquired_records,
});
}
let _ = TaggedFields::decode(buf)?; responses.push(ShareFetchTopicResponse {
topic_id,
partitions,
});
}
let ne_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut node_endpoints = Vec::with_capacity(ne_count);
for _ in 0..ne_count {
let node_id = i32::decode(buf)?;
let host = super::non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
node_endpoints.push(ShareNodeEndpoint {
node_id,
host,
port,
rack,
});
}
let _ = TaggedFields::decode(buf)?; Ok(Self {
throttle_time_ms,
error_code,
error_message,
acquisition_lock_timeout_ms,
responses,
node_endpoints,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v1(buf)
}
}
#[derive(Debug, Clone)]
pub struct ShareAcknowledgeRequest {
pub group_id: Option<String>,
pub member_id: Option<String>,
pub share_session_epoch: i32,
pub topics: Vec<ShareAcknowledgeTopic>,
}
#[derive(Debug, Clone)]
pub struct ShareAcknowledgeTopic {
pub topic_id: [u8; 16],
pub partitions: Vec<ShareAcknowledgePartition>,
}
#[derive(Debug, Clone)]
pub struct ShareAcknowledgePartition {
pub partition_index: i32,
pub acknowledgement_batches: Vec<ShareAcknowledgementBatch>,
}
impl ShareAcknowledgeRequest {
fn encode_topics(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
buf.put_slice(&topic.topic_id);
encode_compact_array_len(topic.partitions.len(), buf)?;
for partition in &topic.partitions {
partition.partition_index.encode(buf);
encode_compact_array_len(partition.acknowledgement_batches.len(), buf)?;
for batch in &partition.acknowledgement_batches {
batch.first_offset.encode(buf);
batch.last_offset.encode(buf);
encode_compact_array_len(batch.acknowledge_types.len(), buf)?;
for &t in &batch.acknowledge_types {
t.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString(self.group_id.clone()).try_encode_compact(buf)?;
KafkaString(self.member_id.clone()).try_encode_compact(buf)?;
self.share_session_epoch.encode(buf);
self.encode_topics(buf)
}
pub fn encode_v2(&self, buf: &mut impl BufMut, is_renew_ack: bool) -> Result<()> {
KafkaString(self.group_id.clone()).try_encode_compact(buf)?;
KafkaString(self.member_id.clone()).try_encode_compact(buf)?;
self.share_session_epoch.encode(buf);
buf.put_u8(u8::from(is_renew_ack));
self.encode_topics(buf)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareAcknowledgeResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub acquisition_lock_timeout_ms: i32,
pub responses: Vec<ShareAcknowledgeTopicResponse>,
pub node_endpoints: Vec<ShareNodeEndpoint>,
}
#[derive(Debug, Clone)]
pub struct ShareAcknowledgeTopicResponse {
pub topic_id: [u8; 16],
pub partitions: Vec<ShareAcknowledgePartitionResponse>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShareAcknowledgePartitionResponse {
pub partition_index: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub current_leader: ShareLeaderIdAndEpoch,
}
impl ShareAcknowledgeResponse {
fn decode_responses_and_endpoints(
buf: &mut impl Buf,
) -> Result<(Vec<ShareAcknowledgeTopicResponse>, Vec<ShareNodeEndpoint>)> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for topic_id UUID",
));
}
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let partition_index = i32::decode(buf)?;
let part_error = ErrorCode::from_i16(i16::decode(buf)?);
let part_error_message = KafkaString::decode_compact(buf)?.0;
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?; let _ = TaggedFields::decode(buf)?; partitions.push(ShareAcknowledgePartitionResponse {
partition_index,
error_code: part_error,
error_message: part_error_message,
current_leader: ShareLeaderIdAndEpoch {
leader_id,
leader_epoch,
},
});
}
let _ = TaggedFields::decode(buf)?; responses.push(ShareAcknowledgeTopicResponse {
topic_id,
partitions,
});
}
let ne_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut node_endpoints = Vec::with_capacity(ne_count);
for _ in 0..ne_count {
let node_id = i32::decode(buf)?;
let host = super::non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
node_endpoints.push(ShareNodeEndpoint {
node_id,
host,
port,
rack,
});
}
let _ = TaggedFields::decode(buf)?; Ok((responses, node_endpoints))
}
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let (responses, node_endpoints) = Self::decode_responses_and_endpoints(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
acquisition_lock_timeout_ms: -1,
responses,
node_endpoints,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let acquisition_lock_timeout_ms = i32::decode(buf)?;
let (responses, node_endpoints) = Self::decode_responses_and_endpoints(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
acquisition_lock_timeout_ms,
responses,
node_endpoints,
})
}
}
impl VersionedEncode for ShareGroupHeartbeatRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
_ => return unsupported_encode!("ShareGroupHeartbeatRequest", version),
}
Ok(())
}
}
impl VersionedDecode for ShareGroupHeartbeatResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
_ => unsupported_decode!("ShareGroupHeartbeatResponse", version),
}
}
}
impl VersionedEncode for ShareGroupDescribeRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
_ => return unsupported_encode!("ShareGroupDescribeRequest", version),
}
Ok(())
}
}
impl VersionedDecode for ShareGroupDescribeResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
_ => unsupported_decode!("ShareGroupDescribeResponse", version),
}
}
}
impl VersionedDecode for ShareFetchResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("ShareFetchResponse", version),
}
}
}
impl VersionedDecode for ShareAcknowledgeResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("ShareAcknowledgeResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
#[test]
fn test_share_group_heartbeat_encode_v1() {
let req = ShareGroupHeartbeatRequest {
group_id: "sg-1".to_string(),
member_id: "m-1".to_string(),
member_epoch: 0,
rack_id: Some("us-east-1a".to_string()),
subscribed_topic_names: Some(vec!["topic-a".to_string()]),
};
let mut buf = BytesMut::new();
req.encode_versioned(1, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_share_group_heartbeat_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(10); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(4, &mut buf);
buf.put_slice(b"m-1");
buf.put_i32(1); buf.put_i32(5000); buf.put_i8(-1); varint::encode_unsigned_varint(0, &mut buf);
let resp = ShareGroupHeartbeatResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
assert_eq!(resp.member_id.as_deref(), Some("m-1"));
assert_eq!(resp.member_epoch, 1);
assert_eq!(resp.heartbeat_interval_ms, 5000);
assert!(resp.assignment.is_none());
}
#[test]
fn test_share_group_describe_encode_v1() {
let req = ShareGroupDescribeRequest {
group_ids: vec!["sg-1".to_string(), "sg-2".to_string()],
include_authorized_operations: true,
};
let mut buf = BytesMut::new();
req.encode_versioned(1, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_share_fetch_encode_v1_and_v2() {
let req = ShareFetchRequest {
group_id: Some("sg-1".to_string()),
member_id: Some("m-1".to_string()),
share_session_epoch: 0,
max_wait_ms: 500,
min_bytes: 1,
max_bytes: 1_048_576,
max_records: 100,
batch_size: 50,
topics: vec![ShareFetchTopic {
topic_id: [4u8; 16],
partitions: vec![ShareFetchPartition {
partition_index: 0,
acknowledgement_batches: vec![],
}],
}],
forgotten_topics: vec![],
};
let mut buf_v1 = BytesMut::new();
req.encode_v1(&mut buf_v1).unwrap();
assert!(!buf_v1.is_empty());
let mut buf_v2 = BytesMut::new();
req.encode_v2(&mut buf_v2, 0, false).unwrap();
assert!(
buf_v2.len() > buf_v1.len(),
"v2 should be longer than v1 due to share_acquire_mode + is_renew_ack"
);
}
#[test]
fn test_share_acknowledge_encode_v1_and_v2() {
let req = ShareAcknowledgeRequest {
group_id: Some("sg-1".to_string()),
member_id: Some("m-1".to_string()),
share_session_epoch: 0,
topics: vec![ShareAcknowledgeTopic {
topic_id: [5u8; 16],
partitions: vec![ShareAcknowledgePartition {
partition_index: 0,
acknowledgement_batches: vec![ShareAcknowledgementBatch {
first_offset: 0,
last_offset: 99,
acknowledge_types: vec![1; 100], }],
}],
}],
};
let mut buf_v1 = BytesMut::new();
req.encode_v1(&mut buf_v1).unwrap();
assert!(!buf_v1.is_empty());
let mut buf_v2 = BytesMut::new();
req.encode_v2(&mut buf_v2, true).unwrap();
assert!(
buf_v2.len() > buf_v1.len(),
"v2 should be longer than v1 due to is_renew_ack"
);
}
#[test]
fn test_share_acknowledge_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_slice(&[6u8; 16]); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(1); buf.put_i32(5); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let resp = ShareAcknowledgeResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.acquisition_lock_timeout_ms, -1);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].partitions[0].current_leader.leader_id, 1);
assert_eq!(
resp.responses[0].partitions[0].current_leader.leader_epoch,
5
);
}
#[test]
fn test_share_acknowledge_response_decode_v2() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(30_000); varint::encode_unsigned_varint(2, &mut buf);
buf.put_slice(&[6u8; 16]); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(1); buf.put_i32(5); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let resp = ShareAcknowledgeResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert_eq!(resp.acquisition_lock_timeout_ms, 30_000);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].partitions[0].current_leader.leader_id, 1);
}
}