rocketmq_common/common/message/
message_batch.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::path::Iter;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27// Use new unified error system
28use rocketmq_error::RocketMQError;
29
30use crate::common::message::message_decoder;
31use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
32use crate::common::message::message_single::Message;
33use crate::common::message::MessageTrait;
34use crate::common::mix_all;
35
36#[derive(Clone, Default, Debug)]
37pub struct MessageBatch {
38    ///`final_message` stores the batch-encoded messages.
39    pub final_message: Message,
40
41    ///`messages` stores the batch of initialized messages.
42    pub messages: Option<Vec<Message>>,
43}
44
45impl Iterator for MessageBatch {
46    type Item = Message;
47
48    fn next(&mut self) -> Option<Self::Item> {
49        match &self.messages {
50            Some(messages) => {
51                if let Some(message) = messages.iter().next() {
52                    return Some(message.clone());
53                }
54                None
55            }
56            None => None,
57        }
58    }
59}
60
61impl MessageBatch {
62    #[inline]
63    pub fn encode(&self) -> Bytes {
64        message_decoder::encode_messages(self.messages.as_ref().unwrap())
65    }
66
67    pub fn generate_from_vec(
68        messages: Vec<Message>,
69    ) -> rocketmq_error::RocketMQResult<MessageBatch> {
70        if messages.is_empty() {
71            return Err(RocketMQError::illegal_argument(
72                "MessageBatch::generate_from_vec: messages is empty",
73            ));
74        }
75        let mut first: Option<&Message> = None;
76        for message in &messages {
77            if message.get_delay_time_level() > 0 {
78                return Err(RocketMQError::illegal_argument(
79                    "TimeDelayLevel is not supported for batching",
80                ));
81            }
82            if message
83                .get_topic()
84                .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
85            {
86                return Err(RocketMQError::illegal_argument(
87                    "Retry group topic is not supported for batching",
88                ));
89            }
90
91            if let Some(first_message) = first {
92                let first_message = first.unwrap();
93                if first_message.get_topic() != message.get_topic() {
94                    return Err(RocketMQError::illegal_argument(
95                        "The topic of the messages in one batch should be the same",
96                    ));
97                }
98                if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
99                    return Err(RocketMQError::illegal_argument(
100                        "The waitStoreMsgOK of the messages in one batch should the same",
101                    ));
102                }
103            } else {
104                first = Some(message);
105            }
106        }
107        let first = first.unwrap();
108        let mut final_message = Message {
109            topic: first.topic.clone(),
110            ..Message::default()
111        };
112        final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
113        Ok(MessageBatch {
114            final_message,
115            messages: Some(messages),
116        })
117    }
118}
119
120impl fmt::Display for MessageBatch {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        let messages_str = match &self.messages {
123            Some(messages) => messages
124                .iter()
125                .map(|msg| msg.to_string())
126                .collect::<Vec<_>>()
127                .join(", "),
128            None => "None".to_string(),
129        };
130
131        write!(
132            f,
133            "MessageBatch {{ final_message: {}, messages: {} }}",
134            self.final_message, messages_str
135        )
136    }
137}
138
139#[allow(unused_variables)]
140impl MessageTrait for MessageBatch {
141    #[inline]
142    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
143        self.final_message.properties.insert(key, value);
144    }
145
146    #[inline]
147    fn clear_property(&mut self, name: &str) {
148        self.final_message.properties.remove(name);
149    }
150
151    #[inline]
152    fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
153        self.final_message.properties.get(name).cloned()
154    }
155
156    #[inline]
157    fn get_topic(&self) -> &CheetahString {
158        &self.final_message.topic
159    }
160
161    #[inline]
162    fn set_topic(&mut self, topic: CheetahString) {
163        self.final_message.topic = topic;
164    }
165
166    #[inline]
167    fn get_flag(&self) -> i32 {
168        self.final_message.flag
169    }
170
171    #[inline]
172    fn set_flag(&mut self, flag: i32) {
173        self.final_message.flag = flag;
174    }
175
176    #[inline]
177    fn get_body(&self) -> Option<&Bytes> {
178        self.final_message.body.as_ref()
179    }
180
181    #[inline]
182    fn set_body(&mut self, body: Bytes) {
183        self.final_message.body = Some(body);
184    }
185
186    #[inline]
187    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
188        &self.final_message.properties
189    }
190
191    #[inline]
192    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
193        self.final_message.properties = properties;
194    }
195
196    #[inline]
197    fn get_transaction_id(&self) -> Option<&CheetahString> {
198        self.final_message.transaction_id.as_ref()
199    }
200
201    #[inline]
202    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
203        self.final_message.transaction_id = Some(transaction_id);
204    }
205
206    #[inline]
207    fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
208        &mut self.final_message.compressed_body
209    }
210
211    #[inline]
212    fn get_compressed_body(&self) -> Option<&Bytes> {
213        self.final_message.compressed_body.as_ref()
214    }
215
216    #[inline]
217    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
218        self.final_message.compressed_body = Some(compressed_body);
219    }
220
221    #[inline]
222    fn take_body(&mut self) -> Option<Bytes> {
223        self.final_message.take_body()
224    }
225
226    fn as_any(&self) -> &dyn Any {
227        self
228    }
229
230    fn as_any_mut(&mut self) -> &mut dyn Any {
231        self
232    }
233}
234
235#[derive(Debug, Default)]
236pub struct MessageExtBatch {
237    pub message_ext_broker_inner: MessageExtBrokerInner,
238    pub is_inner_batch: bool,
239    pub encoded_buff: Option<bytes::BytesMut>,
240}
241
242impl MessageExtBatch {
243    pub fn wrap(&self) -> Option<Bytes> {
244        self.message_ext_broker_inner.body()
245    }
246
247    pub fn get_tags(&self) -> Option<CheetahString> {
248        self.message_ext_broker_inner.get_tags()
249    }
250}