rocketmq_common/common/message/
message_ext_broker_inner.rs1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt;
20use std::fmt::Debug;
21use std::fmt::Display;
22use std::fmt::Formatter;
23use std::net::SocketAddr;
24
25use bytes::Bytes;
26use cheetah_string::CheetahString;
27use rocketmq_rust::ArcMut;
28
29use crate::common::hasher::string_hasher::JavaStringHasher;
30use crate::common::message::message_ext::MessageExt;
31use crate::common::message::message_single::Message;
32use crate::common::message::MessageTrait;
33use crate::common::message::MessageVersion;
34use crate::common::TopicFilterType;
35use crate::MessageUtils;
36
37#[derive(Default)]
38pub struct MessageExtBrokerInner {
39 pub message_ext_inner: MessageExt,
40 pub properties_string: CheetahString,
41 pub tags_code: i64,
42 pub encoded_buff: Option<bytes::BytesMut>,
43 pub encode_completed: bool,
44 pub version: MessageVersion,
45}
46
47impl MessageExtBrokerInner {
48 const VERSION: MessageVersion = MessageVersion::V1;
49
50 #[inline]
51 pub fn delete_property(&mut self, name: impl Into<CheetahString>) {
52 let name = name.into();
53 self.message_ext_inner.message.clear_property(name.as_str());
54 self.properties_string = CheetahString::from_string(MessageUtils::delete_property(
55 self.properties_string.as_str(),
56 name.as_str(),
57 ));
58 }
59
60 #[inline]
61 pub fn with_version(&mut self, version: MessageVersion) {
62 self.version = version;
63 }
64
65 #[inline]
66 pub fn version(&self) -> MessageVersion {
67 self.version
68 }
69
70 #[inline]
71 pub fn topic(&self) -> &CheetahString {
72 self.message_ext_inner.topic()
73 }
74
75 #[inline]
76 pub fn get_topic(&self) -> &CheetahString {
77 self.message_ext_inner.get_topic()
78 }
79
80 #[inline]
81 pub fn born_host(&self) -> SocketAddr {
82 self.message_ext_inner.born_host()
83 }
84
85 #[inline]
86 pub fn store_host(&self) -> SocketAddr {
87 self.message_ext_inner.store_host()
88 }
89
90 #[inline]
91 pub fn with_born_host_v6_flag(&mut self) {
92 self.message_ext_inner.with_born_host_v6_flag()
93 }
94
95 #[inline]
96 pub fn with_store_host_v6_flag(&mut self) {
97 self.message_ext_inner.with_store_host_v6_flag()
98 }
99
100 #[inline]
101 pub fn body(&self) -> Option<bytes::Bytes> {
102 self.message_ext_inner.body()
103 }
104
105 #[inline]
106 pub fn sys_flag(&self) -> i32 {
107 self.message_ext_inner.sys_flag()
108 }
109
110 #[inline]
111 pub fn body_crc(&self) -> u32 {
112 self.message_ext_inner.body_crc()
113 }
114
115 #[inline]
116 pub fn queue_id(&self) -> i32 {
117 self.message_ext_inner.queue_id()
118 }
119
120 #[inline]
121 pub fn flag(&self) -> i32 {
122 self.message_ext_inner.flag()
123 }
124
125 #[inline]
126 pub fn born_timestamp(&self) -> i64 {
127 self.message_ext_inner.born_timestamp()
128 }
129
130 #[inline]
131 pub fn store_timestamp(&self) -> i64 {
132 self.message_ext_inner.store_timestamp()
133 }
134
135 #[inline]
136 pub fn born_host_bytes(&self) -> bytes::Bytes {
137 self.message_ext_inner.born_host_bytes()
138 }
139
140 #[inline]
141 pub fn store_host_bytes(&self) -> bytes::Bytes {
142 self.message_ext_inner.born_store_bytes()
143 }
144
145 #[inline]
146 pub fn reconsume_times(&self) -> i32 {
147 self.message_ext_inner.reconsume_times()
148 }
149
150 #[inline]
151 pub fn prepared_transaction_offset(&self) -> i64 {
152 self.message_ext_inner.prepared_transaction_offset()
153 }
154
155 #[inline]
156 pub fn property(&self, name: &str) -> Option<CheetahString> {
157 self.message_ext_inner.properties().get(name).cloned()
158 }
159
160 #[inline]
161 pub fn properties_string(&self) -> &str {
162 self.properties_string.as_str()
163 }
164
165 #[inline]
166 pub fn queue_offset(&self) -> i64 {
167 self.message_ext_inner.queue_offset()
168 }
169
170 #[inline]
171 pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> i64 {
172 if tags.is_empty() {
173 return 0;
174 }
175 JavaStringHasher::hash_str(tags) as i64
176 }
177
178 #[inline]
179 pub fn tags_string_to_tags_code(tags: &str) -> i64 {
180 if tags.is_empty() {
181 return 0;
182 }
183 JavaStringHasher::hash_str(tags) as i64
184 }
185
186 #[inline]
187 pub fn get_tags(&self) -> Option<CheetahString> {
188 self.message_ext_inner.get_tags()
189 }
190
191 #[inline]
192 pub fn is_wait_store_msg_ok(&self) -> bool {
193 self.message_ext_inner.message.is_wait_store_msg_ok()
194 }
195
196 #[inline]
197 pub fn body_len(&self) -> usize {
198 self.message_ext_inner.message.body.as_ref().unwrap().len()
199 }
200}
201
202impl fmt::Display for MessageExtBrokerInner {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 let encoded_buff_str = match &self.encoded_buff {
205 Some(encoded_buff) =>
206 {
208 "****".to_string()
209 }
210 None => "None".to_string(),
211 };
212
213 write!(
214 f,
215 "MessageExtBrokerInner {{ message_ext_inner: {}, properties_string: {}, tags_code: \
216 {}, encoded_buff: {}, encode_completed: {}, version: {} }}",
217 self.message_ext_inner,
218 self.properties_string,
219 self.tags_code,
220 encoded_buff_str,
221 self.encode_completed,
222 self.version
223 )
224 }
225}
226
227impl Debug for MessageExtBrokerInner {
228 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229 let encoded_buff_str = match &self.encoded_buff {
230 Some(encoded_buff) =>
231 {
233 "****".to_string()
234 }
235 None => "None".to_string(),
236 };
237
238 write!(
239 f,
240 "MessageExtBrokerInner {{ message_ext_inner: {:?}, properties_string: {}, tags_code: \
241 {}, encoded_buff: {}, encode_completed: {}, version: {} }}",
242 self.message_ext_inner,
243 self.properties_string,
244 self.tags_code,
245 encoded_buff_str,
246 self.encode_completed,
247 self.version
248 )
249 }
250}
251
252impl MessageTrait for MessageExtBrokerInner {
253 #[inline]
254 fn put_property(&mut self, key: CheetahString, value: CheetahString) {
255 self.message_ext_inner.put_property(key, value);
256 }
257
258 #[inline]
259 fn clear_property(&mut self, name: &str) {
260 self.message_ext_inner.clear_property(name);
261 }
262
263 #[inline]
264 fn get_property(&self, name: &CheetahString) -> Option<CheetahString> {
265 self.message_ext_inner.get_property(name)
266 }
267
268 #[inline]
269 fn get_topic(&self) -> &CheetahString {
270 self.message_ext_inner.get_topic()
271 }
272
273 #[inline]
274 fn set_topic(&mut self, topic: CheetahString) {
275 self.message_ext_inner.set_topic(topic);
276 }
277
278 #[inline]
279 fn get_flag(&self) -> i32 {
280 self.message_ext_inner.get_flag()
281 }
282
283 #[inline]
284 fn set_flag(&mut self, flag: i32) {
285 self.message_ext_inner.set_flag(flag);
286 }
287
288 #[inline]
289 fn get_body(&self) -> Option<&Bytes> {
290 self.message_ext_inner.get_body()
291 }
292
293 #[inline]
294 fn set_body(&mut self, body: Bytes) {
295 self.message_ext_inner.set_body(body);
296 }
297
298 #[inline]
299 fn get_properties(&self) -> &HashMap<CheetahString, CheetahString> {
300 self.message_ext_inner.get_properties()
301 }
302
303 #[inline]
304 fn set_properties(&mut self, properties: HashMap<CheetahString, CheetahString>) {
305 self.message_ext_inner.set_properties(properties);
306 }
307
308 #[inline]
309 fn get_transaction_id(&self) -> Option<&CheetahString> {
310 self.message_ext_inner.get_transaction_id()
311 }
312
313 #[inline]
314 fn set_transaction_id(&mut self, transaction_id: CheetahString) {
315 self.message_ext_inner.set_transaction_id(transaction_id);
316 }
317
318 #[inline]
319 fn get_compressed_body_mut(&mut self) -> &mut Option<Bytes> {
320 self.message_ext_inner.get_compressed_body_mut()
321 }
322
323 #[inline]
324 fn get_compressed_body(&self) -> Option<&Bytes> {
325 self.message_ext_inner.get_compressed_body()
326 }
327
328 #[inline]
329 fn set_compressed_body_mut(&mut self, compressed_body: Bytes) {
330 self.message_ext_inner
331 .set_compressed_body_mut(compressed_body);
332 }
333
334 #[inline]
335 fn take_body(&mut self) -> Option<Bytes> {
336 self.message_ext_inner.take_body()
337 }
338
339 #[inline]
340 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 #[inline]
345 fn as_any_mut(&mut self) -> &mut dyn Any {
346 self
347 }
348}