fluvio_dataplane_protocol/produce/
request.rs1use std::fmt::Debug;
2use std::io::{Error, ErrorKind};
3use std::marker::PhantomData;
4use std::time::Duration;
5use bytes::{Buf, BufMut};
6
7use crate::batch::RawRecords;
8use crate::core::Encoder;
9use crate::core::Decoder;
10use crate::derive::FluvioDefault;
11use crate::core::Version;
12
13use crate::api::Request;
14use crate::record::RecordSet;
15use crate::Isolation;
16
17use super::ProduceResponse;
18
19pub type DefaultProduceRequest = ProduceRequest<RecordSet<RawRecords>>;
20pub type DefaultPartitionRequest = PartitionProduceData<RecordSet<RawRecords>>;
21pub type DefaultTopicRequest = TopicProduceData<RecordSet<RawRecords>>;
22
23#[derive(FluvioDefault, Debug)]
24pub struct ProduceRequest<R>
25where
26 R: Encoder + Decoder + Default + Debug,
27{
28 #[fluvio(min_version = 3)]
30 pub transactional_id: Option<String>,
31
32 pub isolation: Isolation,
35
36 pub timeout: Duration,
38
39 pub topics: Vec<TopicProduceData<R>>,
41 pub data: PhantomData<R>,
42}
43
44impl<R> Request for ProduceRequest<R>
45where
46 R: Debug + Decoder + Encoder,
47{
48 const API_KEY: u16 = 0;
49
50 const MIN_API_VERSION: i16 = 0;
51 const MAX_API_VERSION: i16 = 7;
52 const DEFAULT_API_VERSION: i16 = 7;
53
54 type Response = ProduceResponse;
55}
56
57#[derive(Encoder, Decoder, FluvioDefault, Debug)]
58pub struct TopicProduceData<R>
59where
60 R: Encoder + Decoder + Default + Debug,
61{
62 pub name: String,
64
65 pub partitions: Vec<PartitionProduceData<R>>,
67 pub data: PhantomData<R>,
68}
69
70#[derive(Encoder, Decoder, FluvioDefault, Debug)]
71pub struct PartitionProduceData<R>
72where
73 R: Encoder + Decoder + Default + Debug,
74{
75 pub partition_index: i32,
77
78 pub records: R,
80}
81
82impl<R> Encoder for ProduceRequest<R>
83where
84 R: Encoder + Decoder + Default + Debug,
85{
86 fn write_size(&self, version: Version) -> usize {
87 self.transactional_id.write_size(version)
88 + IsolationData(0i16).write_size(version)
89 + TimeoutData(0i32).write_size(version)
90 + self.topics.write_size(version)
91 }
92
93 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
94 where
95 T: BufMut,
96 {
97 self.transactional_id.encode(dest, version)?;
98 IsolationData::from(self.isolation).encode(dest, version)?;
99 TimeoutData::try_from(self.timeout)?.encode(dest, version)?;
100 self.topics.encode(dest, version)?;
101 Ok(())
102 }
103}
104
105impl<R> Decoder for ProduceRequest<R>
106where
107 R: Decoder + Encoder + Default + Debug,
108{
109 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
110 where
111 T: Buf,
112 {
113 self.transactional_id = Decoder::decode_from(src, version)?;
114 self.isolation = Isolation::from(IsolationData::decode_from(src, version)?);
115 self.timeout = Duration::try_from(TimeoutData::decode_from(src, version)?)?;
116 self.topics = Decoder::decode_from(src, version)?;
117 Ok(())
118 }
119}
120
121impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for ProduceRequest<R> {
122 fn clone(&self) -> Self {
123 Self {
124 transactional_id: self.transactional_id.clone(),
125 isolation: self.isolation,
126 timeout: self.timeout,
127 topics: self.topics.clone(),
128 data: self.data,
129 }
130 }
131}
132
133impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for TopicProduceData<R> {
134 fn clone(&self) -> Self {
135 Self {
136 name: self.name.clone(),
137 partitions: self.partitions.clone(),
138 data: self.data,
139 }
140 }
141}
142
143impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for PartitionProduceData<R> {
144 fn clone(&self) -> Self {
145 Self {
146 partition_index: self.partition_index,
147 records: self.records.clone(),
148 }
149 }
150}
151
152#[derive(Encoder, Decoder, FluvioDefault, Debug)]
154struct IsolationData(i16);
155
156impl From<Isolation> for IsolationData {
157 fn from(isolation: Isolation) -> Self {
158 IsolationData(match isolation {
159 Isolation::ReadUncommitted => 1,
160 Isolation::ReadCommitted => -1,
161 })
162 }
163}
164
165impl From<IsolationData> for Isolation {
166 fn from(data: IsolationData) -> Self {
167 match data.0 {
168 acks if acks < 0 => Isolation::ReadCommitted,
169 _ => Isolation::ReadUncommitted,
170 }
171 }
172}
173
174#[derive(Encoder, Decoder, FluvioDefault, Debug)]
176struct TimeoutData(i32);
177
178impl TryFrom<Duration> for TimeoutData {
179 type Error = Error;
180
181 fn try_from(value: Duration) -> Result<Self, Self::Error> {
182 value.as_millis().try_into().map(TimeoutData).map_err(|_e| {
183 Error::new(
184 ErrorKind::InvalidInput,
185 "Timeout must fit into 4 bytes integer value",
186 )
187 })
188 }
189}
190
191impl TryFrom<TimeoutData> for Duration {
192 type Error = Error;
193
194 fn try_from(value: TimeoutData) -> Result<Self, Self::Error> {
195 u64::try_from(value.0)
196 .map(Duration::from_millis)
197 .map_err(|_e| {
198 Error::new(
199 ErrorKind::InvalidInput,
200 "Timeout must be positive integer value",
201 )
202 })
203 }
204}
205
206#[cfg(feature = "file")]
207pub use file::*;
208
209#[cfg(feature = "file")]
210mod file {
211 use std::io::Error as IoError;
212
213 use tracing::trace;
214 use bytes::BytesMut;
215
216 use crate::core::Version;
217 use crate::record::FileRecordSet;
218 use crate::store::FileWrite;
219 use crate::store::StoreValue;
220
221 use super::*;
222
223 pub type FileProduceRequest = ProduceRequest<FileRecordSet>;
224 pub type FileTopicRequest = TopicProduceData<FileRecordSet>;
225 pub type FilePartitionRequest = PartitionProduceData<FileRecordSet>;
226
227 impl FileWrite for FileProduceRequest {
228 fn file_encode(
229 &self,
230 src: &mut BytesMut,
231 data: &mut Vec<StoreValue>,
232 version: Version,
233 ) -> Result<(), IoError> {
234 trace!("file encoding produce request");
235 self.transactional_id.encode(src, version)?;
236 IsolationData::from(self.isolation).encode(src, version)?;
237 TimeoutData::try_from(self.timeout)?.encode(src, version)?;
238 self.topics.file_encode(src, data, version)?;
239 Ok(())
240 }
241 }
242
243 impl FileWrite for FileTopicRequest {
244 fn file_encode(
245 &self,
246 src: &mut BytesMut,
247 data: &mut Vec<StoreValue>,
248 version: Version,
249 ) -> Result<(), IoError> {
250 trace!("file encoding produce topic request");
251 self.name.encode(src, version)?;
252 self.partitions.file_encode(src, data, version)?;
253 Ok(())
254 }
255 }
256
257 impl FileWrite for FilePartitionRequest {
258 fn file_encode(
259 &self,
260 src: &mut BytesMut,
261 data: &mut Vec<StoreValue>,
262 version: Version,
263 ) -> Result<(), IoError> {
264 trace!("file encoding for partition request");
265 self.partition_index.encode(src, version)?;
266 self.records.file_encode(src, data, version)?;
267 Ok(())
268 }
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::io::{Error, ErrorKind};
275 use std::time::Duration;
276 use fluvio_protocol::{Decoder, Encoder};
277 use crate::Isolation;
278 use crate::produce::DefaultProduceRequest;
279 use crate::produce::TopicProduceData;
280 use crate::produce::PartitionProduceData;
281 use fluvio_protocol::api::Request;
282 use crate::batch::Batch;
283 use crate::record::{Record, RecordData, RecordSet};
284
285 #[test]
286 fn test_encode_decode_produce_request_isolation_timeout() -> Result<(), Error> {
287 let request = DefaultProduceRequest {
288 isolation: Isolation::ReadCommitted,
289 timeout: Duration::from_millis(123456),
290 ..Default::default()
291 };
292
293 let version = DefaultProduceRequest::DEFAULT_API_VERSION;
294 let mut bytes = request.as_bytes(version)?;
295
296 let decoded: DefaultProduceRequest = Decoder::decode_from(&mut bytes, version)?;
297
298 assert_eq!(request.isolation, decoded.isolation);
299 assert_eq!(request.timeout, decoded.timeout);
300 Ok(())
301 }
302
303 #[test]
304 fn test_encode_produce_request_timeout_too_big() {
305 let request = DefaultProduceRequest {
306 isolation: Isolation::ReadCommitted,
307 timeout: Duration::from_millis(u64::MAX),
308 ..Default::default()
309 };
310
311 let version = DefaultProduceRequest::DEFAULT_API_VERSION;
312 let result = request.as_bytes(version).expect_err("expected error");
313
314 assert_eq!(result.kind(), ErrorKind::InvalidInput);
315 assert_eq!(
316 result.to_string(),
317 "Timeout must fit into 4 bytes integer value"
318 );
319 }
320
321 #[test]
322 fn test_default_produce_request_clone() {
323 let request = DefaultProduceRequest {
325 transactional_id: Some("transaction_id".to_string()),
326 isolation: Default::default(),
327 timeout: Duration::from_millis(100),
328 topics: vec![TopicProduceData {
329 name: "topic".to_string(),
330 partitions: vec![PartitionProduceData {
331 partition_index: 1,
332 records: RecordSet {
333 batches: vec![Batch::from(vec![Record::new(RecordData::from(
334 "some raw data",
335 ))])
336 .try_into()
337 .expect("compressed batch")],
338 },
339 }],
340 data: Default::default(),
341 }],
342 data: Default::default(),
343 };
344 let version = DefaultProduceRequest::DEFAULT_API_VERSION;
345
346 let cloned = request.clone();
348 let bytes = request.as_bytes(version).expect("encoded request");
349 let cloned_bytes = cloned.as_bytes(version).expect("encoded cloned request");
350
351 assert_eq!(bytes, cloned_bytes);
353 }
354}