use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TryEncode};
use crate::protocol::{array_len_i32, check_decode_array_len};
#[derive(Debug, Clone)]
pub struct OffsetDeleteTopicRequest {
pub name: String,
pub partitions: Vec<OffsetDeletePartitionRequest>,
}
#[derive(Debug, Clone)]
pub struct OffsetDeletePartitionRequest {
pub partition_index: i32,
}
#[derive(Debug, Clone)]
pub struct OffsetDeleteRequest {
pub group_id: String,
pub topics: Vec<OffsetDeleteTopicRequest>,
}
impl OffsetDeleteRequest {
pub fn api_key() -> ApiKey {
ApiKey::OffsetDelete
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.group_id).try_encode(buf)?;
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for p in &topic.partitions {
p.partition_index.encode(buf);
}
}
Ok(())
}
}
impl VersionedEncode for OffsetDeleteRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf),
_ => unsupported_encode!("OffsetDeleteRequest", version),
}
}
}
#[derive(Debug, Clone)]
pub struct OffsetDeletePartitionResponse {
pub partition_index: i32,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct OffsetDeleteTopicResponse {
pub name: String,
pub partitions: Vec<OffsetDeletePartitionResponse>,
}
#[derive(Debug, Clone)]
pub struct OffsetDeleteResponse {
pub error_code: ErrorCode,
pub throttle_time_ms: i32,
pub topics: Vec<OffsetDeleteTopicResponse>,
}
impl OffsetDeleteResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let throttle_time_ms = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic_name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(OffsetDeletePartitionResponse {
partition_index,
error_code,
});
}
topics.push(OffsetDeleteTopicResponse { name, partitions });
}
Ok(Self {
error_code,
throttle_time_ms,
topics,
})
}
}
impl VersionedDecode for OffsetDeleteResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("OffsetDeleteResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
fn put_kafka_string(buf: &mut BytesMut, s: &str) {
buf.put_i16(s.len() as i16);
buf.put_slice(s.as_bytes());
}
#[test]
fn test_offset_delete_api_key() {
assert_eq!(OffsetDeleteRequest::api_key(), ApiKey::OffsetDelete);
}
#[test]
fn test_offset_delete_request_encode_v0() {
let request = OffsetDeleteRequest {
group_id: "my-group".to_string(),
topics: vec![OffsetDeleteTopicRequest {
name: "my-topic".to_string(),
partitions: vec![
OffsetDeletePartitionRequest { partition_index: 0 },
OffsetDeletePartitionRequest { partition_index: 1 },
],
}],
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut read = buf.freeze();
let str_len = i16::decode(&mut read).unwrap();
assert_eq!(str_len, 8); }
#[test]
fn test_offset_delete_versioned_encode_unsupported() {
let request = OffsetDeleteRequest {
group_id: "g".to_string(),
topics: Vec::new(),
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(1, &mut buf).is_err());
}
#[test]
fn test_offset_delete_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); put_kafka_string(&mut buf, "my-topic");
buf.put_i32(2); buf.put_i32(0); buf.put_i16(0); buf.put_i32(1); buf.put_i16(3);
let resp = OffsetDeleteResponse::decode_v0(&mut buf.freeze()).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.throttle_time_ms, 0);
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name, "my-topic");
assert_eq!(resp.topics[0].partitions.len(), 2);
assert!(resp.topics[0].partitions[0].error_code.is_ok());
assert!(!resp.topics[0].partitions[1].error_code.is_ok());
}
#[test]
fn test_offset_delete_response_top_level_error() {
let mut buf = BytesMut::new();
buf.put_i16(15); buf.put_i32(50); buf.put_i32(0);
let resp = OffsetDeleteResponse::decode_v0(&mut buf.freeze()).unwrap();
assert!(!resp.error_code.is_ok());
assert_eq!(resp.throttle_time_ms, 50);
assert!(resp.topics.is_empty());
}
#[test]
fn test_offset_delete_versioned_decode_unsupported() {
let buf = BytesMut::new();
assert!(OffsetDeleteResponse::decode_versioned(1, &mut buf.freeze()).is_err());
}
#[test]
fn test_offset_delete_round_trip_encode_decode() {
let request = OffsetDeleteRequest {
group_id: "test-group".to_string(),
topics: vec![OffsetDeleteTopicRequest {
name: "t1".to_string(),
partitions: vec![OffsetDeletePartitionRequest { partition_index: 0 }],
}],
};
let mut buf = BytesMut::new();
request.encode_versioned(0, &mut buf).unwrap();
assert!(!buf.is_empty());
}
}