1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i32, get_i64, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{Decode, DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 59;
19pub const MIN_VERSION: i16 = 0;
20pub const MAX_VERSION: i16 = 1;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct FetchSnapshotRequest<'a> {
28 pub replica_id: i32,
29 pub max_bytes: i32,
30 pub topics: Vec<TopicSnapshot<'a>>,
31 pub cluster_id: Option<String>,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34
35impl<'a> Default for FetchSnapshotRequest<'a> {
36 fn default() -> Self {
37 Self {
38 replica_id: -1i32,
39 max_bytes: 2_147_483_647i32,
40 topics: Vec::new(),
41 cluster_id: None,
42 unknown_tagged_fields: Default::default(),
43 }
44 }
45}
46
47impl<'a> FetchSnapshotRequest<'a> {
48 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_request::FetchSnapshotRequest {
49 crate::owned::fetch_snapshot_request::FetchSnapshotRequest {
50 replica_id: (self.replica_id),
51 max_bytes: (self.max_bytes),
52 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
53 cluster_id: self.cluster_id.clone(),
54 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
55 }
56 }
57}
58
59impl<'a> Encode for FetchSnapshotRequest<'a> {
60 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
61 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
62 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
63 }
64 let flex = is_flexible(version);
65 if version >= 0 { put_i32(buf, self.replica_id) }
66 if version >= 0 { put_i32(buf, self.max_bytes) }
67 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
68 if flex {
69 let mut tagged = WriteTaggedFields::new();
70 if !(self.cluster_id.is_none()) {
71 let payload = encode_to_bytes(if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }, |b| { if flex { put_compact_nullable_string(b, self.cluster_id.as_deref()) } else { put_nullable_string(b, self.cluster_id.as_deref()) }; Ok(()) });
72 tagged.add(0, payload);
73 }
74 tagged.write(buf, &self.unknown_tagged_fields);
75 }
76 Ok(())
77 }
78 fn encoded_len(&self, version: i16) -> usize {
79 let flex = is_flexible(version);
80 let mut n: usize = 0;
81 if version >= 0 { n += 4; }
82 if version >= 0 { n += 4; }
83 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
84 if flex {
85 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
86 if !(self.cluster_id.is_none()) {
87 known_pairs.push((0, if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }));
88 }
89 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
90 }
91 n
92 }
93}
94
95impl<'de> DecodeBorrow<'de> for FetchSnapshotRequest<'de> {
96 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
97 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
98 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
99 }
100 let flex = is_flexible(version);
101 let mut out = Self::default();
102 if version >= 0 { out.replica_id = get_i32(buf)?; }
103 if version >= 0 { out.max_bytes = get_i32(buf)?; }
104 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(TopicSnapshot::decode_borrow(buf, version)?); } v }; }
105 if flex {
106 let mut tag_cluster_id = None;
108 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
109 match tag {
110 0 => { tag_cluster_id = Some({ let b: &mut &[u8] = payload; if flex { crate::primitives::string_bytes::get_compact_nullable_string_owned(b)? } else { crate::primitives::string_bytes::get_nullable_string_owned(b)? } }); Ok(true) }
111 _ => Ok(false),
112 }
113 })?;
114 if let Some(v) = tag_cluster_id { out.cluster_id = v; }
115 }
116 Ok(out)
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct TopicSnapshot<'a> {
122 pub name: &'a str,
123 pub partitions: Vec<PartitionSnapshot>,
124 pub unknown_tagged_fields: UnknownTaggedFields,
125}
126
127impl<'a> Default for TopicSnapshot<'a> {
128 fn default() -> Self {
129 Self {
130 name: "",
131 partitions: Vec::new(),
132 unknown_tagged_fields: Default::default(),
133 }
134 }
135}
136
137impl<'a> TopicSnapshot<'a> {
138 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_request::TopicSnapshot {
139 crate::owned::fetch_snapshot_request::TopicSnapshot {
140 name: (self.name).to_string(),
141 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
142 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
143 }
144 }
145}
146
147impl<'a> Encode for TopicSnapshot<'a> {
148 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
149 let flex = version >= 0;
150 if version >= 0 { if flex { put_compact_string(buf, self.name) } else { put_string(buf, self.name) } }
151 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
152 if flex {
153 let tagged = WriteTaggedFields::new();
154 tagged.write(buf, &self.unknown_tagged_fields);
155 }
156 Ok(())
157 }
158 fn encoded_len(&self, version: i16) -> usize {
159 let flex = version >= 0;
160 let mut n: usize = 0;
161 if version >= 0 { n += if flex { compact_string_len(self.name) } else { string_len(self.name) }; }
162 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
163 if flex {
164 let known_pairs: Vec<(u32, usize)> = Vec::new();
165 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
166 }
167 n
168 }
169}
170
171impl<'de> DecodeBorrow<'de> for TopicSnapshot<'de> {
172 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
173 let flex = version >= 0;
174 let mut out = Self::default();
175 if version >= 0 { out.name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
176 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionSnapshot::decode_borrow(buf, version)?); } v }; }
177 if flex {
178 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
179 Ok(false)
180 })?;
181 }
182 Ok(out)
183 }
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub struct PartitionSnapshot {
188 pub partition: i32,
189 pub current_leader_epoch: i32,
190 pub snapshot_id: SnapshotId,
191 pub position: i64,
192 pub replica_directory_id: crate::primitives::uuid::Uuid,
193 pub unknown_tagged_fields: UnknownTaggedFields,
194}
195
196impl Default for PartitionSnapshot {
197 fn default() -> Self {
198 Self {
199 partition: 0i32,
200 current_leader_epoch: 0i32,
201 snapshot_id: Default::default(),
202 position: 0i64,
203 replica_directory_id: Default::default(),
204 unknown_tagged_fields: Default::default(),
205 }
206 }
207}
208
209impl PartitionSnapshot {
210 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_request::PartitionSnapshot {
211 crate::owned::fetch_snapshot_request::PartitionSnapshot {
212 partition: (self.partition),
213 current_leader_epoch: (self.current_leader_epoch),
214 snapshot_id: (self.snapshot_id).to_owned(),
215 position: (self.position),
216 replica_directory_id: (self.replica_directory_id),
217 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
218 }
219 }
220}
221
222impl Encode for PartitionSnapshot {
223 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
224 let flex = version >= 0;
225 if version >= 0 { put_i32(buf, self.partition) }
226 if version >= 0 { put_i32(buf, self.current_leader_epoch) }
227 if version >= 0 { self.snapshot_id.encode(buf, version)? }
228 if version >= 0 { put_i64(buf, self.position) }
229 if flex {
230 let mut tagged = WriteTaggedFields::new();
231 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
232 let payload = encode_to_bytes(16, |b| { crate::primitives::uuid::put_uuid(b, self.replica_directory_id); Ok(()) });
233 tagged.add(0, payload);
234 }
235 tagged.write(buf, &self.unknown_tagged_fields);
236 }
237 Ok(())
238 }
239 fn encoded_len(&self, version: i16) -> usize {
240 let flex = version >= 0;
241 let mut n: usize = 0;
242 if version >= 0 { n += 4; }
243 if version >= 0 { n += 4; }
244 if version >= 0 { n += self.snapshot_id.encoded_len(version); }
245 if version >= 0 { n += 8; }
246 if flex {
247 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
248 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
249 known_pairs.push((0, 16));
250 }
251 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
252 }
253 n
254 }
255}
256
257impl<'de> DecodeBorrow<'de> for PartitionSnapshot {
258 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
259 let flex = version >= 0;
260 let mut out = Self::default();
261 if version >= 0 { out.partition = get_i32(buf)?; }
262 if version >= 0 { out.current_leader_epoch = get_i32(buf)?; }
263 if version >= 0 { out.snapshot_id = SnapshotId::decode_borrow(buf, version)?; }
264 if version >= 0 { out.position = get_i64(buf)?; }
265 if flex {
266 let mut tag_replica_directory_id = None;
268 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
269 match tag {
270 0 => { tag_replica_directory_id = Some({ let b: &mut &[u8] = payload; crate::primitives::uuid::get_uuid(b)? }); Ok(true) }
271 _ => Ok(false),
272 }
273 })?;
274 if let Some(v) = tag_replica_directory_id { out.replica_directory_id = v; }
275 }
276 Ok(out)
277 }
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub struct SnapshotId {
282 pub end_offset: i64,
283 pub epoch: i32,
284 pub unknown_tagged_fields: UnknownTaggedFields,
285}
286
287impl Default for SnapshotId {
288 fn default() -> Self {
289 Self {
290 end_offset: 0i64,
291 epoch: 0i32,
292 unknown_tagged_fields: Default::default(),
293 }
294 }
295}
296
297impl SnapshotId {
298 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_request::SnapshotId {
299 crate::owned::fetch_snapshot_request::SnapshotId {
300 end_offset: (self.end_offset),
301 epoch: (self.epoch),
302 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
303 }
304 }
305}
306
307impl Encode for SnapshotId {
308 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
309 let flex = version >= 0;
310 if version >= 0 { put_i64(buf, self.end_offset) }
311 if version >= 0 { put_i32(buf, self.epoch) }
312 if flex {
313 let tagged = WriteTaggedFields::new();
314 tagged.write(buf, &self.unknown_tagged_fields);
315 }
316 Ok(())
317 }
318 fn encoded_len(&self, version: i16) -> usize {
319 let flex = version >= 0;
320 let mut n: usize = 0;
321 if version >= 0 { n += 8; }
322 if version >= 0 { n += 4; }
323 if flex {
324 let known_pairs: Vec<(u32, usize)> = Vec::new();
325 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
326 }
327 n
328 }
329}
330
331impl<'de> DecodeBorrow<'de> for SnapshotId {
332 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
333 let flex = version >= 0;
334 let mut out = Self::default();
335 if version >= 0 { out.end_offset = get_i64(buf)?; }
336 if version >= 0 { out.epoch = get_i32(buf)?; }
337 if flex {
338 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
339 Ok(false)
340 })?;
341 }
342 Ok(out)
343 }
344}