rocketmq_common/common/message/
message_batch.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::path::Iter;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_error::RocketmqError::UnsupportedOperationException;
28
29use crate::common::message::message_decoder;
30use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
31use crate::common::message::message_single::Message;
32use crate::common::message::MessageTrait;
33use crate::common::mix_all;
34
35#[derive(Clone, Default, Debug)]
36pub struct MessageBatch {
37    ///`final_message` stores the batch-encoded messages.
38    pub final_message: Message,
39
40    ///`messages` stores the batch of initialized messages.
41    pub messages: Option<Vec<Message>>,
42}
43
44impl Iterator for MessageBatch {
45    type Item = Message;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        match &self.messages {
49            Some(messages) => {
50                if let Some(message) = messages.iter().next() {
51                    return Some(message.clone());
52                }
53                None
54            }
55            None => None,
56        }
57    }
58}
59
60impl MessageBatch {
61    #[inline]
62    pub fn encode(&self) -> Bytes {
63        message_decoder::encode_messages(self.messages.as_ref().unwrap())
64    }
65
66    pub fn generate_from_vec(
67        messages: Vec<Message>,
68    ) -> rocketmq_error::RocketMQResult<MessageBatch> {
69        if messages.is_empty() {
70            return Err(UnsupportedOperationException(
71                "MessageBatch::generate_from_vec: messages is empty".to_string(),
72            ));
73        }
74        let mut first: Option<&Message> = None;
75        for message in &messages {
76            if message.get_delay_time_level() > 0 {
77                return Err(
78                    rocketmq_error::RocketmqError::UnsupportedOperationException(
79                        "TimeDelayLevel is not supported for batching".to_string(),
80                    ),
81                );
82            }
83            if message
84                .get_topic()
85                .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
86            {
87                return Err(UnsupportedOperationException(
88                    "Retry group topic is not supported for batching".to_string(),
89                ));
90            }
91
92            if let Some(first_message) = first {
93                let first_message = first.unwrap();
94                if first_message.get_topic() != message.get_topic() {
95                    return Err(UnsupportedOperationException(
96                        "The topic of the messages in one batch should be the same".to_string(),
97                    ));
98                }
99                if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
100                    return Err(UnsupportedOperationException(
101                        "The waitStoreMsgOK of the messages in one batch should the same"
102                            .to_string(),
103                    ));
104                }
105            } else {
106                first = Some(message);
107            }
108        }
109        let first = first.unwrap();
110        let mut final_message = Message {
111            topic: first.topic.clone(),
112            ..Message::default()
113        };
114        final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
115        Ok(MessageBatch {
116            final_message,
117            messages: Some(messages),
118        })
119    }
120}
121
122impl fmt::Display for MessageBatch {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        let messages_str = match &self.messages {
125            Some(messages) => messages
126                .iter()
127                .map(|msg| msg.to_string())
128                .collect::<Vec<_>>()
129                .join(", "),
130            None => "None".to_string(),
131        };
132
133        write!(
134            f,
135            "MessageBatch {{ final_message: {}, messages: {} }}",
136            self.final_message, messages_str
137        )
138    }
139}
140
141#[allow(unused_variables)]
142impl MessageTrait for MessageBatch {
143    #[inline]
144    fn put_property(&mut self, key: CheetahString, value: CheetahString) {
145        self.final_message.properties.insert(key, value);
146    }
147
148    #[inline]
149    fn clear_property(&mut self, name: &str) {
150        self.final_message.properties.remove(name);
151    }
152
153    #[inline]
154    fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
155        self.final_message.properties.get(name).cloned()
156    }
157
158    #[inline]
159    fn get_topic(&self) -> &CheetahString {
160        &self.final_message.topic
161    }
162
163    #[inline]
164    fn set_topic(&mut self, topic: CheetahString) {
165        self.final_message.topic = topic;
166    }
167
168    #[inline]
169    fn get_flag(&self) -> i32 {
170        self.final_message.flag
171    }
172
173    #[inline]
174    fn set_flag(&mut self, flag: i32) {
175        self.final_message.flag = flag;
176    }
177
178    #[inline]
179    fn get_body(&self) -> Option<&Bytes> {
180        self.final_message.body.as_ref()
181    }
182
183    #[inline]
184    fn set_body(&mut self, body: Bytes) {
185        self.final_message.body = Some(body);
186    }
187
188    #[inline]
189    fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
190        &self.final_message.properties
191    }
192
193    #[inline]
194    fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
195        self.final_message.properties = properties;
196    }
197
198    #[inline]
199    fn get_transaction_id(&self) -> Option<&CheetahString> {
200        self.final_message.transaction_id.as_ref()
201    }
202
203    #[inline]
204    fn set_transaction_id(&mut self, transaction_id: CheetahString) {
205        self.final_message.transaction_id = Some(transaction_id);
206    }
207
208    #[inline]
209    fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
210        &mut self.final_message.compressed_body
211    }
212
213    #[inline]
214    fn get_compressed_body(&self) -> Option<&Bytes> {
215        self.final_message.compressed_body.as_ref()
216    }
217
218    #[inline]
219    fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
220        self.final_message.compressed_body = Some(compressed_body);
221    }
222
223    #[inline]
224    fn take_body(&mut self) -> Option<Bytes> {
225        self.final_message.take_body()
226    }
227
228    fn as_any(&self) -> &dyn Any {
229        self
230    }
231
232    fn as_any_mut(&mut self) -> &mut dyn Any {
233        self
234    }
235}
236
237#[derive(Debug, Default)]
238pub struct MessageExtBatch {
239    pub message_ext_broker_inner: MessageExtBrokerInner,
240    pub is_inner_batch: bool,
241    pub encoded_buff: Option<bytes::BytesMut>,
242}
243
244impl MessageExtBatch {
245    pub fn wrap(&self) -> Option<Bytes> {
246        self.message_ext_broker_inner.body()
247    }
248
249    pub fn get_tags(&self) -> Option<CheetahString> {
250        self.message_ext_broker_inner.get_tags()
251    }
252}