rskafka/protocol/messages/
produce.rs1use std::io::{Read, Write};
2
3use crate::protocol::{
4    api_key::ApiKey,
5    api_version::{ApiVersion, ApiVersionRange},
6    error::Error,
7    messages::{read_versioned_array, write_versioned_array},
8    primitives::{Int16, Int32, Int64, NullableString, Records, String_},
9    traits::{ReadType, WriteType},
10};
11
12use super::{
13    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
14};
15
16#[derive(Debug)]
17pub struct ProduceRequestPartitionData {
18    pub index: Int32,
20
21    pub records: Records,
23}
24
25impl<W> WriteVersionedType<W> for ProduceRequestPartitionData
26where
27    W: Write,
28{
29    fn write_versioned(
30        &self,
31        writer: &mut W,
32        version: ApiVersion,
33    ) -> Result<(), WriteVersionedError> {
34        let v = version.0 .0;
35        assert!(v <= 7);
36
37        self.index.write(writer)?;
38        self.records.write(writer)?;
39        Ok(())
40    }
41}
42
43#[derive(Debug)]
44pub struct ProduceRequestTopicData {
45    pub name: String_,
47
48    pub partition_data: Vec<ProduceRequestPartitionData>,
50}
51
52impl<W> WriteVersionedType<W> for ProduceRequestTopicData
53where
54    W: Write,
55{
56    fn write_versioned(
57        &self,
58        writer: &mut W,
59        version: ApiVersion,
60    ) -> Result<(), WriteVersionedError> {
61        let v = version.0 .0;
62        assert!(v <= 7);
63
64        self.name.write(writer)?;
65        write_versioned_array(writer, version, Some(&self.partition_data))?;
66
67        Ok(())
68    }
69}
70
71#[derive(Debug)]
72pub struct ProduceRequest {
73    pub transactional_id: NullableString,
77
78    pub acks: Int16,
82
83    pub timeout_ms: Int32,
85
86    pub topic_data: Vec<ProduceRequestTopicData>,
88}
89
90impl<W> WriteVersionedType<W> for ProduceRequest
91where
92    W: Write,
93{
94    fn write_versioned(
95        &self,
96        writer: &mut W,
97        version: ApiVersion,
98    ) -> Result<(), WriteVersionedError> {
99        let v = version.0 .0;
100        assert!(v <= 7);
101
102        if v >= 3 {
103            self.transactional_id.write(writer)?;
104        }
105        self.acks.write(writer)?;
106        self.timeout_ms.write(writer)?;
107        write_versioned_array(writer, version, Some(&self.topic_data))?;
108
109        Ok(())
110    }
111}
112
113impl RequestBody for ProduceRequest {
114    type ResponseBody = ProduceResponse;
115
116    const API_KEY: ApiKey = ApiKey::Produce;
117
118    const API_VERSION_RANGE: ApiVersionRange =
125        ApiVersionRange::new(ApiVersion(Int16(3)), ApiVersion(Int16(7)));
126
127    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(9));
128}
129
130#[derive(Debug)]
131#[allow(missing_copy_implementations)]
132pub struct ProduceResponsePartitionResponse {
133    pub index: Int32,
135
136    pub error: Option<Error>,
138
139    pub base_offset: Int64,
141
142    pub log_append_time_ms: Option<Int64>,
149
150    pub log_start_offset: Option<Int64>,
154}
155
156impl<R> ReadVersionedType<R> for ProduceResponsePartitionResponse
157where
158    R: Read,
159{
160    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
161        let v = version.0 .0;
162        assert!(v <= 7);
163
164        Ok(Self {
165            index: Int32::read(reader)?,
166            error: Error::new(Int16::read(reader)?.0),
167            base_offset: Int64::read(reader)?,
168            log_append_time_ms: (v >= 2).then(|| Int64::read(reader)).transpose()?,
169            log_start_offset: (v >= 5).then(|| Int64::read(reader)).transpose()?,
170        })
171    }
172}
173
174#[derive(Debug)]
175pub struct ProduceResponseResponse {
176    pub name: String_,
178
179    pub partition_responses: Vec<ProduceResponsePartitionResponse>,
181}
182
183impl<R> ReadVersionedType<R> for ProduceResponseResponse
184where
185    R: Read,
186{
187    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
188        let v = version.0 .0;
189        assert!(v <= 7);
190
191        Ok(Self {
192            name: String_::read(reader)?,
193            partition_responses: read_versioned_array(reader, version)?.unwrap_or_default(),
194        })
195    }
196}
197
198#[derive(Debug)]
199pub struct ProduceResponse {
200    pub responses: Vec<ProduceResponseResponse>,
202
203    pub throttle_time_ms: Option<Int32>,
207}
208
209impl<R> ReadVersionedType<R> for ProduceResponse
210where
211    R: Read,
212{
213    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
214        let v = version.0 .0;
215        assert!(v <= 7);
216
217        Ok(Self {
218            responses: read_versioned_array(reader, version)?.unwrap_or_default(),
219            throttle_time_ms: (v >= 1).then(|| Int32::read(reader)).transpose()?,
220        })
221    }
222}