1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use byteorder::ReadBytesExt;
1617use crate::codec::*;
18use crate::IoResult;
1920// In version 0, the request read offsets from ZK.
21//
22// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets
23// topic.
24//
25// Starting in version 2, the request can contain a null topics array to indicate that offsets
26// for all topics should be fetched. It also returns a top level error code
27// for group or coordinator level errors.
28//
29// Version 3, 4, and 5 are the same as version 2.
30//
31// Version 6 is the first flexible version.
32//
33// Version 7 is adding the require stable flag.
34//
35// Version 8 is adding support for fetching offsets for multiple groups at a time
3637#[derive(Debug, Default, Clone)]
38pub struct OffsetFetchRequest {
39/// The group to fetch offsets for.
40pub group_id: String,
41/// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
42pub topics: Vec<OffsetFetchRequestTopic>,
43/// Each group we would like to fetch offsets for.
44pub groups: Vec<OffsetFetchRequestGroup>,
45/// Whether broker should hold on returning unstable offsets but set a retryable error code for
46 /// the partitions.
47pub require_stable: bool,
48/// Unknown tagged fields.
49pub unknown_tagged_fields: Vec<RawTaggedField>,
50}
5152impl Decodable for OffsetFetchRequest {
53fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
54let mut this = OffsetFetchRequest::default();
55if version <= 7 {
56 this.group_id = NullableString(version >= 6)
57 .decode(buf)?
58.ok_or_else(|| err_decode_message_null("groups"))?;
59 this.topics = NullableArray(Struct(version), version >= 6)
60 .decode(buf)?
61.or_else(|| if version >= 2 { Some(vec![]) } else { None })
62 .ok_or_else(|| err_decode_message_null("topics"))?;
63 }
64if version >= 8 {
65 this.groups = NullableArray(Struct(version), true)
66 .decode(buf)?
67.ok_or_else(|| err_decode_message_null("groups"))?;
68 }
69if version >= 7 {
70 this.require_stable = Bool.decode(buf)?;
71 }
72if version >= 6 {
73 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
74 }
75Ok(this)
76 }
77}
7879#[derive(Debug, Default, Clone)]
80pub struct OffsetFetchRequestTopic {
81/// The topic name.
82pub name: String,
83/// The partition indexes we would like to fetch offsets for.
84pub partition_indexes: Vec<i32>,
85/// Unknown tagged fields.
86pub unknown_tagged_fields: Vec<RawTaggedField>,
87}
8889impl Decodable for OffsetFetchRequestTopic {
90fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
91let mut this = OffsetFetchRequestTopic {
92 name: NullableString(version >= 6)
93 .decode(buf)?
94.ok_or_else(|| err_decode_message_null("name"))?,
95 partition_indexes: NullableArray(Int32, version >= 6)
96 .decode(buf)?
97.ok_or_else(|| err_decode_message_null("partition_indexes"))?,
98 ..Default::default()
99 };
100if version >= 6 {
101 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
102 }
103Ok(this)
104 }
105}
106107#[derive(Debug, Default, Clone)]
108pub struct OffsetFetchRequestGroup {
109/// The group ID.
110pub group_id: String,
111/// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
112pub topics: Vec<OffsetFetchRequestTopic>,
113/// Unknown tagged fields.
114pub unknown_tagged_fields: Vec<RawTaggedField>,
115}
116117impl Decodable for OffsetFetchRequestGroup {
118fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
119if version > 8 {
120Err(err_decode_message_unsupported(
121 version,
122"OffsetFetchRequestGroup",
123 ))?
124}
125let mut this = OffsetFetchRequestGroup {
126 group_id: NullableString(true)
127 .decode(buf)?
128.ok_or_else(|| err_decode_message_null("group_id"))?,
129 topics: NullableArray(Struct(version), true)
130 .decode(buf)?
131.unwrap_or_default(),
132 ..Default::default()
133 };
134if version >= 6 {
135 this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
136 }
137Ok(this)
138 }
139}