kafka_protocol/messages/
list_transactions_request.rs

1//! ListTransactionsRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ListTransactionsRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-2
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListTransactionsRequest {
24    /// The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned.
25    ///
26    /// Supported API versions: 0-2
27    pub state_filters: Vec<StrBytes>,
28
29    /// The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned.
30    ///
31    /// Supported API versions: 0-2
32    pub producer_id_filters: Vec<super::ProducerId>,
33
34    /// Duration (in millis) to filter by: if < 0, all transactions will be returned; otherwise, only transactions running longer than this duration will be returned.
35    ///
36    /// Supported API versions: 1-2
37    pub duration_filter: i64,
38
39    /// The transactional ID regular expression pattern to filter by: if it is empty or null, all transactions are returned; Otherwise then only the transactions matching the given regular expression will be returned.
40    ///
41    /// Supported API versions: 2
42    pub transactional_id_pattern: Option<StrBytes>,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListTransactionsRequest {
49    /// Sets `state_filters` to the passed value.
50    ///
51    /// The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned.
52    ///
53    /// Supported API versions: 0-2
54    pub fn with_state_filters(mut self, value: Vec<StrBytes>) -> Self {
55        self.state_filters = value;
56        self
57    }
58    /// Sets `producer_id_filters` to the passed value.
59    ///
60    /// The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned.
61    ///
62    /// Supported API versions: 0-2
63    pub fn with_producer_id_filters(mut self, value: Vec<super::ProducerId>) -> Self {
64        self.producer_id_filters = value;
65        self
66    }
67    /// Sets `duration_filter` to the passed value.
68    ///
69    /// Duration (in millis) to filter by: if < 0, all transactions will be returned; otherwise, only transactions running longer than this duration will be returned.
70    ///
71    /// Supported API versions: 1-2
72    pub fn with_duration_filter(mut self, value: i64) -> Self {
73        self.duration_filter = value;
74        self
75    }
76    /// Sets `transactional_id_pattern` to the passed value.
77    ///
78    /// The transactional ID regular expression pattern to filter by: if it is empty or null, all transactions are returned; Otherwise then only the transactions matching the given regular expression will be returned.
79    ///
80    /// Supported API versions: 2
81    pub fn with_transactional_id_pattern(mut self, value: Option<StrBytes>) -> Self {
82        self.transactional_id_pattern = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for ListTransactionsRequest {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        if version < 0 || version > 2 {
101            bail!("specified version not supported by this message type");
102        }
103        types::CompactArray(types::CompactString).encode(buf, &self.state_filters)?;
104        types::CompactArray(types::Int64).encode(buf, &self.producer_id_filters)?;
105        if version >= 1 {
106            types::Int64.encode(buf, &self.duration_filter)?;
107        } else {
108            if self.duration_filter != -1 {
109                bail!("A field is set that is not available on the selected protocol version");
110            }
111        }
112        if version >= 2 {
113            types::CompactString.encode(buf, &self.transactional_id_pattern)?;
114        } else {
115            if !self.transactional_id_pattern.is_none() {
116                bail!("A field is set that is not available on the selected protocol version");
117            }
118        }
119        let num_tagged_fields = self.unknown_tagged_fields.len();
120        if num_tagged_fields > std::u32::MAX as usize {
121            bail!(
122                "Too many tagged fields to encode ({} fields)",
123                num_tagged_fields
124            );
125        }
126        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
127
128        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
129        Ok(())
130    }
131    fn compute_size(&self, version: i16) -> Result<usize> {
132        let mut total_size = 0;
133        total_size +=
134            types::CompactArray(types::CompactString).compute_size(&self.state_filters)?;
135        total_size += types::CompactArray(types::Int64).compute_size(&self.producer_id_filters)?;
136        if version >= 1 {
137            total_size += types::Int64.compute_size(&self.duration_filter)?;
138        } else {
139            if self.duration_filter != -1 {
140                bail!("A field is set that is not available on the selected protocol version");
141            }
142        }
143        if version >= 2 {
144            total_size += types::CompactString.compute_size(&self.transactional_id_pattern)?;
145        } else {
146            if !self.transactional_id_pattern.is_none() {
147                bail!("A field is set that is not available on the selected protocol version");
148            }
149        }
150        let num_tagged_fields = self.unknown_tagged_fields.len();
151        if num_tagged_fields > std::u32::MAX as usize {
152            bail!(
153                "Too many tagged fields to encode ({} fields)",
154                num_tagged_fields
155            );
156        }
157        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
158
159        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
160        Ok(total_size)
161    }
162}
163
164#[cfg(feature = "broker")]
165impl Decodable for ListTransactionsRequest {
166    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
167        if version < 0 || version > 2 {
168            bail!("specified version not supported by this message type");
169        }
170        let state_filters = types::CompactArray(types::CompactString).decode(buf)?;
171        let producer_id_filters = types::CompactArray(types::Int64).decode(buf)?;
172        let duration_filter = if version >= 1 {
173            types::Int64.decode(buf)?
174        } else {
175            -1
176        };
177        let transactional_id_pattern = if version >= 2 {
178            types::CompactString.decode(buf)?
179        } else {
180            None
181        };
182        let mut unknown_tagged_fields = BTreeMap::new();
183        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
184        for _ in 0..num_tagged_fields {
185            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
186            let size: u32 = types::UnsignedVarInt.decode(buf)?;
187            let unknown_value = buf.try_get_bytes(size as usize)?;
188            unknown_tagged_fields.insert(tag as i32, unknown_value);
189        }
190        Ok(Self {
191            state_filters,
192            producer_id_filters,
193            duration_filter,
194            transactional_id_pattern,
195            unknown_tagged_fields,
196        })
197    }
198}
199
200impl Default for ListTransactionsRequest {
201    fn default() -> Self {
202        Self {
203            state_filters: Default::default(),
204            producer_id_filters: Default::default(),
205            duration_filter: -1,
206            transactional_id_pattern: None,
207            unknown_tagged_fields: BTreeMap::new(),
208        }
209    }
210}
211
212impl Message for ListTransactionsRequest {
213    const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
214    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
215}
216
217impl HeaderVersion for ListTransactionsRequest {
218    fn header_version(version: i16) -> i16 {
219        2
220    }
221}