Skip to main content

rocketmq_common/common/message/
message_batch.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt;
18use std::fmt::Debug;
19use std::fmt::Display;
20use std::fmt::Formatter;
21
22use bytes::Bytes;
23use cheetah_string::CheetahString;
24// Use new unified error system
25use rocketmq_error::RocketMQError;
26
27use crate::common::message::message_decoder;
28use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
29use crate::common::message::message_property::MessageProperties;
30use crate::common::message::message_single::Message;
31use crate::common::message::MessageTrait;
32use crate::common::mix_all;
33
34#[derive(Clone, Default, Debug)]
35pub struct MessageBatch {
36    ///`final_message` stores the batch-encoded messages.
37    pub final_message: Message,
38
39    ///`messages` stores the batch of initialized messages.
40    pub messages: Vec<Message>,
41}
42
43impl MessageBatch {
44    /// Encode all messages in the batch.
45    #[inline]
46    pub fn encode(&self) -> Bytes {
47        message_decoder::encode_messages(&self.messages)
48    }
49
50    /// Get an iterator over the messages in the batch.
51    #[inline]
52    pub fn iter(&self) -> std::slice::Iter<'_, Message> {
53        self.messages.iter()
54    }
55
56    /// Get the number of messages in the batch.
57    #[inline]
58    pub fn len(&self) -> usize {
59        self.messages.len()
60    }
61
62    /// Check if the batch is empty.
63    #[inline]
64    pub fn is_empty(&self) -> bool {
65        self.messages.is_empty()
66    }
67
68    pub fn generate_from_vec<M>(messages: Vec<M>) -> rocketmq_error::RocketMQResult<MessageBatch>
69    where
70        M: MessageTrait,
71    {
72        if messages.is_empty() {
73            return Err(RocketMQError::illegal_argument(
74                "MessageBatch::generate_from_vec: messages is empty",
75            ));
76        }
77
78        let mut message_list = Vec::with_capacity(messages.len());
79        for msg in messages {
80            if let Some(m) = msg.as_any().downcast_ref::<Message>() {
81                message_list.push(m.clone());
82            } else {
83                let mut m = Message::default();
84                m.set_topic(msg.topic().clone());
85                if let Some(body) = msg.get_body() {
86                    m.set_body(Some(body.clone()));
87                }
88                m.set_flag(msg.get_flag());
89                if let Some(transaction_id) = msg.transaction_id() {
90                    m.set_transaction_id(transaction_id.clone());
91                }
92                m.set_properties(msg.get_properties().clone());
93                message_list.push(m);
94            }
95        }
96
97        let mut first: Option<&Message> = None;
98        for message in &message_list {
99            if message.delay_time_level() > 0 {
100                return Err(RocketMQError::illegal_argument(
101                    "TimeDelayLevel is not supported for batching",
102                ));
103            }
104            if message.topic().starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
105                return Err(RocketMQError::illegal_argument(
106                    "Retry group topic is not supported for batching",
107                ));
108            }
109
110            if let Some(first_message) = first {
111                if first_message.topic() != message.topic() {
112                    return Err(RocketMQError::illegal_argument(
113                        "The topic of the messages in one batch should be the same",
114                    ));
115                }
116                if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
117                    return Err(RocketMQError::illegal_argument(
118                        "The waitStoreMsgOK of the messages in one batch should the same",
119                    ));
120                }
121            } else {
122                first = Some(message);
123            }
124        }
125        let first = first.unwrap();
126        let mut final_message = Message::default();
127        final_message.set_topic(first.topic().clone());
128        final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
129        Ok(MessageBatch {
130            final_message,
131            messages: message_list,
132        })
133    }
134}
135
136impl IntoIterator for MessageBatch {
137    type Item = Message;
138    type IntoIter = std::vec::IntoIter<Message>;
139
140    fn into_iter(self) -> Self::IntoIter {
141        self.messages.into_iter()
142    }
143}
144
145impl<'a> IntoIterator for &'a MessageBatch {
146    type Item = &'a Message;
147    type IntoIter = std::slice::Iter<'a, Message>;
148
149    fn into_iter(self) -> Self::IntoIter {
150        self.messages.iter()
151    }
152}
153
154impl fmt::Display for MessageBatch {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        let messages_str = self
157            .messages
158            .iter()
159            .map(|msg| msg.to_string())
160            .collect::<Vec<_>>()
161            .join(", ");
162
163        write!(
164            f,
165            "MessageBatch {{ final_message: {}, messages: {} }}",
166            self.final_message, messages_str
167        )
168    }
169}
170
171#[allow(unused_variables)]
172impl MessageTrait for MessageBatch {
173    #[inline]
174    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
175        self.final_message.properties_mut().as_map_mut().insert(key, value);
176    }
177
178    #[inline]
179    fn clear_property(&mut self, name: &str) {
180        self.final_message.properties_mut().as_map_mut().remove(name);
181    }
182
183    #[inline]
184    fn property(&self, name: &CheetahString) -> Option<CheetahString> {
185        self.final_message.properties().as_map().get(name).cloned()
186    }
187
188    fn property_ref(&self, name: &CheetahString) -> Option<&CheetahString> {
189        self.final_message.properties().as_map().get(name)
190    }
191
192    #[inline]
193    fn topic(&self) -> &CheetahString {
194        self.final_message.topic()
195    }
196
197    #[inline]
198    fn set_topic(&mut self, topic: CheetahString) {
199        self.final_message.set_topic(topic);
200    }
201
202    #[inline]
203    fn get_flag(&self) -> i32 {
204        self.final_message.flag()
205    }
206
207    #[inline]
208    fn set_flag(&mut self, flag: i32) {
209        self.final_message.set_flag(flag);
210    }
211
212    #[inline]
213    fn get_body(&self) -> Option<&Bytes> {
214        self.final_message.get_body()
215    }
216
217    #[inline]
218    fn set_body(&mut self, body: Bytes) {
219        self.final_message.set_body(Some(body));
220    }
221
222    #[inline]
223    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
224        self.final_message.properties().as_map()
225    }
226
227    #[inline]
228    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
229        *self.final_message.properties_mut() = MessageProperties::from_map(properties);
230    }
231
232    #[inline]
233    fn transaction_id(&self) -> Option<&CheetahString> {
234        MessageTrait::transaction_id(&self.final_message)
235    }
236
237    #[inline]
238    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
239        *self.final_message.transaction_id_mut() = Some(transaction_id);
240    }
241
242    #[inline]
243    fn get_compressed_body_mut(&mut self) -> Option<&mut Bytes> {
244        self.final_message.body_mut().compressed_mut().as_mut()
245    }
246
247    #[inline]
248    fn get_compressed_body(&self) -> Option<&Bytes> {
249        self.final_message.compressed_body()
250    }
251
252    #[inline]
253    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
254        self.final_message.set_compressed_body_mut(compressed_body);
255    }
256
257    #[inline]
258    fn take_body(&mut self) -> Option<Bytes> {
259        self.final_message.take_body()
260    }
261
262    fn as_any(&self) -> &dyn Any {
263        self
264    }
265
266    fn as_any_mut(&mut self) -> &mut dyn Any {
267        self
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use bytes::Bytes;
274    use cheetah_string::CheetahString;
275
276    use super::*;
277
278    fn create_test_message(topic: &str) -> Message {
279        let mut msg = Message::default();
280        msg.set_topic(CheetahString::from_string(topic.to_string()));
281        msg.set_body(Some(Bytes::from_static(b"test body")));
282        msg
283    }
284
285    #[test]
286    fn test_generate_from_vec_ok() {
287        let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
288        let result = MessageBatch::generate_from_vec(messages);
289        assert!(result.is_ok());
290        let batch = result.unwrap();
291        assert_eq!(batch.messages.len(), 2);
292        assert_eq!(batch.len(), 2);
293        assert!(!batch.is_empty());
294    }
295
296    #[test]
297    fn test_generate_from_vec_empty() {
298        let messages: Vec<Message> = vec![];
299        let result = MessageBatch::generate_from_vec(messages);
300        assert!(result.is_err());
301        let err = result.unwrap_err();
302        assert!(err.to_string().contains("empty"));
303    }
304
305    #[test]
306    fn test_generate_from_vec_different_topic() {
307        let messages = vec![
308            create_test_message("topic1"),
309            create_test_message("topic2"), // Different topic
310        ];
311        let result = MessageBatch::generate_from_vec(messages);
312        assert!(result.is_err());
313        let err = result.unwrap_err();
314        assert!(err.to_string().contains("topic"));
315    }
316
317    #[test]
318    fn test_generate_from_vec_delay_message() {
319        let msg1 = create_test_message("topic1");
320        let mut msg2 = create_test_message("topic1");
321        msg2.set_delay_time_level(1); // Set delay level
322
323        let messages = vec![msg1, msg2];
324        let result = MessageBatch::generate_from_vec(messages);
325        assert!(result.is_err());
326        let err = result.unwrap_err();
327        assert!(err.to_string().contains("TimeDelayLevel"));
328    }
329
330    #[test]
331    fn test_generate_from_vec_retry_group() {
332        let messages = vec![
333            create_test_message("topic1"),
334            create_test_message("%RETRY%topic1"), // Retry group
335        ];
336        let result = MessageBatch::generate_from_vec(messages);
337        assert!(result.is_err());
338        let err = result.unwrap_err();
339        assert!(err.to_string().contains("Retry"));
340    }
341
342    #[test]
343    fn test_generate_from_vec_different_wait_store_msg_ok() {
344        let mut msg1 = create_test_message("topic1");
345        let mut msg2 = create_test_message("topic1");
346        msg1.set_wait_store_msg_ok(true);
347        msg2.set_wait_store_msg_ok(false); // Different waitStoreMsgOK
348
349        let messages = vec![msg1, msg2];
350        let result = MessageBatch::generate_from_vec(messages);
351        assert!(result.is_err());
352        let err = result.unwrap_err();
353        assert!(err.to_string().contains("waitStoreMsgOK"));
354    }
355
356    #[test]
357    fn test_encode() {
358        let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
359        let batch = MessageBatch::generate_from_vec(messages).unwrap();
360        let encoded = batch.encode();
361        assert!(!encoded.is_empty());
362    }
363
364    #[test]
365    fn test_iterator_slice() {
366        let messages = vec![
367            create_test_message("topic1"),
368            create_test_message("topic1"),
369            create_test_message("topic1"),
370        ];
371        let batch = MessageBatch::generate_from_vec(messages).unwrap();
372
373        let count = batch.iter().count();
374        assert_eq!(count, 3);
375    }
376
377    #[test]
378    fn test_into_iterator() {
379        let messages = vec![
380            create_test_message("topic1"),
381            create_test_message("topic1"),
382            create_test_message("topic1"),
383        ];
384        let batch = MessageBatch::generate_from_vec(messages).unwrap();
385
386        let count = batch.into_iter().count();
387        assert_eq!(count, 3);
388    }
389
390    #[test]
391    fn test_into_iterator_ref() {
392        let messages = vec![
393            create_test_message("topic1"),
394            create_test_message("topic1"),
395            create_test_message("topic1"),
396        ];
397        let batch = MessageBatch::generate_from_vec(messages).unwrap();
398
399        let count = (&batch).into_iter().count();
400        assert_eq!(count, 3);
401
402        // Can still use batch after borrowing
403        assert_eq!(batch.len(), 3);
404    }
405
406    #[test]
407    fn test_batch_properties_inherited() {
408        let mut msg1 = create_test_message("topic1");
409        msg1.set_wait_store_msg_ok(true);
410
411        let messages = vec![msg1.clone(), msg1];
412        let batch = MessageBatch::generate_from_vec(messages).unwrap();
413
414        assert_eq!(batch.final_message.topic().as_str(), "topic1");
415        assert!(batch.final_message.is_wait_store_msg_ok());
416    }
417
418    #[test]
419    fn test_display_format() {
420        let messages = vec![create_test_message("topic1"), create_test_message("topic1")];
421        let batch = MessageBatch::generate_from_vec(messages).unwrap();
422
423        let display = format!("{}", batch);
424        assert!(display.contains("MessageBatch"));
425        assert!(display.contains("topic1"));
426    }
427}
428
429#[derive(Debug, Default)]
430pub struct MessageExtBatch {
431    pub message_ext_broker_inner: MessageExtBrokerInner,
432    pub is_inner_batch: bool,
433    pub encoded_buff: Option<bytes::BytesMut>,
434}
435
436impl MessageExtBatch {
437    pub fn wrap(&self) -> Option<Bytes> {
438        self.message_ext_broker_inner.body()
439    }
440
441    pub fn get_tags(&self) -> Option<CheetahString> {
442        self.message_ext_broker_inner.get_tags()
443    }
444}