rocketmq_remoting/protocol/header/message_operation_header/
send_message_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::code::request_code::RequestCode;
21use crate::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
22use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
23use crate::protocol::remoting_command::RemotingCommand;
24use crate::rpc::topic_request_header::TopicRequestHeader;
25
26#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct SendMessageRequestHeader {
29 #[required]
30 pub producer_group: CheetahString,
31
32 #[required]
33 pub topic: CheetahString,
34
35 #[required]
36 pub default_topic: CheetahString,
37
38 #[required]
39 pub default_topic_queue_nums: i32,
40
41 #[required]
42 pub queue_id: i32,
43
44 #[required]
45 pub sys_flag: i32,
46
47 #[required]
48 pub born_timestamp: i64,
49
50 #[required]
51 pub flag: i32,
52
53 pub properties: Option<CheetahString>,
54 pub reconsume_times: Option<i32>,
55 pub unit_mode: Option<bool>,
56 pub batch: Option<bool>,
57 pub max_reconsume_times: Option<i32>,
58 #[serde(flatten)]
59 pub topic_request_header: Option<TopicRequestHeader>,
60}
61
62impl SendMessageRequestHeader {
63 #[inline(always)]
64 pub fn is_batch(&self) -> bool {
65 self.batch.unwrap_or_default()
66 }
67}
68
69impl TopicRequestHeaderTrait for SendMessageRequestHeader {
70 fn set_lo(&mut self, lo: Option<bool>) {
71 self.topic_request_header.as_mut().unwrap().lo = lo;
72 }
73
74 fn lo(&self) -> Option<bool> {
75 match self.topic_request_header {
76 None => None,
77 Some(ref value) => value.lo,
78 }
79 }
80
81 fn set_topic(&mut self, topic: CheetahString) {
82 self.topic = topic;
83 }
84
85 fn topic(&self) -> &CheetahString {
86 &self.topic
87 }
88
89 fn broker_name(&self) -> Option<&CheetahString> {
90 self.topic_request_header
91 .as_ref()?
92 .rpc_request_header
93 .as_ref()?
94 .broker_name
95 .as_ref()
96 }
97
98 fn set_broker_name(&mut self, broker_name: CheetahString) {
99 self.topic_request_header
100 .as_mut()
101 .unwrap()
102 .rpc_request_header
103 .as_mut()
104 .unwrap()
105 .broker_name = Some(broker_name);
106 }
107
108 fn namespace(&self) -> Option<&str> {
109 self.topic_request_header
110 .as_ref()?
111 .rpc_request_header
112 .as_ref()?
113 .namespace
114 .as_deref()
115 }
116
117 fn set_namespace(&mut self, namespace: CheetahString) {
118 self.topic_request_header
119 .as_mut()
120 .unwrap()
121 .rpc_request_header
122 .as_mut()
123 .unwrap()
124 .namespace = Some(namespace);
125 }
126
127 fn namespaced(&self) -> Option<bool> {
128 self.topic_request_header
129 .as_ref()?
130 .rpc_request_header
131 .as_ref()?
132 .namespaced
133 .as_ref()
134 .cloned()
135 }
136
137 fn set_namespaced(&mut self, namespaced: bool) {
138 self.topic_request_header
139 .as_mut()
140 .unwrap()
141 .rpc_request_header
142 .as_mut()
143 .unwrap()
144 .namespaced = Some(namespaced);
145 }
146
147 fn oneway(&self) -> Option<bool> {
148 self.topic_request_header
149 .as_ref()?
150 .rpc_request_header
151 .as_ref()?
152 .oneway
153 .as_ref()
154 .cloned()
155 }
156
157 fn set_oneway(&mut self, oneway: bool) {
158 self.topic_request_header
159 .as_mut()
160 .unwrap()
161 .rpc_request_header
162 .as_mut()
163 .unwrap()
164 .namespaced = Some(oneway);
165 }
166
167 fn queue_id(&self) -> i32 {
168 self.queue_id
169 }
170
171 fn set_queue_id(&mut self, queue_id: i32) {
172 self.queue_id = queue_id;
173 }
174}
175
176#[inline]
195pub fn parse_request_header(
196 request: &RemotingCommand,
197 request_code: RequestCode,
198) -> rocketmq_error::RocketMQResult<SendMessageRequestHeader> {
199 let mut request_header_v2 = None;
200 if RequestCode::SendMessageV2 == request_code || RequestCode::SendBatchMessage == request_code {
201 request_header_v2 = request
203 .decode_command_custom_header::<SendMessageRequestHeaderV2>()
204 .ok();
205 }
206 match request_header_v2 {
208 Some(header) => Ok(SendMessageRequestHeaderV2::create_send_message_request_header_v1(
209 &header,
210 )),
211 None => request.decode_command_custom_header::<SendMessageRequestHeader>(),
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::collections::HashMap;
218
219 use cheetah_string::CheetahString;
220
221 use super::*;
222 use crate::code::request_code::RequestCode;
223 use crate::protocol::command_custom_header::CommandCustomHeader;
224 use crate::protocol::command_custom_header::FromMap;
225 use crate::protocol::remoting_command::RemotingCommand;
226
227 #[test]
228 fn parse_request_header_handles_invalid_request_code() {
229 let request = RemotingCommand::create_remoting_command(RequestCode::SendBatchMessage);
230 let request_code = RequestCode::SendBatchMessage;
231 let result = parse_request_header(&request, request_code);
232 assert!(result.is_err());
233 }
234
235 #[test]
236 fn parse_request_header_handles_missing_header() {
237 let request = RemotingCommand::create_remoting_command(RequestCode::SendMessageV2);
238 let request_code = RequestCode::SendMessageV2;
239 let result = parse_request_header(&request, request_code);
240 assert!(result.is_err());
241 }
242
243 #[test]
244 fn send_message_request_header_serializes_correctly() {
245 let header = SendMessageRequestHeader {
246 producer_group: CheetahString::from_static_str("test_producer_group"),
247 topic: CheetahString::from_static_str("test_topic"),
248 default_topic: CheetahString::from_static_str("test_default_topic"),
249 default_topic_queue_nums: 8,
250 queue_id: 1,
251 sys_flag: 0,
252 born_timestamp: 1622547800000,
253 flag: 0,
254 properties: Some(CheetahString::from_static_str("test_properties")),
255 reconsume_times: Some(3),
256 unit_mode: Some(true),
257 batch: Some(false),
258 max_reconsume_times: Some(5),
259 topic_request_header: None,
260 };
261 let map = header.to_map().unwrap();
262 assert_eq!(
263 map.get(&CheetahString::from_static_str("producerGroup")).unwrap(),
264 "test_producer_group"
265 );
266 assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
267 assert_eq!(
268 map.get(&CheetahString::from_static_str("defaultTopic")).unwrap(),
269 "test_default_topic"
270 );
271 assert_eq!(
272 map.get(&CheetahString::from_static_str("defaultTopicQueueNums"))
273 .unwrap(),
274 "8"
275 );
276 assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
277 assert_eq!(map.get(&CheetahString::from_static_str("sysFlag")).unwrap(), "0");
278 assert_eq!(
279 map.get(&CheetahString::from_static_str("bornTimestamp")).unwrap(),
280 "1622547800000"
281 );
282 assert_eq!(map.get(&CheetahString::from_static_str("flag")).unwrap(), "0");
283 assert_eq!(
284 map.get(&CheetahString::from_static_str("properties")).unwrap(),
285 "test_properties"
286 );
287 assert_eq!(map.get(&CheetahString::from_static_str("reconsumeTimes")).unwrap(), "3");
288 assert_eq!(map.get(&CheetahString::from_static_str("unitMode")).unwrap(), "true");
289 assert_eq!(map.get(&CheetahString::from_static_str("batch")).unwrap(), "false");
290 assert_eq!(
291 map.get(&CheetahString::from_static_str("maxReconsumeTimes")).unwrap(),
292 "5"
293 );
294 }
295
296 #[test]
297 fn send_message_request_header_deserializes_correctly() {
298 let mut map = HashMap::new();
299 map.insert(
300 CheetahString::from_static_str("producerGroup"),
301 CheetahString::from_static_str("test_producer_group"),
302 );
303 map.insert(
304 CheetahString::from_static_str("topic"),
305 CheetahString::from_static_str("test_topic"),
306 );
307 map.insert(
308 CheetahString::from_static_str("defaultTopic"),
309 CheetahString::from_static_str("test_default_topic"),
310 );
311 map.insert(
312 CheetahString::from_static_str("defaultTopicQueueNums"),
313 CheetahString::from_static_str("8"),
314 );
315 map.insert(
316 CheetahString::from_static_str("queueId"),
317 CheetahString::from_static_str("1"),
318 );
319 map.insert(
320 CheetahString::from_static_str("sysFlag"),
321 CheetahString::from_static_str("0"),
322 );
323 map.insert(
324 CheetahString::from_static_str("bornTimestamp"),
325 CheetahString::from_static_str("1622547800000"),
326 );
327 map.insert(
328 CheetahString::from_static_str("flag"),
329 CheetahString::from_static_str("0"),
330 );
331 map.insert(
332 CheetahString::from_static_str("properties"),
333 CheetahString::from_static_str("test_properties"),
334 );
335 map.insert(
336 CheetahString::from_static_str("reconsumeTimes"),
337 CheetahString::from_static_str("3"),
338 );
339 map.insert(
340 CheetahString::from_static_str("unitMode"),
341 CheetahString::from_static_str("true"),
342 );
343 map.insert(
344 CheetahString::from_static_str("batch"),
345 CheetahString::from_static_str("false"),
346 );
347 map.insert(
348 CheetahString::from_static_str("maxReconsumeTimes"),
349 CheetahString::from_static_str("5"),
350 );
351
352 let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
353 assert_eq!(header.producer_group, "test_producer_group");
354 assert_eq!(header.topic, "test_topic");
355 assert_eq!(header.default_topic, "test_default_topic");
356 assert_eq!(header.default_topic_queue_nums, 8);
357 assert_eq!(header.queue_id, 1);
358 assert_eq!(header.sys_flag, 0);
359 assert_eq!(header.born_timestamp, 1622547800000);
360 assert_eq!(header.flag, 0);
361 assert_eq!(header.properties.unwrap(), "test_properties");
362 assert_eq!(header.reconsume_times.unwrap(), 3);
363 assert!(header.unit_mode.unwrap());
364 assert!(!header.batch.unwrap());
365 assert_eq!(header.max_reconsume_times.unwrap(), 5);
366 }
367
368 #[test]
369 fn send_message_request_header_handles_missing_optional_fields() {
370 let mut map = HashMap::new();
371 map.insert(
372 CheetahString::from_static_str("queueId"),
373 CheetahString::from_static_str("1"),
374 );
375 map.insert(
376 CheetahString::from_static_str("producerGroup"),
377 CheetahString::from_static_str("test_producer_group"),
378 );
379 map.insert(
380 CheetahString::from_static_str("topic"),
381 CheetahString::from_static_str("test_topic"),
382 );
383 map.insert(
384 CheetahString::from_static_str("defaultTopic"),
385 CheetahString::from_static_str("test_default_topic"),
386 );
387 map.insert(
388 CheetahString::from_static_str("defaultTopicQueueNums"),
389 CheetahString::from_static_str("8"),
390 );
391 map.insert(
392 CheetahString::from_static_str("sysFlag"),
393 CheetahString::from_static_str("0"),
394 );
395 map.insert(
396 CheetahString::from_static_str("bornTimestamp"),
397 CheetahString::from_static_str("1622547800000"),
398 );
399 map.insert(
400 CheetahString::from_static_str("flag"),
401 CheetahString::from_static_str("0"),
402 );
403
404 let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
405 assert_eq!(header.producer_group, "test_producer_group");
406 assert_eq!(header.topic, "test_topic");
407 assert_eq!(header.default_topic, "test_default_topic");
408 assert_eq!(header.default_topic_queue_nums, 8);
409 assert_eq!(header.sys_flag, 0);
411 assert_eq!(header.born_timestamp, 1622547800000);
412 assert_eq!(header.flag, 0);
413 assert!(header.properties.is_none());
414 assert!(header.reconsume_times.is_none());
415 assert!(header.unit_mode.is_none());
416 assert!(header.batch.is_none());
417 assert!(header.max_reconsume_times.is_none());
418 }
419
420 #[test]
421 fn send_message_request_header_handles_invalid_data() {
422 let mut map = HashMap::new();
423 map.insert(
424 CheetahString::from_static_str("producerGroup"),
425 CheetahString::from_static_str("test_producer_group"),
426 );
427 map.insert(
428 CheetahString::from_static_str("topic"),
429 CheetahString::from_static_str("test_topic"),
430 );
431 map.insert(
432 CheetahString::from_static_str("defaultTopic"),
433 CheetahString::from_static_str("test_default_topic"),
434 );
435 map.insert(
436 CheetahString::from_static_str("defaultTopicQueueNums"),
437 CheetahString::from_static_str("invalid"),
438 );
439 map.insert(
440 CheetahString::from_static_str("sysFlag"),
441 CheetahString::from_static_str("invalid"),
442 );
443 map.insert(
444 CheetahString::from_static_str("bornTimestamp"),
445 CheetahString::from_static_str("invalid"),
446 );
447 map.insert(
448 CheetahString::from_static_str("flag"),
449 CheetahString::from_static_str("invalid"),
450 );
451
452 let result = <SendMessageRequestHeader as FromMap>::from(&map);
453 assert!(result.is_err());
454 }
455}