use std::io::{Read, Write};
use crate::protocol::{
api_key::ApiKey,
api_version::{ApiVersion, ApiVersionRange},
error::Error,
messages::{read_versioned_array, write_versioned_array},
primitives::{Int16, Int32, Int64, NullableString, Records, String_},
traits::{ReadType, WriteType},
};
use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};
#[derive(Debug)]
pub struct ProduceRequestPartitionData {
pub index: Int32,
pub records: Records,
}
impl<W> WriteVersionedType<W> for ProduceRequestPartitionData
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 7);
self.index.write(writer)?;
self.records.write(writer)?;
Ok(())
}
}
#[derive(Debug)]
pub struct ProduceRequestTopicData {
pub name: String_,
pub partition_data: Vec<ProduceRequestPartitionData>,
}
impl<W> WriteVersionedType<W> for ProduceRequestTopicData
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 7);
self.name.write(writer)?;
write_versioned_array(writer, version, Some(&self.partition_data))?;
Ok(())
}
}
#[derive(Debug)]
pub struct ProduceRequest {
pub transactional_id: NullableString,
pub acks: Int16,
pub timeout_ms: Int32,
pub topic_data: Vec<ProduceRequestTopicData>,
}
impl<W> WriteVersionedType<W> for ProduceRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 7);
if v >= 3 {
self.transactional_id.write(writer)?;
}
self.acks.write(writer)?;
self.timeout_ms.write(writer)?;
write_versioned_array(writer, version, Some(&self.topic_data))?;
Ok(())
}
}
impl RequestBody for ProduceRequest {
type ResponseBody = ProduceResponse;
const API_KEY: ApiKey = ApiKey::Produce;
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(3)), ApiVersion(Int16(7)));
const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(9));
}
#[derive(Debug)]
#[allow(missing_copy_implementations)]
pub struct ProduceResponsePartitionResponse {
pub index: Int32,
pub error: Option<Error>,
pub base_offset: Int64,
pub log_append_time_ms: Option<Int64>,
pub log_start_offset: Option<Int64>,
}
impl<R> ReadVersionedType<R> for ProduceResponsePartitionResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 7);
Ok(Self {
index: Int32::read(reader)?,
error: Error::new(Int16::read(reader)?.0),
base_offset: Int64::read(reader)?,
log_append_time_ms: (v >= 2).then(|| Int64::read(reader)).transpose()?,
log_start_offset: (v >= 5).then(|| Int64::read(reader)).transpose()?,
})
}
}
#[derive(Debug)]
pub struct ProduceResponseResponse {
pub name: String_,
pub partition_responses: Vec<ProduceResponsePartitionResponse>,
}
impl<R> ReadVersionedType<R> for ProduceResponseResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 7);
Ok(Self {
name: String_::read(reader)?,
partition_responses: read_versioned_array(reader, version)?.unwrap_or_default(),
})
}
}
#[derive(Debug)]
pub struct ProduceResponse {
pub responses: Vec<ProduceResponseResponse>,
pub throttle_time_ms: Option<Int32>,
}
impl<R> ReadVersionedType<R> for ProduceResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 7);
Ok(Self {
responses: read_versioned_array(reader, version)?.unwrap_or_default(),
throttle_time_ms: (v >= 1).then(|| Int32::read(reader)).transpose()?,
})
}
}