use std::io::{Read, Write};
use crate::protocol::{
api_key::ApiKey,
api_version::{ApiVersion, ApiVersionRange},
error::Error,
messages::{
read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
write_versioned_array,
},
primitives::{CompactString, CompactStringRef, Int16, Int32, Int64, String_, TaggedFields},
traits::{ReadType, WriteType},
};
use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};
#[derive(Debug)]
pub struct DeleteRequestPartition {
pub partition_index: Int32,
pub offset: Int64,
pub tagged_fields: Option<TaggedFields>,
}
impl<W> WriteVersionedType<W> for DeleteRequestPartition
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 2);
self.partition_index.write(writer)?;
self.offset.write(writer)?;
if v >= 2 {
match self.tagged_fields.as_ref() {
Some(tagged_fields) => {
tagged_fields.write(writer)?;
}
None => {
TaggedFields::default().write(writer)?;
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct DeleteRequestTopic {
pub name: String_,
pub partitions: Vec<DeleteRequestPartition>,
pub tagged_fields: Option<TaggedFields>,
}
impl<W> WriteVersionedType<W> for DeleteRequestTopic
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 2);
if v >= 2 {
CompactStringRef(&self.name.0).write(writer)?
} else {
self.name.write(writer)?;
}
if v >= 2 {
write_compact_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
} else {
write_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
}
if v >= 2 {
match self.tagged_fields.as_ref() {
Some(tagged_fields) => {
tagged_fields.write(writer)?;
}
None => {
TaggedFields::default().write(writer)?;
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct DeleteRecordsRequest {
pub topics: Vec<DeleteRequestTopic>,
pub timeout_ms: Int32,
pub tagged_fields: Option<TaggedFields>,
}
impl<W> WriteVersionedType<W> for DeleteRecordsRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 2);
if v >= 2 {
write_compact_versioned_array(writer, version, Some(self.topics.as_slice()))?;
} else {
write_versioned_array(writer, version, Some(self.topics.as_slice()))?;
}
self.timeout_ms.write(writer)?;
if v >= 2 {
match self.tagged_fields.as_ref() {
Some(tagged_fields) => {
tagged_fields.write(writer)?;
}
None => {
TaggedFields::default().write(writer)?;
}
}
}
Ok(())
}
}
impl RequestBody for DeleteRecordsRequest {
type ResponseBody = DeleteRecordsResponse;
const API_KEY: ApiKey = ApiKey::DeleteRecords;
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(2)));
const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(2));
}
#[derive(Debug)]
pub struct DeleteResponsePartition {
pub partition_index: Int32,
pub low_watermark: Int64,
pub error: Option<Error>,
pub tagged_fields: Option<TaggedFields>,
}
impl<R> ReadVersionedType<R> for DeleteResponsePartition
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 2);
let partition_index = Int32::read(reader)?;
let low_watermark = Int64::read(reader)?;
let error = Error::new(Int16::read(reader)?.0);
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
Ok(Self {
partition_index,
low_watermark,
error,
tagged_fields,
})
}
}
#[derive(Debug)]
pub struct DeleteResponseTopic {
pub name: String_,
pub partitions: Vec<DeleteResponsePartition>,
pub tagged_fields: Option<TaggedFields>,
}
impl<R> ReadVersionedType<R> for DeleteResponseTopic
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 2);
let name = if v >= 2 {
String_(CompactString::read(reader)?.0)
} else {
String_::read(reader)?
};
let partitions = if v >= 2 {
read_compact_versioned_array(reader, version)?.unwrap_or_default()
} else {
read_versioned_array(reader, version)?.unwrap_or_default()
};
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
Ok(Self {
name,
partitions,
tagged_fields,
})
}
}
#[derive(Debug)]
pub struct DeleteRecordsResponse {
pub throttle_time_ms: Int32,
pub topics: Vec<DeleteResponseTopic>,
pub tagged_fields: Option<TaggedFields>,
}
impl<R> ReadVersionedType<R> for DeleteRecordsResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 2);
let throttle_time_ms = Int32::read(reader)?;
let topics = if v >= 2 {
read_compact_versioned_array(reader, version)?.unwrap_or_default()
} else {
read_versioned_array(reader, version)?.unwrap_or_default()
};
let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
Ok(Self {
throttle_time_ms,
topics,
tagged_fields,
})
}
}