iggy_common/commands/messages/
poll_messages.rs1use crate::error::IggyError;
20use crate::{BytesSerializable, Identifier, PollingKind, PollingStrategy, Sizeable, Validatable};
21use crate::{Command, POLL_MESSAGES_CODE};
22use crate::{Consumer, ConsumerKind};
23use bytes::{BufMut, Bytes, BytesMut};
24use serde::{Deserialize, Serialize};
25use std::fmt::Display;
26
27pub const DEFAULT_PARTITION_ID: u32 = 0;
28pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10;
29
30#[derive(Debug, Serialize, Deserialize, PartialEq)]
40pub struct PollMessages {
41 #[serde(flatten)]
43 pub consumer: Consumer,
44 #[serde(skip)]
46 pub stream_id: Identifier,
47 #[serde(skip)]
49 pub topic_id: Identifier,
50 #[serde(default = "PollMessages::default_partition_id")]
52 pub partition_id: Option<u32>,
53 #[serde(default = "PollingStrategy::default", flatten)]
55 pub strategy: PollingStrategy,
56 #[serde(default = "PollMessages::default_number_of_messages_to_poll")]
58 pub count: u32,
59 #[serde(default)]
61 pub auto_commit: bool,
62}
63
64impl PollMessages {
65 pub fn bytes(
66 stream_id: &Identifier,
67 topic_id: &Identifier,
68 partition_id: Option<u32>,
69 consumer: &Consumer,
70 strategy: &PollingStrategy,
71 count: u32,
72 auto_commit: bool,
73 ) -> Bytes {
74 let consumer_bytes = consumer.to_bytes();
75 let stream_id_bytes = stream_id.to_bytes();
76 let topic_id_bytes = topic_id.to_bytes();
77 let strategy_bytes = strategy.to_bytes();
78 let mut bytes = BytesMut::with_capacity(
79 10 + consumer_bytes.len()
80 + stream_id_bytes.len()
81 + topic_id_bytes.len()
82 + strategy_bytes.len(),
83 );
84 bytes.put_slice(&consumer_bytes);
85 bytes.put_slice(&stream_id_bytes);
86 bytes.put_slice(&topic_id_bytes);
87 if let Some(partition_id) = partition_id {
89 bytes.put_u8(1);
90 bytes.put_u32_le(partition_id);
91 } else {
92 bytes.put_u8(0);
93 bytes.put_u32_le(0); }
95 bytes.put_slice(&strategy_bytes);
96 bytes.put_u32_le(count);
97 if auto_commit {
98 bytes.put_u8(1);
99 } else {
100 bytes.put_u8(0);
101 }
102
103 bytes.freeze()
104 }
105
106 pub fn default_number_of_messages_to_poll() -> u32 {
107 DEFAULT_NUMBER_OF_MESSAGES_TO_POLL
108 }
109
110 pub fn default_partition_id() -> Option<u32> {
111 Some(DEFAULT_PARTITION_ID)
112 }
113}
114
115impl Default for PollMessages {
116 fn default() -> Self {
117 Self {
118 consumer: Consumer::default(),
119 stream_id: Identifier::numeric(1).unwrap(),
120 topic_id: Identifier::numeric(1).unwrap(),
121 partition_id: PollMessages::default_partition_id(),
122 strategy: PollingStrategy::default(),
123 count: PollMessages::default_number_of_messages_to_poll(),
124 auto_commit: false,
125 }
126 }
127}
128
129impl Command for PollMessages {
130 fn code(&self) -> u32 {
131 POLL_MESSAGES_CODE
132 }
133}
134
135impl Validatable<IggyError> for PollMessages {
136 fn validate(&self) -> Result<(), IggyError> {
137 Ok(())
138 }
139}
140
141impl BytesSerializable for PollMessages {
142 fn to_bytes(&self) -> Bytes {
143 PollMessages::bytes(
144 &self.stream_id,
145 &self.topic_id,
146 self.partition_id,
147 &self.consumer,
148 &self.strategy,
149 self.count,
150 self.auto_commit,
151 )
152 }
153
154 fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
155 if bytes.len() < 30 {
156 return Err(IggyError::InvalidCommand);
157 }
158
159 let mut position = 0;
160 let consumer_kind =
161 ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
162 let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
163 position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
164 let consumer = Consumer {
165 kind: consumer_kind,
166 id: consumer_id,
167 };
168 let stream_id = Identifier::from_bytes(bytes.slice(position..))?;
169 position += stream_id.get_size_bytes().as_bytes_usize();
170 let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
171 position += topic_id.get_size_bytes().as_bytes_usize();
172 let has_partition_id = *bytes.get(position).ok_or(IggyError::InvalidCommand)?;
173 let partition_id_value = u32::from_le_bytes(
174 bytes
175 .get(position + 1..position + 5)
176 .ok_or(IggyError::InvalidCommand)?
177 .try_into()
178 .map_err(|_| IggyError::InvalidNumberEncoding)?,
179 );
180 let partition_id = if has_partition_id == 1 {
181 Some(partition_id_value)
182 } else {
183 None
184 };
185 let polling_kind =
186 PollingKind::from_code(*bytes.get(position + 5).ok_or(IggyError::InvalidCommand)?)?;
187 position += 6;
188 let value = u64::from_le_bytes(
189 bytes
190 .get(position..position + 8)
191 .ok_or(IggyError::InvalidCommand)?
192 .try_into()
193 .map_err(|_| IggyError::InvalidNumberEncoding)?,
194 );
195 let strategy = PollingStrategy {
196 kind: polling_kind,
197 value,
198 };
199 let count = u32::from_le_bytes(
200 bytes
201 .get(position + 8..position + 12)
202 .ok_or(IggyError::InvalidCommand)?
203 .try_into()
204 .map_err(|_| IggyError::InvalidNumberEncoding)?,
205 );
206 let auto_commit = *bytes.get(position + 12).ok_or(IggyError::InvalidCommand)? == 1;
207 let command = PollMessages {
208 consumer,
209 stream_id,
210 topic_id,
211 partition_id,
212 strategy,
213 count,
214 auto_commit,
215 };
216 Ok(command)
217 }
218}
219
220impl Display for PollMessages {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 write!(
223 f,
224 "{}|{}|{}|{}|{}|{}|{}",
225 self.consumer,
226 self.stream_id,
227 self.topic_id,
228 self.partition_id.unwrap_or(0),
229 self.strategy,
230 self.count,
231 auto_commit_to_string(self.auto_commit)
232 )
233 }
234}
235
236fn auto_commit_to_string(auto_commit: bool) -> &'static str {
237 if auto_commit { "a" } else { "n" }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn should_be_serialized_as_bytes() {
246 let command = PollMessages {
247 consumer: Consumer::new(Identifier::numeric(1).unwrap()),
248 stream_id: Identifier::numeric(2).unwrap(),
249 topic_id: Identifier::numeric(3).unwrap(),
250 partition_id: Some(4),
251 strategy: PollingStrategy::offset(2),
252 count: 3,
253 auto_commit: true,
254 };
255
256 let bytes = command.to_bytes();
257 let mut position = 0;
258 let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap();
259 let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap();
260 position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
261 let consumer = Consumer {
262 kind: consumer_kind,
263 id: consumer_id,
264 };
265 let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
266 position += stream_id.get_size_bytes().as_bytes_usize();
267 let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap();
268 position += topic_id.get_size_bytes().as_bytes_usize();
269 let has_partition_id = bytes[position];
270 let partition_id =
271 u32::from_le_bytes(bytes[position + 1..position + 5].try_into().unwrap());
272 let partition_id = if has_partition_id == 1 {
273 Some(partition_id)
274 } else {
275 None
276 };
277 let polling_kind = PollingKind::from_code(bytes[position + 5]).unwrap();
278 position += 6;
279 let value = u64::from_le_bytes(bytes[position..position + 8].try_into().unwrap());
280 let strategy = PollingStrategy {
281 kind: polling_kind,
282 value,
283 };
284 let count = u32::from_le_bytes(bytes[position + 8..position + 12].try_into().unwrap());
285 let auto_commit = bytes[position + 12];
286 let auto_commit = matches!(auto_commit, 1);
287
288 assert!(!bytes.is_empty());
289 assert_eq!(consumer, command.consumer);
290 assert_eq!(stream_id, command.stream_id);
291 assert_eq!(topic_id, command.topic_id);
292 assert_eq!(partition_id, command.partition_id);
293 assert_eq!(strategy, command.strategy);
294 assert_eq!(count, command.count);
295 assert_eq!(auto_commit, command.auto_commit);
296 }
297
298 #[test]
299 fn should_be_deserialized_from_bytes() {
300 let consumer = Consumer::new(Identifier::numeric(1).unwrap());
301 let stream_id = Identifier::numeric(2).unwrap();
302 let topic_id = Identifier::numeric(3).unwrap();
303 let partition_id = 4u32;
304 let strategy = PollingStrategy::offset(2);
305 let count = 3u32;
306 let auto_commit = 1u8;
307
308 let consumer_bytes = consumer.to_bytes();
309 let stream_id_bytes = stream_id.to_bytes();
310 let topic_id_bytes = topic_id.to_bytes();
311 let strategy_bytes = strategy.to_bytes();
312 let mut bytes = BytesMut::with_capacity(
313 10 + consumer_bytes.len()
314 + stream_id_bytes.len()
315 + topic_id_bytes.len()
316 + strategy_bytes.len(),
317 );
318 bytes.put_slice(&consumer_bytes);
319 bytes.put_slice(&stream_id_bytes);
320 bytes.put_slice(&topic_id_bytes);
321 bytes.put_u8(1); bytes.put_u32_le(partition_id);
323 bytes.put_slice(&strategy_bytes);
324 bytes.put_u32_le(count);
325 bytes.put_u8(auto_commit);
326
327 let command = PollMessages::from_bytes(bytes.freeze());
328 assert!(command.is_ok());
329
330 let auto_commit = matches!(auto_commit, 1);
331
332 let command = command.unwrap();
333 assert_eq!(command.consumer, consumer);
334 assert_eq!(command.stream_id, stream_id);
335 assert_eq!(command.topic_id, topic_id);
336 assert_eq!(command.partition_id, Some(partition_id));
337 assert_eq!(command.strategy, strategy);
338 assert_eq!(command.count, count);
339 assert_eq!(command.auto_commit, auto_commit);
340 }
341}