rocketmq_common/common/message/
message_batch.rs1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::path::Iter;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_error::RocketmqError::UnsupportedOperationException;
28
29use crate::common::message::message_decoder;
30use crate::common::message::message_ext_broker_inner::MessageExtBrokerInner;
31use crate::common::message::message_single::Message;
32use crate::common::message::MessageTrait;
33use crate::common::mix_all;
34
35#[derive(Clone, Default, Debug)]
36pub struct MessageBatch {
37 pub final_message: Message,
39
40 pub messages: Option<Vec<Message>>,
42}
43
44impl Iterator for MessageBatch {
45 type Item = Message;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 match &self.messages {
49 Some(messages) => {
50 if let Some(message) = messages.iter().next() {
51 return Some(message.clone());
52 }
53 None
54 }
55 None => None,
56 }
57 }
58}
59
60impl MessageBatch {
61 #[inline]
62 pub fn encode(&self) -> Bytes {
63 message_decoder::encode_messages(self.messages.as_ref().unwrap())
64 }
65
66 pub fn generate_from_vec(
67 messages: Vec<Message>,
68 ) -> rocketmq_error::RocketMQResult<MessageBatch> {
69 if messages.is_empty() {
70 return Err(UnsupportedOperationException(
71 "MessageBatch::generate_from_vec: messages is empty".to_string(),
72 ));
73 }
74 let mut first: Option<&Message> = None;
75 for message in &messages {
76 if message.get_delay_time_level() > 0 {
77 return Err(
78 rocketmq_error::RocketmqError::UnsupportedOperationException(
79 "TimeDelayLevel is not supported for batching".to_string(),
80 ),
81 );
82 }
83 if message
84 .get_topic()
85 .starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
86 {
87 return Err(UnsupportedOperationException(
88 "Retry group topic is not supported for batching".to_string(),
89 ));
90 }
91
92 if let Some(first_message) = first {
93 let first_message = first.unwrap();
94 if first_message.get_topic() != message.get_topic() {
95 return Err(UnsupportedOperationException(
96 "The topic of the messages in one batch should be the same".to_string(),
97 ));
98 }
99 if first_message.is_wait_store_msg_ok() != message.is_wait_store_msg_ok() {
100 return Err(UnsupportedOperationException(
101 "The waitStoreMsgOK of the messages in one batch should the same"
102 .to_string(),
103 ));
104 }
105 } else {
106 first = Some(message);
107 }
108 }
109 let first = first.unwrap();
110 let mut final_message = Message {
111 topic: first.topic.clone(),
112 ..Message::default()
113 };
114 final_message.set_wait_store_msg_ok(first.is_wait_store_msg_ok());
115 Ok(MessageBatch {
116 final_message,
117 messages: Some(messages),
118 })
119 }
120}
121
122impl fmt::Display for MessageBatch {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 let messages_str = match &self.messages {
125 Some(messages) => messages
126 .iter()
127 .map(|msg| msg.to_string())
128 .collect::<Vec<_>>()
129 .join(", "),
130 None => "None".to_string(),
131 };
132
133 write!(
134 f,
135 "MessageBatch {{ final_message: {}, messages: {} }}",
136 self.final_message, messages_str
137 )
138 }
139}
140
141#[allow(unused_variables)]
142impl MessageTrait for MessageBatch {
143 #[inline]
144 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
145 self.final_message.properties.insert(key, value);
146 }
147
148 #[inline]
149 fn clear_property(&mut self, name: &str) {
150 self.final_message.properties.remove(name);
151 }
152
153 #[inline]
154 fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
155 self.final_message.properties.get(name).cloned()
156 }
157
158 #[inline]
159 fn get_topic(&self) -> &CheetahString {
160 &self.final_message.topic
161 }
162
163 #[inline]
164 fn set_topic(&mut self, topic: CheetahString) {
165 self.final_message.topic = topic;
166 }
167
168 #[inline]
169 fn get_flag(&self) -> i32 {
170 self.final_message.flag
171 }
172
173 #[inline]
174 fn set_flag(&mut self, flag: i32) {
175 self.final_message.flag = flag;
176 }
177
178 #[inline]
179 fn get_body(&self) -> Option<&Bytes> {
180 self.final_message.body.as_ref()
181 }
182
183 #[inline]
184 fn set_body(&mut self, body: Bytes) {
185 self.final_message.body = Some(body);
186 }
187
188 #[inline]
189 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
190 &self.final_message.properties
191 }
192
193 #[inline]
194 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
195 self.final_message.properties = properties;
196 }
197
198 #[inline]
199 fn get_transaction_id(&self) -> Option<&CheetahString> {
200 self.final_message.transaction_id.as_ref()
201 }
202
203 #[inline]
204 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
205 self.final_message.transaction_id = Some(transaction_id);
206 }
207
208 #[inline]
209 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
210 &mut self.final_message.compressed_body
211 }
212
213 #[inline]
214 fn get_compressed_body(&self) -> Option<&Bytes> {
215 self.final_message.compressed_body.as_ref()
216 }
217
218 #[inline]
219 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
220 self.final_message.compressed_body = Some(compressed_body);
221 }
222
223 #[inline]
224 fn take_body(&mut self) -> Option<Bytes> {
225 self.final_message.take_body()
226 }
227
228 fn as_any(&self) -> &dyn Any {
229 self
230 }
231
232 fn as_any_mut(&mut self) -> &mut dyn Any {
233 self
234 }
235}
236
237#[derive(Debug, Default)]
238pub struct MessageExtBatch {
239 pub message_ext_broker_inner: MessageExtBrokerInner,
240 pub is_inner_batch: bool,
241 pub encoded_buff: Option<bytes::BytesMut>,
242}
243
244impl MessageExtBatch {
245 pub fn wrap(&self) -> Option<Bytes> {
246 self.message_ext_broker_inner.body()
247 }
248
249 pub fn get_tags(&self) -> Option<CheetahString> {
250 self.message_ext_broker_inner.get_tags()
251 }
252}