1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchSnapshotRequest {
24 pub cluster_id: Option<StrBytes>,
28
29 pub replica_id: super::BrokerId,
33
34 pub max_bytes: i32,
38
39 pub topics: Vec<TopicSnapshot>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FetchSnapshotRequest {
49 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
55 self.cluster_id = value;
56 self
57 }
58 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
64 self.replica_id = value;
65 self
66 }
67 pub fn with_max_bytes(mut self, value: i32) -> Self {
73 self.max_bytes = value;
74 self
75 }
76 pub fn with_topics(mut self, value: Vec<TopicSnapshot>) -> Self {
82 self.topics = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for FetchSnapshotRequest {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 types::Int32.encode(buf, &self.replica_id)?;
101 types::Int32.encode(buf, &self.max_bytes)?;
102 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
103 let mut num_tagged_fields = self.unknown_tagged_fields.len();
104 if !self.cluster_id.is_none() {
105 num_tagged_fields += 1;
106 }
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
114 if !self.cluster_id.is_none() {
115 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
116 if computed_size > std::u32::MAX as usize {
117 bail!(
118 "Tagged field is too large to encode ({} bytes)",
119 computed_size
120 );
121 }
122 types::UnsignedVarInt.encode(buf, 0)?;
123 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
124 types::CompactString.encode(buf, &self.cluster_id)?;
125 }
126
127 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
128 Ok(())
129 }
130 fn compute_size(&self, version: i16) -> Result<usize> {
131 let mut total_size = 0;
132 total_size += types::Int32.compute_size(&self.replica_id)?;
133 total_size += types::Int32.compute_size(&self.max_bytes)?;
134 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
135 let mut num_tagged_fields = self.unknown_tagged_fields.len();
136 if !self.cluster_id.is_none() {
137 num_tagged_fields += 1;
138 }
139 if num_tagged_fields > std::u32::MAX as usize {
140 bail!(
141 "Too many tagged fields to encode ({} fields)",
142 num_tagged_fields
143 );
144 }
145 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
146 if !self.cluster_id.is_none() {
147 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
148 if computed_size > std::u32::MAX as usize {
149 bail!(
150 "Tagged field is too large to encode ({} bytes)",
151 computed_size
152 );
153 }
154 total_size += types::UnsignedVarInt.compute_size(0)?;
155 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
156 total_size += computed_size;
157 }
158
159 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
160 Ok(total_size)
161 }
162}
163
164#[cfg(feature = "broker")]
165impl Decodable for FetchSnapshotRequest {
166 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
167 let mut cluster_id = None;
168 let replica_id = types::Int32.decode(buf)?;
169 let max_bytes = types::Int32.decode(buf)?;
170 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
171 let mut unknown_tagged_fields = BTreeMap::new();
172 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
173 for _ in 0..num_tagged_fields {
174 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
175 let size: u32 = types::UnsignedVarInt.decode(buf)?;
176 match tag {
177 0 => {
178 cluster_id = types::CompactString.decode(buf)?;
179 }
180 _ => {
181 let unknown_value = buf.try_get_bytes(size as usize)?;
182 unknown_tagged_fields.insert(tag as i32, unknown_value);
183 }
184 }
185 }
186 Ok(Self {
187 cluster_id,
188 replica_id,
189 max_bytes,
190 topics,
191 unknown_tagged_fields,
192 })
193 }
194}
195
196impl Default for FetchSnapshotRequest {
197 fn default() -> Self {
198 Self {
199 cluster_id: None,
200 replica_id: (-1).into(),
201 max_bytes: 0x7fffffff,
202 topics: Default::default(),
203 unknown_tagged_fields: BTreeMap::new(),
204 }
205 }
206}
207
208impl Message for FetchSnapshotRequest {
209 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
210 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
211}
212
213#[non_exhaustive]
215#[derive(Debug, Clone, PartialEq)]
216pub struct PartitionSnapshot {
217 pub partition: i32,
221
222 pub current_leader_epoch: i32,
226
227 pub snapshot_id: SnapshotId,
231
232 pub position: i64,
236
237 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
239}
240
241impl PartitionSnapshot {
242 pub fn with_partition(mut self, value: i32) -> Self {
248 self.partition = value;
249 self
250 }
251 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
257 self.current_leader_epoch = value;
258 self
259 }
260 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
266 self.snapshot_id = value;
267 self
268 }
269 pub fn with_position(mut self, value: i64) -> Self {
275 self.position = value;
276 self
277 }
278 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
280 self.unknown_tagged_fields = value;
281 self
282 }
283 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
285 self.unknown_tagged_fields.insert(key, value);
286 self
287 }
288}
289
290#[cfg(feature = "client")]
291impl Encodable for PartitionSnapshot {
292 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
293 types::Int32.encode(buf, &self.partition)?;
294 types::Int32.encode(buf, &self.current_leader_epoch)?;
295 types::Struct { version }.encode(buf, &self.snapshot_id)?;
296 types::Int64.encode(buf, &self.position)?;
297 let num_tagged_fields = self.unknown_tagged_fields.len();
298 if num_tagged_fields > std::u32::MAX as usize {
299 bail!(
300 "Too many tagged fields to encode ({} fields)",
301 num_tagged_fields
302 );
303 }
304 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
305
306 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
307 Ok(())
308 }
309 fn compute_size(&self, version: i16) -> Result<usize> {
310 let mut total_size = 0;
311 total_size += types::Int32.compute_size(&self.partition)?;
312 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
313 total_size += types::Struct { version }.compute_size(&self.snapshot_id)?;
314 total_size += types::Int64.compute_size(&self.position)?;
315 let num_tagged_fields = self.unknown_tagged_fields.len();
316 if num_tagged_fields > std::u32::MAX as usize {
317 bail!(
318 "Too many tagged fields to encode ({} fields)",
319 num_tagged_fields
320 );
321 }
322 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
323
324 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
325 Ok(total_size)
326 }
327}
328
329#[cfg(feature = "broker")]
330impl Decodable for PartitionSnapshot {
331 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
332 let partition = types::Int32.decode(buf)?;
333 let current_leader_epoch = types::Int32.decode(buf)?;
334 let snapshot_id = types::Struct { version }.decode(buf)?;
335 let position = types::Int64.decode(buf)?;
336 let mut unknown_tagged_fields = BTreeMap::new();
337 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
338 for _ in 0..num_tagged_fields {
339 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
340 let size: u32 = types::UnsignedVarInt.decode(buf)?;
341 let unknown_value = buf.try_get_bytes(size as usize)?;
342 unknown_tagged_fields.insert(tag as i32, unknown_value);
343 }
344 Ok(Self {
345 partition,
346 current_leader_epoch,
347 snapshot_id,
348 position,
349 unknown_tagged_fields,
350 })
351 }
352}
353
354impl Default for PartitionSnapshot {
355 fn default() -> Self {
356 Self {
357 partition: 0,
358 current_leader_epoch: 0,
359 snapshot_id: Default::default(),
360 position: 0,
361 unknown_tagged_fields: BTreeMap::new(),
362 }
363 }
364}
365
366impl Message for PartitionSnapshot {
367 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
368 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
369}
370
371#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq)]
374pub struct SnapshotId {
375 pub end_offset: i64,
379
380 pub epoch: i32,
384
385 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
387}
388
389impl SnapshotId {
390 pub fn with_end_offset(mut self, value: i64) -> Self {
396 self.end_offset = value;
397 self
398 }
399 pub fn with_epoch(mut self, value: i32) -> Self {
405 self.epoch = value;
406 self
407 }
408 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
410 self.unknown_tagged_fields = value;
411 self
412 }
413 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
415 self.unknown_tagged_fields.insert(key, value);
416 self
417 }
418}
419
420#[cfg(feature = "client")]
421impl Encodable for SnapshotId {
422 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
423 types::Int64.encode(buf, &self.end_offset)?;
424 types::Int32.encode(buf, &self.epoch)?;
425 let num_tagged_fields = self.unknown_tagged_fields.len();
426 if num_tagged_fields > std::u32::MAX as usize {
427 bail!(
428 "Too many tagged fields to encode ({} fields)",
429 num_tagged_fields
430 );
431 }
432 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
433
434 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
435 Ok(())
436 }
437 fn compute_size(&self, version: i16) -> Result<usize> {
438 let mut total_size = 0;
439 total_size += types::Int64.compute_size(&self.end_offset)?;
440 total_size += types::Int32.compute_size(&self.epoch)?;
441 let num_tagged_fields = self.unknown_tagged_fields.len();
442 if num_tagged_fields > std::u32::MAX as usize {
443 bail!(
444 "Too many tagged fields to encode ({} fields)",
445 num_tagged_fields
446 );
447 }
448 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
449
450 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
451 Ok(total_size)
452 }
453}
454
455#[cfg(feature = "broker")]
456impl Decodable for SnapshotId {
457 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
458 let end_offset = types::Int64.decode(buf)?;
459 let epoch = types::Int32.decode(buf)?;
460 let mut unknown_tagged_fields = BTreeMap::new();
461 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
462 for _ in 0..num_tagged_fields {
463 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
464 let size: u32 = types::UnsignedVarInt.decode(buf)?;
465 let unknown_value = buf.try_get_bytes(size as usize)?;
466 unknown_tagged_fields.insert(tag as i32, unknown_value);
467 }
468 Ok(Self {
469 end_offset,
470 epoch,
471 unknown_tagged_fields,
472 })
473 }
474}
475
476impl Default for SnapshotId {
477 fn default() -> Self {
478 Self {
479 end_offset: 0,
480 epoch: 0,
481 unknown_tagged_fields: BTreeMap::new(),
482 }
483 }
484}
485
486impl Message for SnapshotId {
487 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
488 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
489}
490
491#[non_exhaustive]
493#[derive(Debug, Clone, PartialEq)]
494pub struct TopicSnapshot {
495 pub name: super::TopicName,
499
500 pub partitions: Vec<PartitionSnapshot>,
504
505 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
507}
508
509impl TopicSnapshot {
510 pub fn with_name(mut self, value: super::TopicName) -> Self {
516 self.name = value;
517 self
518 }
519 pub fn with_partitions(mut self, value: Vec<PartitionSnapshot>) -> Self {
525 self.partitions = value;
526 self
527 }
528 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
530 self.unknown_tagged_fields = value;
531 self
532 }
533 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
535 self.unknown_tagged_fields.insert(key, value);
536 self
537 }
538}
539
540#[cfg(feature = "client")]
541impl Encodable for TopicSnapshot {
542 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
543 types::CompactString.encode(buf, &self.name)?;
544 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
545 let num_tagged_fields = self.unknown_tagged_fields.len();
546 if num_tagged_fields > std::u32::MAX as usize {
547 bail!(
548 "Too many tagged fields to encode ({} fields)",
549 num_tagged_fields
550 );
551 }
552 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
553
554 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
555 Ok(())
556 }
557 fn compute_size(&self, version: i16) -> Result<usize> {
558 let mut total_size = 0;
559 total_size += types::CompactString.compute_size(&self.name)?;
560 total_size +=
561 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
562 let num_tagged_fields = self.unknown_tagged_fields.len();
563 if num_tagged_fields > std::u32::MAX as usize {
564 bail!(
565 "Too many tagged fields to encode ({} fields)",
566 num_tagged_fields
567 );
568 }
569 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
570
571 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
572 Ok(total_size)
573 }
574}
575
576#[cfg(feature = "broker")]
577impl Decodable for TopicSnapshot {
578 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
579 let name = types::CompactString.decode(buf)?;
580 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
581 let mut unknown_tagged_fields = BTreeMap::new();
582 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
583 for _ in 0..num_tagged_fields {
584 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
585 let size: u32 = types::UnsignedVarInt.decode(buf)?;
586 let unknown_value = buf.try_get_bytes(size as usize)?;
587 unknown_tagged_fields.insert(tag as i32, unknown_value);
588 }
589 Ok(Self {
590 name,
591 partitions,
592 unknown_tagged_fields,
593 })
594 }
595}
596
597impl Default for TopicSnapshot {
598 fn default() -> Self {
599 Self {
600 name: Default::default(),
601 partitions: Default::default(),
602 unknown_tagged_fields: BTreeMap::new(),
603 }
604 }
605}
606
607impl Message for TopicSnapshot {
608 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
609 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
610}
611
612impl HeaderVersion for FetchSnapshotRequest {
613 fn header_version(version: i16) -> i16 {
614 2
615 }
616}