fluvio_spu_schema/server/
fetch_offset.rs1use std::fmt;
6
7use fluvio_protocol::api::Request;
8use fluvio_protocol::{Encoder, Decoder};
9use fluvio_protocol::record::PartitionOffset;
10use fluvio_protocol::record::ReplicaKey;
11
12use fluvio_types::PartitionId;
13
14use crate::COMMON_VERSION;
15use crate::errors::ErrorCode;
16use super::SpuServerApiKey;
17
18#[derive(Decoder, Encoder, Default, Debug)]
24pub struct FetchOffsetsRequest {
25 pub topics: Vec<FetchOffsetTopic>,
27
28 #[deprecated(note = "to get consumer offest use `FetchConsumerOffsetsRequest` instead")]
30 #[fluvio(min_version = 23, max_version = 23)]
31 pub consumer_id: Option<String>,
32}
33
34impl Request for FetchOffsetsRequest {
35 const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
36 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
37 type Response = FetchOffsetsResponse;
38}
39
40impl FetchOffsetsRequest {
41 pub fn new(topic: String, partition: u32) -> Self {
43 Self {
44 topics: vec![FetchOffsetTopic {
45 name: topic,
46 partitions: vec![FetchOffsetPartition {
47 partition_index: partition,
48 }],
49 }],
50 ..Default::default()
51 }
52 }
53}
54
55#[derive(Decoder, Encoder, Default, Debug)]
56pub struct FetchOffsetTopic {
57 pub name: String,
59
60 pub partitions: Vec<FetchOffsetPartition>,
62}
63
64#[derive(Decoder, Encoder, Default, Debug)]
65pub struct FetchOffsetPartition {
66 pub partition_index: PartitionId,
68}
69
70#[derive(Encoder, Decoder, Default, Debug)]
75pub struct FetchOffsetsResponse {
76 pub topics: Vec<FetchOffsetTopicResponse>,
78}
79
80impl FetchOffsetsResponse {
81 pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
82 for topic_res in self.topics {
83 if topic_res.name == replica.topic {
84 for partition_res in topic_res.partitions {
85 if partition_res.partition_index == replica.partition {
86 return Some(partition_res);
87 }
88 }
89 }
90 }
91
92 None
93 }
94}
95
96#[derive(Encoder, Decoder, Default, Debug)]
97pub struct FetchOffsetTopicResponse {
98 pub name: String,
100
101 pub partitions: Vec<FetchOffsetPartitionResponse>,
103}
104
105#[derive(Encoder, Decoder, Default, Debug)]
106pub struct FetchOffsetPartitionResponse {
107 pub error_code: ErrorCode,
109
110 pub partition_index: PartitionId,
112
113 pub start_offset: i64,
115
116 pub last_stable_offset: i64,
118}
119
120impl fmt::Display for FetchOffsetPartitionResponse {
121 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122 write!(
123 f,
124 "error: {:#?}, partition: {}, start: {}, last: {}",
125 self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
126 )
127 }
128}
129
130impl PartitionOffset for FetchOffsetPartitionResponse {
131 fn last_stable_offset(&self) -> i64 {
132 self.last_stable_offset
133 }
134
135 fn start_offset(&self) -> i64 {
136 self.start_offset
137 }
138}