rocketmq_common/common/message/
message_batch.rs1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::fmt::Formatter;
21
22use bytes::Bytes;
23use cheetah_string::CheetahString;
24use rocketmq_error::RocketMQError;
26
27use crate::common::message::message_decoder;
28use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
29use crate::common::message::message_property::MessageProperties;
30use crate::common::message::message_single::Message;
31use crate::common::message::MessageTrait;
32use crate::common::mix_all;
33
34#[derive(Clone, Default, Debug)]
35pub struct MessageBatch {
36 pub final_message: Message,
38
39 pub messages: Vec<Message>,
41}
42
43impl MessageBatch {
44 #[inline]
46 pub fn encode(&self) -> Bytes {
47 message_decoder::encode_messages(&self.messages)
48 }
49
50 #[inline]
52 pub fn iter(&self) -> std::slice::Iter<'_, Message> {
53 self.messages.iter()
54 }
55
56 #[inline]
58 pub fn len(&self) -> usize {
59 self.messages.len()
60 }
61
62 #[inline]
64 pub fn is_empty(&self) -> bool {
65 self.messages.is_empty()
66 }
67
68 pub fn generate_from_vec<M>(messages: Vec<M>) -> rocketmq_error::RocketMQResult<MessageBatch>
69 where
70 M: MessageTrait,
71 {
72 if messages.is_empty() {
73 return Err(RocketMQError::illegal_argument(
74 "MessageBatch::generate_from_vec: messages is empty",
75 ));
76 }
77
78 let mut message_list = Vec::with_capacity(messages.len());
79 for msg in messages {
80 if let Some(m) = msg.as_any().downcast_ref::<Message>() {
81 message_list.push(m.clone());
82 } else {
83 let mut m = Message::default();
84 m.set_topic(msg.topic().clone());
85 if let Some(body) = msg.get_body() {
86 m.set_body(Some(body.clone()));
87 }
88 m.set_flag(msg.get_flag());
89 if let Some(transaction_id) = msg.transaction_id() {
90 m.set_transaction_id(transaction_id.clone());
91 }
92 m.set_properties(msg.get_properties().clone());
93 message_list.push(m);
94 }
95 }
96
97 let mut first: Option<&Message> = None;
98 for message in &message_list {
99 if message.delay_time_level() > 0 {
100 return Err(RocketMQError::illegal_argument(
101 "TimeDelayLevel is not supported for batching",
102 ));
103 }
104 if message.topic().starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
105 return Err(RocketMQError::illegal_argument(
106 "Retry group topic is not supported for batching",
107 ));
108 }
109
110 if let Some(first_message) = first {
111 if first_message.topic() != message.topic() {
112 return Err(RocketMQError::illegal_argument(
113 "The topic of the messages in one batch should be the same",
114 ));
115 }
116 if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
117 return Err(RocketMQError::illegal_argument(
118 "The waitStoreMsgOK of the messages in one batch should the same",
119 ));
120 }
121 } else {
122 first = Some(message);
123 }
124 }
125 let first = first.unwrap();
126 let mut final_message = Message::default();
127 final_message.set_topic(first.topic().clone());
128 final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
129 Ok(MessageBatch {
130 final_message,
131 messages: message_list,
132 })
133 }
134}
135
136impl IntoIterator for MessageBatch {
137 type Item = Message;
138 type IntoIter = std::vec::IntoIter<Message>;
139
140 fn into_iter(self) -> Self::IntoIter {
141 self.messages.into_iter()
142 }
143}
144
145impl<'a> IntoIterator for &'a MessageBatch {
146 type Item = &'a Message;
147 type IntoIter = std::slice::Iter<'a, Message>;
148
149 fn into_iter(self) -> Self::IntoIter {
150 self.messages.iter()
151 }
152}
153
154impl fmt::Display for MessageBatch {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 let messages_str = self
157 .messages
158 .iter()
159 .map(|msg| msg.to_string())
160 .collect::<Vec<_>>()
161 .join(", ");
162
163 write!(
164 f,
165 "MessageBatch {{ final_message: {}, messages: {} }}",
166 self.final_message, messages_str
167 )
168 }
169}
170
171#[allow(unused_variables)]
172impl MessageTrait for MessageBatch {
173 #[inline]
174 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
175 self.final_message.properties_mut().as_map_mut().insert(key, value);
176 }
177
178 #[inline]
179 fn clear_property(&mut self, name: &str) {
180 self.final_message.properties_mut().as_map_mut().remove(name);
181 }
182
183 #[inline]
184 fn property(&self, name: &CheetahString) -> Option<CheetahString> {
185 self.final_message.properties().as_map().get(name).cloned()
186 }
187
188 fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
189 self.final_message.properties().as_map().get(name)
190 }
191
192 #[inline]
193 fn topic(&self) -> &CheetahString {
194 self.final_message.topic()
195 }
196
197 #[inline]
198 fn set_topic(&mut self, topic: CheetahString) {
199 self.final_message.set_topic(topic);
200 }
201
202 #[inline]
203 fn get_flag(&self) -> i32 {
204 self.final_message.flag()
205 }
206
207 #[inline]
208 fn set_flag(&mut self, flag: i32) {
209 self.final_message.set_flag(flag);
210 }
211
212 #[inline]
213 fn get_body(&self) -> Option<&Bytes> {
214 self.final_message.get_body()
215 }
216
217 #[inline]
218 fn set_body(&mut self, body: Bytes) {
219 self.final_message.set_body(Some(body));
220 }
221
222 #[inline]
223 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
224 self.final_message.properties().as_map()
225 }
226
227 #[inline]
228 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
229 *self.final_message.properties_mut() = MessageProperties::from_map(properties);
230 }
231
232 #[inline]
233 fn transaction_id(&self) -> Option<&CheetahString> {
234 MessageTrait::transaction_id(&self.final_message)
235 }
236
237 #[inline]
238 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
239 *self.final_message.transaction_id_mut() = Some(transaction_id);
240 }
241
242 #[inline]
243 fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
244 self.final_message.body_mut().compressed_mut().as_mut()
245 }
246
247 #[inline]
248 fn get_compressed_body(&self) -> Option<&Bytes> {
249 self.final_message.compressed_body()
250 }
251
252 #[inline]
253 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
254 self.final_message.set_compressed_body_mut(compressed_body);
255 }
256
257 #[inline]
258 fn take_body(&mut self) -> Option<Bytes> {
259 self.final_message.take_body()
260 }
261
262 fn as_any(&self) -> &dyn Any {
263 self
264 }
265
266 fn as_any_mut(&mut self) -> &mut dyn Any {
267 self
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use bytes::Bytes;
274 use cheetah_string::CheetahString;
275
276 use super::*;
277
278 fn create_test_message(topic: &str) -> Message {
279 let mut msg = Message::default();
280 msg.set_topic(CheetahString::from_string(topic.to_string()));
281 msg.set_body(Some(Bytes::from_static(b"test body")));
282 msg
283 }
284
285 #[test]
286 fn test_generate_from_vec_ok() {
287 let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
288 let result = MessageBatch::generate_from_vec(messages);
289 assert!(result.is_ok());
290 let batch = result.unwrap();
291 assert_eq!(batch.messages.len(), 2);
292 assert_eq!(batch.len(), 2);
293 assert!(!batch.is_empty());
294 }
295
296 #[test]
297 fn test_generate_from_vec_empty() {
298 let messages: Vec<Message> = vec![];
299 let result = MessageBatch::generate_from_vec(messages);
300 assert!(result.is_err());
301 let err = result.unwrap_err();
302 assert!(err.to_string().contains("empty"));
303 }
304
305 #[test]
306 fn test_generate_from_vec_different_topic() {
307 let messages = vec![
308 create_test_message("topic1"),
309 create_test_message("topic2"), ];
311 let result = MessageBatch::generate_from_vec(messages);
312 assert!(result.is_err());
313 let err = result.unwrap_err();
314 assert!(err.to_string().contains("topic"));
315 }
316
317 #[test]
318 fn test_generate_from_vec_delay_message() {
319 let msg1 = create_test_message("topic1");
320 let mut msg2 = create_test_message("topic1");
321 msg2.set_delay_time_level(1); let messages = vec![msg1, msg2];
324 let result = MessageBatch::generate_from_vec(messages);
325 assert!(result.is_err());
326 let err = result.unwrap_err();
327 assert!(err.to_string().contains("TimeDelayLevel"));
328 }
329
330 #[test]
331 fn test_generate_from_vec_retry_group() {
332 let messages = vec![
333 create_test_message("topic1"),
334 create_test_message("%RETRY%topic1"), ];
336 let result = MessageBatch::generate_from_vec(messages);
337 assert!(result.is_err());
338 let err = result.unwrap_err();
339 assert!(err.to_string().contains("Retry"));
340 }
341
342 #[test]
343 fn test_generate_from_vec_different_wait_store_msg_ok() {
344 let mut msg1 = create_test_message("topic1");
345 let mut msg2 = create_test_message("topic1");
346 msg1.set_wait_store_msg_ok(true);
347 msg2.set_wait_store_msg_ok(false); let messages = vec![msg1, msg2];
350 let result = MessageBatch::generate_from_vec(messages);
351 assert!(result.is_err());
352 let err = result.unwrap_err();
353 assert!(err.to_string().contains("waitStoreMsgOK"));
354 }
355
356 #[test]
357 fn test_encode() {
358 let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
359 let batch = MessageBatch::generate_from_vec(messages).unwrap();
360 let encoded = batch.encode();
361 assert!(!encoded.is_empty());
362 }
363
364 #[test]
365 fn test_iterator_slice() {
366 let messages = vec![
367 create_test_message("topic1"),
368 create_test_message("topic1"),
369 create_test_message("topic1"),
370 ];
371 let batch = MessageBatch::generate_from_vec(messages).unwrap();
372
373 let count = batch.iter().count();
374 assert_eq!(count, 3);
375 }
376
377 #[test]
378 fn test_into_iterator() {
379 let messages = vec![
380 create_test_message("topic1"),
381 create_test_message("topic1"),
382 create_test_message("topic1"),
383 ];
384 let batch = MessageBatch::generate_from_vec(messages).unwrap();
385
386 let count = batch.into_iter().count();
387 assert_eq!(count, 3);
388 }
389
390 #[test]
391 fn test_into_iterator_ref() {
392 let messages = vec![
393 create_test_message("topic1"),
394 create_test_message("topic1"),
395 create_test_message("topic1"),
396 ];
397 let batch = MessageBatch::generate_from_vec(messages).unwrap();
398
399 let count = (&batch).into_iter().count();
400 assert_eq!(count, 3);
401
402 assert_eq!(batch.len(), 3);
404 }
405
406 #[test]
407 fn test_batch_properties_inherited() {
408 let mut msg1 = create_test_message("topic1");
409 msg1.set_wait_store_msg_ok(true);
410
411 let messages = vec![msg1.clone(), msg1];
412 let batch = MessageBatch::generate_from_vec(messages).unwrap();
413
414 assert_eq!(batch.final_message.topic().as_str(), "topic1");
415 assert!(batch.final_message.is_wait_store_msg_ok());
416 }
417
418 #[test]
419 fn test_display_format() {
420 let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
421 let batch = MessageBatch::generate_from_vec(messages).unwrap();
422
423 let display = format!("{}", batch);
424 assert!(display.contains("MessageBatch"));
425 assert!(display.contains("topic1"));
426 }
427}
428
429#[derive(Debug, Default)]
430pub struct MessageExtBatch {
431 pub message_ext_broker_inner: MessageExtBrokerInner,
432 pub is_inner_batch: bool,
433 pub encoded_buff: Option<bytes::BytesMut>,
434}
435
436impl MessageExtBatch {
437 pub fn wrap(&self) -> Option<Bytes> {
438 self.message_ext_broker_inner.body()
439 }
440
441 pub fn get_tags(&self) -> Option<CheetahString> {
442 self.message_ext_broker_inner.get_tags()
443 }
444}