1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::primitives::string_bytes::{get_bytes_owned, get_compact_bytes_owned, get_compact_nullable_bytes_owned, get_nullable_bytes_owned, put_bytes, put_compact_bytes, put_compact_nullable_bytes, put_nullable_bytes};
13use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
14use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
15
16pub const API_KEY: i16 = 78;
17pub const MIN_VERSION: i16 = 1;
18pub const MAX_VERSION: i16 = 2;
19pub const FLEXIBLE_MIN: i16 = 0;
20
21#[inline]
22fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct ShareFetchResponse {
26 pub throttle_time_ms: i32,
27 pub error_code: i16,
28 pub error_message: Option<String>,
29 pub acquisition_lock_timeout_ms: i32,
30 pub responses: Vec<ShareFetchableTopicResponse>,
31 pub node_endpoints: Vec<NodeEndpoint>,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34
35impl Encode for ShareFetchResponse {
36 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
37 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
38 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
39 }
40 let flex = is_flexible(version);
41 if version >= 0 { put_i32(buf, self.throttle_time_ms) }
42 if version >= 0 { put_i16(buf, self.error_code) }
43 if version >= 0 { if flex { put_compact_nullable_string(buf, self.error_message.as_deref()) } else { put_nullable_string(buf, self.error_message.as_deref()) } }
44 if version >= 1 { put_i32(buf, self.acquisition_lock_timeout_ms) }
45 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.responses).len(), flex); for it in &self.responses { it.encode(buf, version)?; } } }
46 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.node_endpoints).len(), flex); for it in &self.node_endpoints { it.encode(buf, version)?; } } }
47 if flex {
48 let tagged = WriteTaggedFields::new();
49 tagged.write(buf, &self.unknown_tagged_fields);
50 }
51 Ok(())
52 }
53 fn encoded_len(&self, version: i16) -> usize {
54 let flex = is_flexible(version);
55 let mut n: usize = 0;
56 if version >= 0 { n += 4; }
57 if version >= 0 { n += 2; }
58 if version >= 0 { n += if flex { compact_nullable_string_len(self.error_message.as_deref()) } else { nullable_string_len(self.error_message.as_deref()) }; }
59 if version >= 1 { n += 4; }
60 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.responses).len(), flex); let body: usize = (self.responses).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
61 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
62 if flex {
63 let known_pairs: Vec<(u32, usize)> = Vec::new();
64 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
65 }
66 n
67 }
68}
69
70impl<'de> Decode<'de> for ShareFetchResponse {
71 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
72 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
73 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
74 }
75 let flex = is_flexible(version);
76 let mut out = Self::default();
77 if version >= 0 { out.throttle_time_ms = get_i32(buf)?; }
78 if version >= 0 { out.error_code = get_i16(buf)?; }
79 if version >= 0 { out.error_message = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
80 if version >= 1 { out.acquisition_lock_timeout_ms = get_i32(buf)?; }
81 if version >= 0 { out.responses = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(ShareFetchableTopicResponse::decode(buf, version)?); } v }; }
82 if version >= 0 { out.node_endpoints = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(NodeEndpoint::decode(buf, version)?); } v }; }
83 if flex {
84 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
85 Ok(false)
86 })?;
87 }
88 Ok(out)
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Default)]
93pub struct ShareFetchableTopicResponse {
94 pub topic_id: crate::primitives::uuid::Uuid,
95 pub partitions: Vec<PartitionData>,
96 pub unknown_tagged_fields: UnknownTaggedFields,
97}
98
99impl Encode for ShareFetchableTopicResponse {
100 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
101 let flex = version >= 0;
102 if version >= 0 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
103 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
104 if flex {
105 let tagged = WriteTaggedFields::new();
106 tagged.write(buf, &self.unknown_tagged_fields);
107 }
108 Ok(())
109 }
110 fn encoded_len(&self, version: i16) -> usize {
111 let flex = version >= 0;
112 let mut n: usize = 0;
113 if version >= 0 { n += 16; }
114 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
115 if flex {
116 let known_pairs: Vec<(u32, usize)> = Vec::new();
117 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
118 }
119 n
120 }
121}
122
123impl<'de> Decode<'de> for ShareFetchableTopicResponse {
124 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
125 let flex = version >= 0;
126 let mut out = Self::default();
127 if version >= 0 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
128 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionData::decode(buf, version)?); } v }; }
129 if flex {
130 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
131 Ok(false)
132 })?;
133 }
134 Ok(out)
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Default)]
139pub struct PartitionData {
140 pub partition_index: i32,
141 pub error_code: i16,
142 pub error_message: Option<String>,
143 pub acknowledge_error_code: i16,
144 pub acknowledge_error_message: Option<String>,
145 pub current_leader: LeaderIdAndEpoch,
146 pub records: Option<crate::records::RecordsPayload>,
147 pub acquired_records: Vec<AcquiredRecords>,
148 pub unknown_tagged_fields: UnknownTaggedFields,
149}
150
151impl Encode for PartitionData {
152 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
153 let flex = version >= 0;
154 if version >= 0 { put_i32(buf, self.partition_index) }
155 if version >= 0 { put_i16(buf, self.error_code) }
156 if version >= 0 { if flex { put_compact_nullable_string(buf, self.error_message.as_deref()) } else { put_nullable_string(buf, self.error_message.as_deref()) } }
157 if version >= 0 { put_i16(buf, self.acknowledge_error_code) }
158 if version >= 0 { if flex { put_compact_nullable_string(buf, self.acknowledge_error_message.as_deref()) } else { put_nullable_string(buf, self.acknowledge_error_message.as_deref()) } }
159 if version >= 0 { self.current_leader.encode(buf, version)? }
160 if version >= 0 { if version <= 0 { match &self.records { None => if flex { put_compact_nullable_bytes(buf, None) } else { put_nullable_bytes(buf, None) }, Some(__rb) => { let mut __rb_buf = bytes::BytesMut::new(); <crate::records::RecordsPayload as crate::Encode>::encode(__rb, &mut __rb_buf, version)?; if flex { put_compact_bytes(buf, &__rb_buf) } else { put_bytes(buf, &__rb_buf) } } } } else { match &self.records { None => { let __rb_buf = bytes::BytesMut::new(); if flex { put_compact_bytes(buf, &__rb_buf) } else { put_bytes(buf, &__rb_buf) } }, Some(__rb) => { let mut __rb_buf = bytes::BytesMut::new(); <crate::records::RecordsPayload as crate::Encode>::encode(__rb, &mut __rb_buf, version)?; if flex { put_compact_bytes(buf, &__rb_buf) } else { put_bytes(buf, &__rb_buf) } } } } }
161 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.acquired_records).len(), flex); for it in &self.acquired_records { it.encode(buf, version)?; } } }
162 if flex {
163 let tagged = WriteTaggedFields::new();
164 tagged.write(buf, &self.unknown_tagged_fields);
165 }
166 Ok(())
167 }
168 fn encoded_len(&self, version: i16) -> usize {
169 let flex = version >= 0;
170 let mut n: usize = 0;
171 if version >= 0 { n += 4; }
172 if version >= 0 { n += 2; }
173 if version >= 0 { n += if flex { compact_nullable_string_len(self.error_message.as_deref()) } else { nullable_string_len(self.error_message.as_deref()) }; }
174 if version >= 0 { n += 2; }
175 if version >= 0 { n += if flex { compact_nullable_string_len(self.acknowledge_error_message.as_deref()) } else { nullable_string_len(self.acknowledge_error_message.as_deref()) }; }
176 if version >= 0 { n += self.current_leader.encoded_len(version); }
177 if version >= 0 { n += if version <= 0 { match &self.records { None => if flex { crate::primitives::varint::uvarint_len(0) } else { 4 }, Some(__rb) => { let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(__rb, version); if flex { crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len) } else { 4 + __rb_len } } } } else { match &self.records { None => if flex { crate::primitives::string_bytes::compact_bytes_len_from_size(0) } else { 4 }, Some(__rb) => { let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(__rb, version); if flex { crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len) } else { 4 + __rb_len } } } }; }
178 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.acquired_records).len(), flex); let body: usize = (self.acquired_records).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
179 if flex {
180 let known_pairs: Vec<(u32, usize)> = Vec::new();
181 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
182 }
183 n
184 }
185}
186
187impl<'de> Decode<'de> for PartitionData {
188 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
189 let flex = version >= 0;
190 let mut out = Self::default();
191 if version >= 0 { out.partition_index = get_i32(buf)?; }
192 if version >= 0 { out.error_code = get_i16(buf)?; }
193 if version >= 0 { out.error_message = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
194 if version >= 0 { out.acknowledge_error_code = get_i16(buf)?; }
195 if version >= 0 { out.acknowledge_error_message = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
196 if version >= 0 { out.current_leader = LeaderIdAndEpoch::decode(buf, version)?; }
197 if version >= 0 { out.records = if version <= 0 { { let __rb_opt = if flex { get_compact_nullable_bytes_owned(buf)? } else { get_nullable_bytes_owned(buf)? }; match __rb_opt { None => None, Some(__rb_bytes) => { let mut __rb_cur: &[u8] = &__rb_bytes; Some(<crate::records::RecordsPayload as crate::Decode>::decode(&mut __rb_cur, version)?) } } } } else { Some({ let __rb_bytes = if flex { get_compact_bytes_owned(buf)? } else { get_bytes_owned(buf)? }; let mut __rb_cur: &[u8] = &__rb_bytes; <crate::records::RecordsPayload as crate::Decode>::decode(&mut __rb_cur, version)? }) }; }
198 if version >= 0 { out.acquired_records = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(AcquiredRecords::decode(buf, version)?); } v }; }
199 if flex {
200 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
201 Ok(false)
202 })?;
203 }
204 Ok(out)
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq, Default)]
209pub struct LeaderIdAndEpoch {
210 pub leader_id: i32,
211 pub leader_epoch: i32,
212 pub unknown_tagged_fields: UnknownTaggedFields,
213}
214
215impl Encode for LeaderIdAndEpoch {
216 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
217 let flex = version >= 0;
218 if version >= 0 { put_i32(buf, self.leader_id) }
219 if version >= 0 { put_i32(buf, self.leader_epoch) }
220 if flex {
221 let tagged = WriteTaggedFields::new();
222 tagged.write(buf, &self.unknown_tagged_fields);
223 }
224 Ok(())
225 }
226 fn encoded_len(&self, version: i16) -> usize {
227 let flex = version >= 0;
228 let mut n: usize = 0;
229 if version >= 0 { n += 4; }
230 if version >= 0 { n += 4; }
231 if flex {
232 let known_pairs: Vec<(u32, usize)> = Vec::new();
233 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
234 }
235 n
236 }
237}
238
239impl<'de> Decode<'de> for LeaderIdAndEpoch {
240 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
241 let flex = version >= 0;
242 let mut out = Self::default();
243 if version >= 0 { out.leader_id = get_i32(buf)?; }
244 if version >= 0 { out.leader_epoch = get_i32(buf)?; }
245 if flex {
246 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
247 Ok(false)
248 })?;
249 }
250 Ok(out)
251 }
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Default)]
255pub struct AcquiredRecords {
256 pub first_offset: i64,
257 pub last_offset: i64,
258 pub delivery_count: i16,
259 pub unknown_tagged_fields: UnknownTaggedFields,
260}
261
262impl Encode for AcquiredRecords {
263 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
264 let flex = version >= 0;
265 if version >= 0 { put_i64(buf, self.first_offset) }
266 if version >= 0 { put_i64(buf, self.last_offset) }
267 if version >= 0 { put_i16(buf, self.delivery_count) }
268 if flex {
269 let tagged = WriteTaggedFields::new();
270 tagged.write(buf, &self.unknown_tagged_fields);
271 }
272 Ok(())
273 }
274 fn encoded_len(&self, version: i16) -> usize {
275 let flex = version >= 0;
276 let mut n: usize = 0;
277 if version >= 0 { n += 8; }
278 if version >= 0 { n += 8; }
279 if version >= 0 { n += 2; }
280 if flex {
281 let known_pairs: Vec<(u32, usize)> = Vec::new();
282 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
283 }
284 n
285 }
286}
287
288impl<'de> Decode<'de> for AcquiredRecords {
289 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
290 let flex = version >= 0;
291 let mut out = Self::default();
292 if version >= 0 { out.first_offset = get_i64(buf)?; }
293 if version >= 0 { out.last_offset = get_i64(buf)?; }
294 if version >= 0 { out.delivery_count = get_i16(buf)?; }
295 if flex {
296 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
297 Ok(false)
298 })?;
299 }
300 Ok(out)
301 }
302}
303
304#[derive(Debug, Clone, PartialEq, Eq, Default)]
305pub struct NodeEndpoint {
306 pub node_id: i32,
307 pub host: String,
308 pub port: i32,
309 pub rack: Option<String>,
310 pub unknown_tagged_fields: UnknownTaggedFields,
311}
312
313impl Encode for NodeEndpoint {
314 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
315 let flex = version >= 0;
316 if version >= 0 { put_i32(buf, self.node_id) }
317 if version >= 0 { if flex { put_compact_string(buf, &self.host) } else { put_string(buf, &self.host) } }
318 if version >= 0 { put_i32(buf, self.port) }
319 if version >= 0 { if flex { put_compact_nullable_string(buf, self.rack.as_deref()) } else { put_nullable_string(buf, self.rack.as_deref()) } }
320 if flex {
321 let tagged = WriteTaggedFields::new();
322 tagged.write(buf, &self.unknown_tagged_fields);
323 }
324 Ok(())
325 }
326 fn encoded_len(&self, version: i16) -> usize {
327 let flex = version >= 0;
328 let mut n: usize = 0;
329 if version >= 0 { n += 4; }
330 if version >= 0 { n += if flex { compact_string_len(&self.host) } else { string_len(&self.host) }; }
331 if version >= 0 { n += 4; }
332 if version >= 0 { n += if flex { compact_nullable_string_len(self.rack.as_deref()) } else { nullable_string_len(self.rack.as_deref()) }; }
333 if flex {
334 let known_pairs: Vec<(u32, usize)> = Vec::new();
335 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
336 }
337 n
338 }
339}
340
341impl<'de> Decode<'de> for NodeEndpoint {
342 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
343 let flex = version >= 0;
344 let mut out = Self::default();
345 if version >= 0 { out.node_id = get_i32(buf)?; }
346 if version >= 0 { out.host = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
347 if version >= 0 { out.port = get_i32(buf)?; }
348 if version >= 0 { out.rack = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
349 if flex {
350 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
351 Ok(false)
352 })?;
353 }
354 Ok(out)
355 }
356}
357
358#[must_use]
361#[allow(unused_comparisons)]
362pub fn default_json(version: i16) -> ::serde_json::Value {
363 let mut obj = ::serde_json::Map::new();
364 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
365 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
366 obj.insert("errorMessage".to_string(), ::serde_json::Value::Null);
367 if version >= 1 {
368 obj.insert("acquisitionLockTimeoutMs".to_string(), ::serde_json::json!(0));
369 }
370 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
371 obj.insert("nodeEndpoints".to_string(), ::serde_json::Value::Array(vec![]));
372 ::serde_json::Value::Object(obj)
373}