use std::any::type_name;
use std::ops::Deref;
use bytes_parser::BytesParser;
use crate::errors::{
KonsumerOffsetsError,
KonsumerOffsetsError::{
ByteParsingError, UnableToParseForVersion, UnsupportedConsumerProtocolAssignmentVersion,
UnsupportedConsumerProtocolSubscriptionVersion, UnsupportedGroupMetadataSchema,
},
};
use crate::utils::{parse_i16, parse_i32, parse_str, parse_vec_bytes};
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(any(feature = "ts_int", feature = "ts_chrono"), derive(Default))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct GroupMetadata {
pub message_version: i16,
pub group: String,
pub is_tombstone: bool,
pub schema_version: i16,
pub protocol_type: String,
pub generation: i32,
pub protocol: String,
pub leader: String,
#[cfg(feature = "ts_int")]
pub current_state_timestamp: i64,
#[cfg(feature = "ts_chrono")]
pub current_state_timestamp: chrono::DateTime<chrono::Utc>,
#[cfg(feature = "ts_time")]
pub current_state_timestamp: time::OffsetDateTime,
pub members: Vec<MemberMetadata>,
}
#[cfg(feature = "ts_time")]
impl Default for GroupMetadata {
fn default() -> Self {
Self {
message_version: Default::default(),
group: Default::default(),
is_tombstone: Default::default(),
schema_version: Default::default(),
protocol_type: Default::default(),
generation: Default::default(),
protocol: Default::default(),
leader: Default::default(),
current_state_timestamp: time::OffsetDateTime::UNIX_EPOCH,
members: Default::default(),
}
}
}
impl GroupMetadata {
pub(crate) fn try_from(parser: &mut BytesParser, message_version: i16) -> Result<Self, KonsumerOffsetsError> {
Ok(GroupMetadata {
message_version,
group: parse_str(parser)?,
is_tombstone: true,
..Default::default()
})
}
pub(crate) fn parse_payload(&mut self, parser: &mut BytesParser) -> Result<(), KonsumerOffsetsError> {
self.is_tombstone = false;
self.schema_version = parse_i16(parser)?;
if !(0..=3).contains(&self.schema_version) {
return Err(UnsupportedGroupMetadataSchema(self.schema_version));
}
self.protocol_type = parse_str(parser)?;
self.generation = parse_i32(parser)?;
self.protocol = parse_str(parser)?;
self.leader = parse_str(parser)?;
self.current_state_timestamp = if self.schema_version >= 2 {
#[cfg(feature = "ts_int")]
{
crate::utils::parse_i64(parser)?
}
#[cfg(feature = "ts_chrono")]
{
crate::utils::parse_chrono_datetime_utc(parser)?
}
#[cfg(feature = "ts_time")]
{
crate::utils::parse_time_offset_datetime(parser)?
}
} else {
#[cfg(feature = "ts_int")]
{
-1
}
#[cfg(feature = "ts_chrono")]
{
chrono::DateTime::<chrono::Utc>::default()
}
#[cfg(feature = "ts_time")]
{
time::OffsetDateTime::UNIX_EPOCH
}
};
let members_len = parse_i32(parser)?;
self.members = Vec::with_capacity(members_len as usize);
for _ in 0..members_len {
self.members.push(MemberMetadata::try_from(parser, self.schema_version)?);
}
Ok(())
}
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct MemberMetadata {
pub id: String,
pub group_instance_id: String,
pub client_id: String,
pub client_host: String,
pub rebalance_timeout: i32,
pub session_timeout: i32,
pub subscription: ConsumerProtocolSubscription,
pub assignment: ConsumerProtocolAssignment,
}
impl MemberMetadata {
fn try_from(parser: &mut BytesParser, schema_version: i16) -> Result<Self, KonsumerOffsetsError> {
let mut member = Self {
id: parse_str(parser)?,
..Default::default()
};
if schema_version >= 3 {
member.group_instance_id = parse_str(parser)?;
}
member.client_id = parse_str(parser)?;
member.client_host = parse_str(parser)?;
member.rebalance_timeout = if schema_version >= 1 {
parse_i32(parser)?
} else {
0
};
member.session_timeout = parse_i32(parser)?;
let subscription_bytes_len = parse_i32(parser)?;
let mut subscription_parser = parser.from_slice(subscription_bytes_len as usize).map_err(ByteParsingError)?;
member.subscription = ConsumerProtocolSubscription::try_from(&mut subscription_parser)?;
let assignment_bytes_len = parse_i32(parser)?;
let mut assignment_parser = parser.from_slice(assignment_bytes_len as usize).map_err(ByteParsingError)?;
member.assignment = ConsumerProtocolAssignment::try_from(&mut assignment_parser)?;
Ok(member)
}
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ConsumerProtocolSubscription {
pub schema_version: i16,
pub subscribed_topics: Vec<String>,
pub user_data: Vec<u8>,
pub owned_topic_partitions: Vec<TopicPartitions>,
pub generation_id: i32,
pub rack_id: String,
}
impl<'a> TryFrom<&mut BytesParser<'a>> for ConsumerProtocolSubscription {
type Error = KonsumerOffsetsError;
fn try_from(parser: &mut BytesParser) -> Result<Self, Self::Error> {
let mut subscription = Self {
schema_version: parse_i16(parser)?,
..Default::default()
};
if !(0..=3).contains(&subscription.schema_version) {
return Err(UnsupportedConsumerProtocolSubscriptionVersion(subscription.schema_version));
}
let subscribed_topics_len = parse_i32(parser)?;
if subscribed_topics_len > 0 {
subscription.subscribed_topics = Vec::with_capacity(subscribed_topics_len as usize);
for _ in 0..subscribed_topics_len {
subscription.subscribed_topics.push(parse_str(parser)?);
}
}
subscription.user_data = parse_vec_bytes(parser)?;
if subscription.schema_version >= 1 {
let owned_topic_partitions_len = parse_i32(parser)?;
if owned_topic_partitions_len > 0 {
subscription.owned_topic_partitions = Vec::with_capacity(owned_topic_partitions_len as usize);
for _ in 0..owned_topic_partitions_len {
subscription
.owned_topic_partitions
.push(TopicPartitions::try_from(parser, subscription.schema_version)?);
}
}
}
subscription.generation_id = if subscription.schema_version >= 2 {
parse_i32(parser)?
} else {
-1
};
if subscription.schema_version >= 3 {
subscription.rack_id = parse_str(parser)?;
}
Ok(subscription)
}
}
impl TryFrom<&[u8]> for ConsumerProtocolSubscription {
type Error = KonsumerOffsetsError;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
Self::try_from(&mut BytesParser::from(bytes))
}
}
impl TryFrom<Vec<u8>> for ConsumerProtocolSubscription {
type Error = KonsumerOffsetsError;
fn try_from(bytes_vec: Vec<u8>) -> Result<Self, Self::Error> {
Self::try_from(bytes_vec.deref())
}
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TopicPartitions {
pub topic: String,
pub partitions: Vec<i32>,
}
impl TopicPartitions {
fn try_from(parser: &mut BytesParser, version: i16) -> Result<Self, KonsumerOffsetsError> {
if version > 3 {
return Err(UnableToParseForVersion(
type_name::<TopicPartitions>().to_string(),
version,
type_name::<ConsumerProtocolSubscription>().to_string(),
));
}
let mut topic_partitions = Self {
topic: parse_str(parser)?,
..Default::default()
};
let partitions_len = parse_i32(parser)?;
if partitions_len > 0 {
topic_partitions.partitions = Vec::with_capacity(partitions_len as usize);
for _ in 0..partitions_len {
topic_partitions.partitions.push(parse_i32(parser)?);
}
}
Ok(topic_partitions)
}
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ConsumerProtocolAssignment {
pub schema_version: i16,
pub assigned_topic_partitions: Vec<TopicPartitions>,
pub user_data: Vec<u8>,
}
impl<'a> TryFrom<&mut BytesParser<'a>> for ConsumerProtocolAssignment {
type Error = KonsumerOffsetsError;
fn try_from(parser: &mut BytesParser) -> Result<Self, Self::Error> {
let mut assignment = Self {
schema_version: parse_i16(parser)?,
..Default::default()
};
if !(0..=3).contains(&assignment.schema_version) {
return Err(UnsupportedConsumerProtocolAssignmentVersion(assignment.schema_version));
}
let assigned_topic_partitions_len = parse_i32(parser)?;
if assigned_topic_partitions_len > 0 {
assignment.assigned_topic_partitions = Vec::with_capacity(assigned_topic_partitions_len as usize);
for _ in 0..assigned_topic_partitions_len {
assignment
.assigned_topic_partitions
.push(TopicPartitions::try_from(parser, assignment.schema_version)?);
}
}
assignment.user_data = parse_vec_bytes(parser)?;
Ok(assignment)
}
}
impl TryFrom<&[u8]> for ConsumerProtocolAssignment {
type Error = KonsumerOffsetsError;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
Self::try_from(&mut BytesParser::from(bytes))
}
}
impl TryFrom<Vec<u8>> for ConsumerProtocolAssignment {
type Error = KonsumerOffsetsError;
fn try_from(bytes_vec: Vec<u8>) -> Result<Self, Self::Error> {
Self::try_from(bytes_vec.deref())
}
}
#[cfg(test)]
mod tests {
use crate::utils::is_thread_safe;
use crate::{
ConsumerProtocolAssignment, ConsumerProtocolSubscription, GroupMetadata, MemberMetadata, TopicPartitions,
};
#[test]
fn test_types_thread_safety() {
is_thread_safe::<GroupMetadata>();
is_thread_safe::<MemberMetadata>();
is_thread_safe::<ConsumerProtocolSubscription>();
is_thread_safe::<TopicPartitions>();
is_thread_safe::<ConsumerProtocolAssignment>();
}
}