use std::io::{Read, Write};
use super::{
ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
};
use crate::protocol::api_version::ApiVersionRange;
use crate::protocol::messages::{read_versioned_array, write_versioned_array};
use crate::protocol::{
api_key::ApiKey,
api_version::ApiVersion,
error::Error,
primitives::*,
traits::{ReadType, WriteType},
};
#[derive(Debug)]
pub struct MetadataRequest {
pub topics: Option<Vec<MetadataRequestTopic>>,
pub allow_auto_topic_creation: Option<Boolean>,
}
impl RequestBody for MetadataRequest {
type ResponseBody = MetadataResponse;
const API_KEY: ApiKey = ApiKey::Metadata;
const API_VERSION_RANGE: ApiVersionRange =
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(4)));
const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(9));
}
impl<W> WriteVersionedType<W> for MetadataRequest
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
let v = version.0.0;
assert!(v <= 4);
if v < 4 && self.allow_auto_topic_creation.is_some() {
return Err(WriteVersionedError::FieldNotAvailable {
version,
field: "allow_auto_topic_creation".to_string(),
});
}
write_versioned_array(writer, version, self.topics.as_deref())?;
if v >= 4 {
match self.allow_auto_topic_creation {
None => Boolean(true).write(writer)?,
Some(b) => b.write(writer)?,
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct MetadataRequestTopic {
pub name: String_,
}
impl<W> WriteVersionedType<W> for MetadataRequestTopic
where
W: Write,
{
fn write_versioned(
&self,
writer: &mut W,
version: ApiVersion,
) -> Result<(), WriteVersionedError> {
assert!(version.0.0 <= 4);
Ok(self.name.write(writer)?)
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MetadataResponse {
pub throttle_time_ms: Option<Int32>,
pub brokers: Vec<MetadataResponseBroker>,
pub cluster_id: Option<NullableString>,
pub controller_id: Option<Int32>,
pub topics: Vec<MetadataResponseTopic>,
}
impl<R> ReadVersionedType<R> for MetadataResponse
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 4);
let throttle_time_ms = (v >= 3).then(|| Int32::read(reader)).transpose()?;
let brokers = read_versioned_array(reader, version)?.unwrap_or_default();
let cluster_id = (v >= 2).then(|| NullableString::read(reader)).transpose()?;
let controller_id = (v >= 1).then(|| Int32::read(reader)).transpose()?;
let topics = read_versioned_array(reader, version)?.unwrap_or_default();
Ok(Self {
throttle_time_ms,
brokers,
topics,
cluster_id,
controller_id,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MetadataResponseBroker {
pub node_id: Int32,
pub host: String_,
pub port: Int32,
pub rack: Option<NullableString>,
}
impl<R> ReadVersionedType<R> for MetadataResponseBroker
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 4);
let node_id = Int32::read(reader)?;
let host = String_::read(reader)?;
let port = Int32::read(reader)?;
let rack = (v >= 1).then(|| NullableString::read(reader)).transpose()?;
Ok(Self {
node_id,
host,
port,
rack,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MetadataResponseTopic {
pub error: Option<Error>,
pub name: String_,
pub is_internal: Option<Boolean>,
pub partitions: Vec<MetadataResponsePartition>,
}
impl<R> ReadVersionedType<R> for MetadataResponseTopic
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 4);
let error = Error::new(Int16::read(reader)?.0);
let name = String_::read(reader)?;
let is_internal = (v >= 1).then(|| Boolean::read(reader)).transpose()?;
let partitions = read_versioned_array(reader, version)?.unwrap_or_default();
Ok(Self {
error,
name,
is_internal,
partitions,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MetadataResponsePartition {
pub error: Option<Error>,
pub partition_index: Int32,
pub leader_id: Int32,
pub replica_nodes: Array<Int32>,
pub isr_nodes: Array<Int32>,
}
impl<R> ReadVersionedType<R> for MetadataResponsePartition
where
R: Read,
{
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
let v = version.0.0;
assert!(v <= 4);
Ok(Self {
error: Error::new(Int16::read(reader)?.0),
partition_index: Int32::read(reader)?,
leader_id: Int32::read(reader)?,
replica_nodes: Array::read(reader)?,
isr_nodes: Array::read(reader)?,
})
}
}