use std::fmt;
use fluvio_protocol::api::Request;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_protocol::record::PartitionOffset;
use fluvio_protocol::record::ReplicaKey;
use fluvio_types::PartitionId;
use crate::COMMON_VERSION;
use crate::errors::ErrorCode;
use super::SpuServerApiKey;
#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetsRequest {
pub topics: Vec<FetchOffsetTopic>,
}
impl Request for FetchOffsetsRequest {
const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = FetchOffsetsResponse;
}
impl FetchOffsetsRequest {
pub fn new(topic: String, partition: u32) -> Self {
Self {
topics: vec![FetchOffsetTopic {
name: topic,
partitions: vec![FetchOffsetPartition {
partition_index: partition,
}],
}],
}
}
}
#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetTopic {
pub name: String,
pub partitions: Vec<FetchOffsetPartition>,
}
#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchOffsetPartition {
pub partition_index: PartitionId,
}
#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetsResponse {
pub topics: Vec<FetchOffsetTopicResponse>,
}
impl FetchOffsetsResponse {
pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
for topic_res in self.topics {
if topic_res.name == replica.topic {
for partition_res in topic_res.partitions {
if partition_res.partition_index == replica.partition {
return Some(partition_res);
}
}
}
}
None
}
}
#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetTopicResponse {
pub name: String,
pub partitions: Vec<FetchOffsetPartitionResponse>,
}
#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchOffsetPartitionResponse {
pub error_code: ErrorCode,
pub partition_index: PartitionId,
pub start_offset: i64,
pub last_stable_offset: i64,
}
impl fmt::Display for FetchOffsetPartitionResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"error: {:#?}, partition: {}, start: {}, last: {}",
self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
)
}
}
impl PartitionOffset for FetchOffsetPartitionResponse {
fn last_stable_offset(&self) -> i64 {
self.last_stable_offset
}
fn start_offset(&self) -> i64 {
self.start_offset
}
}