fluvio_dataplane_protocol/produce/
request.rs

1use std::fmt::Debug;
2use std::io::{Error, ErrorKind};
3use std::marker::PhantomData;
4use std::time::Duration;
5use bytes::{Buf, BufMut};
6
7use crate::batch::RawRecords;
8use crate::core::Encoder;
9use crate::core::Decoder;
10use crate::derive::FluvioDefault;
11use crate::core::Version;
12
13use crate::api::Request;
14use crate::record::RecordSet;
15use crate::Isolation;
16
17use super::ProduceResponse;
18
19pub type DefaultProduceRequest = ProduceRequest<RecordSet<RawRecords>>;
20pub type DefaultPartitionRequest = PartitionProduceData<RecordSet<RawRecords>>;
21pub type DefaultTopicRequest = TopicProduceData<RecordSet<RawRecords>>;
22
23#[derive(FluvioDefault, Debug)]
24pub struct ProduceRequest<R>
25where
26    R: Encoder + Decoder + Default + Debug,
27{
28    /// The transactional ID, or null if the producer is not transactional.
29    #[fluvio(min_version = 3)]
30    pub transactional_id: Option<String>,
31
32    /// ReadUncommitted - Just wait for leader to write message (only wait for LEO update).
33    /// ReadCommitted - Wait for messages to be committed (wait for HW).
34    pub isolation: Isolation,
35
36    /// The timeout to await a response.
37    pub timeout: Duration,
38
39    /// Each topic to produce to.
40    pub topics: Vec<TopicProduceData<R>>,
41    pub data: PhantomData<R>,
42}
43
44impl<R> Request for ProduceRequest<R>
45where
46    R: Debug + Decoder + Encoder,
47{
48    const API_KEY: u16 = 0;
49
50    const MIN_API_VERSION: i16 = 0;
51    const MAX_API_VERSION: i16 = 7;
52    const DEFAULT_API_VERSION: i16 = 7;
53
54    type Response = ProduceResponse;
55}
56
57#[derive(Encoder, Decoder, FluvioDefault, Debug)]
58pub struct TopicProduceData<R>
59where
60    R: Encoder + Decoder + Default + Debug,
61{
62    /// The topic name.
63    pub name: String,
64
65    /// Each partition to produce to.
66    pub partitions: Vec<PartitionProduceData<R>>,
67    pub data: PhantomData<R>,
68}
69
70#[derive(Encoder, Decoder, FluvioDefault, Debug)]
71pub struct PartitionProduceData<R>
72where
73    R: Encoder + Decoder + Default + Debug,
74{
75    /// The partition index.
76    pub partition_index: i32,
77
78    /// The record data to be produced.
79    pub records: R,
80}
81
82impl<R> Encoder for ProduceRequest<R>
83where
84    R: Encoder + Decoder + Default + Debug,
85{
86    fn write_size(&self, version: Version) -> usize {
87        self.transactional_id.write_size(version)
88            + IsolationData(0i16).write_size(version)
89            + TimeoutData(0i32).write_size(version)
90            + self.topics.write_size(version)
91    }
92
93    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
94    where
95        T: BufMut,
96    {
97        self.transactional_id.encode(dest, version)?;
98        IsolationData::from(self.isolation).encode(dest, version)?;
99        TimeoutData::try_from(self.timeout)?.encode(dest, version)?;
100        self.topics.encode(dest, version)?;
101        Ok(())
102    }
103}
104
105impl<R> Decoder for ProduceRequest<R>
106where
107    R: Decoder + Encoder + Default + Debug,
108{
109    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
110    where
111        T: Buf,
112    {
113        self.transactional_id = Decoder::decode_from(src, version)?;
114        self.isolation = Isolation::from(IsolationData::decode_from(src, version)?);
115        self.timeout = Duration::try_from(TimeoutData::decode_from(src, version)?)?;
116        self.topics = Decoder::decode_from(src, version)?;
117        Ok(())
118    }
119}
120
121impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for ProduceRequest<R> {
122    fn clone(&self) -> Self {
123        Self {
124            transactional_id: self.transactional_id.clone(),
125            isolation: self.isolation,
126            timeout: self.timeout,
127            topics: self.topics.clone(),
128            data: self.data,
129        }
130    }
131}
132
133impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for TopicProduceData<R> {
134    fn clone(&self) -> Self {
135        Self {
136            name: self.name.clone(),
137            partitions: self.partitions.clone(),
138            data: self.data,
139        }
140    }
141}
142
143impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for PartitionProduceData<R> {
144    fn clone(&self) -> Self {
145        Self {
146            partition_index: self.partition_index,
147            records: self.records.clone(),
148        }
149    }
150}
151
152/// Isolation is represented in binary format as i16 value (field `acks` in Kafka wire protocol).
153#[derive(Encoder, Decoder, FluvioDefault, Debug)]
154struct IsolationData(i16);
155
156impl From<Isolation> for IsolationData {
157    fn from(isolation: Isolation) -> Self {
158        IsolationData(match isolation {
159            Isolation::ReadUncommitted => 1,
160            Isolation::ReadCommitted => -1,
161        })
162    }
163}
164
165impl From<IsolationData> for Isolation {
166    fn from(data: IsolationData) -> Self {
167        match data.0 {
168            acks if acks < 0 => Isolation::ReadCommitted,
169            _ => Isolation::ReadUncommitted,
170        }
171    }
172}
173
174/// Timeout duration is represented in binary format as i32 value (field `timeout_ms` in Kafka wire protocol).
175#[derive(Encoder, Decoder, FluvioDefault, Debug)]
176struct TimeoutData(i32);
177
178impl TryFrom<Duration> for TimeoutData {
179    type Error = Error;
180
181    fn try_from(value: Duration) -> Result<Self, Self::Error> {
182        value.as_millis().try_into().map(TimeoutData).map_err(|_e| {
183            Error::new(
184                ErrorKind::InvalidInput,
185                "Timeout must fit into 4 bytes integer value",
186            )
187        })
188    }
189}
190
191impl TryFrom<TimeoutData> for Duration {
192    type Error = Error;
193
194    fn try_from(value: TimeoutData) -> Result<Self, Self::Error> {
195        u64::try_from(value.0)
196            .map(Duration::from_millis)
197            .map_err(|_e| {
198                Error::new(
199                    ErrorKind::InvalidInput,
200                    "Timeout must be positive integer value",
201                )
202            })
203    }
204}
205
206#[cfg(feature = "file")]
207pub use file::*;
208
209#[cfg(feature = "file")]
210mod file {
211    use std::io::Error as IoError;
212
213    use tracing::trace;
214    use bytes::BytesMut;
215
216    use crate::core::Version;
217    use crate::record::FileRecordSet;
218    use crate::store::FileWrite;
219    use crate::store::StoreValue;
220
221    use super::*;
222
223    pub type FileProduceRequest = ProduceRequest<FileRecordSet>;
224    pub type FileTopicRequest = TopicProduceData<FileRecordSet>;
225    pub type FilePartitionRequest = PartitionProduceData<FileRecordSet>;
226
227    impl FileWrite for FileProduceRequest {
228        fn file_encode(
229            &self,
230            src: &mut BytesMut,
231            data: &mut Vec<StoreValue>,
232            version: Version,
233        ) -> Result<(), IoError> {
234            trace!("file encoding produce request");
235            self.transactional_id.encode(src, version)?;
236            IsolationData::from(self.isolation).encode(src, version)?;
237            TimeoutData::try_from(self.timeout)?.encode(src, version)?;
238            self.topics.file_encode(src, data, version)?;
239            Ok(())
240        }
241    }
242
243    impl FileWrite for FileTopicRequest {
244        fn file_encode(
245            &self,
246            src: &mut BytesMut,
247            data: &mut Vec<StoreValue>,
248            version: Version,
249        ) -> Result<(), IoError> {
250            trace!("file encoding produce topic request");
251            self.name.encode(src, version)?;
252            self.partitions.file_encode(src, data, version)?;
253            Ok(())
254        }
255    }
256
257    impl FileWrite for FilePartitionRequest {
258        fn file_encode(
259            &self,
260            src: &mut BytesMut,
261            data: &mut Vec<StoreValue>,
262            version: Version,
263        ) -> Result<(), IoError> {
264            trace!("file encoding for partition request");
265            self.partition_index.encode(src, version)?;
266            self.records.file_encode(src, data, version)?;
267            Ok(())
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::io::{Error, ErrorKind};
275    use std::time::Duration;
276    use fluvio_protocol::{Decoder, Encoder};
277    use crate::Isolation;
278    use crate::produce::DefaultProduceRequest;
279    use crate::produce::TopicProduceData;
280    use crate::produce::PartitionProduceData;
281    use fluvio_protocol::api::Request;
282    use crate::batch::Batch;
283    use crate::record::{Record, RecordData, RecordSet};
284
285    #[test]
286    fn test_encode_decode_produce_request_isolation_timeout() -> Result<(), Error> {
287        let request = DefaultProduceRequest {
288            isolation: Isolation::ReadCommitted,
289            timeout: Duration::from_millis(123456),
290            ..Default::default()
291        };
292
293        let version = DefaultProduceRequest::DEFAULT_API_VERSION;
294        let mut bytes = request.as_bytes(version)?;
295
296        let decoded: DefaultProduceRequest = Decoder::decode_from(&mut bytes, version)?;
297
298        assert_eq!(request.isolation, decoded.isolation);
299        assert_eq!(request.timeout, decoded.timeout);
300        Ok(())
301    }
302
303    #[test]
304    fn test_encode_produce_request_timeout_too_big() {
305        let request = DefaultProduceRequest {
306            isolation: Isolation::ReadCommitted,
307            timeout: Duration::from_millis(u64::MAX),
308            ..Default::default()
309        };
310
311        let version = DefaultProduceRequest::DEFAULT_API_VERSION;
312        let result = request.as_bytes(version).expect_err("expected error");
313
314        assert_eq!(result.kind(), ErrorKind::InvalidInput);
315        assert_eq!(
316            result.to_string(),
317            "Timeout must fit into 4 bytes integer value"
318        );
319    }
320
321    #[test]
322    fn test_default_produce_request_clone() {
323        //given
324        let request = DefaultProduceRequest {
325            transactional_id: Some("transaction_id".to_string()),
326            isolation: Default::default(),
327            timeout: Duration::from_millis(100),
328            topics: vec![TopicProduceData {
329                name: "topic".to_string(),
330                partitions: vec![PartitionProduceData {
331                    partition_index: 1,
332                    records: RecordSet {
333                        batches: vec![Batch::from(vec![Record::new(RecordData::from(
334                            "some raw data",
335                        ))])
336                        .try_into()
337                        .expect("compressed batch")],
338                    },
339                }],
340                data: Default::default(),
341            }],
342            data: Default::default(),
343        };
344        let version = DefaultProduceRequest::DEFAULT_API_VERSION;
345
346        //when
347        let cloned = request.clone();
348        let bytes = request.as_bytes(version).expect("encoded request");
349        let cloned_bytes = cloned.as_bytes(version).expect("encoded cloned request");
350
351        //then
352        assert_eq!(bytes, cloned_bytes);
353    }
354}