use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[derive(Debug, Clone)]
pub struct DeleteTopicsRequest {
pub topic_names: Vec<String>,
pub topics: Vec<DeleteTopicState>,
pub timeout_ms: i32,
}
#[derive(Debug, Clone)]
pub struct DeleteTopicState {
pub name: Option<String>,
pub topic_id: [u8; 16],
}
impl DeleteTopicsRequest {
pub fn api_key() -> ApiKey {
ApiKey::DeleteTopics
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_i32(array_len_i32(self.topic_names.len())?);
for name in &self.topic_names {
KafkaString::new(name).try_encode(buf)?;
}
self.timeout_ms.encode(buf);
Ok(())
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topic_names.len(), buf)?;
for name in &self.topic_names {
KafkaString::new(name).try_encode_compact(buf)?;
}
self.timeout_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v6(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
match &topic.name {
Some(n) => KafkaString::new(n).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
buf.put_slice(&topic.topic_id);
TaggedFields::default().try_encode(buf)?; }
self.timeout_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeletableTopicResult {
pub name: Option<String>,
pub topic_id: Option<[u8; 16]>,
pub error_code: ErrorCode,
pub error_message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct DeleteTopicsResponse {
pub throttle_time_ms: i32,
pub responses: Vec<DeletableTopicResult>,
}
impl DeleteTopicsResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
throttle_time_ms,
responses: Self::decode_responses_v1(buf)?,
})
}
pub fn decode_v4(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let raw = crate::util::varint::decode_unsigned_varint(buf)?;
let count = check_compact_array_len(raw)?;
let mut responses = Vec::with_capacity(count);
for _ in 0..count {
let name = KafkaString::decode_compact(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
TaggedFields::decode(buf)?;
responses.push(DeletableTopicResult {
name,
topic_id: None,
error_code,
error_message: None,
});
}
TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
responses,
})
}
pub fn decode_v5(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let raw = crate::util::varint::decode_unsigned_varint(buf)?;
let count = check_compact_array_len(raw)?;
let mut responses = Vec::with_capacity(count);
for _ in 0..count {
let name = KafkaString::decode_compact(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
TaggedFields::decode(buf)?;
responses.push(DeletableTopicResult {
name,
topic_id: None,
error_code,
error_message,
});
}
TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
responses,
})
}
pub fn decode_v6(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let raw = crate::util::varint::decode_unsigned_varint(buf)?;
let count = check_compact_array_len(raw)?;
let mut responses = Vec::with_capacity(count);
for _ in 0..count {
let name = KafkaString::decode_compact(buf)?.0;
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("not enough bytes for topic_id UUID"));
}
let mut id = [0u8; 16];
buf.copy_to_slice(&mut id);
let topic_id = if id == [0u8; 16] { None } else { Some(id) };
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
TaggedFields::decode(buf)?;
responses.push(DeletableTopicResult {
name,
topic_id,
error_code,
error_message,
});
}
TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
responses,
})
}
fn decode_responses_v1(buf: &mut impl Buf) -> Result<Vec<DeletableTopicResult>> {
let response_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(response_count);
for _ in 0..response_count {
let name = KafkaString::decode(buf)?.0;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
responses.push(DeletableTopicResult {
name,
topic_id: None,
error_code,
error_message: None,
});
}
Ok(responses)
}
}
impl VersionedEncode for DeleteTopicsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1..=3 => self.encode_v1(buf)?,
4 | 5 => self.encode_v4(buf)?,
6 => self.encode_v6(buf)?,
_ => return unsupported_encode!("DeleteTopicsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DeleteTopicsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1..=3 => Self::decode_v1(buf),
4 => Self::decode_v4(buf),
5 => Self::decode_v5(buf),
6 => Self::decode_v6(buf),
_ => unsupported_decode!("DeleteTopicsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
#[test]
fn test_delete_topics_v2_v3_same_wire_as_v1() {
let request = DeleteTopicsRequest {
topic_names: vec!["test".to_string()],
topics: vec![],
timeout_ms: 30_000,
};
let mut v1 = BytesMut::new();
request.encode_versioned(1, &mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_versioned(2, &mut v2).unwrap();
let mut v3 = BytesMut::new();
request.encode_versioned(3, &mut v3).unwrap();
assert_eq!(v1, v2);
assert_eq!(v1, v3);
}
#[test]
fn test_delete_topics_v4_flexible() {
let request = DeleteTopicsRequest {
topic_names: vec!["test".to_string()],
topics: vec![],
timeout_ms: 30_000,
};
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v4 = BytesMut::new();
request.encode_v4(&mut v4).unwrap();
assert_ne!(v1.len(), v4.len());
let mut v5 = BytesMut::new();
request.encode_versioned(5, &mut v5).unwrap();
assert_eq!(v4, v5);
}
#[test]
fn test_delete_topics_v6_with_topic_id() {
let topic_uuid = [0xAA; 16];
let request = DeleteTopicsRequest {
topic_names: vec![],
topics: vec![DeleteTopicState {
name: None,
topic_id: topic_uuid,
}],
timeout_ms: 30_000,
};
let mut buf = BytesMut::new();
request.encode_v6(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_delete_topics_response_v4_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(2); buf.put_u8(5); buf.put_slice(b"test");
buf.put_i16(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DeleteTopicsResponse::decode_v4(&mut frozen).unwrap();
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].name.as_deref(), Some("test"));
assert!(resp.responses[0].topic_id.is_none());
assert!(resp.responses[0].error_message.is_none());
}
#[test]
fn test_delete_topics_response_v5_with_error_message() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(2); buf.put_u8(5); buf.put_slice(b"test");
buf.put_i16(0); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DeleteTopicsResponse::decode_v5(&mut frozen).unwrap();
assert_eq!(resp.responses[0].name.as_deref(), Some("test"));
assert!(resp.responses[0].error_message.is_none());
}
#[test]
fn test_delete_topics_response_v6_with_topic_id() {
let topic_uuid = [0xBB; 16];
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(2); buf.put_u8(0); buf.put_slice(&topic_uuid); buf.put_i16(0); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DeleteTopicsResponse::decode_v6(&mut frozen).unwrap();
assert!(resp.responses[0].name.is_none());
assert_eq!(resp.responses[0].topic_id, Some(topic_uuid));
}
}