rocketmq_common/common/message/
message_builder.rs1use bytes::Bytes;
16use cheetah_string::CheetahString;
17use rocketmq_error::RocketMQError;
18use rocketmq_error::RocketMQResult;
19
20use crate::common::message::message_body::MessageBody;
21use crate::common::message::message_flag::MessageFlag;
22use crate::common::message::message_property::MessageProperties;
23use crate::common::message::message_property::MessagePropertyKey;
24use crate::common::message::MessageConst;
25
26#[derive(Default)]
41pub struct MessageBuilder {
42 topic: Option<CheetahString>,
43 body: MessageBody,
44 properties: MessageProperties,
45 flag: MessageFlag,
46 transaction_id: Option<CheetahString>,
47}
48
49impl MessageBuilder {
50 #[inline]
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 #[inline]
58 pub fn topic(mut self, topic: impl Into<CheetahString>) -> Self {
59 self.topic = Some(topic.into());
60 self
61 }
62
63 #[inline]
65 pub fn body(mut self, body: impl Into<Bytes>) -> Self {
66 self.body = MessageBody::new(body);
67 self
68 }
69
70 #[inline]
72 pub fn body_slice(mut self, body: &[u8]) -> Self {
73 self.body = MessageBody::new(Bytes::copy_from_slice(body));
74 self
75 }
76
77 #[inline]
79 pub fn empty_body(mut self) -> Self {
80 self.body = MessageBody::empty();
81 self
82 }
83
84 #[inline]
86 pub fn tags(mut self, tags: impl Into<CheetahString>) -> Self {
87 self.properties.insert(MessagePropertyKey::Tags, tags.into());
88 self
89 }
90
91 #[inline]
93 pub fn keys(mut self, keys: Vec<String>) -> Self {
94 if !keys.is_empty() {
95 let keys_str = keys.join(MessageConst::KEY_SEPARATOR);
96 self.properties.insert(MessagePropertyKey::Keys, keys_str);
97 }
98 self
99 }
100
101 #[inline]
103 pub fn key(mut self, key: impl Into<CheetahString>) -> Self {
104 self.properties.insert(MessagePropertyKey::Keys, key.into());
105 self
106 }
107
108 #[inline]
110 pub fn flag(mut self, flag: MessageFlag) -> Self {
111 self.flag = flag;
112 self
113 }
114
115 #[inline]
117 pub fn flag_bits(mut self, bits: i32) -> Self {
118 self.flag = MessageFlag::from_bits(bits);
119 self
120 }
121
122 #[inline]
124 pub fn delay_level(mut self, level: i32) -> Self {
125 self.properties
126 .insert(MessagePropertyKey::DelayTimeLevel, level.to_string());
127 self
128 }
129
130 #[inline]
132 pub fn delay_secs(mut self, secs: u64) -> Self {
133 self.properties
134 .insert(MessagePropertyKey::DelayTimeSec, secs.to_string());
135 self
136 }
137
138 #[inline]
140 pub fn delay_millis(mut self, millis: u64) -> Self {
141 self.properties
142 .insert(MessagePropertyKey::DelayTimeMs, millis.to_string());
143 self
144 }
145
146 #[inline]
148 pub fn deliver_time_ms(mut self, time_ms: u64) -> Self {
149 self.properties
150 .insert(MessagePropertyKey::DeliverTimeMs, time_ms.to_string());
151 self
152 }
153
154 #[inline]
156 pub fn wait_store_msg_ok(mut self, wait: bool) -> Self {
157 if !wait {
158 self.properties.insert(MessagePropertyKey::WaitStoreMsgOk, "false");
159 }
160 self
161 }
162
163 #[inline]
165 pub fn transaction_id(mut self, id: impl Into<CheetahString>) -> Self {
166 self.transaction_id = Some(id.into());
167 self
168 }
169
170 #[inline]
172 pub fn buyer_id(mut self, buyer_id: impl Into<CheetahString>) -> Self {
173 self.properties.insert(MessagePropertyKey::BuyerId, buyer_id.into());
174 self
175 }
176
177 #[inline]
179 pub fn instance_id(mut self, instance_id: impl Into<CheetahString>) -> Self {
180 self.properties
181 .insert(MessagePropertyKey::InstanceId, instance_id.into());
182 self
183 }
184
185 #[inline]
187 pub fn correlation_id(mut self, correlation_id: impl Into<CheetahString>) -> Self {
188 self.properties
189 .insert(MessagePropertyKey::CorrelationId, correlation_id.into());
190 self
191 }
192
193 #[inline]
195 pub fn sharding_key(mut self, sharding_key: impl Into<CheetahString>) -> Self {
196 self.properties
197 .insert(MessagePropertyKey::ShardingKey, sharding_key.into());
198 self
199 }
200
201 #[inline]
203 pub fn trace_switch(mut self, enabled: bool) -> Self {
204 self.properties
205 .insert(MessagePropertyKey::TraceSwitch, if enabled { "true" } else { "false" });
206 self
207 }
208
209 #[inline]
211 pub fn property(mut self, key: MessagePropertyKey, value: impl Into<CheetahString>) -> Self {
212 self.properties.insert(key, value.into());
213 self
214 }
215
216 pub fn raw_property(
222 mut self,
223 key: impl Into<CheetahString>,
224 value: impl Into<CheetahString>,
225 ) -> RocketMQResult<Self> {
226 let key = key.into();
227 let value = value.into();
228
229 if crate::common::message::STRING_HASH_SET.contains(key.as_str()) {
231 return Err(RocketMQError::InvalidProperty(format!(
232 "The Property<{key}> is used by system, input another please"
233 )));
234 }
235
236 self.properties.as_map_mut().insert(key, value);
237 Ok(self)
238 }
239
240 pub fn build(self) -> RocketMQResult<super::message_single::Message> {
246 let topic = self
247 .topic
248 .ok_or_else(|| RocketMQError::InvalidProperty("Topic is required for message".to_string()))?;
249
250 Ok(super::message_single::Message::from_builder(
251 topic,
252 self.body,
253 self.properties,
254 self.flag,
255 self.transaction_id,
256 ))
257 }
258
259 #[track_caller]
263 pub fn build_unchecked(self) -> super::message_single::Message {
264 self.build().expect("message validation failed")
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn test_builder_basic() {
274 let msg = MessageBuilder::new()
275 .topic("test-topic")
276 .body_slice(b"hello world")
277 .build_unchecked();
278
279 assert_eq!(msg.topic().as_str(), "test-topic");
280 assert_eq!(msg.body_slice(), b"hello world");
281 }
282
283 #[test]
284 fn test_builder_with_tags_and_keys() {
285 let msg = MessageBuilder::new()
286 .topic("test-topic")
287 .body_slice(b"test")
288 .tags("tag1")
289 .keys(vec!["key1".to_string(), "key2".to_string()])
290 .build_unchecked();
291
292 assert_eq!(msg.tags(), Some("tag1"));
293 assert_eq!(msg.keys(), Some(vec!["key1".to_string(), "key2".to_string()]));
294 }
295
296 #[test]
297 fn test_builder_with_delay() {
298 let msg = MessageBuilder::new()
299 .topic("test-topic")
300 .body_slice(b"test")
301 .delay_level(3)
302 .build_unchecked();
303
304 assert_eq!(msg.delay_time_level(), 3);
305 }
306
307 #[test]
308 fn test_builder_missing_topic() {
309 let result = MessageBuilder::new().body_slice(b"test").build();
310
311 assert!(result.is_err());
312 }
313
314 #[test]
315 fn test_builder_with_transaction_id() {
316 let msg = MessageBuilder::new()
317 .topic("test-topic")
318 .body_slice(b"test")
319 .transaction_id("tx-123")
320 .build_unchecked();
321
322 assert_eq!(msg.transaction_id(), Some("tx-123"));
323 }
324}