kafka_protocol/messages/
init_producer_id_request.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct InitProducerIdRequest {
24 pub transactional_id: Option<super::TransactionalId>,
28
29 pub transaction_timeout_ms: i32,
33
34 pub producer_id: super::ProducerId,
38
39 pub producer_epoch: i16,
43
44 pub enable_2_pc: bool,
48
49 pub keep_prepared_txn: bool,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl InitProducerIdRequest {
59 pub fn with_transactional_id(mut self, value: Option<super::TransactionalId>) -> Self {
65 self.transactional_id = value;
66 self
67 }
68 pub fn with_transaction_timeout_ms(mut self, value: i32) -> Self {
74 self.transaction_timeout_ms = value;
75 self
76 }
77 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
83 self.producer_id = value;
84 self
85 }
86 pub fn with_producer_epoch(mut self, value: i16) -> Self {
92 self.producer_epoch = value;
93 self
94 }
95 pub fn with_enable_2_pc(mut self, value: bool) -> Self {
101 self.enable_2_pc = value;
102 self
103 }
104 pub fn with_keep_prepared_txn(mut self, value: bool) -> Self {
110 self.keep_prepared_txn = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for InitProducerIdRequest {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 0 || version > 5 {
129 bail!("specified version not supported by this message type");
130 }
131 if version >= 2 {
132 types::CompactString.encode(buf, &self.transactional_id)?;
133 } else {
134 types::String.encode(buf, &self.transactional_id)?;
135 }
136 types::Int32.encode(buf, &self.transaction_timeout_ms)?;
137 if version >= 3 {
138 types::Int64.encode(buf, &self.producer_id)?;
139 } else {
140 if self.producer_id != -1 {
141 bail!("A field is set that is not available on the selected protocol version");
142 }
143 }
144 if version >= 3 {
145 types::Int16.encode(buf, &self.producer_epoch)?;
146 } else {
147 if self.producer_epoch != -1 {
148 bail!("A field is set that is not available on the selected protocol version");
149 }
150 }
151 if version >= 2 {
152 let num_tagged_fields = self.unknown_tagged_fields.len();
153 if num_tagged_fields > std::u32::MAX as usize {
154 bail!(
155 "Too many tagged fields to encode ({} fields)",
156 num_tagged_fields
157 );
158 }
159 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
160
161 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
162 }
163 Ok(())
164 }
165 fn compute_size(&self, version: i16) -> Result<usize> {
166 let mut total_size = 0;
167 if version >= 2 {
168 total_size += types::CompactString.compute_size(&self.transactional_id)?;
169 } else {
170 total_size += types::String.compute_size(&self.transactional_id)?;
171 }
172 total_size += types::Int32.compute_size(&self.transaction_timeout_ms)?;
173 if version >= 3 {
174 total_size += types::Int64.compute_size(&self.producer_id)?;
175 } else {
176 if self.producer_id != -1 {
177 bail!("A field is set that is not available on the selected protocol version");
178 }
179 }
180 if version >= 3 {
181 total_size += types::Int16.compute_size(&self.producer_epoch)?;
182 } else {
183 if self.producer_epoch != -1 {
184 bail!("A field is set that is not available on the selected protocol version");
185 }
186 }
187 if version >= 2 {
188 let num_tagged_fields = self.unknown_tagged_fields.len();
189 if num_tagged_fields > std::u32::MAX as usize {
190 bail!(
191 "Too many tagged fields to encode ({} fields)",
192 num_tagged_fields
193 );
194 }
195 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
196
197 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
198 }
199 Ok(total_size)
200 }
201}
202
203#[cfg(feature = "broker")]
204impl Decodable for InitProducerIdRequest {
205 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
206 if version < 0 || version > 5 {
207 bail!("specified version not supported by this message type");
208 }
209 let transactional_id = if version >= 2 {
210 types::CompactString.decode(buf)?
211 } else {
212 types::String.decode(buf)?
213 };
214 let transaction_timeout_ms = types::Int32.decode(buf)?;
215 let producer_id = if version >= 3 {
216 types::Int64.decode(buf)?
217 } else {
218 (-1).into()
219 };
220 let producer_epoch = if version >= 3 {
221 types::Int16.decode(buf)?
222 } else {
223 -1
224 };
225 let enable_2_pc = false;
226 let keep_prepared_txn = false;
227 let mut unknown_tagged_fields = BTreeMap::new();
228 if version >= 2 {
229 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
230 for _ in 0..num_tagged_fields {
231 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
232 let size: u32 = types::UnsignedVarInt.decode(buf)?;
233 let unknown_value = buf.try_get_bytes(size as usize)?;
234 unknown_tagged_fields.insert(tag as i32, unknown_value);
235 }
236 }
237 Ok(Self {
238 transactional_id,
239 transaction_timeout_ms,
240 producer_id,
241 producer_epoch,
242 enable_2_pc,
243 keep_prepared_txn,
244 unknown_tagged_fields,
245 })
246 }
247}
248
249impl Default for InitProducerIdRequest {
250 fn default() -> Self {
251 Self {
252 transactional_id: Some(Default::default()),
253 transaction_timeout_ms: 0,
254 producer_id: (-1).into(),
255 producer_epoch: -1,
256 enable_2_pc: false,
257 keep_prepared_txn: false,
258 unknown_tagged_fields: BTreeMap::new(),
259 }
260 }
261}
262
263impl Message for InitProducerIdRequest {
264 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
265 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
266}
267
268impl HeaderVersion for InitProducerIdRequest {
269 fn header_version(version: i16) -> i16 {
270 if version >= 2 {
271 2
272 } else {
273 1
274 }
275 }
276}