use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[derive(Debug, Clone)]
pub struct DeleteRecordsPartition {
pub partition_index: i32,
pub offset: i64,
}
#[derive(Debug, Clone)]
pub struct DeleteRecordsTopic {
pub name: String,
pub partitions: Vec<DeleteRecordsPartition>,
}
#[derive(Debug, Clone)]
pub struct DeleteRecordsRequest {
pub topics: Vec<DeleteRecordsTopic>,
pub timeout_ms: i32,
}
impl DeleteRecordsRequest {
pub fn api_key() -> ApiKey {
ApiKey::DeleteRecords
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_i32(array_len_i32(self.topics.len())?);
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.offset.encode(buf);
}
}
self.timeout_ms.encode(buf);
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
KafkaString::new(&topic.name).try_encode_compact(buf)?;
encode_compact_array_len(topic.partitions.len(), buf)?;
for partition in &topic.partitions {
partition.partition_index.encode(buf);
partition.offset.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
self.timeout_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeleteRecordsPartitionResult {
pub partition_index: i32,
pub low_watermark: i64,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct DeleteRecordsTopicResult {
pub name: String,
pub partitions: Vec<DeleteRecordsPartitionResult>,
}
#[derive(Debug, Clone)]
pub struct DeleteRecordsResponse {
pub throttle_time_ms: i32,
pub topics: Vec<DeleteRecordsTopicResult>,
}
impl DeleteRecordsResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let low_watermark = i64::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(DeleteRecordsPartitionResult {
partition_index,
low_watermark,
error_code,
});
}
topics.push(DeleteRecordsTopicResult { name, partitions });
}
Ok(Self {
throttle_time_ms,
topics,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let partition_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let low_watermark = i64::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(DeleteRecordsPartitionResult {
partition_index,
low_watermark,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
topics.push(DeleteRecordsTopicResult { name, partitions });
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
topics,
})
}
}
impl VersionedEncode for DeleteRecordsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 | 1 => self.encode_v0(buf)?,
2 => self.encode_v2(buf)?,
_ => return unsupported_encode!("DeleteRecordsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DeleteRecordsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 | 1 => Self::decode_v0(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("DeleteRecordsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
#[test]
fn test_delete_records_request() {
let request = DeleteRecordsRequest {
topics: vec![DeleteRecordsTopic {
name: "my-topic".to_string(),
partitions: vec![
DeleteRecordsPartition {
partition_index: 0,
offset: 100,
},
DeleteRecordsPartition {
partition_index: 1,
offset: 200,
},
],
}],
timeout_ms: 30000,
};
assert_eq!(request.topics.len(), 1);
assert_eq!(request.topics[0].partitions.len(), 2);
assert_eq!(DeleteRecordsRequest::api_key(), ApiKey::DeleteRecords);
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_delete_records_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i32(1);
let topic = "my-topic";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic.as_bytes());
buf.put_i32(1);
buf.put_i32(0);
buf.put_i64(100);
buf.put_i16(0);
let response = DeleteRecordsResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(response.throttle_time_ms, 0);
assert_eq!(response.topics.len(), 1);
assert_eq!(response.topics[0].name, "my-topic");
assert_eq!(response.topics[0].partitions.len(), 1);
assert_eq!(response.topics[0].partitions[0].partition_index, 0);
assert_eq!(response.topics[0].partitions[0].low_watermark, 100);
assert!(response.topics[0].partitions[0].error_code.is_ok());
}
#[test]
fn test_delete_records_v1_same_as_v0() {
let request = DeleteRecordsRequest {
topics: vec![DeleteRecordsTopic {
name: "test".to_string(),
partitions: vec![DeleteRecordsPartition {
partition_index: 0,
offset: 100,
}],
}],
timeout_ms: 30_000,
};
let mut v0 = BytesMut::new();
request.encode_versioned(0, &mut v0).unwrap();
let mut v1 = BytesMut::new();
request.encode_versioned(1, &mut v1).unwrap();
assert_eq!(v0, v1); }
#[test]
fn test_delete_records_v2_flexible() {
let request = DeleteRecordsRequest {
topics: vec![DeleteRecordsTopic {
name: "test".to_string(),
partitions: vec![DeleteRecordsPartition {
partition_index: 0,
offset: 100,
}],
}],
timeout_ms: 30_000,
};
let mut v0 = BytesMut::new();
request.encode_v0(&mut v0).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v0.len(), v2.len());
}
#[test]
fn test_delete_records_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(2); buf.put_u8(5); buf.put_slice(b"test");
buf.put_u8(2); buf.put_i32(0); buf.put_i64(50); buf.put_i16(0); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DeleteRecordsResponse::decode_v2(&mut frozen).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name, "test");
assert_eq!(resp.topics[0].partitions[0].low_watermark, 50);
}
}