use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::check_compact_array_len;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerGroupTopicPartitions {
pub topic_id: [u8; 16],
pub partitions: Vec<i32>,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupHeartbeatRequest {
pub group_id: String,
pub member_id: String,
pub member_epoch: i32,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub rebalance_timeout_ms: i32,
pub subscribed_topic_names: Option<Vec<String>>,
pub subscribed_topic_regex: Option<String>,
pub server_assignor: Option<String>,
pub topic_partitions: Option<Vec<ConsumerGroupTopicPartitions>>,
}
impl ConsumerGroupHeartbeatRequest {
pub fn api_key() -> ApiKey {
ApiKey::ConsumerGroupHeartbeat
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
KafkaString::new(&self.member_id).try_encode_compact(buf)?;
self.member_epoch.encode(buf);
match &self.instance_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
match &self.rack_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.rebalance_timeout_ms.encode(buf);
Self::encode_subscribed_topic_names(&self.subscribed_topic_names, buf)?;
match &self.server_assignor {
Some(a) => KafkaString::new(a).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.encode_topic_partitions(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode_compact(buf)?;
KafkaString::new(&self.member_id).try_encode_compact(buf)?;
self.member_epoch.encode(buf);
match &self.instance_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
match &self.rack_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.rebalance_timeout_ms.encode(buf);
Self::encode_subscribed_topic_names(&self.subscribed_topic_names, buf)?;
match &self.subscribed_topic_regex {
Some(r) => KafkaString::new(r).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
match &self.server_assignor {
Some(a) => KafkaString::new(a).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.encode_topic_partitions(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
fn encode_subscribed_topic_names(
names: &Option<Vec<String>>,
buf: &mut impl BufMut,
) -> Result<()> {
match names {
None => {
crate::util::varint::encode_unsigned_varint(0, buf);
}
Some(names) => {
let len_plus_one = u32::try_from(names.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol(format!(
"subscribed topic names array length {} exceeds u32 limit",
names.len()
))
})?;
crate::util::varint::encode_unsigned_varint(len_plus_one, buf);
for name in names {
KafkaString::new(name).try_encode_compact(buf)?;
}
}
}
Ok(())
}
fn encode_topic_partitions(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.topic_partitions {
None => {
crate::util::varint::encode_unsigned_varint(0, buf);
}
Some(tps) => {
let len_plus_one = u32::try_from(tps.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol(format!(
"topic partitions array length {} exceeds u32 limit",
tps.len()
))
})?;
crate::util::varint::encode_unsigned_varint(len_plus_one, buf);
for tp in tps {
buf.put_slice(&tp.topic_id);
let part_len_plus_one = u32::try_from(tp.partitions.len().saturating_add(1))
.map_err(|_| {
KrafkaError::protocol(format!(
"partitions array length {} exceeds u32 limit",
tp.partitions.len()
))
})?;
crate::util::varint::encode_unsigned_varint(part_len_plus_one, buf);
for &p in &tp.partitions {
p.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
}
}
Ok(())
}
}
impl VersionedEncode for ConsumerGroupHeartbeatRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
1 => self.encode_v1(buf)?,
_ => return unsupported_encode!("ConsumerGroupHeartbeatRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct ConsumerGroupAssignment {
pub topic_partitions: Vec<ConsumerGroupTopicPartitions>,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupHeartbeatResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub member_id: Option<String>,
pub member_epoch: i32,
pub heartbeat_interval_ms: i32,
pub assignment: Option<ConsumerGroupAssignment>,
}
impl ConsumerGroupHeartbeatResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let member_id = KafkaString::decode_compact(buf)?.0;
let member_epoch = i32::decode(buf)?;
let heartbeat_interval_ms = i32::decode(buf)?;
let assignment = Self::decode_assignment(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
member_id,
member_epoch,
heartbeat_interval_ms,
assignment,
})
}
fn decode_assignment(buf: &mut impl Buf) -> Result<Option<ConsumerGroupAssignment>> {
if buf.remaining() < 1 {
return Err(KrafkaError::protocol(
"not enough bytes for assignment presence tag",
));
}
let presence = buf.get_i8();
if presence < 0 {
return Ok(None);
}
if presence != 1 {
return Err(KrafkaError::protocol(format!(
"invalid assignment presence tag: expected negative for null or 1 for present, got {presence}"
)));
}
let tp_count_raw = crate::util::varint::decode_unsigned_varint(buf)?;
let topic_partitions = Self::decode_topic_partitions_from_count(tp_count_raw, buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Some(ConsumerGroupAssignment { topic_partitions }))
}
fn decode_topic_partitions_from_count(
count: u32,
buf: &mut impl Buf,
) -> Result<Vec<ConsumerGroupTopicPartitions>> {
let len = check_compact_array_len(count)?;
let mut result = Vec::with_capacity(len);
for _ in 0..len {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic ID UUID"));
}
let mut topic_id = [0u8; 16];
buf.copy_to_slice(&mut topic_id);
let part_count = crate::util::varint::decode_unsigned_varint(buf)?;
let part_len = check_compact_array_len(part_count)?;
let mut partitions = Vec::with_capacity(part_len);
for _ in 0..part_len {
partitions.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
result.push(ConsumerGroupTopicPartitions {
topic_id,
partitions,
});
}
Ok(result)
}
}
impl VersionedDecode for ConsumerGroupHeartbeatResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 | 1 => Self::decode_v0(buf),
_ => unsupported_decode!("ConsumerGroupHeartbeatResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::primitives::KafkaArray;
use crate::protocol::*;
use bytes::BytesMut;
fn put_compact_string(buf: &mut BytesMut, s: Option<&str>) {
match s {
Some(val) => {
buf.put_u8((val.len() + 1) as u8);
buf.put_slice(val.as_bytes());
}
None => buf.put_u8(0),
}
}
fn put_compact_array_count(buf: &mut BytesMut, count: Option<usize>) {
match count {
Some(n) => buf.put_u8((n + 1) as u8),
None => buf.put_u8(0),
}
}
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_consumer_group_heartbeat_request_encode_v0_all_fields() {
let topic_id: [u8; 16] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
];
let request = ConsumerGroupHeartbeatRequest {
group_id: "grp".to_string(),
member_id: "m1".to_string(),
member_epoch: 5,
instance_id: Some("inst".to_string()),
rack_id: Some("rack-a".to_string()),
rebalance_timeout_ms: 30_000,
subscribed_topic_names: Some(vec!["topicA".to_string()]),
subscribed_topic_regex: None,
server_assignor: Some("uniform".to_string()),
topic_partitions: Some(vec![ConsumerGroupTopicPartitions {
topic_id,
partitions: vec![0, 1, 2],
}]),
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
let mut r = buf.freeze();
assert_eq!(r.get_u8(), 4); let mut gid = vec![0u8; 3];
r.copy_to_slice(&mut gid);
assert_eq!(&gid, b"grp");
assert_eq!(r.get_u8(), 3);
let mut mid = vec![0u8; 2];
r.copy_to_slice(&mut mid);
assert_eq!(&mid, b"m1");
assert_eq!(r.get_i32(), 5);
assert_eq!(r.get_u8(), 5);
let mut iid = vec![0u8; 4];
r.copy_to_slice(&mut iid);
assert_eq!(&iid, b"inst");
assert_eq!(r.get_u8(), 7);
let mut rid = vec![0u8; 6];
r.copy_to_slice(&mut rid);
assert_eq!(&rid, b"rack-a");
assert_eq!(r.get_i32(), 30_000);
assert_eq!(r.get_u8(), 2); assert_eq!(r.get_u8(), 7); let mut tn = vec![0u8; 6];
r.copy_to_slice(&mut tn);
assert_eq!(&tn, b"topicA");
assert_eq!(r.get_u8(), 8);
let mut sa = vec![0u8; 7];
r.copy_to_slice(&mut sa);
assert_eq!(&sa, b"uniform");
assert_eq!(r.get_u8(), 2); let mut tid = [0u8; 16];
r.copy_to_slice(&mut tid);
assert_eq!(tid, topic_id);
assert_eq!(r.get_u8(), 4);
assert_eq!(r.get_i32(), 0);
assert_eq!(r.get_i32(), 1);
assert_eq!(r.get_i32(), 2);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.remaining(), 0);
}
#[test]
fn test_consumer_group_heartbeat_request_encode_v0_null_optionals() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 0,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
let mut r = buf.freeze();
assert_eq!(r.get_u8(), 2);
assert_eq!(r.get_u8(), b'g');
assert_eq!(r.get_u8(), 2);
assert_eq!(r.get_u8(), b'm');
assert_eq!(r.get_i32(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_i32(), -1);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.get_u8(), 0);
assert_eq!(r.remaining(), 0);
}
#[test]
fn test_consumer_group_heartbeat_request_leave_epoch() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: -1,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
let mut r = buf.freeze();
let _ = r.get_u8();
let _ = r.get_u8(); let _ = r.get_u8();
let _ = r.get_u8(); assert_eq!(r.get_i32(), -1);
}
#[test]
fn test_consumer_group_heartbeat_request_versioned_encode_v0() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 0,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf_direct = BytesMut::new();
request.encode_v0(&mut buf_direct).unwrap();
let mut buf_versioned = BytesMut::new();
request.encode_versioned(0, &mut buf_versioned).unwrap();
assert_eq!(buf_direct, buf_versioned);
}
#[test]
fn test_consumer_group_heartbeat_request_versioned_encode_unsupported() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 0,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf = BytesMut::new();
let result = request.encode_versioned(2, &mut buf);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("unsupported"), "got: {msg}");
}
#[test]
fn test_consumer_group_heartbeat_response_decode_v0_with_assignment() {
let mut buf = BytesMut::new();
buf.put_i32(100);
buf.put_i16(0);
put_compact_string(&mut buf, None);
put_compact_string(&mut buf, Some("member-1"));
buf.put_i32(3);
buf.put_i32(5000);
buf.put_i8(1);
put_compact_array_count(&mut buf, Some(1));
let topic_id: [u8; 16] = [
0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77,
0x88, 0x99,
];
buf.put_slice(&topic_id);
put_compact_array_count(&mut buf, Some(2));
buf.put_i32(0);
buf.put_i32(1);
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert!(resp.error_code.is_ok());
assert!(resp.error_message.is_none());
assert_eq!(resp.member_id.as_deref(), Some("member-1"));
assert_eq!(resp.member_epoch, 3);
assert_eq!(resp.heartbeat_interval_ms, 5000);
let assignment = resp.assignment.expect("assignment should be present");
assert_eq!(assignment.topic_partitions.len(), 1);
assert_eq!(assignment.topic_partitions[0].topic_id, topic_id);
assert_eq!(assignment.topic_partitions[0].partitions, vec![0, 1]);
}
#[test]
fn test_consumer_group_heartbeat_response_decode_v0_null_assignment() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i16(0);
put_compact_string(&mut buf, None);
put_compact_string(&mut buf, Some("m"));
buf.put_i32(1);
buf.put_i32(3000);
buf.put_i8(-1);
put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert!(resp.error_code.is_ok());
assert!(resp.member_id.as_deref() == Some("m"));
assert_eq!(resp.member_epoch, 1);
assert_eq!(resp.heartbeat_interval_ms, 3000);
assert!(resp.assignment.is_none());
}
#[test]
fn test_consumer_group_heartbeat_response_decode_v0_invalid_assignment_presence() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i16(0);
put_compact_string(&mut buf, None);
put_compact_string(&mut buf, Some("m"));
buf.put_i32(1);
buf.put_i32(3000);
buf.put_i8(0);
put_tagged_fields(&mut buf);
let err = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("invalid assignment presence tag"),
"expected presence tag error, got: {msg}"
);
}
#[test]
fn test_consumer_group_heartbeat_response_decode_v0_with_error() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i16(110);
put_compact_string(&mut buf, Some("Fenced"));
put_compact_string(&mut buf, None);
buf.put_i32(-1);
buf.put_i32(0);
buf.put_i8(-1);
put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap();
assert!(!resp.error_code.is_ok());
assert_eq!(resp.error_message.as_deref(), Some("Fenced"));
assert!(resp.member_id.is_none());
assert_eq!(resp.member_epoch, -1);
}
#[test]
fn test_consumer_group_heartbeat_response_decode_v0_empty_assignment() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i16(0);
put_compact_string(&mut buf, None);
put_compact_string(&mut buf, Some("m"));
buf.put_i32(2);
buf.put_i32(5000);
buf.put_i8(1);
put_compact_array_count(&mut buf, Some(0));
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap();
let assignment = resp.assignment.expect("assignment should be present");
assert!(assignment.topic_partitions.is_empty());
}
#[test]
fn test_consumer_group_heartbeat_response_versioned_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("m")); buf.put_i32(1); buf.put_i32(5000); buf.put_i8(-1); put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_versioned(0, &mut buf.freeze()).unwrap();
assert!(resp.error_code.is_ok());
assert!(resp.assignment.is_none());
}
#[test]
fn test_consumer_group_heartbeat_response_versioned_decode_unsupported() {
let mut buf = BytesMut::new();
buf.put_u8(0); let result = ConsumerGroupHeartbeatResponse::decode_versioned(2, &mut buf.freeze());
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("unsupported"), "got: {msg}");
}
#[test]
fn test_consumer_group_heartbeat_request_encode_decode_roundtrip() {
let topic_id = [0xab_u8; 16];
let request = ConsumerGroupHeartbeatRequest {
group_id: "test-grp".to_string(),
member_id: "consumer-1".to_string(),
member_epoch: 7,
instance_id: Some("static-1".to_string()),
rack_id: None,
rebalance_timeout_ms: 60_000,
subscribed_topic_names: Some(vec!["t1".to_string(), "t2".to_string()]),
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: Some(vec![
ConsumerGroupTopicPartitions {
topic_id,
partitions: vec![0],
},
ConsumerGroupTopicPartitions {
topic_id: [0xcd; 16],
partitions: vec![1, 2, 3],
},
]),
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
let mut r = buf.freeze();
let gid = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(gid, "test-grp");
let mid = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(mid, "consumer-1");
assert_eq!(i32::decode(&mut r).unwrap(), 7);
let iid = KafkaString::decode_compact(&mut r).unwrap().0;
assert_eq!(iid.as_deref(), Some("static-1"));
let rid = KafkaString::decode_compact(&mut r).unwrap().0;
assert!(rid.is_none());
assert_eq!(i32::decode(&mut r).unwrap(), 60_000);
let arr = KafkaArray::<KafkaString>::decode_compact(&mut r).unwrap();
let names: Vec<String> = arr.0.unwrap().into_iter().map(|s| s.0.unwrap()).collect();
assert_eq!(names, vec!["t1", "t2"]);
let sa = KafkaString::decode_compact(&mut r).unwrap().0;
assert!(sa.is_none());
let tp_count = crate::util::varint::decode_unsigned_varint(&mut r).unwrap();
assert_eq!(tp_count, 3); let mut tid1 = [0u8; 16];
r.copy_to_slice(&mut tid1);
assert_eq!(tid1, [0xab; 16]);
let pc1 = crate::util::varint::decode_unsigned_varint(&mut r).unwrap();
assert_eq!(pc1, 2); assert_eq!(i32::decode(&mut r).unwrap(), 0);
let _ = TaggedFields::decode(&mut r).unwrap();
let mut tid2 = [0u8; 16];
r.copy_to_slice(&mut tid2);
assert_eq!(tid2, [0xcd; 16]);
let pc2 = crate::util::varint::decode_unsigned_varint(&mut r).unwrap();
assert_eq!(pc2, 4); assert_eq!(i32::decode(&mut r).unwrap(), 1);
assert_eq!(i32::decode(&mut r).unwrap(), 2);
assert_eq!(i32::decode(&mut r).unwrap(), 3);
let _ = TaggedFields::decode(&mut r).unwrap();
let _ = TaggedFields::decode(&mut r).unwrap();
assert_eq!(r.remaining(), 0);
}
#[test]
fn test_consumer_group_heartbeat_response_multi_topic_assignment() {
let mut buf = BytesMut::new();
buf.put_i32(50); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("mem")); buf.put_i32(10); buf.put_i32(4000);
buf.put_i8(1);
put_compact_array_count(&mut buf, Some(2));
buf.put_slice(&[0x11; 16]); put_compact_array_count(&mut buf, Some(1));
buf.put_i32(5);
put_tagged_fields(&mut buf);
buf.put_slice(&[0x22; 16]); put_compact_array_count(&mut buf, Some(3));
buf.put_i32(0);
buf.put_i32(1);
buf.put_i32(2);
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.member_epoch, 10);
let assignment = resp.assignment.unwrap();
assert_eq!(assignment.topic_partitions.len(), 2);
assert_eq!(assignment.topic_partitions[0].topic_id, [0x11; 16]);
assert_eq!(assignment.topic_partitions[0].partitions, vec![5]);
assert_eq!(assignment.topic_partitions[1].topic_id, [0x22; 16]);
assert_eq!(assignment.topic_partitions[1].partitions, vec![0, 1, 2]);
}
#[test]
fn test_consumer_group_heartbeat_request_encode_v1_with_regex() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 1,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: Some(vec!["t1".to_string()]),
subscribed_topic_regex: Some("topic-.*".to_string()),
server_assignor: None,
topic_partitions: None,
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
let mut r = buf.freeze();
let gid = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(gid, "g");
let mid = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(mid, "m");
assert_eq!(i32::decode(&mut r).unwrap(), 1);
assert!(KafkaString::decode_compact(&mut r).unwrap().0.is_none());
assert!(KafkaString::decode_compact(&mut r).unwrap().0.is_none());
assert_eq!(i32::decode(&mut r).unwrap(), -1);
let stn_count = crate::util::varint::decode_unsigned_varint(&mut r).unwrap();
assert_eq!(stn_count, 2); let t = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(t, "t1");
let regex = KafkaString::decode_compact(&mut r).unwrap().0.unwrap();
assert_eq!(regex, "topic-.*");
assert!(KafkaString::decode_compact(&mut r).unwrap().0.is_none());
let tp = crate::util::varint::decode_unsigned_varint(&mut r).unwrap();
assert_eq!(tp, 0); let _ = TaggedFields::decode(&mut r).unwrap();
assert_eq!(r.remaining(), 0);
}
#[test]
fn test_consumer_group_heartbeat_request_encode_v1_null_regex() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 0,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf_v0 = BytesMut::new();
request.encode_v0(&mut buf_v0).unwrap();
let mut buf_v1 = BytesMut::new();
request.encode_v1(&mut buf_v1).unwrap();
assert_eq!(buf_v1.len(), buf_v0.len() + 1);
}
#[test]
fn test_consumer_group_heartbeat_request_versioned_encode_v1() {
let request = ConsumerGroupHeartbeatRequest {
group_id: "g".to_string(),
member_id: "m".to_string(),
member_epoch: 0,
instance_id: None,
rack_id: None,
rebalance_timeout_ms: -1,
subscribed_topic_names: None,
subscribed_topic_regex: None,
server_assignor: None,
topic_partitions: None,
};
let mut buf_direct = BytesMut::new();
request.encode_v1(&mut buf_direct).unwrap();
let mut buf_versioned = BytesMut::new();
request.encode_versioned(1, &mut buf_versioned).unwrap();
assert_eq!(buf_direct, buf_versioned);
}
#[test]
fn test_consumer_group_heartbeat_response_versioned_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("consumer-gen-id")); buf.put_i32(3); buf.put_i32(5000); buf.put_i8(-1); put_tagged_fields(&mut buf);
let resp = ConsumerGroupHeartbeatResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.error_code, ErrorCode::None);
assert_eq!(resp.member_id.as_deref(), Some("consumer-gen-id"));
assert_eq!(resp.member_epoch, 3);
assert_eq!(resp.heartbeat_interval_ms, 5000);
assert!(resp.assignment.is_none());
}
}