use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, ProtocolErrorKind, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{array_len_i32, check_compact_array_len, check_decode_array_len};
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochPartition {
pub partition: i32,
pub current_leader_epoch: i32,
pub leader_epoch: i32,
}
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochTopic {
pub topic: String,
pub partitions: Vec<OffsetForLeaderEpochPartition>,
}
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochRequest {
pub replica_id: i32,
pub topics: Vec<OffsetForLeaderEpochTopic>,
}
impl OffsetForLeaderEpochRequest {
pub fn api_key() -> ApiKey {
ApiKey::OffsetForLeaderEpoch
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.leader_epoch.encode(buf);
}
}
Ok(())
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
self.replica_id.encode(buf);
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.leader_epoch.encode(buf);
}
}
Ok(())
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
self.replica_id.encode(buf);
let topics_len = u32::try_from(self.topics.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(ProtocolErrorKind::InvalidLength, "topics array too large")
})?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topics {
KafkaString::new(&topic.topic).try_encode_compact(buf)?;
let parts_len =
u32::try_from(topic.partitions.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"partitions array too large",
)
})?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partitions {
partition.partition.encode(buf);
partition.current_leader_epoch.encode(buf);
partition.leader_epoch.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochPartitionResult {
pub error_code: ErrorCode,
pub partition: i32,
pub leader_epoch: i32,
pub end_offset: i64,
}
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochTopicResult {
pub topic: String,
pub partitions: Vec<OffsetForLeaderEpochPartitionResult>,
}
#[derive(Debug, Clone)]
pub struct OffsetForLeaderEpochResponse {
pub throttle_time_ms: i32,
pub topics: Vec<OffsetForLeaderEpochTopicResult>,
}
impl OffsetForLeaderEpochResponse {
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let end_offset = i64::decode(buf)?;
partitions.push(OffsetForLeaderEpochPartitionResult {
error_code,
partition,
leader_epoch,
end_offset,
});
}
topics.push(OffsetForLeaderEpochTopicResult { topic, partitions });
}
Ok(Self {
throttle_time_ms,
topics,
})
}
pub fn decode_v4(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let partition_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let end_offset = i64::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
partitions.push(OffsetForLeaderEpochPartitionResult {
error_code,
partition,
leader_epoch,
end_offset,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(OffsetForLeaderEpochTopicResult { topic, partitions });
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
})
}
}
impl VersionedEncode for OffsetForLeaderEpochRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
2 => self.encode_v2(buf)?,
3 => self.encode_v3(buf)?,
4 => self.encode_v4(buf)?,
_ => return unsupported_encode!("OffsetForLeaderEpochRequest", version),
}
Ok(())
}
}
impl VersionedDecode for OffsetForLeaderEpochResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
2..=3 => Self::decode_v2(buf),
4 => Self::decode_v4(buf),
_ => unsupported_decode!("OffsetForLeaderEpochResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
use rstest::rstest;
#[test]
fn test_offset_for_leader_epoch_request() {
let request = OffsetForLeaderEpochRequest {
replica_id: -1,
topics: vec![OffsetForLeaderEpochTopic {
topic: "my-topic".to_string(),
partitions: vec![OffsetForLeaderEpochPartition {
partition: 0,
current_leader_epoch: 5,
leader_epoch: 4,
}],
}],
};
assert_eq!(
OffsetForLeaderEpochRequest::api_key(),
ApiKey::OffsetForLeaderEpoch
);
let mut buf = BytesMut::new();
request.encode_v2(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf3 = BytesMut::new();
request.encode_v3(&mut buf3).unwrap();
assert!(buf3.len() > buf.len());
}
#[test]
fn test_offset_for_leader_epoch_response_decode_v2() {
use bytes::BufMut;
let mut buf = BytesMut::new();
buf.put_i32(50);
buf.put_i32(1);
let topic = b"my-topic";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i16(0);
buf.put_i32(0);
buf.put_i32(5);
buf.put_i64(1000);
let mut data = buf.freeze();
let resp = OffsetForLeaderEpochResponse::decode_v2(&mut data).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].topic, "my-topic");
assert_eq!(resp.topics[0].partitions.len(), 1);
assert!(resp.topics[0].partitions[0].error_code.is_ok());
assert_eq!(resp.topics[0].partitions[0].partition, 0);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, 5);
assert_eq!(resp.topics[0].partitions[0].end_offset, 1000);
}
#[rstest]
#[case::ofl_v0(0)]
#[case::ofl_v1(1)]
fn test_offset_for_leader_epoch_encode_below_min(#[case] version: i16) {
let request = OffsetForLeaderEpochRequest {
replica_id: -1,
topics: vec![],
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[test]
fn test_offset_for_leader_epoch_request_encode_v4_flexible() {
let request = OffsetForLeaderEpochRequest {
replica_id: -1,
topics: vec![OffsetForLeaderEpochTopic {
topic: "my-topic".to_string(),
partitions: vec![OffsetForLeaderEpochPartition {
partition: 0,
current_leader_epoch: 5,
leader_epoch: 3,
}],
}],
};
let mut buf_v4 = BytesMut::new();
request.encode_v4(&mut buf_v4).unwrap();
assert!(!buf_v4.is_empty());
let mut buf_v3 = BytesMut::new();
request.encode_v3(&mut buf_v3).unwrap();
assert_ne!(buf_v4.as_ref(), buf_v3.as_ref());
}
#[test]
fn test_offset_for_leader_epoch_response_decode_v4_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(20);
buf.put_u8(2);
let topic = b"my-topic";
buf.put_u8((topic.len() + 1) as u8);
buf.put_slice(topic);
buf.put_u8(2);
buf.put_i16(0);
buf.put_i32(0);
buf.put_i32(5);
buf.put_i64(1000);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
let resp = OffsetForLeaderEpochResponse::decode_v4(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 20);
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].topic, "my-topic");
assert_eq!(resp.topics[0].partitions.len(), 1);
assert!(resp.topics[0].partitions[0].error_code.is_ok());
assert_eq!(resp.topics[0].partitions[0].partition, 0);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, 5);
assert_eq!(resp.topics[0].partitions[0].end_offset, 1000);
}
#[test]
fn test_offset_for_leader_epoch_v4_dispatch() {
let request = OffsetForLeaderEpochRequest {
replica_id: -1,
topics: vec![],
};
let mut buf = BytesMut::new();
request.encode_versioned(4, &mut buf).unwrap();
assert!(!buf.is_empty());
}
}