fluvio_dataplane_protocol/fetch/
response.rs1use std::fmt::Debug;
2use std::marker::PhantomData;
3
4use crate::batch::BatchRecords;
5use crate::core::Decoder;
6use crate::core::Encoder;
7use crate::derive::FluvioDefault;
8
9use crate::record::RecordSet;
10use crate::ErrorCode;
11use crate::Offset;
12
13pub type DefaultFetchResponse = FetchResponse<RecordSet>;
14
15#[derive(Encoder, Decoder, FluvioDefault, Debug)]
16pub struct FetchResponse<R>
17where
18 R: Encoder + Decoder + Default + Debug,
19{
20 pub throttle_time_ms: i32,
23 pub error_code: ErrorCode,
24 pub session_id: i32,
25
26 pub topics: Vec<FetchableTopicResponse<R>>,
28}
29
30impl<R> FetchResponse<R>
31where
32 R: Encoder + Decoder + Default + Debug,
33{
34 pub fn find_partition(
35 self,
36 topic: &str,
37 partition: i32,
38 ) -> Option<FetchablePartitionResponse<R>> {
39 for topic_res in self.topics {
40 if topic_res.name == topic {
41 for partition_res in topic_res.partitions {
42 if partition_res.partition_index == partition {
43 return Some(partition_res);
44 }
45 }
46 }
47 }
48
49 None
50 }
51}
52
53#[derive(Encoder, Decoder, FluvioDefault, Debug)]
54pub struct FetchableTopicResponse<R>
55where
56 R: Encoder + Decoder + Default + Debug,
57{
58 pub name: String,
60
61 pub partitions: Vec<FetchablePartitionResponse<R>>,
63 pub data: PhantomData<R>,
64}
65
66#[derive(Encoder, Decoder, FluvioDefault, Debug)]
67pub struct FetchablePartitionResponse<R>
68where
69 R: Encoder + Decoder + Default + Debug,
70{
71 pub partition_index: i32,
73
74 pub error_code: ErrorCode,
76
77 pub high_watermark: i64,
79
80 #[fluvio(min_version = 11, ignorable)]
83 pub next_filter_offset: i64,
84
85 pub log_start_offset: i64,
87
88 pub aborted: Option<Vec<AbortedTransaction>>,
90
91 pub records: R,
93}
94
95impl<R: BatchRecords> FetchablePartitionResponse<RecordSet<R>> {
96 pub fn next_offset_for_fetch(&self) -> Option<Offset> {
100 if self.next_filter_offset > 0 {
101 Some(self.next_filter_offset)
102 } else {
103 self.records.last_offset()
104 }
105 }
106}
107
108#[derive(Encoder, Decoder, FluvioDefault, Debug)]
109pub struct AbortedTransaction {
110 pub producer_id: i64,
111 pub first_offset: i64,
112}
113
114impl<R> FetchResponse<R>
119where
120 R: Encoder + Decoder + Debug,
121{
122 pub fn find_topic(&self, topic: &str) -> Option<&FetchableTopicResponse<R>>
123 where
124 R: Debug,
125 {
126 for r_topic in &self.topics {
127 if r_topic.name == *topic {
128 return Some(r_topic);
129 }
130 }
131 None
132 }
133}
134
135#[cfg(feature = "file")]
136pub use file::*;
137
138#[cfg(feature = "file")]
139mod file {
140
141 use std::io::Error as IoError;
142
143 use tracing::trace;
144 use bytes::BytesMut;
145
146 use crate::record::FileRecordSet;
147 use crate::store::FileWrite;
148 use crate::store::StoreValue;
149 use crate::core::Version;
150
151 use super::*;
152
153 pub type FileFetchResponse = FetchResponse<FileRecordSet>;
154 pub type FileTopicResponse = FetchableTopicResponse<FileRecordSet>;
155 pub type FilePartitionResponse = FetchablePartitionResponse<FileRecordSet>;
156
157 impl FileWrite for FilePartitionResponse {
158 fn file_encode(
159 &self,
160 src: &mut BytesMut,
161 data: &mut Vec<StoreValue>,
162 version: Version,
163 ) -> Result<(), IoError> {
164 trace!("file encoding fetch partition response");
165 self.partition_index.encode(src, version)?;
166 self.error_code.encode(src, version)?;
167 self.high_watermark.encode(src, version)?;
168 if version >= 11 {
169 self.next_filter_offset.encode(src, version)?;
170 } else {
171 tracing::trace!("v: {} is less than last fetched version 11", version);
172 }
173 self.log_start_offset.encode(src, version)?;
174 self.aborted.encode(src, version)?;
175 self.records.file_encode(src, data, version)?;
176 Ok(())
177 }
178 }
179
180 impl FileWrite for FileFetchResponse {
181 fn file_encode(
182 &self,
183 src: &mut BytesMut,
184 data: &mut Vec<StoreValue>,
185 version: Version,
186 ) -> Result<(), IoError> {
187 trace!("file encoding FileFetchResponse");
188 trace!("encoding throttle_time_ms {}", self.throttle_time_ms);
189 self.throttle_time_ms.encode(src, version)?;
190 trace!("encoding error code {:#?}", self.error_code);
191 self.error_code.encode(src, version)?;
192 trace!("encoding session code {}", self.session_id);
193 self.session_id.encode(src, version)?;
194 trace!("encoding topics len: {}", self.topics.len());
195 self.topics.file_encode(src, data, version)?;
196 Ok(())
197 }
198 }
199
200 impl FileWrite for FileTopicResponse {
201 fn file_encode(
202 &self,
203 src: &mut BytesMut,
204 data: &mut Vec<StoreValue>,
205 version: Version,
206 ) -> Result<(), IoError> {
207 trace!("file encoding fetch topic response");
208 self.name.encode(src, version)?;
209 self.partitions.file_encode(src, data, version)?;
210 Ok(())
211 }
212 }
213}