Skip to main content

iggy_common/commands/messages/
poll_messages.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::error::IggyError;
20use crate::{BytesSerializable, Identifier, PollingKind, PollingStrategy, Sizeable, Validatable};
21use crate::{Command, POLL_MESSAGES_CODE};
22use crate::{Consumer, ConsumerKind};
23use bytes::{BufMut, Bytes, BytesMut};
24use serde::{Deserialize, Serialize};
25use std::fmt::Display;
26
27pub const DEFAULT_PARTITION_ID: u32 = 0;
28pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10;
29
30/// `PollMessages` command is used to poll messages from a topic in a stream.
31/// It has additional payload:
32/// - `consumer` - consumer which will poll messages. Either regular consumer or consumer group.
33/// - `stream_id` - unique stream ID (numeric or name).
34/// - `topic_id` - unique topic ID (numeric or name).
35/// - `partition_id` - partition ID from which messages will be polled. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`).
36/// - `strategy` - polling strategy which specifies from where to start polling messages.
37/// - `count` - number of messages to poll.
38/// - `auto_commit` - whether to commit offset on the server automatically after polling the messages.
39#[derive(Debug, Serialize, Deserialize, PartialEq)]
40pub struct PollMessages {
41    /// Consumer which will poll messages. Either regular consumer or consumer group.
42    #[serde(flatten)]
43    pub consumer: Consumer,
44    /// Unique stream ID (numeric or name).
45    #[serde(skip)]
46    pub stream_id: Identifier,
47    /// Unique topic ID (numeric or name).
48    #[serde(skip)]
49    pub topic_id: Identifier,
50    /// Partition ID from which messages will be polled. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`).
51    #[serde(default = "PollMessages::default_partition_id")]
52    pub partition_id: Option<u32>,
53    /// Polling strategy which specifies from where to start polling messages.
54    #[serde(default = "PollingStrategy::default", flatten)]
55    pub strategy: PollingStrategy,
56    /// Number of messages to poll.
57    #[serde(default = "PollMessages::default_number_of_messages_to_poll")]
58    pub count: u32,
59    /// Whether to commit offset on the server automatically after polling the messages.
60    #[serde(default)]
61    pub auto_commit: bool,
62}
63
64impl PollMessages {
65    pub fn bytes(
66        stream_id: &Identifier,
67        topic_id: &Identifier,
68        partition_id: Option<u32>,
69        consumer: &Consumer,
70        strategy: &PollingStrategy,
71        count: u32,
72        auto_commit: bool,
73    ) -> Bytes {
74        let consumer_bytes = consumer.to_bytes();
75        let stream_id_bytes = stream_id.to_bytes();
76        let topic_id_bytes = topic_id.to_bytes();
77        let strategy_bytes = strategy.to_bytes();
78        let mut bytes = BytesMut::with_capacity(
79            10 + consumer_bytes.len()
80                + stream_id_bytes.len()
81                + topic_id_bytes.len()
82                + strategy_bytes.len(),
83        );
84        bytes.put_slice(&consumer_bytes);
85        bytes.put_slice(&stream_id_bytes);
86        bytes.put_slice(&topic_id_bytes);
87        // Encode partition_id with a flag byte: 1 = Some, 0 = None
88        if let Some(partition_id) = partition_id {
89            bytes.put_u8(1);
90            bytes.put_u32_le(partition_id);
91        } else {
92            bytes.put_u8(0);
93            bytes.put_u32_le(0); // Padding to keep structure consistent
94        }
95        bytes.put_slice(&strategy_bytes);
96        bytes.put_u32_le(count);
97        if auto_commit {
98            bytes.put_u8(1);
99        } else {
100            bytes.put_u8(0);
101        }
102
103        bytes.freeze()
104    }
105
106    pub fn default_number_of_messages_to_poll() -> u32 {
107        DEFAULT_NUMBER_OF_MESSAGES_TO_POLL
108    }
109
110    pub fn default_partition_id() -> Option<u32> {
111        Some(DEFAULT_PARTITION_ID)
112    }
113}
114
115impl Default for PollMessages {
116    fn default() -> Self {
117        Self {
118            consumer: Consumer::default(),
119            stream_id: Identifier::numeric(1).unwrap(),
120            topic_id: Identifier::numeric(1).unwrap(),
121            partition_id: PollMessages::default_partition_id(),
122            strategy: PollingStrategy::default(),
123            count: PollMessages::default_number_of_messages_to_poll(),
124            auto_commit: false,
125        }
126    }
127}
128
129impl Command for PollMessages {
130    fn code(&self) -> u32 {
131        POLL_MESSAGES_CODE
132    }
133}
134
135impl Validatable<IggyError> for PollMessages {
136    fn validate(&self) -> Result<(), IggyError> {
137        Ok(())
138    }
139}
140
141impl BytesSerializable for PollMessages {
142    fn to_bytes(&self) -> Bytes {
143        PollMessages::bytes(
144            &self.stream_id,
145            &self.topic_id,
146            self.partition_id,
147            &self.consumer,
148            &self.strategy,
149            self.count,
150            self.auto_commit,
151        )
152    }
153
154    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
155        if bytes.len() < 30 {
156            return Err(IggyError::InvalidCommand);
157        }
158
159        let mut position = 0;
160        let consumer_kind =
161            ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
162        let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
163        position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
164        let consumer = Consumer {
165            kind: consumer_kind,
166            id: consumer_id,
167        };
168        let stream_id = Identifier::from_bytes(bytes.slice(position..))?;
169        position += stream_id.get_size_bytes().as_bytes_usize();
170        let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
171        position += topic_id.get_size_bytes().as_bytes_usize();
172        let has_partition_id = *bytes.get(position).ok_or(IggyError::InvalidCommand)?;
173        let partition_id_value = u32::from_le_bytes(
174            bytes
175                .get(position + 1..position + 5)
176                .ok_or(IggyError::InvalidCommand)?
177                .try_into()
178                .map_err(|_| IggyError::InvalidNumberEncoding)?,
179        );
180        let partition_id = if has_partition_id == 1 {
181            Some(partition_id_value)
182        } else {
183            None
184        };
185        let polling_kind =
186            PollingKind::from_code(*bytes.get(position + 5).ok_or(IggyError::InvalidCommand)?)?;
187        position += 6;
188        let value = u64::from_le_bytes(
189            bytes
190                .get(position..position + 8)
191                .ok_or(IggyError::InvalidCommand)?
192                .try_into()
193                .map_err(|_| IggyError::InvalidNumberEncoding)?,
194        );
195        let strategy = PollingStrategy {
196            kind: polling_kind,
197            value,
198        };
199        let count = u32::from_le_bytes(
200            bytes
201                .get(position + 8..position + 12)
202                .ok_or(IggyError::InvalidCommand)?
203                .try_into()
204                .map_err(|_| IggyError::InvalidNumberEncoding)?,
205        );
206        let auto_commit = *bytes.get(position + 12).ok_or(IggyError::InvalidCommand)? == 1;
207        let command = PollMessages {
208            consumer,
209            stream_id,
210            topic_id,
211            partition_id,
212            strategy,
213            count,
214            auto_commit,
215        };
216        Ok(command)
217    }
218}
219
220impl Display for PollMessages {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        write!(
223            f,
224            "{}|{}|{}|{}|{}|{}|{}",
225            self.consumer,
226            self.stream_id,
227            self.topic_id,
228            self.partition_id.unwrap_or(0),
229            self.strategy,
230            self.count,
231            auto_commit_to_string(self.auto_commit)
232        )
233    }
234}
235
236fn auto_commit_to_string(auto_commit: bool) -> &'static str {
237    if auto_commit { "a" } else { "n" }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn should_be_serialized_as_bytes() {
246        let command = PollMessages {
247            consumer: Consumer::new(Identifier::numeric(1).unwrap()),
248            stream_id: Identifier::numeric(2).unwrap(),
249            topic_id: Identifier::numeric(3).unwrap(),
250            partition_id: Some(4),
251            strategy: PollingStrategy::offset(2),
252            count: 3,
253            auto_commit: true,
254        };
255
256        let bytes = command.to_bytes();
257        let mut position = 0;
258        let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap();
259        let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap();
260        position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
261        let consumer = Consumer {
262            kind: consumer_kind,
263            id: consumer_id,
264        };
265        let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
266        position += stream_id.get_size_bytes().as_bytes_usize();
267        let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
268        position += topic_id.get_size_bytes().as_bytes_usize();
269        let has_partition_id = bytes[position];
270        let partition_id =
271            u32::from_le_bytes(bytes[position + 1..position + 5].try_into().unwrap());
272        let partition_id = if has_partition_id == 1 {
273            Some(partition_id)
274        } else {
275            None
276        };
277        let polling_kind = PollingKind::from_code(bytes[position + 5]).unwrap();
278        position += 6;
279        let value = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap());
280        let strategy = PollingStrategy {
281            kind: polling_kind,
282            value,
283        };
284        let count = u32::from_le_bytes(bytes[position + 8..position + 12].try_into().unwrap());
285        let auto_commit = bytes[position + 12];
286        let auto_commit = matches!(auto_commit, 1);
287
288        assert!(!bytes.is_empty());
289        assert_eq!(consumer, command.consumer);
290        assert_eq!(stream_id, command.stream_id);
291        assert_eq!(topic_id, command.topic_id);
292        assert_eq!(partition_id, command.partition_id);
293        assert_eq!(strategy, command.strategy);
294        assert_eq!(count, command.count);
295        assert_eq!(auto_commit, command.auto_commit);
296    }
297
298    #[test]
299    fn should_be_deserialized_from_bytes() {
300        let consumer = Consumer::new(Identifier::numeric(1).unwrap());
301        let stream_id = Identifier::numeric(2).unwrap();
302        let topic_id = Identifier::numeric(3).unwrap();
303        let partition_id = 4u32;
304        let strategy = PollingStrategy::offset(2);
305        let count = 3u32;
306        let auto_commit = 1u8;
307
308        let consumer_bytes = consumer.to_bytes();
309        let stream_id_bytes = stream_id.to_bytes();
310        let topic_id_bytes = topic_id.to_bytes();
311        let strategy_bytes = strategy.to_bytes();
312        let mut bytes = BytesMut::with_capacity(
313            10 + consumer_bytes.len()
314                + stream_id_bytes.len()
315                + topic_id_bytes.len()
316                + strategy_bytes.len(),
317        );
318        bytes.put_slice(&consumer_bytes);
319        bytes.put_slice(&stream_id_bytes);
320        bytes.put_slice(&topic_id_bytes);
321        bytes.put_u8(1); // Flag: partition_id is Some
322        bytes.put_u32_le(partition_id);
323        bytes.put_slice(&strategy_bytes);
324        bytes.put_u32_le(count);
325        bytes.put_u8(auto_commit);
326
327        let command = PollMessages::from_bytes(bytes.freeze());
328        assert!(command.is_ok());
329
330        let auto_commit = matches!(auto_commit, 1);
331
332        let command = command.unwrap();
333        assert_eq!(command.consumer, consumer);
334        assert_eq!(command.stream_id, stream_id);
335        assert_eq!(command.topic_id, topic_id);
336        assert_eq!(command.partition_id, Some(partition_id));
337        assert_eq!(command.strategy, strategy);
338        assert_eq!(command.count, count);
339        assert_eq!(command.auto_commit, auto_commit);
340    }
341}