use byteorder::WriteBytesExt;
use crate::codec::*;
use crate::IoResult;
#[derive(Debug, Default, Clone)]
pub struct FetchResponse {
pub throttle_time_ms: i32,
pub error_code: i16,
pub session_id: i32,
pub responses: Vec<FetchableTopicResponse>,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Encodable for FetchResponse {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version >= 1 {
Int32.encode(buf, self.throttle_time_ms)?;
}
if version >= 7 {
Int16.encode(buf, self.error_code)?;
Int32.encode(buf, self.session_id)?;
}
NullableArray(Struct(version), version >= 12).encode(buf, self.responses.as_slice())?;
if version >= 12 {
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
}
Ok(())
}
fn calculate_size(&self, version: i16) -> usize {
let mut res = 0;
if version >= 1 {
res += Int32::SIZE; }
if version >= 7 {
res += Int16::SIZE; res += Int32::SIZE; }
res +=
NullableArray(Struct(version), version >= 12).calculate_size(self.responses.as_slice());
if version >= 12 {
res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
}
res
}
}
#[derive(Debug, Default, Clone)]
pub struct FetchableTopicResponse {
pub topic: String,
pub topic_id: uuid::Uuid,
pub partitions: Vec<PartitionData>,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Encodable for FetchableTopicResponse {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version <= 12 {
NullableString(version >= 12).encode(buf, self.topic.as_str())?;
}
if version >= 13 {
Uuid.encode(buf, self.topic_id)?;
}
NullableArray(Struct(version), version >= 12).encode(buf, self.partitions.as_slice())?;
if version >= 12 {
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
}
Ok(())
}
fn calculate_size(&self, version: i16) -> usize {
let mut res = 0;
if version <= 12 {
res += NullableString(version >= 12).calculate_size(self.topic.as_str());
}
if version >= 13 {
res += Uuid::SIZE; }
res += NullableArray(Struct(version), version >= 12)
.calculate_size(self.partitions.as_slice());
if version >= 12 {
res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
}
res
}
}
#[derive(Debug, Clone)]
pub struct PartitionData {
pub partition_index: i32,
pub error_code: i16,
pub high_watermark: i64,
pub last_stable_offset: i64,
pub log_start_offset: i64,
pub diverging_epoch: Option<EpochEndOffset>,
pub current_leader: Option<LeaderIdAndEpoch>,
pub snapshot_id: Option<SnapshotId>,
pub aborted_transactions: Option<Vec<AbortedTransaction>>,
pub preferred_read_replica: i32,
pub records: Vec<u8>,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Default for PartitionData {
fn default() -> Self {
PartitionData {
partition_index: 0,
error_code: 0,
high_watermark: 0,
last_stable_offset: -1,
log_start_offset: -1,
diverging_epoch: None,
current_leader: None,
snapshot_id: None,
aborted_transactions: None,
preferred_read_replica: -1,
records: Default::default(),
unknown_tagged_fields: vec![],
}
}
}
impl Encodable for PartitionData {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
Int32.encode(buf, self.partition_index)?;
Int16.encode(buf, self.error_code)?;
Int64.encode(buf, self.high_watermark)?;
if version >= 4 {
Int64.encode(buf, self.last_stable_offset)?;
}
if version >= 5 {
Int64.encode(buf, self.log_start_offset)?;
}
if version >= 4 {
NullableArray(Struct(version), version >= 12)
.encode(buf, self.aborted_transactions.as_deref())?;
}
if version >= 11 {
Int32.encode(buf, self.preferred_read_replica)?;
}
NullableBytes(version >= 12).encode(buf, &self.records)?;
if version >= 12 {
let mut n = self.diverging_epoch.is_some() as usize;
n += self.current_leader.is_some() as usize;
n += self.snapshot_id.is_some() as usize;
RawTaggedFieldList.encode_with(buf, n, &self.unknown_tagged_fields, |buf| {
if let Some(diverging_epoch) = &self.diverging_epoch {
RawTaggedFieldWriter.write_field(buf, 0, Struct(version), diverging_epoch)?;
}
if let Some(current_leader) = &self.current_leader {
RawTaggedFieldWriter.write_field(buf, 1, Struct(version), current_leader)?;
}
if let Some(snapshot_id) = &self.snapshot_id {
RawTaggedFieldWriter.write_field(buf, 2, Struct(version), snapshot_id)?;
}
Ok(())
})?;
}
Ok(())
}
fn calculate_size(&self, version: i16) -> usize {
let mut res = 0;
res += Int32::SIZE; res += Int16::SIZE; res += Int64::SIZE; if version >= 4 {
res += Int64::SIZE; }
if version >= 5 {
res += Int64::SIZE; }
if version >= 4 {
res += NullableArray(Struct(version), version >= 12)
.calculate_size(self.aborted_transactions.as_deref());
}
if version >= 11 {
res += Int32::SIZE; }
res += NullableBytes(version >= 12).calculate_size(&self.records);
if version >= 12 {
let mut n = 0;
let mut bs = 0;
if let Some(diverging_epoch) = &self.diverging_epoch {
n += 1;
bs +=
RawTaggedFieldWriter.calculate_field_size(0, Struct(version), diverging_epoch);
}
if let Some(current_leader) = &self.current_leader {
n += 1;
bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), current_leader);
}
if let Some(snapshot_id) = &self.snapshot_id {
n += 1;
bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), snapshot_id);
}
res += RawTaggedFieldList.calculate_size_with(n, bs, &self.unknown_tagged_fields);
}
res
}
}
#[derive(Debug, Clone)]
pub struct EpochEndOffset {
pub epoch: i32,
pub end_offset: i64,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Default for EpochEndOffset {
fn default() -> Self {
EpochEndOffset {
epoch: -1,
end_offset: -1,
unknown_tagged_fields: vec![],
}
}
}
impl Encodable for EpochEndOffset {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version < 12 {
Err(err_encode_message_unsupported(version, "EpochEndOffset"))?
}
Int32.encode(buf, self.epoch)?;
Int64.encode(buf, self.end_offset)?;
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
Ok(())
}
fn calculate_size(&self, _version: i16) -> usize {
let mut res = 0;
res += Int32::SIZE; res += Int64::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
res
}
}
#[derive(Debug, Clone)]
pub struct LeaderIdAndEpoch {
pub leader_id: i32,
pub leader_epoch: i32,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Default for LeaderIdAndEpoch {
fn default() -> Self {
LeaderIdAndEpoch {
leader_id: -1,
leader_epoch: -1,
unknown_tagged_fields: vec![],
}
}
}
impl Encodable for LeaderIdAndEpoch {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version < 12 {
Err(err_encode_message_unsupported(version, "LeaderIdAndEpoch"))?
}
Int32.encode(buf, self.leader_id)?;
Int32.encode(buf, self.leader_epoch)?;
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
Ok(())
}
fn calculate_size(&self, _version: i16) -> usize {
let mut res = 0;
res += Int32::SIZE; res += Int32::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
res
}
}
#[derive(Debug, Clone)]
pub struct SnapshotId {
pub end_offset: i64,
pub epoch: i32,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Default for SnapshotId {
fn default() -> Self {
SnapshotId {
end_offset: -1,
epoch: -1,
unknown_tagged_fields: vec![],
}
}
}
impl Encodable for SnapshotId {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version < 12 {
Err(err_encode_message_unsupported(version, "SnapshotId"))?
}
Int64.encode(buf, self.end_offset)?;
Int32.encode(buf, self.epoch)?;
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
Ok(())
}
fn calculate_size(&self, _version: i16) -> usize {
let mut res = 0;
res += Int64::SIZE; res += Int32::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
res
}
}
#[derive(Debug, Default, Clone)]
pub struct AbortedTransaction {
pub producer_id: i64,
pub first_offset: i64,
pub unknown_tagged_fields: Vec<RawTaggedField>,
}
impl Encodable for AbortedTransaction {
fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
if version < 4 {
Err(err_encode_message_unsupported(
version,
"AbortedTransaction",
))?
}
Int64.encode(buf, self.producer_id)?;
Int64.encode(buf, self.first_offset)?;
if version >= 12 {
RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
}
Ok(())
}
fn calculate_size(&self, version: i16) -> usize {
let mut res = 0;
res += Int64::SIZE; res += Int64::SIZE; if version >= 12 {
res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
}
res
}
}