rocketmq_remoting/protocol/header/message_operation_header/
send_message_request_header.rs1use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::code::request_code::RequestCode;
24use crate::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
25use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
26use crate::protocol::remoting_command::RemotingCommand;
27use crate::rpc::topic_request_header::TopicRequestHeader;
28
29#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
30#[serde(rename_all = "camelCase")]
31pub struct SendMessageRequestHeader {
32 #[required]
33 pub producer_group: CheetahString,
34
35 #[required]
36 pub topic: CheetahString,
37
38 #[required]
39 pub default_topic: CheetahString,
40
41 #[required]
42 pub default_topic_queue_nums: i32,
43
44 #[required]
45 pub queue_id: i32,
46
47 #[required]
48 pub sys_flag: i32,
49
50 #[required]
51 pub born_timestamp: i64,
52
53 #[required]
54 pub flag: i32,
55
56 pub properties: Option<CheetahString>,
57 pub reconsume_times: Option<i32>,
58 pub unit_mode: Option<bool>,
59 pub batch: Option<bool>,
60 pub max_reconsume_times: Option<i32>,
61 #[serde(flatten)]
62 pub topic_request_header: Option<TopicRequestHeader>,
63}
64
65impl SendMessageRequestHeader {
66 #[inline(always)]
67 pub fn is_batch(&self) -> bool {
68 self.batch.unwrap_or_default()
69 }
70}
71
72impl TopicRequestHeaderTrait for SendMessageRequestHeader {
73 fn set_lo(&mut self, lo: Option<bool>) {
74 self.topic_request_header.as_mut().unwrap().lo = lo;
75 }
76
77 fn lo(&self) -> Option<bool> {
78 match self.topic_request_header {
79 None => None,
80 Some(ref value) => value.lo,
81 }
82 }
83
84 fn set_topic(&mut self, topic: CheetahString) {
85 self.topic = topic;
86 }
87
88 fn topic(&self) -> &CheetahString {
89 &self.topic
90 }
91
92 fn broker_name(&self) -> Option<&CheetahString> {
93 self.topic_request_header
94 .as_ref()?
95 .rpc_request_header
96 .as_ref()?
97 .broker_name
98 .as_ref()
99 }
100
101 fn set_broker_name(&mut self, broker_name: CheetahString) {
102 self.topic_request_header
103 .as_mut()
104 .unwrap()
105 .rpc_request_header
106 .as_mut()
107 .unwrap()
108 .broker_name = Some(broker_name);
109 }
110
111 fn namespace(&self) -> Option<&str> {
112 self.topic_request_header
113 .as_ref()?
114 .rpc_request_header
115 .as_ref()?
116 .namespace
117 .as_deref()
118 }
119
120 fn set_namespace(&mut self, namespace: CheetahString) {
121 self.topic_request_header
122 .as_mut()
123 .unwrap()
124 .rpc_request_header
125 .as_mut()
126 .unwrap()
127 .namespace = Some(namespace);
128 }
129
130 fn namespaced(&self) -> Option<bool> {
131 self.topic_request_header
132 .as_ref()?
133 .rpc_request_header
134 .as_ref()?
135 .namespaced
136 .as_ref()
137 .cloned()
138 }
139
140 fn set_namespaced(&mut self, namespaced: bool) {
141 self.topic_request_header
142 .as_mut()
143 .unwrap()
144 .rpc_request_header
145 .as_mut()
146 .unwrap()
147 .namespaced = Some(namespaced);
148 }
149
150 fn oneway(&self) -> Option<bool> {
151 self.topic_request_header
152 .as_ref()?
153 .rpc_request_header
154 .as_ref()?
155 .oneway
156 .as_ref()
157 .cloned()
158 }
159
160 fn set_oneway(&mut self, oneway: bool) {
161 self.topic_request_header
162 .as_mut()
163 .unwrap()
164 .rpc_request_header
165 .as_mut()
166 .unwrap()
167 .namespaced = Some(oneway);
168 }
169
170 fn queue_id(&self) -> i32 {
171 self.queue_id
172 }
173
174 fn set_queue_id(&mut self, queue_id: i32) {
175 self.queue_id = queue_id;
176 }
177}
178
179#[inline]
198pub fn parse_request_header(
199 request: &RemotingCommand,
200 request_code: RequestCode,
201) -> rocketmq_error::RocketMQResult<SendMessageRequestHeader> {
202 let mut request_header_v2 = None;
203 if RequestCode::SendMessageV2 == request_code || RequestCode::SendBatchMessage == request_code {
204 request_header_v2 = request
206 .decode_command_custom_header::<SendMessageRequestHeaderV2>()
207 .ok();
208 }
209 match request_header_v2 {
211 Some(header) => {
212 Ok(SendMessageRequestHeaderV2::create_send_message_request_header_v1(&header))
213 }
214 None => request.decode_command_custom_header::<SendMessageRequestHeader>(),
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::collections::HashMap;
221
222 use cheetah_string::CheetahString;
223
224 use super::*;
225 use crate::code::request_code::RequestCode;
226 use crate::protocol::command_custom_header::CommandCustomHeader;
227 use crate::protocol::command_custom_header::FromMap;
228 use crate::protocol::remoting_command::RemotingCommand;
229
230 #[test]
231 fn parse_request_header_handles_invalid_request_code() {
232 let request = RemotingCommand::create_remoting_command(RequestCode::SendBatchMessage);
233 let request_code = RequestCode::SendBatchMessage;
234 let result = parse_request_header(&request, request_code);
235 assert!(result.is_err());
236 }
237
238 #[test]
239 fn parse_request_header_handles_missing_header() {
240 let request = RemotingCommand::create_remoting_command(RequestCode::SendMessageV2);
241 let request_code = RequestCode::SendMessageV2;
242 let result = parse_request_header(&request, request_code);
243 assert!(result.is_err());
244 }
245
246 #[test]
247 fn send_message_request_header_serializes_correctly() {
248 let header = SendMessageRequestHeader {
249 producer_group: CheetahString::from_static_str("test_producer_group"),
250 topic: CheetahString::from_static_str("test_topic"),
251 default_topic: CheetahString::from_static_str("test_default_topic"),
252 default_topic_queue_nums: 8,
253 queue_id: 1,
254 sys_flag: 0,
255 born_timestamp: 1622547800000,
256 flag: 0,
257 properties: Some(CheetahString::from_static_str("test_properties")),
258 reconsume_times: Some(3),
259 unit_mode: Some(true),
260 batch: Some(false),
261 max_reconsume_times: Some(5),
262 topic_request_header: None,
263 };
264 let map = header.to_map().unwrap();
265 assert_eq!(
266 map.get(&CheetahString::from_static_str("producerGroup"))
267 .unwrap(),
268 "test_producer_group"
269 );
270 assert_eq!(
271 map.get(&CheetahString::from_static_str("topic")).unwrap(),
272 "test_topic"
273 );
274 assert_eq!(
275 map.get(&CheetahString::from_static_str("defaultTopic"))
276 .unwrap(),
277 "test_default_topic"
278 );
279 assert_eq!(
280 map.get(&CheetahString::from_static_str("defaultTopicQueueNums"))
281 .unwrap(),
282 "8"
283 );
284 assert_eq!(
285 map.get(&CheetahString::from_static_str("queueId")).unwrap(),
286 "1"
287 );
288 assert_eq!(
289 map.get(&CheetahString::from_static_str("sysFlag")).unwrap(),
290 "0"
291 );
292 assert_eq!(
293 map.get(&CheetahString::from_static_str("bornTimestamp"))
294 .unwrap(),
295 "1622547800000"
296 );
297 assert_eq!(
298 map.get(&CheetahString::from_static_str("flag")).unwrap(),
299 "0"
300 );
301 assert_eq!(
302 map.get(&CheetahString::from_static_str("properties"))
303 .unwrap(),
304 "test_properties"
305 );
306 assert_eq!(
307 map.get(&CheetahString::from_static_str("reconsumeTimes"))
308 .unwrap(),
309 "3"
310 );
311 assert_eq!(
312 map.get(&CheetahString::from_static_str("unitMode"))
313 .unwrap(),
314 "true"
315 );
316 assert_eq!(
317 map.get(&CheetahString::from_static_str("batch")).unwrap(),
318 "false"
319 );
320 assert_eq!(
321 map.get(&CheetahString::from_static_str("maxReconsumeTimes"))
322 .unwrap(),
323 "5"
324 );
325 }
326
327 #[test]
328 fn send_message_request_header_deserializes_correctly() {
329 let mut map = HashMap::new();
330 map.insert(
331 CheetahString::from_static_str("producerGroup"),
332 CheetahString::from_static_str("test_producer_group"),
333 );
334 map.insert(
335 CheetahString::from_static_str("topic"),
336 CheetahString::from_static_str("test_topic"),
337 );
338 map.insert(
339 CheetahString::from_static_str("defaultTopic"),
340 CheetahString::from_static_str("test_default_topic"),
341 );
342 map.insert(
343 CheetahString::from_static_str("defaultTopicQueueNums"),
344 CheetahString::from_static_str("8"),
345 );
346 map.insert(
347 CheetahString::from_static_str("queueId"),
348 CheetahString::from_static_str("1"),
349 );
350 map.insert(
351 CheetahString::from_static_str("sysFlag"),
352 CheetahString::from_static_str("0"),
353 );
354 map.insert(
355 CheetahString::from_static_str("bornTimestamp"),
356 CheetahString::from_static_str("1622547800000"),
357 );
358 map.insert(
359 CheetahString::from_static_str("flag"),
360 CheetahString::from_static_str("0"),
361 );
362 map.insert(
363 CheetahString::from_static_str("properties"),
364 CheetahString::from_static_str("test_properties"),
365 );
366 map.insert(
367 CheetahString::from_static_str("reconsumeTimes"),
368 CheetahString::from_static_str("3"),
369 );
370 map.insert(
371 CheetahString::from_static_str("unitMode"),
372 CheetahString::from_static_str("true"),
373 );
374 map.insert(
375 CheetahString::from_static_str("batch"),
376 CheetahString::from_static_str("false"),
377 );
378 map.insert(
379 CheetahString::from_static_str("maxReconsumeTimes"),
380 CheetahString::from_static_str("5"),
381 );
382
383 let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
384 assert_eq!(header.producer_group, "test_producer_group");
385 assert_eq!(header.topic, "test_topic");
386 assert_eq!(header.default_topic, "test_default_topic");
387 assert_eq!(header.default_topic_queue_nums, 8);
388 assert_eq!(header.queue_id, 1);
389 assert_eq!(header.sys_flag, 0);
390 assert_eq!(header.born_timestamp, 1622547800000);
391 assert_eq!(header.flag, 0);
392 assert_eq!(header.properties.unwrap(), "test_properties");
393 assert_eq!(header.reconsume_times.unwrap(), 3);
394 assert!(header.unit_mode.unwrap());
395 assert!(!header.batch.unwrap());
396 assert_eq!(header.max_reconsume_times.unwrap(), 5);
397 }
398
399 #[test]
400 fn send_message_request_header_handles_missing_optional_fields() {
401 let mut map = HashMap::new();
402 map.insert(
403 CheetahString::from_static_str("queueId"),
404 CheetahString::from_static_str("1"),
405 );
406 map.insert(
407 CheetahString::from_static_str("producerGroup"),
408 CheetahString::from_static_str("test_producer_group"),
409 );
410 map.insert(
411 CheetahString::from_static_str("topic"),
412 CheetahString::from_static_str("test_topic"),
413 );
414 map.insert(
415 CheetahString::from_static_str("defaultTopic"),
416 CheetahString::from_static_str("test_default_topic"),
417 );
418 map.insert(
419 CheetahString::from_static_str("defaultTopicQueueNums"),
420 CheetahString::from_static_str("8"),
421 );
422 map.insert(
423 CheetahString::from_static_str("sysFlag"),
424 CheetahString::from_static_str("0"),
425 );
426 map.insert(
427 CheetahString::from_static_str("bornTimestamp"),
428 CheetahString::from_static_str("1622547800000"),
429 );
430 map.insert(
431 CheetahString::from_static_str("flag"),
432 CheetahString::from_static_str("0"),
433 );
434
435 let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
436 assert_eq!(header.producer_group, "test_producer_group");
437 assert_eq!(header.topic, "test_topic");
438 assert_eq!(header.default_topic, "test_default_topic");
439 assert_eq!(header.default_topic_queue_nums, 8);
440 assert_eq!(header.sys_flag, 0);
442 assert_eq!(header.born_timestamp, 1622547800000);
443 assert_eq!(header.flag, 0);
444 assert!(header.properties.is_none());
445 assert!(header.reconsume_times.is_none());
446 assert!(header.unit_mode.is_none());
447 assert!(header.batch.is_none());
448 assert!(header.max_reconsume_times.is_none());
449 }
450
451 #[test]
452 fn send_message_request_header_handles_invalid_data() {
453 let mut map = HashMap::new();
454 map.insert(
455 CheetahString::from_static_str("producerGroup"),
456 CheetahString::from_static_str("test_producer_group"),
457 );
458 map.insert(
459 CheetahString::from_static_str("topic"),
460 CheetahString::from_static_str("test_topic"),
461 );
462 map.insert(
463 CheetahString::from_static_str("defaultTopic"),
464 CheetahString::from_static_str("test_default_topic"),
465 );
466 map.insert(
467 CheetahString::from_static_str("defaultTopicQueueNums"),
468 CheetahString::from_static_str("invalid"),
469 );
470 map.insert(
471 CheetahString::from_static_str("sysFlag"),
472 CheetahString::from_static_str("invalid"),
473 );
474 map.insert(
475 CheetahString::from_static_str("bornTimestamp"),
476 CheetahString::from_static_str("invalid"),
477 );
478 map.insert(
479 CheetahString::from_static_str("flag"),
480 CheetahString::from_static_str("invalid"),
481 );
482
483 let result = <SendMessageRequestHeader as FromMap>::from(&map);
484 assert!(result.is_err());
485 }
486}