1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{
6 get_bool, get_i8, get_i32, get_i64, put_bool, put_i8, put_i32, put_i64,
7};
8use crate::primitives::string_bytes::{
9 compact_nullable_bytes_len, get_compact_nullable_bytes_owned, get_nullable_bytes_owned,
10 nullable_bytes_len, put_compact_nullable_bytes, put_nullable_bytes,
11};
12use crate::primitives::string_bytes::{
13 compact_string_len, get_compact_string_owned, get_string_owned, put_compact_string, put_string,
14 string_len,
15};
16use crate::tagged_fields::{
17 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
18};
19use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
20pub const MIN_VERSION: i16 = 0;
21pub const MAX_VERSION: i16 = 0;
22pub const FLEXIBLE_MIN: i16 = 0;
23
24#[inline]
25fn is_flexible(version: i16) -> bool {
26 version >= FLEXIBLE_MIN
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Default)]
30pub struct RemoteLogSegmentMetadataRecord {
31 pub remote_log_segment_id: RemoteLogSegmentIdEntry,
32 pub start_offset: i64,
33 pub end_offset: i64,
34 pub broker_id: i32,
35 pub max_timestamp_ms: i64,
36 pub event_timestamp_ms: i64,
37 pub segment_leader_epochs: Vec<SegmentLeaderEpochEntry>,
38 pub segment_size_in_bytes: i32,
39 pub custom_metadata: Option<::bytes::Bytes>,
40 pub remote_log_segment_state: i8,
41 pub txn_index_empty: bool,
42 pub unknown_tagged_fields: UnknownTaggedFields,
43}
44impl Encode for RemoteLogSegmentMetadataRecord {
45 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
46 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
47 return Err(ProtocolError::SchemaMismatch(
48 "RemoteLogSegmentMetadataRecord version out of range",
49 ));
50 }
51 let flex = is_flexible(version);
52 if version >= 0 {
53 self.remote_log_segment_id.encode(buf, version)?;
54 }
55 if version >= 0 {
56 put_i64(buf, self.start_offset);
57 }
58 if version >= 0 {
59 put_i64(buf, self.end_offset);
60 }
61 if version >= 0 {
62 put_i32(buf, self.broker_id);
63 }
64 if version >= 0 {
65 put_i64(buf, self.max_timestamp_ms);
66 }
67 if version >= 0 {
68 put_i64(buf, self.event_timestamp_ms);
69 }
70 if version >= 0 {
71 {
72 crate::primitives::array::put_array_len(
73 buf,
74 (self.segment_leader_epochs).len(),
75 flex,
76 );
77 for it in &self.segment_leader_epochs {
78 it.encode(buf, version)?;
79 }
80 }
81 }
82 if version >= 0 {
83 put_i32(buf, self.segment_size_in_bytes);
84 }
85 if version >= 0 {
86 if flex {
87 put_compact_nullable_bytes(buf, self.custom_metadata.as_deref());
88 } else {
89 put_nullable_bytes(buf, self.custom_metadata.as_deref());
90 }
91 }
92 if version >= 0 {
93 put_i8(buf, self.remote_log_segment_state);
94 }
95 if flex {
96 let mut tagged = WriteTaggedFields::new();
97 if !(crate::codegen_helpers::is_default(&self.txn_index_empty)) {
98 let payload = encode_to_bytes(1, |b| {
99 put_bool(b, self.txn_index_empty);
100 Ok(())
101 });
102 tagged.add(0, payload);
103 }
104 tagged.write(buf, &self.unknown_tagged_fields);
105 }
106 Ok(())
107 }
108 fn encoded_len(&self, version: i16) -> usize {
109 let flex = is_flexible(version);
110 let mut n: usize = 0;
111 if version >= 0 {
112 n += self.remote_log_segment_id.encoded_len(version);
113 }
114 if version >= 0 {
115 n += 8;
116 }
117 if version >= 0 {
118 n += 8;
119 }
120 if version >= 0 {
121 n += 4;
122 }
123 if version >= 0 {
124 n += 8;
125 }
126 if version >= 0 {
127 n += 8;
128 }
129 if version >= 0 {
130 n += {
131 let prefix = crate::primitives::array::array_len_prefix_len(
132 (self.segment_leader_epochs).len(),
133 flex,
134 );
135 let body: usize = (self.segment_leader_epochs)
136 .iter()
137 .map(|it| it.encoded_len(version))
138 .sum();
139 prefix + body
140 };
141 }
142 if version >= 0 {
143 n += 4;
144 }
145 if version >= 0 {
146 n += if flex {
147 compact_nullable_bytes_len(self.custom_metadata.as_deref())
148 } else {
149 nullable_bytes_len(self.custom_metadata.as_deref())
150 };
151 }
152 if version >= 0 {
153 n += 1;
154 }
155 if flex {
156 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
157 if !(crate::codegen_helpers::is_default(&self.txn_index_empty)) {
158 known_pairs.push((0, 1));
159 }
160 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
161 }
162 n
163 }
164}
165impl Decode<'_> for RemoteLogSegmentMetadataRecord {
166 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
167 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
168 return Err(ProtocolError::SchemaMismatch(
169 "RemoteLogSegmentMetadataRecord version out of range",
170 ));
171 }
172 let flex = is_flexible(version);
173 let mut out = Self::default();
174 if version >= 0 {
175 out.remote_log_segment_id = RemoteLogSegmentIdEntry::decode(buf, version)?;
176 }
177 if version >= 0 {
178 out.start_offset = get_i64(buf)?;
179 }
180 if version >= 0 {
181 out.end_offset = get_i64(buf)?;
182 }
183 if version >= 0 {
184 out.broker_id = get_i32(buf)?;
185 }
186 if version >= 0 {
187 out.max_timestamp_ms = get_i64(buf)?;
188 }
189 if version >= 0 {
190 out.event_timestamp_ms = get_i64(buf)?;
191 }
192 if version >= 0 {
193 out.segment_leader_epochs = {
194 let n = crate::primitives::array::get_array_len(buf, flex)?;
195 let mut v = Vec::with_capacity(n);
196 for _ in 0..n {
197 v.push(SegmentLeaderEpochEntry::decode(buf, version)?);
198 }
199 v
200 };
201 }
202 if version >= 0 {
203 out.segment_size_in_bytes = get_i32(buf)?;
204 }
205 if version >= 0 {
206 out.custom_metadata = if flex {
207 get_compact_nullable_bytes_owned(buf)?
208 } else {
209 get_nullable_bytes_owned(buf)?
210 };
211 }
212 if version >= 0 {
213 out.remote_log_segment_state = get_i8(buf)?;
214 }
215 if flex {
216 let mut tag_txn_index_empty = None;
217 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
218 0 => {
219 tag_txn_index_empty = Some({
220 let b: &mut &[u8] = payload;
221 get_bool(b)?
222 });
223 Ok(true)
224 }
225 _ => Ok(false),
226 })?;
227 if let Some(v) = tag_txn_index_empty {
228 out.txn_index_empty = v;
229 }
230 }
231 Ok(out)
232 }
233}
234#[cfg(test)]
235impl RemoteLogSegmentMetadataRecord {
236 #[must_use]
237 pub fn populated(version: i16) -> Self {
238 let mut m = Self::default();
239 if version >= 0 {
240 m.remote_log_segment_id = RemoteLogSegmentIdEntry::populated(version);
241 }
242 if version >= 0 {
243 m.start_offset = 1i64;
244 }
245 if version >= 0 {
246 m.end_offset = 1i64;
247 }
248 if version >= 0 {
249 m.broker_id = 1i32;
250 }
251 if version >= 0 {
252 m.max_timestamp_ms = 1i64;
253 }
254 if version >= 0 {
255 m.event_timestamp_ms = 1i64;
256 }
257 if version >= 0 {
258 m.segment_leader_epochs = vec![SegmentLeaderEpochEntry::populated(version)];
259 }
260 if version >= 0 {
261 m.segment_size_in_bytes = 1i32;
262 }
263 if version >= 0 {
264 m.custom_metadata = Some(::bytes::Bytes::from_static(b"x"));
265 }
266 if version >= 0 {
267 m.remote_log_segment_state = 1i8;
268 }
269 if version >= 0 {
270 m.txn_index_empty = true;
271 }
272 m
273 }
274}
275#[derive(Debug, Clone, PartialEq, Eq, Default)]
276pub struct RemoteLogSegmentIdEntry {
277 pub topic_id_partition: TopicIdPartitionEntry,
278 pub id: crate::primitives::uuid::Uuid,
279 pub unknown_tagged_fields: UnknownTaggedFields,
280}
281impl Encode for RemoteLogSegmentIdEntry {
282 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
283 let flex = version >= 0;
284 if version >= 0 {
285 self.topic_id_partition.encode(buf, version)?;
286 }
287 if version >= 0 {
288 crate::primitives::uuid::put_uuid(buf, self.id);
289 }
290 if flex {
291 let tagged = WriteTaggedFields::new();
292 tagged.write(buf, &self.unknown_tagged_fields);
293 }
294 Ok(())
295 }
296 fn encoded_len(&self, version: i16) -> usize {
297 let flex = version >= 0;
298 let mut n: usize = 0;
299 if version >= 0 {
300 n += self.topic_id_partition.encoded_len(version);
301 }
302 if version >= 0 {
303 n += 16;
304 }
305 if flex {
306 let known_pairs: Vec<(u32, usize)> = Vec::new();
307 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
308 }
309 n
310 }
311}
312impl Decode<'_> for RemoteLogSegmentIdEntry {
313 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
314 let flex = version >= 0;
315 let mut out = Self::default();
316 if version >= 0 {
317 out.topic_id_partition = TopicIdPartitionEntry::decode(buf, version)?;
318 }
319 if version >= 0 {
320 out.id = crate::primitives::uuid::get_uuid(buf)?;
321 }
322 if flex {
323 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
324 }
325 Ok(out)
326 }
327}
328#[cfg(test)]
329impl RemoteLogSegmentIdEntry {
330 #[must_use]
331 pub fn populated(version: i16) -> Self {
332 let mut m = Self::default();
333 if version >= 0 {
334 m.topic_id_partition = TopicIdPartitionEntry::populated(version);
335 }
336 if version >= 0 {
337 m.id = crate::primitives::uuid::Uuid([1u8; 16]);
338 }
339 m
340 }
341}
342#[derive(Debug, Clone, PartialEq, Eq, Default)]
343pub struct TopicIdPartitionEntry {
344 pub name: String,
345 pub id: crate::primitives::uuid::Uuid,
346 pub partition: i32,
347 pub unknown_tagged_fields: UnknownTaggedFields,
348}
349impl Encode for TopicIdPartitionEntry {
350 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
351 let flex = version >= 0;
352 if version >= 0 {
353 if flex {
354 put_compact_string(buf, &self.name);
355 } else {
356 put_string(buf, &self.name);
357 }
358 }
359 if version >= 0 {
360 crate::primitives::uuid::put_uuid(buf, self.id);
361 }
362 if version >= 0 {
363 put_i32(buf, self.partition);
364 }
365 if flex {
366 let tagged = WriteTaggedFields::new();
367 tagged.write(buf, &self.unknown_tagged_fields);
368 }
369 Ok(())
370 }
371 fn encoded_len(&self, version: i16) -> usize {
372 let flex = version >= 0;
373 let mut n: usize = 0;
374 if version >= 0 {
375 n += if flex {
376 compact_string_len(&self.name)
377 } else {
378 string_len(&self.name)
379 };
380 }
381 if version >= 0 {
382 n += 16;
383 }
384 if version >= 0 {
385 n += 4;
386 }
387 if flex {
388 let known_pairs: Vec<(u32, usize)> = Vec::new();
389 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
390 }
391 n
392 }
393}
394impl Decode<'_> for TopicIdPartitionEntry {
395 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
396 let flex = version >= 0;
397 let mut out = Self::default();
398 if version >= 0 {
399 out.name = if flex {
400 get_compact_string_owned(buf)?
401 } else {
402 get_string_owned(buf)?
403 };
404 }
405 if version >= 0 {
406 out.id = crate::primitives::uuid::get_uuid(buf)?;
407 }
408 if version >= 0 {
409 out.partition = get_i32(buf)?;
410 }
411 if flex {
412 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
413 }
414 Ok(out)
415 }
416}
417#[cfg(test)]
418impl TopicIdPartitionEntry {
419 #[must_use]
420 pub fn populated(version: i16) -> Self {
421 let mut m = Self::default();
422 if version >= 0 {
423 m.name = "x".to_string();
424 }
425 if version >= 0 {
426 m.id = crate::primitives::uuid::Uuid([1u8; 16]);
427 }
428 if version >= 0 {
429 m.partition = 1i32;
430 }
431 m
432 }
433}
434#[derive(Debug, Clone, PartialEq, Eq, Default)]
435pub struct SegmentLeaderEpochEntry {
436 pub leader_epoch: i32,
437 pub offset: i64,
438 pub unknown_tagged_fields: UnknownTaggedFields,
439}
440impl Encode for SegmentLeaderEpochEntry {
441 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
442 let flex = version >= 0;
443 if version >= 0 {
444 put_i32(buf, self.leader_epoch);
445 }
446 if version >= 0 {
447 put_i64(buf, self.offset);
448 }
449 if flex {
450 let tagged = WriteTaggedFields::new();
451 tagged.write(buf, &self.unknown_tagged_fields);
452 }
453 Ok(())
454 }
455 fn encoded_len(&self, version: i16) -> usize {
456 let flex = version >= 0;
457 let mut n: usize = 0;
458 if version >= 0 {
459 n += 4;
460 }
461 if version >= 0 {
462 n += 8;
463 }
464 if flex {
465 let known_pairs: Vec<(u32, usize)> = Vec::new();
466 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
467 }
468 n
469 }
470}
471impl Decode<'_> for SegmentLeaderEpochEntry {
472 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
473 let flex = version >= 0;
474 let mut out = Self::default();
475 if version >= 0 {
476 out.leader_epoch = get_i32(buf)?;
477 }
478 if version >= 0 {
479 out.offset = get_i64(buf)?;
480 }
481 if flex {
482 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
483 }
484 Ok(out)
485 }
486}
487#[cfg(test)]
488impl SegmentLeaderEpochEntry {
489 #[must_use]
490 pub fn populated(version: i16) -> Self {
491 let mut m = Self::default();
492 if version >= 0 {
493 m.leader_epoch = 1i32;
494 }
495 if version >= 0 {
496 m.offset = 1i64;
497 }
498 m
499 }
500}
501
502#[must_use]
505#[allow(unused_comparisons)]
506pub fn default_json(version: i16) -> ::serde_json::Value {
507 let mut obj = ::serde_json::Map::new();
508 obj.insert("remoteLogSegmentId".to_string(), {
509 let mut m = ::serde_json::Map::new();
510 m.insert("topicIdPartition".to_string(), {
511 let mut m = ::serde_json::Map::new();
512 m.insert(
513 "name".to_string(),
514 ::serde_json::Value::String(String::new()),
515 );
516 m.insert(
517 "id".to_string(),
518 ::serde_json::Value::String("AAAAAAAAAAAAAAAAAAAAAA".to_string()),
519 );
520 m.insert("partition".to_string(), ::serde_json::json!(0));
521 ::serde_json::Value::Object(m)
522 });
523 m.insert(
524 "id".to_string(),
525 ::serde_json::Value::String("AAAAAAAAAAAAAAAAAAAAAA".to_string()),
526 );
527 ::serde_json::Value::Object(m)
528 });
529 obj.insert("startOffset".to_string(), ::serde_json::json!(0));
530 obj.insert("endOffset".to_string(), ::serde_json::json!(0));
531 obj.insert("brokerId".to_string(), ::serde_json::json!(0));
532 obj.insert("maxTimestampMs".to_string(), ::serde_json::json!(0));
533 obj.insert("eventTimestampMs".to_string(), ::serde_json::json!(0));
534 obj.insert(
535 "segmentLeaderEpochs".to_string(),
536 ::serde_json::Value::Array(vec![]),
537 );
538 obj.insert("segmentSizeInBytes".to_string(), ::serde_json::json!(0));
539 obj.insert("customMetadata".to_string(), ::serde_json::Value::Null);
540 obj.insert("remoteLogSegmentState".to_string(), ::serde_json::json!(0));
541 obj.insert(
542 "txnIndexEmpty".to_string(),
543 ::serde_json::Value::Bool(false),
544 );
545 ::serde_json::Value::Object(obj)
546}