kafka_api/schemata/
offset_fetch_request.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// In version 0, the request read offsets from ZK.
21//
22// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets
23// topic.
24//
25// Starting in version 2, the request can contain a null topics array to indicate that offsets
26// for all topics should be fetched. It also returns a top level error code
27// for group or coordinator level errors.
28//
29// Version 3, 4, and 5 are the same as version 2.
30//
31// Version 6 is the first flexible version.
32//
33// Version 7 is adding the require stable flag.
34//
35// Version 8 is adding support for fetching offsets for multiple groups at a time
36
37#[derive(Debug, Default, Clone)]
38pub struct OffsetFetchRequest {
39    /// The group to fetch offsets for.
40    pub group_id: String,
41    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
42    pub topics: Vec<OffsetFetchRequestTopic>,
43    /// Each group we would like to fetch offsets for.
44    pub groups: Vec<OffsetFetchRequestGroup>,
45    /// Whether broker should hold on returning unstable offsets but set a retryable error code for
46    /// the partitions.
47    pub require_stable: bool,
48    /// Unknown tagged fields.
49    pub unknown_tagged_fields: Vec<RawTaggedField>,
50}
51
52impl Decodable for OffsetFetchRequest {
53    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
54        let mut this = OffsetFetchRequest::default();
55        if version <= 7 {
56            this.group_id = NullableString(version >= 6)
57                .decode(buf)?
58                .ok_or_else(|| err_decode_message_null("groups"))?;
59            this.topics = NullableArray(Struct(version), version >= 6)
60                .decode(buf)?
61                .or_else(|| if version >= 2 { Some(vec![]) } else { None })
62                .ok_or_else(|| err_decode_message_null("topics"))?;
63        }
64        if version >= 8 {
65            this.groups = NullableArray(Struct(version), true)
66                .decode(buf)?
67                .ok_or_else(|| err_decode_message_null("groups"))?;
68        }
69        if version >= 7 {
70            this.require_stable = Bool.decode(buf)?;
71        }
72        if version >= 6 {
73            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
74        }
75        Ok(this)
76    }
77}
78
79#[derive(Debug, Default, Clone)]
80pub struct OffsetFetchRequestTopic {
81    /// The topic name.
82    pub name: String,
83    /// The partition indexes we would like to fetch offsets for.
84    pub partition_indexes: Vec<i32>,
85    /// Unknown tagged fields.
86    pub unknown_tagged_fields: Vec<RawTaggedField>,
87}
88
89impl Decodable for OffsetFetchRequestTopic {
90    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
91        let mut this = OffsetFetchRequestTopic {
92            name: NullableString(version >= 6)
93                .decode(buf)?
94                .ok_or_else(|| err_decode_message_null("name"))?,
95            partition_indexes: NullableArray(Int32, version >= 6)
96                .decode(buf)?
97                .ok_or_else(|| err_decode_message_null("partition_indexes"))?,
98            ..Default::default()
99        };
100        if version >= 6 {
101            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
102        }
103        Ok(this)
104    }
105}
106
107#[derive(Debug, Default, Clone)]
108pub struct OffsetFetchRequestGroup {
109    /// The group ID.
110    pub group_id: String,
111    /// Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.
112    pub topics: Vec<OffsetFetchRequestTopic>,
113    /// Unknown tagged fields.
114    pub unknown_tagged_fields: Vec<RawTaggedField>,
115}
116
117impl Decodable for OffsetFetchRequestGroup {
118    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
119        if version > 8 {
120            Err(err_decode_message_unsupported(
121                version,
122                "OffsetFetchRequestGroup",
123            ))?
124        }
125        let mut this = OffsetFetchRequestGroup {
126            group_id: NullableString(true)
127                .decode(buf)?
128                .ok_or_else(|| err_decode_message_null("group_id"))?,
129            topics: NullableArray(Struct(version), true)
130                .decode(buf)?
131                .unwrap_or_default(),
132            ..Default::default()
133        };
134        if version >= 6 {
135            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
136        }
137        Ok(this)
138    }
139}