fluvio_dataplane_protocol/fetch/
response.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3
4use crate::batch::BatchRecords;
5use crate::core::Decoder;
6use crate::core::Encoder;
7use crate::derive::FluvioDefault;
8
9use crate::record::RecordSet;
10use crate::ErrorCode;
11use crate::Offset;
12
13pub type DefaultFetchResponse = FetchResponse<RecordSet>;
14
15#[derive(Encoder, Decoder, FluvioDefault, Debug)]
16pub struct FetchResponse<R>
17where
18    R: Encoder + Decoder + Default + Debug,
19{
20    /// The duration in milliseconds for which the request was throttled due to a quota violation,
21    /// or zero if the request did not violate any quota.
22    pub throttle_time_ms: i32,
23    pub error_code: ErrorCode,
24    pub session_id: i32,
25
26    /// The response topics.
27    pub topics: Vec<FetchableTopicResponse<R>>,
28}
29
30impl<R> FetchResponse<R>
31where
32    R: Encoder + Decoder + Default + Debug,
33{
34    pub fn find_partition(
35        self,
36        topic: &str,
37        partition: i32,
38    ) -> Option<FetchablePartitionResponse<R>> {
39        for topic_res in self.topics {
40            if topic_res.name == topic {
41                for partition_res in topic_res.partitions {
42                    if partition_res.partition_index == partition {
43                        return Some(partition_res);
44                    }
45                }
46            }
47        }
48
49        None
50    }
51}
52
53#[derive(Encoder, Decoder, FluvioDefault, Debug)]
54pub struct FetchableTopicResponse<R>
55where
56    R: Encoder + Decoder + Default + Debug,
57{
58    /// The topic name.
59    pub name: String,
60
61    /// The topic partitions.
62    pub partitions: Vec<FetchablePartitionResponse<R>>,
63    pub data: PhantomData<R>,
64}
65
66#[derive(Encoder, Decoder, FluvioDefault, Debug)]
67pub struct FetchablePartitionResponse<R>
68where
69    R: Encoder + Decoder + Default + Debug,
70{
71    /// The partition index.
72    pub partition_index: i32,
73
74    /// The error code, or 0 if there was no fetch error
75    pub error_code: ErrorCode,
76
77    /// The current high water mark.
78    pub high_watermark: i64,
79
80    /// next offset to fetch in case of filter
81    /// consumer should return that back to SPU, otherwise SPU will re-turn same filter records
82    #[fluvio(min_version = 11, ignorable)]
83    pub next_filter_offset: i64,
84
85    /// The current log start offset.
86    pub log_start_offset: i64,
87
88    /// The aborted transactions.
89    pub aborted: Option<Vec<AbortedTransaction>>,
90
91    /// The record data.
92    pub records: R,
93}
94
95impl<R: BatchRecords> FetchablePartitionResponse<RecordSet<R>> {
96    /// offset that will be use for fetching rest of offsets
97    /// this will be 1 greater than last offset of previous query
98    /// If all records have been read then it will be either HW or LEO
99    pub fn next_offset_for_fetch(&self) -> Option<Offset> {
100        if self.next_filter_offset > 0 {
101            Some(self.next_filter_offset)
102        } else {
103            self.records.last_offset()
104        }
105    }
106}
107
108#[derive(Encoder, Decoder, FluvioDefault, Debug)]
109pub struct AbortedTransaction {
110    pub producer_id: i64,
111    pub first_offset: i64,
112}
113
114// -----------------------------------
115// Implementation
116// -----------------------------------
117
118impl<R> FetchResponse<R>
119where
120    R: Encoder + Decoder + Debug,
121{
122    pub fn find_topic(&self, topic: &str) -> Option<&FetchableTopicResponse<R>>
123    where
124        R: Debug,
125    {
126        for r_topic in &self.topics {
127            if r_topic.name == *topic {
128                return Some(r_topic);
129            }
130        }
131        None
132    }
133}
134
135#[cfg(feature = "file")]
136pub use file::*;
137
138#[cfg(feature = "file")]
139mod file {
140
141    use std::io::Error as IoError;
142
143    use tracing::trace;
144    use bytes::BytesMut;
145
146    use crate::record::FileRecordSet;
147    use crate::store::FileWrite;
148    use crate::store::StoreValue;
149    use crate::core::Version;
150
151    use super::*;
152
153    pub type FileFetchResponse = FetchResponse<FileRecordSet>;
154    pub type FileTopicResponse = FetchableTopicResponse<FileRecordSet>;
155    pub type FilePartitionResponse = FetchablePartitionResponse<FileRecordSet>;
156
157    impl FileWrite for FilePartitionResponse {
158        fn file_encode(
159            &self,
160            src: &mut BytesMut,
161            data: &mut Vec<StoreValue>,
162            version: Version,
163        ) -> Result<(), IoError> {
164            trace!("file encoding fetch partition response");
165            self.partition_index.encode(src, version)?;
166            self.error_code.encode(src, version)?;
167            self.high_watermark.encode(src, version)?;
168            if version >= 11 {
169                self.next_filter_offset.encode(src, version)?;
170            } else {
171                tracing::trace!("v: {} is less than last fetched version 11", version);
172            }
173            self.log_start_offset.encode(src, version)?;
174            self.aborted.encode(src, version)?;
175            self.records.file_encode(src, data, version)?;
176            Ok(())
177        }
178    }
179
180    impl FileWrite for FileFetchResponse {
181        fn file_encode(
182            &self,
183            src: &mut BytesMut,
184            data: &mut Vec<StoreValue>,
185            version: Version,
186        ) -> Result<(), IoError> {
187            trace!("file encoding FileFetchResponse");
188            trace!("encoding throttle_time_ms {}", self.throttle_time_ms);
189            self.throttle_time_ms.encode(src, version)?;
190            trace!("encoding error code {:#?}", self.error_code);
191            self.error_code.encode(src, version)?;
192            trace!("encoding session code {}", self.session_id);
193            self.session_id.encode(src, version)?;
194            trace!("encoding topics len: {}", self.topics.len());
195            self.topics.file_encode(src, data, version)?;
196            Ok(())
197        }
198    }
199
200    impl FileWrite for FileTopicResponse {
201        fn file_encode(
202            &self,
203            src: &mut BytesMut,
204            data: &mut Vec<StoreValue>,
205            version: Version,
206        ) -> Result<(), IoError> {
207            trace!("file encoding fetch topic response");
208            self.name.encode(src, version)?;
209            self.partitions.file_encode(src, data, version)?;
210            Ok(())
211        }
212    }
213}