fluvio_spu_schema/fetch/
response.rs1use std::fmt::Debug;
2use std::marker::PhantomData;
3
4use fluvio_protocol::record::BatchRecords;
5use fluvio_protocol::{Decoder, Encoder};
6use fluvio_protocol::derive::FluvioDefault;
7use fluvio_protocol::record::RecordSet;
8use fluvio_protocol::link::ErrorCode;
9use fluvio_protocol::record::Offset;
10
11pub type DefaultFetchResponse = FetchResponse<RecordSet>;
12
13#[derive(Encoder, Decoder, FluvioDefault, Debug)]
14pub struct FetchResponse<R> {
15 pub throttle_time_ms: i32,
18 pub error_code: ErrorCode,
19 pub session_id: i32,
20
21 pub topics: Vec<FetchableTopicResponse<R>>,
23}
24
25impl<R> FetchResponse<R> {
26 pub fn find_partition(
27 self,
28 topic: &str,
29 partition: u32,
30 ) -> Option<FetchablePartitionResponse<R>> {
31 for topic_res in self.topics {
32 if topic_res.name == topic {
33 for partition_res in topic_res.partitions {
34 if partition_res.partition_index == partition {
35 return Some(partition_res);
36 }
37 }
38 }
39 }
40
41 None
42 }
43}
44
45#[derive(Encoder, Decoder, FluvioDefault, Debug)]
46pub struct FetchableTopicResponse<R> {
47 pub name: String,
49
50 pub partitions: Vec<FetchablePartitionResponse<R>>,
52 pub data: PhantomData<R>,
53}
54
55#[derive(Encoder, Decoder, FluvioDefault, Debug)]
56pub struct FetchablePartitionResponse<R> {
57 pub partition_index: PartitionId,
59
60 pub error_code: ErrorCode,
62
63 pub high_watermark: i64,
65
66 #[fluvio(min_version = 11, ignorable)]
69 pub next_filter_offset: i64,
70
71 pub log_start_offset: i64,
73
74 pub aborted: Option<Vec<AbortedTransaction>>,
76
77 pub records: R,
79}
80
81impl<R: BatchRecords> FetchablePartitionResponse<RecordSet<R>> {
82 pub fn next_offset_for_fetch(&self) -> Option<Offset> {
86 if self.next_filter_offset > 0 {
87 Some(self.next_filter_offset)
88 } else {
89 self.records.last_offset()
90 }
91 }
92}
93
94#[derive(Encoder, Decoder, FluvioDefault, Debug)]
95pub struct AbortedTransaction {
96 pub producer_id: i64,
97 pub first_offset: i64,
98}
99
100impl<R> FetchResponse<R> {
105 pub fn find_topic(&self, topic: &str) -> Option<&FetchableTopicResponse<R>>
106 where
107 R: Debug,
108 {
109 self.topics.iter().find(|&r_topic| r_topic.name == *topic)
110 }
111}
112
113#[cfg(feature = "file")]
114pub use file::*;
115use fluvio_types::PartitionId;
116
117#[cfg(feature = "file")]
118mod file {
119
120 use std::io::Error as IoError;
121
122 use tracing::trace;
123 use bytes::BytesMut;
124
125 use fluvio_protocol::store::FileWrite;
126 use fluvio_protocol::store::StoreValue;
127 use fluvio_protocol::core::Version;
128
129 use crate::file::FileRecordSet;
130
131 use super::*;
132
133 pub type FileFetchResponse = FetchResponse<FileRecordSet>;
134 pub type FileTopicResponse = FetchableTopicResponse<FileRecordSet>;
135 pub type FilePartitionResponse = FetchablePartitionResponse<FileRecordSet>;
136
137 impl FileWrite for FilePartitionResponse {
138 fn file_encode(
139 &self,
140 src: &mut BytesMut,
141 data: &mut Vec<StoreValue>,
142 version: Version,
143 ) -> Result<(), IoError> {
144 trace!("file encoding fetch partition response");
145 self.partition_index.encode(src, version)?;
146 self.error_code.encode(src, version)?;
147 self.high_watermark.encode(src, version)?;
148 if version >= 11 {
149 self.next_filter_offset.encode(src, version)?;
150 } else {
151 tracing::trace!("v: {} is less than last fetched version 11", version);
152 }
153 self.log_start_offset.encode(src, version)?;
154 self.aborted.encode(src, version)?;
155 self.records.file_encode(src, data, version)?;
156 Ok(())
157 }
158 }
159
160 impl FileWrite for FileFetchResponse {
161 fn file_encode(
162 &self,
163 src: &mut BytesMut,
164 data: &mut Vec<StoreValue>,
165 version: Version,
166 ) -> Result<(), IoError> {
167 trace!("file encoding FileFetchResponse");
168 trace!("encoding throttle_time_ms {}", self.throttle_time_ms);
169 self.throttle_time_ms.encode(src, version)?;
170 trace!("encoding error code {:#?}", self.error_code);
171 self.error_code.encode(src, version)?;
172 trace!("encoding session code {}", self.session_id);
173 self.session_id.encode(src, version)?;
174 trace!("encoding topics len: {}", self.topics.len());
175 self.topics.file_encode(src, data, version)?;
176 Ok(())
177 }
178 }
179
180 impl FileWrite for FileTopicResponse {
181 fn file_encode(
182 &self,
183 src: &mut BytesMut,
184 data: &mut Vec<StoreValue>,
185 version: Version,
186 ) -> Result<(), IoError> {
187 trace!("file encoding fetch topic response");
188 self.name.encode(src, version)?;
189 self.partitions.file_encode(src, data, version)?;
190 Ok(())
191 }
192 }
193}