use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::primitives::{Decode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone)]
pub struct ConsumerGroupDescribeRequest {
pub group_ids: Vec<String>,
pub include_authorized_operations: bool,
}
impl ConsumerGroupDescribeRequest {
pub fn new(group_ids: Vec<String>) -> Self {
Self {
group_ids,
include_authorized_operations: false,
}
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.group_ids.len(), buf)?;
for id in &self.group_ids {
KafkaString::new(id).try_encode_compact(buf)?;
}
buf.put_u8(u8::from(self.include_authorized_operations));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DescribeGroupTopicPartition {
pub topic_id: [u8; 16],
pub topic_name: String,
pub partitions: Vec<i32>,
}
#[derive(Debug, Clone)]
pub struct DescribeGroupAssignment {
pub topic_partitions: Vec<DescribeGroupTopicPartition>,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupDescribeMember {
pub member_id: String,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub member_epoch: i32,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: Vec<String>,
pub subscribed_topic_regex: Option<String>,
pub assignment: DescribeGroupAssignment,
pub target_assignment: DescribeGroupAssignment,
pub member_type: i8,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupDescribeGroup {
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub group_id: String,
pub group_state: String,
pub group_epoch: i32,
pub assignment_epoch: i32,
pub assignor_name: String,
pub members: Vec<ConsumerGroupDescribeMember>,
pub authorized_operations: i32,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupDescribeResponse {
pub throttle_time_ms: i32,
pub groups: Vec<ConsumerGroupDescribeGroup>,
}
impl ConsumerGroupDescribeResponse {
fn decode_assignment(buf: &mut impl Buf) -> Result<DescribeGroupAssignment> {
let tp_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topic_partitions = Vec::with_capacity(tp_count);
for _ in 0..tp_count {
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("short buf for topic_id"));
}
buf.copy_to_slice(&mut topic_id);
let topic_name =
non_nullable_string("topic_name", KafkaString::decode_compact(buf)?.0)?;
let partition_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
partitions.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
topic_partitions.push(DescribeGroupTopicPartition {
topic_id,
topic_name,
partitions,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(DescribeGroupAssignment { topic_partitions })
}
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let group_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut groups = Vec::with_capacity(group_count);
for _ in 0..group_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let group_id = non_nullable_string("group_id", KafkaString::decode_compact(buf)?.0)?;
let group_state =
non_nullable_string("group_state", KafkaString::decode_compact(buf)?.0)?;
let group_epoch = i32::decode(buf)?;
let assignment_epoch = i32::decode(buf)?;
let assignor_name =
non_nullable_string("assignor_name", KafkaString::decode_compact(buf)?.0)?;
let member_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut members = Vec::with_capacity(member_count);
for _ in 0..member_count {
let member_id =
non_nullable_string("member_id", KafkaString::decode_compact(buf)?.0)?;
let instance_id = KafkaString::decode_compact(buf)?.0;
let rack_id = KafkaString::decode_compact(buf)?.0;
let member_epoch = i32::decode(buf)?;
let client_id =
non_nullable_string("client_id", KafkaString::decode_compact(buf)?.0)?;
let client_host =
non_nullable_string("client_host", KafkaString::decode_compact(buf)?.0)?;
let sub_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut subscribed_topic_names = Vec::with_capacity(sub_count);
for _ in 0..sub_count {
subscribed_topic_names.push(non_nullable_string(
"subscribed_topic",
KafkaString::decode_compact(buf)?.0,
)?);
}
let subscribed_topic_regex = KafkaString::decode_compact(buf)?.0;
let assignment = Self::decode_assignment(buf)?;
let target_assignment = Self::decode_assignment(buf)?;
let _ = TaggedFields::decode(buf)?;
members.push(ConsumerGroupDescribeMember {
member_id,
instance_id,
rack_id,
member_epoch,
client_id,
client_host,
subscribed_topic_names,
subscribed_topic_regex,
assignment,
target_assignment,
member_type: -1,
});
}
let authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
groups.push(ConsumerGroupDescribeGroup {
error_code,
error_message,
group_id,
group_state,
group_epoch,
assignment_epoch,
assignor_name,
members,
authorized_operations,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
groups,
})
}
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let group_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut groups = Vec::with_capacity(group_count);
for _ in 0..group_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let group_id = non_nullable_string("group_id", KafkaString::decode_compact(buf)?.0)?;
let group_state =
non_nullable_string("group_state", KafkaString::decode_compact(buf)?.0)?;
let group_epoch = i32::decode(buf)?;
let assignment_epoch = i32::decode(buf)?;
let assignor_name =
non_nullable_string("assignor_name", KafkaString::decode_compact(buf)?.0)?;
let member_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut members = Vec::with_capacity(member_count);
for _ in 0..member_count {
let member_id =
non_nullable_string("member_id", KafkaString::decode_compact(buf)?.0)?;
let instance_id = KafkaString::decode_compact(buf)?.0;
let rack_id = KafkaString::decode_compact(buf)?.0;
let member_epoch = i32::decode(buf)?;
let client_id =
non_nullable_string("client_id", KafkaString::decode_compact(buf)?.0)?;
let client_host =
non_nullable_string("client_host", KafkaString::decode_compact(buf)?.0)?;
let sub_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut subscribed_topic_names = Vec::with_capacity(sub_count);
for _ in 0..sub_count {
subscribed_topic_names.push(non_nullable_string(
"subscribed_topic",
KafkaString::decode_compact(buf)?.0,
)?);
}
let subscribed_topic_regex = KafkaString::decode_compact(buf)?.0;
let assignment = Self::decode_assignment(buf)?;
let target_assignment = Self::decode_assignment(buf)?;
let member_type = i8::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
members.push(ConsumerGroupDescribeMember {
member_id,
instance_id,
rack_id,
member_epoch,
client_id,
client_host,
subscribed_topic_names,
subscribed_topic_regex,
assignment,
target_assignment,
member_type,
});
}
let authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
groups.push(ConsumerGroupDescribeGroup {
error_code,
error_message,
group_id,
group_state,
group_epoch,
assignment_epoch,
assignor_name,
members,
authorized_operations,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
groups,
})
}
}
impl VersionedEncode for ConsumerGroupDescribeRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 | 1 => self.encode_v0(buf)?,
_ => return unsupported_encode!("ConsumerGroupDescribeRequest", version),
}
Ok(())
}
}
impl VersionedDecode for ConsumerGroupDescribeResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
1 => Self::decode_v1(buf),
_ => unsupported_decode!("ConsumerGroupDescribeResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
fn put_compact_string(buf: &mut BytesMut, s: Option<&str>) {
match s {
Some(val) => {
buf.put_u8((val.len() + 1) as u8);
buf.put_slice(val.as_bytes());
}
None => buf.put_u8(0),
}
}
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_consumer_group_describe_request_encode_v0() {
let req = ConsumerGroupDescribeRequest::new(vec!["g1".to_string()]);
let mut buf = BytesMut::new();
req.encode_v0(&mut buf).unwrap();
let mut cur = &buf[..];
let arr = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(arr, 2); let id_len = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(id_len, 3); let mut id_bytes = vec![0u8; 2];
cur.copy_to_slice(&mut id_bytes);
assert_eq!(id_bytes, b"g1");
assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_consumer_group_describe_response_decode_v0_empty_group() {
let mut buf = BytesMut::new();
buf.put_i32(5); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("g1")); put_compact_string(&mut buf, Some("Stable")); buf.put_i32(1); buf.put_i32(1); put_compact_string(&mut buf, Some("range")); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(0); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = ConsumerGroupDescribeResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 5);
assert_eq!(resp.groups.len(), 1);
let g = &resp.groups[0];
assert!(g.error_code.is_ok());
assert_eq!(g.group_id, "g1");
assert_eq!(g.group_state, "Stable");
assert_eq!(g.group_epoch, 1);
assert_eq!(g.assignor_name, "range");
assert!(g.members.is_empty());
}
#[test]
fn test_consumer_group_describe_response_decode_v0_with_member() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("g")); put_compact_string(&mut buf, Some("S")); buf.put_i32(2); buf.put_i32(2); put_compact_string(&mut buf, Some("a")); varint::encode_unsigned_varint(2, &mut buf); put_compact_string(&mut buf, Some("m1")); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("rack0")); buf.put_i32(3); put_compact_string(&mut buf, Some("client")); put_compact_string(&mut buf, Some("/127.0.0.1")); varint::encode_unsigned_varint(2, &mut buf); put_compact_string(&mut buf, Some("tp1"));
put_compact_string(&mut buf, None); varint::encode_unsigned_varint(2, &mut buf); buf.put_slice(&[0u8; 16]); put_compact_string(&mut buf, Some("tp1")); varint::encode_unsigned_varint(3, &mut buf); buf.put_i32(0);
buf.put_i32(1);
put_tagged_fields(&mut buf); put_tagged_fields(&mut buf); varint::encode_unsigned_varint(1, &mut buf); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf); buf.put_i32(-2_147_483_648); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = ConsumerGroupDescribeResponse::decode_v0(&mut buf.freeze()).unwrap();
let g = &resp.groups[0];
assert_eq!(g.members.len(), 1);
let m = &g.members[0];
assert_eq!(m.member_id, "m1");
assert!(m.instance_id.is_none());
assert_eq!(m.rack_id.as_deref(), Some("rack0"));
assert_eq!(m.member_epoch, 3);
assert_eq!(m.subscribed_topic_names, vec!["tp1"]);
assert_eq!(m.assignment.topic_partitions.len(), 1);
assert_eq!(m.assignment.topic_partitions[0].partitions, vec![0, 1]);
assert!(m.target_assignment.topic_partitions.is_empty());
assert_eq!(m.member_type, -1); }
#[test]
fn test_consumer_group_describe_response_decode_v1_member_type() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("g")); put_compact_string(&mut buf, Some("S")); buf.put_i32(1);
buf.put_i32(1); put_compact_string(&mut buf, Some("a")); varint::encode_unsigned_varint(2, &mut buf); put_compact_string(&mut buf, Some("m")); put_compact_string(&mut buf, None); put_compact_string(&mut buf, None); buf.put_i32(1); put_compact_string(&mut buf, Some("c")); put_compact_string(&mut buf, Some("h")); varint::encode_unsigned_varint(1, &mut buf); put_compact_string(&mut buf, None); varint::encode_unsigned_varint(1, &mut buf);
put_tagged_fields(&mut buf);
varint::encode_unsigned_varint(1, &mut buf);
put_tagged_fields(&mut buf);
buf.put_i8(1); put_tagged_fields(&mut buf); buf.put_i32(0); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = ConsumerGroupDescribeResponse::decode_v1(&mut buf.freeze()).unwrap();
let m = &resp.groups[0].members[0];
assert_eq!(m.member_type, 1); }
}