rocketmq_common/common/message/
message_batch.rs1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::path::Iter;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_error::RocketMQError;
29
30use crate::common::message::message_decoder;
31use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
32use crate::common::message::message_single::Message;
33use crate::common::message::MessageTrait;
34use crate::common::mix_all;
35
36#[derive(Clone, Default, Debug)]
37pub struct MessageBatch {
38 pub final_message: Message,
40
41 pub messages: Option<Vec<Message>>,
43}
44
45impl Iterator for MessageBatch {
46 type Item = Message;
47
48 fn next(&mut self) -> Option<Self::Item> {
49 match &self.messages {
50 Some(messages) => {
51 if let Some(message) = messages.iter().next() {
52 return Some(message.clone());
53 }
54 None
55 }
56 None => None,
57 }
58 }
59}
60
61impl MessageBatch {
62 #[inline]
63 pub fn encode(&self) -> Bytes {
64 message_decoder::encode_messages(self.messages.as_ref().unwrap())
65 }
66
67 pub fn generate_from_vec(
68 messages: Vec<Message>,
69 ) -> rocketmq_error::RocketMQResult<MessageBatch> {
70 if messages.is_empty() {
71 return Err(RocketMQError::illegal_argument(
72 "MessageBatch::generate_from_vec: messages is empty",
73 ));
74 }
75 let mut first: Option<&Message> = None;
76 for message in &messages {
77 if message.get_delay_time_level() > 0 {
78 return Err(RocketMQError::illegal_argument(
79 "TimeDelayLevel is not supported for batching",
80 ));
81 }
82 if message
83 .get_topic()
84 .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
85 {
86 return Err(RocketMQError::illegal_argument(
87 "Retry group topic is not supported for batching",
88 ));
89 }
90
91 if let Some(first_message) = first {
92 let first_message = first.unwrap();
93 if first_message.get_topic() != message.get_topic() {
94 return Err(RocketMQError::illegal_argument(
95 "The topic of the messages in one batch should be the same",
96 ));
97 }
98 if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
99 return Err(RocketMQError::illegal_argument(
100 "The waitStoreMsgOK of the messages in one batch should the same",
101 ));
102 }
103 } else {
104 first = Some(message);
105 }
106 }
107 let first = first.unwrap();
108 let mut final_message = Message {
109 topic: first.topic.clone(),
110 ..Message::default()
111 };
112 final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
113 Ok(MessageBatch {
114 final_message,
115 messages: Some(messages),
116 })
117 }
118}
119
120impl fmt::Display for MessageBatch {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 let messages_str = match &self.messages {
123 Some(messages) => messages
124 .iter()
125 .map(|msg| msg.to_string())
126 .collect::<Vec<_>>()
127 .join(", "),
128 None => "None".to_string(),
129 };
130
131 write!(
132 f,
133 "MessageBatch {{ final_message: {}, messages: {} }}",
134 self.final_message, messages_str
135 )
136 }
137}
138
139#[allow(unused_variables)]
140impl MessageTrait for MessageBatch {
141 #[inline]
142 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
143 self.final_message.properties.insert(key, value);
144 }
145
146 #[inline]
147 fn clear_property(&mut self, name: &str) {
148 self.final_message.properties.remove(name);
149 }
150
151 #[inline]
152 fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
153 self.final_message.properties.get(name).cloned()
154 }
155
156 #[inline]
157 fn get_topic(&self) -> &CheetahString {
158 &self.final_message.topic
159 }
160
161 #[inline]
162 fn set_topic(&mut self, topic: CheetahString) {
163 self.final_message.topic = topic;
164 }
165
166 #[inline]
167 fn get_flag(&self) -> i32 {
168 self.final_message.flag
169 }
170
171 #[inline]
172 fn set_flag(&mut self, flag: i32) {
173 self.final_message.flag = flag;
174 }
175
176 #[inline]
177 fn get_body(&self) -> Option<&Bytes> {
178 self.final_message.body.as_ref()
179 }
180
181 #[inline]
182 fn set_body(&mut self, body: Bytes) {
183 self.final_message.body = Some(body);
184 }
185
186 #[inline]
187 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
188 &self.final_message.properties
189 }
190
191 #[inline]
192 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
193 self.final_message.properties = properties;
194 }
195
196 #[inline]
197 fn get_transaction_id(&self) -> Option<&CheetahString> {
198 self.final_message.transaction_id.as_ref()
199 }
200
201 #[inline]
202 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
203 self.final_message.transaction_id = Some(transaction_id);
204 }
205
206 #[inline]
207 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
208 &mut self.final_message.compressed_body
209 }
210
211 #[inline]
212 fn get_compressed_body(&self) -> Option<&Bytes> {
213 self.final_message.compressed_body.as_ref()
214 }
215
216 #[inline]
217 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
218 self.final_message.compressed_body = Some(compressed_body);
219 }
220
221 #[inline]
222 fn take_body(&mut self) -> Option<Bytes> {
223 self.final_message.take_body()
224 }
225
226 fn as_any(&self) -> &dyn Any {
227 self
228 }
229
230 fn as_any_mut(&mut self) -> &mut dyn Any {
231 self
232 }
233}
234
235#[derive(Debug, Default)]
236pub struct MessageExtBatch {
237 pub message_ext_broker_inner: MessageExtBrokerInner,
238 pub is_inner_batch: bool,
239 pub encoded_buff: Option<bytes::BytesMut>,
240}
241
242impl MessageExtBatch {
243 pub fn wrap(&self) -> Option<Bytes> {
244 self.message_ext_broker_inner.body()
245 }
246
247 pub fn get_tags(&self) -> Option<CheetahString> {
248 self.message_ext_broker_inner.get_tags()
249 }
250}