Skip to main content

rocketmq_common/common/message/
message_builder.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use cheetah_string::CheetahString;
17use rocketmq_error::RocketMQError;
18use rocketmq_error::RocketMQResult;
19
20use crate::common::message::message_body::MessageBody;
21use crate::common::message::message_flag::MessageFlag;
22use crate::common::message::message_property::MessageProperties;
23use crate::common::message::message_property::MessagePropertyKey;
24use crate::common::message::MessageConst;
25
26/// Builder for constructing messages.
27///
28/// # Examples
29///
30/// ```
31/// use rocketmq_common::common::message::message_builder::MessageBuilder;
32///
33/// let msg = MessageBuilder::new()
34///     .topic("test-topic")
35///     .body_slice(b"hello world")
36///     .tags("important")
37///     .keys(vec!["key1".to_string(), "key2".to_string()])
38///     .build();
39/// ```
40#[derive(Default)]
41pub struct MessageBuilder {
42    topic: Option<CheetahString>,
43    body: MessageBody,
44    properties: MessageProperties,
45    flag: MessageFlag,
46    transaction_id: Option<CheetahString>,
47}
48
49impl MessageBuilder {
50    /// Creates a new message builder.
51    #[inline]
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    /// Sets the topic name (required).
57    #[inline]
58    pub fn topic(mut self, topic: impl Into<CheetahString>) -> Self {
59        self.topic = Some(topic.into());
60        self
61    }
62
63    /// Sets the message body from bytes.
64    #[inline]
65    pub fn body(mut self, body: impl Into<Bytes>) -> Self {
66        self.body = MessageBody::new(body);
67        self
68    }
69
70    /// Sets the message body from a byte slice.
71    #[inline]
72    pub fn body_slice(mut self, body: &[u8]) -> Self {
73        self.body = MessageBody::new(Bytes::copy_from_slice(body));
74        self
75    }
76
77    /// Sets an empty body.
78    #[inline]
79    pub fn empty_body(mut self) -> Self {
80        self.body = MessageBody::empty();
81        self
82    }
83
84    /// Sets the message tags.
85    #[inline]
86    pub fn tags(mut self, tags: impl Into<CheetahString>) -> Self {
87        self.properties.insert(MessagePropertyKey::Tags, tags.into());
88        self
89    }
90
91    /// Sets the message keys from a vector.
92    #[inline]
93    pub fn keys(mut self, keys: Vec<String>) -> Self {
94        if !keys.is_empty() {
95            let keys_str = keys.join(MessageConst::KEY_SEPARATOR);
96            self.properties.insert(MessagePropertyKey::Keys, keys_str);
97        }
98        self
99    }
100
101    /// Sets a single message key.
102    #[inline]
103    pub fn key(mut self, key: impl Into<CheetahString>) -> Self {
104        self.properties.insert(MessagePropertyKey::Keys, key.into());
105        self
106    }
107
108    /// Sets the message flag.
109    #[inline]
110    pub fn flag(mut self, flag: MessageFlag) -> Self {
111        self.flag = flag;
112        self
113    }
114
115    /// Sets the message flag from raw bits.
116    #[inline]
117    pub fn flag_bits(mut self, bits: i32) -> Self {
118        self.flag = MessageFlag::from_bits(bits);
119        self
120    }
121
122    /// Sets the delay time level (1-18 for predefined levels).
123    #[inline]
124    pub fn delay_level(mut self, level: i32) -> Self {
125        self.properties
126            .insert(MessagePropertyKey::DelayTimeLevel, level.to_string());
127        self
128    }
129
130    /// Sets the delay time in seconds.
131    #[inline]
132    pub fn delay_secs(mut self, secs: u64) -> Self {
133        self.properties
134            .insert(MessagePropertyKey::DelayTimeSec, secs.to_string());
135        self
136    }
137
138    /// Sets the delay time in milliseconds.
139    #[inline]
140    pub fn delay_millis(mut self, millis: u64) -> Self {
141        self.properties
142            .insert(MessagePropertyKey::DelayTimeMs, millis.to_string());
143        self
144    }
145
146    /// Sets the delivery time in milliseconds.
147    #[inline]
148    pub fn deliver_time_ms(mut self, time_ms: u64) -> Self {
149        self.properties
150            .insert(MessagePropertyKey::DeliverTimeMs, time_ms.to_string());
151        self
152    }
153
154    /// Sets whether to wait for store confirmation.
155    #[inline]
156    pub fn wait_store_msg_ok(mut self, wait: bool) -> Self {
157        if !wait {
158            self.properties.insert(MessagePropertyKey::WaitStoreMsgOk, "false");
159        }
160        self
161    }
162
163    /// Sets the transaction ID.
164    #[inline]
165    pub fn transaction_id(mut self, id: impl Into<CheetahString>) -> Self {
166        self.transaction_id = Some(id.into());
167        self
168    }
169
170    /// Sets the buyer ID.
171    #[inline]
172    pub fn buyer_id(mut self, buyer_id: impl Into<CheetahString>) -> Self {
173        self.properties.insert(MessagePropertyKey::BuyerId, buyer_id.into());
174        self
175    }
176
177    /// Sets the instance ID.
178    #[inline]
179    pub fn instance_id(mut self, instance_id: impl Into<CheetahString>) -> Self {
180        self.properties
181            .insert(MessagePropertyKey::InstanceId, instance_id.into());
182        self
183    }
184
185    /// Sets the correlation ID.
186    #[inline]
187    pub fn correlation_id(mut self, correlation_id: impl Into<CheetahString>) -> Self {
188        self.properties
189            .insert(MessagePropertyKey::CorrelationId, correlation_id.into());
190        self
191    }
192
193    /// Sets the sharding key.
194    #[inline]
195    pub fn sharding_key(mut self, sharding_key: impl Into<CheetahString>) -> Self {
196        self.properties
197            .insert(MessagePropertyKey::ShardingKey, sharding_key.into());
198        self
199    }
200
201    /// Sets the trace switch.
202    #[inline]
203    pub fn trace_switch(mut self, enabled: bool) -> Self {
204        self.properties
205            .insert(MessagePropertyKey::TraceSwitch, if enabled { "true" } else { "false" });
206        self
207    }
208
209    /// Sets a custom property directly using MessagePropertyKey.
210    #[inline]
211    pub fn property(mut self, key: MessagePropertyKey, value: impl Into<CheetahString>) -> Self {
212        self.properties.insert(key, value.into());
213        self
214    }
215
216    /// Sets a raw property with string key (for advanced use).
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the property name is reserved by the system.
221    pub fn raw_property(
222        mut self,
223        key: impl Into<CheetahString>,
224        value: impl Into<CheetahString>,
225    ) -> RocketMQResult<Self> {
226        let key = key.into();
227        let value = value.into();
228
229        // Validate not a reserved key
230        if crate::common::message::STRING_HASH_SET.contains(key.as_str()) {
231            return Err(RocketMQError::InvalidProperty(format!(
232                "The Property<{key}> is used by system, input another please"
233            )));
234        }
235
236        self.properties.as_map_mut().insert(key, value);
237        Ok(self)
238    }
239
240    /// Builds the message.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if required fields (topic) are not set.
245    pub fn build(self) -> RocketMQResult<super::message_single::Message> {
246        let topic = self
247            .topic
248            .ok_or_else(|| RocketMQError::InvalidProperty("Topic is required for message".to_string()))?;
249
250        Ok(super::message_single::Message::from_builder(
251            topic,
252            self.body,
253            self.properties,
254            self.flag,
255            self.transaction_id,
256        ))
257    }
258
259    /// Builds the message, panicking if validation fails.
260    ///
261    /// This is convenient for test code where you know the message is valid.
262    #[track_caller]
263    pub fn build_unchecked(self) -> super::message_single::Message {
264        self.build().expect("message validation failed")
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_builder_basic() {
274        let msg = MessageBuilder::new()
275            .topic("test-topic")
276            .body_slice(b"hello world")
277            .build_unchecked();
278
279        assert_eq!(msg.topic().as_str(), "test-topic");
280        assert_eq!(msg.body_slice(), b"hello world");
281    }
282
283    #[test]
284    fn test_builder_with_tags_and_keys() {
285        let msg = MessageBuilder::new()
286            .topic("test-topic")
287            .body_slice(b"test")
288            .tags("tag1")
289            .keys(vec!["key1".to_string(), "key2".to_string()])
290            .build_unchecked();
291
292        assert_eq!(msg.tags(), Some("tag1"));
293        assert_eq!(msg.keys(), Some(vec!["key1".to_string(), "key2".to_string()]));
294    }
295
296    #[test]
297    fn test_builder_with_delay() {
298        let msg = MessageBuilder::new()
299            .topic("test-topic")
300            .body_slice(b"test")
301            .delay_level(3)
302            .build_unchecked();
303
304        assert_eq!(msg.delay_time_level(), 3);
305    }
306
307    #[test]
308    fn test_builder_missing_topic() {
309        let result = MessageBuilder::new().body_slice(b"test").build();
310
311        assert!(result.is_err());
312    }
313
314    #[test]
315    fn test_builder_with_transaction_id() {
316        let msg = MessageBuilder::new()
317            .topic("test-topic")
318            .body_slice(b"test")
319            .transaction_id("tx-123")
320            .build_unchecked();
321
322        assert_eq!(msg.transaction_id(), Some("tx-123"));
323    }
324}