fluvio_spu_schema/fetch/
response.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3
4use fluvio_protocol::record::BatchRecords;
5use fluvio_protocol::{Decoder, Encoder};
6use fluvio_protocol::derive::FluvioDefault;
7use fluvio_protocol::record::RecordSet;
8use fluvio_protocol::link::ErrorCode;
9use fluvio_protocol::record::Offset;
10
11pub type DefaultFetchResponse = FetchResponse<RecordSet>;
12
13#[derive(Encoder, Decoder, FluvioDefault, Debug)]
14pub struct FetchResponse<R> {
15    /// The duration in milliseconds for which the request was throttled due to a quota violation,
16    /// or zero if the request did not violate any quota.
17    pub throttle_time_ms: i32,
18    pub error_code: ErrorCode,
19    pub session_id: i32,
20
21    /// The response topics.
22    pub topics: Vec<FetchableTopicResponse<R>>,
23}
24
25impl<R> FetchResponse<R> {
26    pub fn find_partition(
27        self,
28        topic: &str,
29        partition: u32,
30    ) -> Option<FetchablePartitionResponse<R>> {
31        for topic_res in self.topics {
32            if topic_res.name == topic {
33                for partition_res in topic_res.partitions {
34                    if partition_res.partition_index == partition {
35                        return Some(partition_res);
36                    }
37                }
38            }
39        }
40
41        None
42    }
43}
44
45#[derive(Encoder, Decoder, FluvioDefault, Debug)]
46pub struct FetchableTopicResponse<R> {
47    /// The topic name.
48    pub name: String,
49
50    /// The topic partitions.
51    pub partitions: Vec<FetchablePartitionResponse<R>>,
52    pub data: PhantomData<R>,
53}
54
55#[derive(Encoder, Decoder, FluvioDefault, Debug)]
56pub struct FetchablePartitionResponse<R> {
57    /// The partition index.
58    pub partition_index: PartitionId,
59
60    /// The error code, or 0 if there was no fetch error
61    pub error_code: ErrorCode,
62
63    /// The current high water mark.
64    pub high_watermark: i64,
65
66    /// next offset to fetch in case of filter
67    /// consumer should return that back to SPU, otherwise SPU will re-turn same filter records
68    #[fluvio(min_version = 11, ignorable)]
69    pub next_filter_offset: i64,
70
71    /// The current log start offset.
72    pub log_start_offset: i64,
73
74    /// The aborted transactions.
75    pub aborted: Option<Vec<AbortedTransaction>>,
76
77    /// The record data.
78    pub records: R,
79}
80
81impl<R: BatchRecords> FetchablePartitionResponse<RecordSet<R>> {
82    /// offset that will be use for fetching rest of offsets
83    /// this will be 1 greater than last offset of previous query
84    /// If all records have been read then it will be either HW or LEO
85    pub fn next_offset_for_fetch(&self) -> Option<Offset> {
86        if self.next_filter_offset > 0 {
87            Some(self.next_filter_offset)
88        } else {
89            self.records.last_offset()
90        }
91    }
92}
93
94#[derive(Encoder, Decoder, FluvioDefault, Debug)]
95pub struct AbortedTransaction {
96    pub producer_id: i64,
97    pub first_offset: i64,
98}
99
100// -----------------------------------
101// Implementation
102// -----------------------------------
103
104impl<R> FetchResponse<R> {
105    pub fn find_topic(&self, topic: &str) -> Option<&FetchableTopicResponse<R>>
106    where
107        R: Debug,
108    {
109        self.topics.iter().find(|&r_topic| r_topic.name == *topic)
110    }
111}
112
113#[cfg(feature = "file")]
114pub use file::*;
115use fluvio_types::PartitionId;
116
117#[cfg(feature = "file")]
118mod file {
119
120    use std::io::Error as IoError;
121
122    use tracing::trace;
123    use bytes::BytesMut;
124
125    use fluvio_protocol::store::FileWrite;
126    use fluvio_protocol::store::StoreValue;
127    use fluvio_protocol::core::Version;
128
129    use crate::file::FileRecordSet;
130
131    use super::*;
132
133    pub type FileFetchResponse = FetchResponse<FileRecordSet>;
134    pub type FileTopicResponse = FetchableTopicResponse<FileRecordSet>;
135    pub type FilePartitionResponse = FetchablePartitionResponse<FileRecordSet>;
136
137    impl FileWrite for FilePartitionResponse {
138        fn file_encode(
139            &self,
140            src: &mut BytesMut,
141            data: &mut Vec<StoreValue>,
142            version: Version,
143        ) -> Result<(), IoError> {
144            trace!("file encoding fetch partition response");
145            self.partition_index.encode(src, version)?;
146            self.error_code.encode(src, version)?;
147            self.high_watermark.encode(src, version)?;
148            if version >= 11 {
149                self.next_filter_offset.encode(src, version)?;
150            } else {
151                tracing::trace!("v: {} is less than last fetched version 11", version);
152            }
153            self.log_start_offset.encode(src, version)?;
154            self.aborted.encode(src, version)?;
155            self.records.file_encode(src, data, version)?;
156            Ok(())
157        }
158    }
159
160    impl FileWrite for FileFetchResponse {
161        fn file_encode(
162            &self,
163            src: &mut BytesMut,
164            data: &mut Vec<StoreValue>,
165            version: Version,
166        ) -> Result<(), IoError> {
167            trace!("file encoding FileFetchResponse");
168            trace!("encoding throttle_time_ms {}", self.throttle_time_ms);
169            self.throttle_time_ms.encode(src, version)?;
170            trace!("encoding error code {:#?}", self.error_code);
171            self.error_code.encode(src, version)?;
172            trace!("encoding session code {}", self.session_id);
173            self.session_id.encode(src, version)?;
174            trace!("encoding topics len: {}", self.topics.len());
175            self.topics.file_encode(src, data, version)?;
176            Ok(())
177        }
178    }
179
180    impl FileWrite for FileTopicResponse {
181        fn file_encode(
182            &self,
183            src: &mut BytesMut,
184            data: &mut Vec<StoreValue>,
185            version: Version,
186        ) -> Result<(), IoError> {
187            trace!("file encoding fetch topic response");
188            self.name.encode(src, version)?;
189            self.partitions.file_encode(src, data, version)?;
190            Ok(())
191        }
192    }
193}