use std::fmt::Debug;
use std::marker::PhantomData;
use crate::batch::BatchRecords;
use crate::core::Decoder;
use crate::core::Encoder;
use crate::derive::FluvioDefault;
use crate::record::RecordSet;
use crate::ErrorCode;
use crate::Offset;
pub type DefaultFetchResponse = FetchResponse<RecordSet>;
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct FetchResponse<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub session_id: i32,
pub topics: Vec<FetchableTopicResponse<R>>,
}
impl<R> FetchResponse<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub fn find_partition(
self,
topic: &str,
partition: i32,
) -> Option<FetchablePartitionResponse<R>> {
for topic_res in self.topics {
if topic_res.name == topic {
for partition_res in topic_res.partitions {
if partition_res.partition_index == partition {
return Some(partition_res);
}
}
}
}
None
}
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct FetchableTopicResponse<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub name: String,
pub partitions: Vec<FetchablePartitionResponse<R>>,
pub data: PhantomData<R>,
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct FetchablePartitionResponse<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub partition_index: i32,
pub error_code: ErrorCode,
pub high_watermark: i64,
#[fluvio(min_version = 11, ignorable)]
pub next_filter_offset: i64,
pub log_start_offset: i64,
pub aborted: Option<Vec<AbortedTransaction>>,
pub records: R,
}
impl<R: BatchRecords> FetchablePartitionResponse<RecordSet<R>> {
pub fn next_offset_for_fetch(&self) -> Option<Offset> {
if self.next_filter_offset > 0 {
Some(self.next_filter_offset)
} else {
self.records.last_offset()
}
}
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct AbortedTransaction {
pub producer_id: i64,
pub first_offset: i64,
}
impl<R> FetchResponse<R>
where
R: Encoder + Decoder + Debug,
{
pub fn find_topic(&self, topic: &str) -> Option<&FetchableTopicResponse<R>>
where
R: Debug,
{
for r_topic in &self.topics {
if r_topic.name == *topic {
return Some(r_topic);
}
}
None
}
}
#[cfg(feature = "file")]
pub use file::*;
#[cfg(feature = "file")]
mod file {
use std::io::Error as IoError;
use tracing::trace;
use bytes::BytesMut;
use crate::record::FileRecordSet;
use crate::store::FileWrite;
use crate::store::StoreValue;
use crate::core::Version;
use super::*;
pub type FileFetchResponse = FetchResponse<FileRecordSet>;
pub type FileTopicResponse = FetchableTopicResponse<FileRecordSet>;
pub type FilePartitionResponse = FetchablePartitionResponse<FileRecordSet>;
impl FileWrite for FilePartitionResponse {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding fetch partition response");
self.partition_index.encode(src, version)?;
self.error_code.encode(src, version)?;
self.high_watermark.encode(src, version)?;
if version >= 11 {
self.next_filter_offset.encode(src, version)?;
} else {
tracing::trace!("v: {} is less than last fetched version 11", version);
}
self.log_start_offset.encode(src, version)?;
self.aborted.encode(src, version)?;
self.records.file_encode(src, data, version)?;
Ok(())
}
}
impl FileWrite for FileFetchResponse {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding FileFetchResponse");
trace!("encoding throttle_time_ms {}", self.throttle_time_ms);
self.throttle_time_ms.encode(src, version)?;
trace!("encoding error code {:#?}", self.error_code);
self.error_code.encode(src, version)?;
trace!("encoding session code {}", self.session_id);
self.session_id.encode(src, version)?;
trace!("encoding topics len: {}", self.topics.len());
self.topics.file_encode(src, data, version)?;
Ok(())
}
}
impl FileWrite for FileTopicResponse {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding fetch topic response");
self.name.encode(src, version)?;
self.partitions.file_encode(src, data, version)?;
Ok(())
}
}
}