use byteorder::ReadBytesExt;
use crate::codec::*;
use crate::IoResult;
#[derive(Debug, Default, Clone)]
pub struct FetchRequest {
pub cluster_id: Option<String>,
pub replica_id: i32,
pub replica_state: ReplicaState,
pub max_wait_ms: i32,
pub min_bytes: i32,
pub max_bytes: i32,
pub isolation_level: i8,
pub session_id: i32,
pub session_epoch: i32,
pub topics: Vec<FetchTopic>,
pub forgotten_topics_data: Vec<ForgottenTopic>,
pub rack_id: String,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Decodable for FetchRequest {
fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
let mut this = FetchRequest {
replica_id: -1,
max_bytes: i32::MAX,
session_epoch: -1,
..Default::default()
};
if version <= 14 {
this.replica_id = Int32.decode(buf)?
}
this.max_wait_ms = Int32.decode(buf)?;
this.min_bytes = Int32.decode(buf)?;
if version >= 3 {
this.max_bytes = Int32.decode(buf)?;
}
if version >= 4 {
this.isolation_level = Int8.decode(buf)?;
}
if version >= 7 {
this.session_id = Int32.decode(buf)?;
}
if version >= 7 {
this.session_epoch = Int32.decode(buf)?;
}
this.topics = NullableArray(Struct(version), version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("topics"))?;
if version >= 7 {
this.forgotten_topics_data = NullableArray(Struct(version), version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("forgotten_topics_data"))?;
}
if version >= 11 {
this.rack_id = NullableString(version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("rack_id"))?;
}
if version >= 12 {
this.unknown_tagged_fields =
RawTaggedFieldList.decode_with(buf, |buf, tag, _| match tag {
0 => {
this.cluster_id = NullableString(true).decode(buf)?;
Ok(true)
}
1 => {
if version >= 15 {
this.replica_state = ReplicaState::read(buf, version)?;
}
Ok(true)
}
_ => Ok(false),
})?;
}
Ok(this)
}
}
#[derive(Debug, Default, Clone)]
pub struct ReplicaState {
pub replica_id: i32,
pub replica_epoch: i64,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Decodable for ReplicaState {
fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
if version > 15 {
Err(err_decode_message_unsupported(version, "ReplicaState"))?
}
Ok(ReplicaState {
replica_id: Int32.decode(buf)?,
replica_epoch: Int64.decode(buf)?,
unknown_tagged_fields: RawTaggedFieldList.decode(buf)?,
})
}
}
#[derive(Debug, Default, Clone)]
pub struct FetchTopic {
pub topic: String,
pub topic_id: uuid::Uuid,
pub partitions: Vec<FetchPartition>,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Decodable for FetchTopic {
fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
if version > 15 {
Err(err_decode_message_unsupported(version, "FetchTopic"))?
}
let mut this = FetchTopic::default();
if version <= 12 {
this.topic = NullableString(version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("topic"))?;
}
if version >= 13 {
this.topic_id = Uuid.decode(buf)?;
}
this.partitions = NullableArray(Struct(version), version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("partitions"))?;
if version >= 12 {
this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
}
Ok(this)
}
}
#[derive(Debug, Default, Clone)]
pub struct FetchPartition {
pub partition: i32,
pub current_leader_epoch: i32,
pub fetch_offset: i64,
pub last_fetched_epoch: i32,
pub log_start_offset: i64,
pub partition_max_bytes: i32,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Decodable for FetchPartition {
fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
if version > 15 {
Err(err_decode_message_unsupported(version, "FetchPartition"))?
}
let mut this = FetchPartition {
partition: Int32.decode(buf)?,
..Default::default()
};
this.current_leader_epoch = if version >= 9 { Int32.decode(buf)? } else { -1 };
this.fetch_offset = Int64.decode(buf)?;
this.last_fetched_epoch = if version >= 12 {
Int32.decode(buf)?
} else {
-1
};
this.log_start_offset = if version >= 5 { Int64.decode(buf)? } else { -1 };
this.partition_max_bytes = Int32.decode(buf)?;
if version >= 12 {
this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
}
Ok(this)
}
}
#[derive(Debug, Default, Clone)]
pub struct ForgottenTopic {
pub topic: String,
pub topic_id: uuid::Uuid,
pub partitions: Vec<i32>,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Decodable for ForgottenTopic {
fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
if version > 15 {
Err(err_decode_message_unsupported(version, "ForgottenTopic"))?
}
let mut this = ForgottenTopic::default();
if version <= 12 {
this.topic = NullableString(version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("topic"))?;
}
if version >= 13 {
this.topic_id = Uuid.decode(buf)?;
}
this.partitions = NullableArray(Int32, version >= 12)
.decode(buf)?
.ok_or_else(|| err_decode_message_null("partitions"))?;
if version >= 12 {
this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
}
Ok(this)
}
}