1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16};
6use crate::primitives::string_bytes::{
7 compact_string_len, get_compact_string_owned, get_string_owned,
8 put_compact_string, put_string, string_len,
9};
10use crate::primitives::string_bytes::{get_bytes_owned, get_compact_bytes_owned, put_bytes, put_compact_bytes};
11use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 59;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 1;
17pub const FLEXIBLE_MIN: i16 = 0;
18
19#[inline]
20fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
21
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct FetchSnapshotResponse {
24 pub throttle_time_ms: i32,
25 pub error_code: i16,
26 pub topics: Vec<TopicSnapshot>,
27 pub node_endpoints: Vec<NodeEndpoint>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30
31impl Encode for FetchSnapshotResponse {
32 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
33 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
34 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
35 }
36 let flex = is_flexible(version);
37 if version >= 0 { put_i32(buf, self.throttle_time_ms) }
38 if version >= 0 { put_i16(buf, self.error_code) }
39 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
40 if flex {
41 let mut tagged = WriteTaggedFields::new();
42 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
43 let payload = encode_to_bytes({ let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }, |b| { { crate::primitives::array::put_array_len(b, (self.node_endpoints).len(), flex); for it in &self.node_endpoints { it.encode(b, version)?; } }; Ok(()) });
44 tagged.add(0, payload);
45 }
46 tagged.write(buf, &self.unknown_tagged_fields);
47 }
48 Ok(())
49 }
50 fn encoded_len(&self, version: i16) -> usize {
51 let flex = is_flexible(version);
52 let mut n: usize = 0;
53 if version >= 0 { n += 4; }
54 if version >= 0 { n += 2; }
55 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
56 if flex {
57 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
58 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
59 known_pairs.push((0, { let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }));
60 }
61 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
62 }
63 n
64 }
65}
66
67impl<'de> Decode<'de> for FetchSnapshotResponse {
68 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
69 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
70 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
71 }
72 let flex = is_flexible(version);
73 let mut out = Self::default();
74 if version >= 0 { out.throttle_time_ms = get_i32(buf)?; }
75 if version >= 0 { out.error_code = get_i16(buf)?; }
76 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(TopicSnapshot::decode(buf, version)?); } v }; }
77 if flex {
78 let mut tag_node_endpoints = None;
80 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
81 match tag {
82 0 => { tag_node_endpoints = Some({ let b: &mut &[u8] = payload; { let n = crate::primitives::array::get_array_len(b, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(NodeEndpoint::decode(b, version)?); } v } }); Ok(true) }
83 _ => Ok(false),
84 }
85 })?;
86 if let Some(v) = tag_node_endpoints { out.node_endpoints = v; }
87 }
88 Ok(out)
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Default)]
93pub struct TopicSnapshot {
94 pub name: String,
95 pub partitions: Vec<PartitionSnapshot>,
96 pub unknown_tagged_fields: UnknownTaggedFields,
97}
98
99impl Encode for TopicSnapshot {
100 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
101 let flex = version >= 0;
102 if version >= 0 { if flex { put_compact_string(buf, &self.name) } else { put_string(buf, &self.name) } }
103 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
104 if flex {
105 let tagged = WriteTaggedFields::new();
106 tagged.write(buf, &self.unknown_tagged_fields);
107 }
108 Ok(())
109 }
110 fn encoded_len(&self, version: i16) -> usize {
111 let flex = version >= 0;
112 let mut n: usize = 0;
113 if version >= 0 { n += if flex { compact_string_len(&self.name) } else { string_len(&self.name) }; }
114 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
115 if flex {
116 let known_pairs: Vec<(u32, usize)> = Vec::new();
117 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
118 }
119 n
120 }
121}
122
123impl<'de> Decode<'de> for TopicSnapshot {
124 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
125 let flex = version >= 0;
126 let mut out = Self::default();
127 if version >= 0 { out.name = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
128 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionSnapshot::decode(buf, version)?); } v }; }
129 if flex {
130 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
131 Ok(false)
132 })?;
133 }
134 Ok(out)
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Default)]
139pub struct PartitionSnapshot {
140 pub index: i32,
141 pub error_code: i16,
142 pub snapshot_id: SnapshotId,
143 pub size: i64,
144 pub position: i64,
145 pub unaligned_records: crate::records::RecordsPayload,
146 pub current_leader: LeaderIdAndEpoch,
147 pub unknown_tagged_fields: UnknownTaggedFields,
148}
149
150impl Encode for PartitionSnapshot {
151 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
152 let flex = version >= 0;
153 if version >= 0 { put_i32(buf, self.index) }
154 if version >= 0 { put_i16(buf, self.error_code) }
155 if version >= 0 { self.snapshot_id.encode(buf, version)? }
156 if version >= 0 { put_i64(buf, self.size) }
157 if version >= 0 { put_i64(buf, self.position) }
158 if version >= 0 { { let mut __rb_buf = bytes::BytesMut::new(); <crate::records::RecordsPayload as crate::Encode>::encode(&self.unaligned_records, &mut __rb_buf, version)?; if flex { put_compact_bytes(buf, &__rb_buf) } else { put_bytes(buf, &__rb_buf) } } }
159 if flex {
160 let mut tagged = WriteTaggedFields::new();
161 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
162 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| { self.current_leader.encode(b, version)?; Ok(()) });
163 tagged.add(0, payload);
164 }
165 tagged.write(buf, &self.unknown_tagged_fields);
166 }
167 Ok(())
168 }
169 fn encoded_len(&self, version: i16) -> usize {
170 let flex = version >= 0;
171 let mut n: usize = 0;
172 if version >= 0 { n += 4; }
173 if version >= 0 { n += 2; }
174 if version >= 0 { n += self.snapshot_id.encoded_len(version); }
175 if version >= 0 { n += 8; }
176 if version >= 0 { n += 8; }
177 if version >= 0 { n += { let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(&self.unaligned_records, version); if flex { crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len) } else { 4 + __rb_len } }; }
178 if flex {
179 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
180 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
181 known_pairs.push((0, self.current_leader.encoded_len(version)));
182 }
183 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
184 }
185 n
186 }
187}
188
189impl<'de> Decode<'de> for PartitionSnapshot {
190 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
191 let flex = version >= 0;
192 let mut out = Self::default();
193 if version >= 0 { out.index = get_i32(buf)?; }
194 if version >= 0 { out.error_code = get_i16(buf)?; }
195 if version >= 0 { out.snapshot_id = SnapshotId::decode(buf, version)?; }
196 if version >= 0 { out.size = get_i64(buf)?; }
197 if version >= 0 { out.position = get_i64(buf)?; }
198 if version >= 0 { out.unaligned_records = { let __rb_bytes = if flex { get_compact_bytes_owned(buf)? } else { get_bytes_owned(buf)? }; let mut __rb_cur: &[u8] = &__rb_bytes; <crate::records::RecordsPayload as crate::Decode>::decode(&mut __rb_cur, version)? }; }
199 if flex {
200 let mut tag_current_leader = None;
202 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
203 match tag {
204 0 => { tag_current_leader = Some({ let b: &mut &[u8] = payload; LeaderIdAndEpoch::decode(b, version)? }); Ok(true) }
205 _ => Ok(false),
206 }
207 })?;
208 if let Some(v) = tag_current_leader { out.current_leader = v; }
209 }
210 Ok(out)
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Default)]
215pub struct SnapshotId {
216 pub end_offset: i64,
217 pub epoch: i32,
218 pub unknown_tagged_fields: UnknownTaggedFields,
219}
220
221impl Encode for SnapshotId {
222 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
223 let flex = version >= 0;
224 if version >= 0 { put_i64(buf, self.end_offset) }
225 if version >= 0 { put_i32(buf, self.epoch) }
226 if flex {
227 let tagged = WriteTaggedFields::new();
228 tagged.write(buf, &self.unknown_tagged_fields);
229 }
230 Ok(())
231 }
232 fn encoded_len(&self, version: i16) -> usize {
233 let flex = version >= 0;
234 let mut n: usize = 0;
235 if version >= 0 { n += 8; }
236 if version >= 0 { n += 4; }
237 if flex {
238 let known_pairs: Vec<(u32, usize)> = Vec::new();
239 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
240 }
241 n
242 }
243}
244
245impl<'de> Decode<'de> for SnapshotId {
246 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
247 let flex = version >= 0;
248 let mut out = Self::default();
249 if version >= 0 { out.end_offset = get_i64(buf)?; }
250 if version >= 0 { out.epoch = get_i32(buf)?; }
251 if flex {
252 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
253 Ok(false)
254 })?;
255 }
256 Ok(out)
257 }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq, Default)]
261pub struct LeaderIdAndEpoch {
262 pub leader_id: i32,
263 pub leader_epoch: i32,
264 pub unknown_tagged_fields: UnknownTaggedFields,
265}
266
267impl Encode for LeaderIdAndEpoch {
268 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
269 let flex = version >= 0;
270 if version >= 0 { put_i32(buf, self.leader_id) }
271 if version >= 0 { put_i32(buf, self.leader_epoch) }
272 if flex {
273 let tagged = WriteTaggedFields::new();
274 tagged.write(buf, &self.unknown_tagged_fields);
275 }
276 Ok(())
277 }
278 fn encoded_len(&self, version: i16) -> usize {
279 let flex = version >= 0;
280 let mut n: usize = 0;
281 if version >= 0 { n += 4; }
282 if version >= 0 { n += 4; }
283 if flex {
284 let known_pairs: Vec<(u32, usize)> = Vec::new();
285 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
286 }
287 n
288 }
289}
290
291impl<'de> Decode<'de> for LeaderIdAndEpoch {
292 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
293 let flex = version >= 0;
294 let mut out = Self::default();
295 if version >= 0 { out.leader_id = get_i32(buf)?; }
296 if version >= 0 { out.leader_epoch = get_i32(buf)?; }
297 if flex {
298 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
299 Ok(false)
300 })?;
301 }
302 Ok(out)
303 }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq, Default)]
307pub struct NodeEndpoint {
308 pub node_id: i32,
309 pub host: String,
310 pub port: u16,
311 pub unknown_tagged_fields: UnknownTaggedFields,
312}
313
314impl Encode for NodeEndpoint {
315 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
316 let flex = version >= 0;
317 if version >= 1 { put_i32(buf, self.node_id) }
318 if version >= 1 { if flex { put_compact_string(buf, &self.host) } else { put_string(buf, &self.host) } }
319 if version >= 1 { put_u16(buf, self.port) }
320 if flex {
321 let tagged = WriteTaggedFields::new();
322 tagged.write(buf, &self.unknown_tagged_fields);
323 }
324 Ok(())
325 }
326 fn encoded_len(&self, version: i16) -> usize {
327 let flex = version >= 0;
328 let mut n: usize = 0;
329 if version >= 1 { n += 4; }
330 if version >= 1 { n += if flex { compact_string_len(&self.host) } else { string_len(&self.host) }; }
331 if version >= 1 { n += 2; }
332 if flex {
333 let known_pairs: Vec<(u32, usize)> = Vec::new();
334 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
335 }
336 n
337 }
338}
339
340impl<'de> Decode<'de> for NodeEndpoint {
341 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
342 let flex = version >= 0;
343 let mut out = Self::default();
344 if version >= 1 { out.node_id = get_i32(buf)?; }
345 if version >= 1 { out.host = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
346 if version >= 1 { out.port = get_u16(buf)?; }
347 if flex {
348 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
349 Ok(false)
350 })?;
351 }
352 Ok(out)
353 }
354}
355
356#[must_use]
359#[allow(unused_comparisons)]
360pub fn default_json(version: i16) -> ::serde_json::Value {
361 let mut obj = ::serde_json::Map::new();
362 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
363 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
364 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
365 if version >= 1 {
366 obj.insert("nodeEndpoints".to_string(), ::serde_json::Value::Array(vec![]));
367 }
368 ::serde_json::Value::Object(obj)
369}