rocketmq_remoting/protocol/header/message_operation_header/
send_message_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::code::request_code::RequestCode;
24use crate::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
25use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
26use crate::protocol::remoting_command::RemotingCommand;
27use crate::rpc::topic_request_header::TopicRequestHeader;
28
29#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
30#[serde(rename_all = "camelCase")]
31pub struct SendMessageRequestHeader {
32    #[required]
33    pub producer_group: CheetahString,
34
35    #[required]
36    pub topic: CheetahString,
37
38    #[required]
39    pub default_topic: CheetahString,
40
41    #[required]
42    pub default_topic_queue_nums: i32,
43
44    #[required]
45    pub queue_id: i32,
46
47    #[required]
48    pub sys_flag: i32,
49
50    #[required]
51    pub born_timestamp: i64,
52
53    #[required]
54    pub flag: i32,
55
56    pub properties: Option<CheetahString>,
57    pub reconsume_times: Option<i32>,
58    pub unit_mode: Option<bool>,
59    pub batch: Option<bool>,
60    pub max_reconsume_times: Option<i32>,
61    #[serde(flatten)]
62    pub topic_request_header: Option<TopicRequestHeader>,
63}
64
65impl SendMessageRequestHeader {
66    #[inline(always)]
67    pub fn is_batch(&self) -> bool {
68        self.batch.unwrap_or_default()
69    }
70}
71
72impl TopicRequestHeaderTrait for SendMessageRequestHeader {
73    fn set_lo(&mut self, lo: Option<bool>) {
74        self.topic_request_header.as_mut().unwrap().lo = lo;
75    }
76
77    fn lo(&self) -> Option<bool> {
78        match self.topic_request_header {
79            None => None,
80            Some(ref value) => value.lo,
81        }
82    }
83
84    fn set_topic(&mut self, topic: CheetahString) {
85        self.topic = topic;
86    }
87
88    fn topic(&self) -> &CheetahString {
89        &self.topic
90    }
91
92    fn broker_name(&self) -> Option<&CheetahString> {
93        self.topic_request_header
94            .as_ref()?
95            .rpc_request_header
96            .as_ref()?
97            .broker_name
98            .as_ref()
99    }
100
101    fn set_broker_name(&mut self, broker_name: CheetahString) {
102        self.topic_request_header
103            .as_mut()
104            .unwrap()
105            .rpc_request_header
106            .as_mut()
107            .unwrap()
108            .broker_name = Some(broker_name);
109    }
110
111    fn namespace(&self) -> Option<&str> {
112        self.topic_request_header
113            .as_ref()?
114            .rpc_request_header
115            .as_ref()?
116            .namespace
117            .as_deref()
118    }
119
120    fn set_namespace(&mut self, namespace: CheetahString) {
121        self.topic_request_header
122            .as_mut()
123            .unwrap()
124            .rpc_request_header
125            .as_mut()
126            .unwrap()
127            .namespace = Some(namespace);
128    }
129
130    fn namespaced(&self) -> Option<bool> {
131        self.topic_request_header
132            .as_ref()?
133            .rpc_request_header
134            .as_ref()?
135            .namespaced
136            .as_ref()
137            .cloned()
138    }
139
140    fn set_namespaced(&mut self, namespaced: bool) {
141        self.topic_request_header
142            .as_mut()
143            .unwrap()
144            .rpc_request_header
145            .as_mut()
146            .unwrap()
147            .namespaced = Some(namespaced);
148    }
149
150    fn oneway(&self) -> Option<bool> {
151        self.topic_request_header
152            .as_ref()?
153            .rpc_request_header
154            .as_ref()?
155            .oneway
156            .as_ref()
157            .cloned()
158    }
159
160    fn set_oneway(&mut self, oneway: bool) {
161        self.topic_request_header
162            .as_mut()
163            .unwrap()
164            .rpc_request_header
165            .as_mut()
166            .unwrap()
167            .namespaced = Some(oneway);
168    }
169
170    fn queue_id(&self) -> i32 {
171        self.queue_id
172    }
173
174    fn set_queue_id(&mut self, queue_id: i32) {
175        self.queue_id = queue_id;
176    }
177}
178
179/// Parses the request header from a `RemotingCommand` based on the `RequestCode`.
180///
181/// This function attempts to decode the command custom header from the provided `RemotingCommand`.
182/// If the `RequestCode` is `SendMessageV2` or `SendBatchMessage`, it tries to decode the header
183/// as `SendMessageRequestHeaderV2`. If successful, it converts the `V2` header to a `V1` header.
184/// Otherwise, it decodes the header directly as `SendMessageRequestHeader`.
185///
186/// # Arguments
187///
188/// * `request` - A reference to the `RemotingCommand` containing the command and its headers.
189/// * `request_code` - The `RequestCode` that indicates which version of the request header to
190///   expect.
191///
192/// # Returns
193///
194/// * `Ok(SendMessageRequestHeader)` if the header is successfully parsed and converted (if
195///   necessary).
196/// * `Err(crate::Error)` if there is an error in decoding the header.
197#[inline]
198pub fn parse_request_header(
199    request: &RemotingCommand,
200    request_code: RequestCode,
201) -> rocketmq_error::RocketMQResult<SendMessageRequestHeader> {
202    let mut request_header_v2 = None;
203    if RequestCode::SendMessageV2 == request_code || RequestCode::SendBatchMessage == request_code {
204        // Attempt to decode the command custom header as SendMessageRequestHeaderV2
205        request_header_v2 = request
206            .decode_command_custom_header::<SendMessageRequestHeaderV2>()
207            .ok();
208    }
209    // Match on the result of the V2 header decoding
210    match request_header_v2 {
211        Some(header) => {
212            Ok(SendMessageRequestHeaderV2::create_send_message_request_header_v1(&header))
213        }
214        None => request.decode_command_custom_header::<SendMessageRequestHeader>(),
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::collections::HashMap;
221
222    use cheetah_string::CheetahString;
223
224    use super::*;
225    use crate::code::request_code::RequestCode;
226    use crate::protocol::command_custom_header::CommandCustomHeader;
227    use crate::protocol::command_custom_header::FromMap;
228    use crate::protocol::remoting_command::RemotingCommand;
229
230    #[test]
231    fn parse_request_header_handles_invalid_request_code() {
232        let request = RemotingCommand::create_remoting_command(RequestCode::SendBatchMessage);
233        let request_code = RequestCode::SendBatchMessage;
234        let result = parse_request_header(&request, request_code);
235        assert!(result.is_err());
236    }
237
238    #[test]
239    fn parse_request_header_handles_missing_header() {
240        let request = RemotingCommand::create_remoting_command(RequestCode::SendMessageV2);
241        let request_code = RequestCode::SendMessageV2;
242        let result = parse_request_header(&request, request_code);
243        assert!(result.is_err());
244    }
245
246    #[test]
247    fn send_message_request_header_serializes_correctly() {
248        let header = SendMessageRequestHeader {
249            producer_group: CheetahString::from_static_str("test_producer_group"),
250            topic: CheetahString::from_static_str("test_topic"),
251            default_topic: CheetahString::from_static_str("test_default_topic"),
252            default_topic_queue_nums: 8,
253            queue_id: 1,
254            sys_flag: 0,
255            born_timestamp: 1622547800000,
256            flag: 0,
257            properties: Some(CheetahString::from_static_str("test_properties")),
258            reconsume_times: Some(3),
259            unit_mode: Some(true),
260            batch: Some(false),
261            max_reconsume_times: Some(5),
262            topic_request_header: None,
263        };
264        let map = header.to_map().unwrap();
265        assert_eq!(
266            map.get(&CheetahString::from_static_str("producerGroup"))
267                .unwrap(),
268            "test_producer_group"
269        );
270        assert_eq!(
271            map.get(&CheetahString::from_static_str("topic")).unwrap(),
272            "test_topic"
273        );
274        assert_eq!(
275            map.get(&CheetahString::from_static_str("defaultTopic"))
276                .unwrap(),
277            "test_default_topic"
278        );
279        assert_eq!(
280            map.get(&CheetahString::from_static_str("defaultTopicQueueNums"))
281                .unwrap(),
282            "8"
283        );
284        assert_eq!(
285            map.get(&CheetahString::from_static_str("queueId")).unwrap(),
286            "1"
287        );
288        assert_eq!(
289            map.get(&CheetahString::from_static_str("sysFlag")).unwrap(),
290            "0"
291        );
292        assert_eq!(
293            map.get(&CheetahString::from_static_str("bornTimestamp"))
294                .unwrap(),
295            "1622547800000"
296        );
297        assert_eq!(
298            map.get(&CheetahString::from_static_str("flag")).unwrap(),
299            "0"
300        );
301        assert_eq!(
302            map.get(&CheetahString::from_static_str("properties"))
303                .unwrap(),
304            "test_properties"
305        );
306        assert_eq!(
307            map.get(&CheetahString::from_static_str("reconsumeTimes"))
308                .unwrap(),
309            "3"
310        );
311        assert_eq!(
312            map.get(&CheetahString::from_static_str("unitMode"))
313                .unwrap(),
314            "true"
315        );
316        assert_eq!(
317            map.get(&CheetahString::from_static_str("batch")).unwrap(),
318            "false"
319        );
320        assert_eq!(
321            map.get(&CheetahString::from_static_str("maxReconsumeTimes"))
322                .unwrap(),
323            "5"
324        );
325    }
326
327    #[test]
328    fn send_message_request_header_deserializes_correctly() {
329        let mut map = HashMap::new();
330        map.insert(
331            CheetahString::from_static_str("producerGroup"),
332            CheetahString::from_static_str("test_producer_group"),
333        );
334        map.insert(
335            CheetahString::from_static_str("topic"),
336            CheetahString::from_static_str("test_topic"),
337        );
338        map.insert(
339            CheetahString::from_static_str("defaultTopic"),
340            CheetahString::from_static_str("test_default_topic"),
341        );
342        map.insert(
343            CheetahString::from_static_str("defaultTopicQueueNums"),
344            CheetahString::from_static_str("8"),
345        );
346        map.insert(
347            CheetahString::from_static_str("queueId"),
348            CheetahString::from_static_str("1"),
349        );
350        map.insert(
351            CheetahString::from_static_str("sysFlag"),
352            CheetahString::from_static_str("0"),
353        );
354        map.insert(
355            CheetahString::from_static_str("bornTimestamp"),
356            CheetahString::from_static_str("1622547800000"),
357        );
358        map.insert(
359            CheetahString::from_static_str("flag"),
360            CheetahString::from_static_str("0"),
361        );
362        map.insert(
363            CheetahString::from_static_str("properties"),
364            CheetahString::from_static_str("test_properties"),
365        );
366        map.insert(
367            CheetahString::from_static_str("reconsumeTimes"),
368            CheetahString::from_static_str("3"),
369        );
370        map.insert(
371            CheetahString::from_static_str("unitMode"),
372            CheetahString::from_static_str("true"),
373        );
374        map.insert(
375            CheetahString::from_static_str("batch"),
376            CheetahString::from_static_str("false"),
377        );
378        map.insert(
379            CheetahString::from_static_str("maxReconsumeTimes"),
380            CheetahString::from_static_str("5"),
381        );
382
383        let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
384        assert_eq!(header.producer_group, "test_producer_group");
385        assert_eq!(header.topic, "test_topic");
386        assert_eq!(header.default_topic, "test_default_topic");
387        assert_eq!(header.default_topic_queue_nums, 8);
388        assert_eq!(header.queue_id, 1);
389        assert_eq!(header.sys_flag, 0);
390        assert_eq!(header.born_timestamp, 1622547800000);
391        assert_eq!(header.flag, 0);
392        assert_eq!(header.properties.unwrap(), "test_properties");
393        assert_eq!(header.reconsume_times.unwrap(), 3);
394        assert!(header.unit_mode.unwrap());
395        assert!(!header.batch.unwrap());
396        assert_eq!(header.max_reconsume_times.unwrap(), 5);
397    }
398
399    #[test]
400    fn send_message_request_header_handles_missing_optional_fields() {
401        let mut map = HashMap::new();
402        map.insert(
403            CheetahString::from_static_str("queueId"),
404            CheetahString::from_static_str("1"),
405        );
406        map.insert(
407            CheetahString::from_static_str("producerGroup"),
408            CheetahString::from_static_str("test_producer_group"),
409        );
410        map.insert(
411            CheetahString::from_static_str("topic"),
412            CheetahString::from_static_str("test_topic"),
413        );
414        map.insert(
415            CheetahString::from_static_str("defaultTopic"),
416            CheetahString::from_static_str("test_default_topic"),
417        );
418        map.insert(
419            CheetahString::from_static_str("defaultTopicQueueNums"),
420            CheetahString::from_static_str("8"),
421        );
422        map.insert(
423            CheetahString::from_static_str("sysFlag"),
424            CheetahString::from_static_str("0"),
425        );
426        map.insert(
427            CheetahString::from_static_str("bornTimestamp"),
428            CheetahString::from_static_str("1622547800000"),
429        );
430        map.insert(
431            CheetahString::from_static_str("flag"),
432            CheetahString::from_static_str("0"),
433        );
434
435        let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
436        assert_eq!(header.producer_group, "test_producer_group");
437        assert_eq!(header.topic, "test_topic");
438        assert_eq!(header.default_topic, "test_default_topic");
439        assert_eq!(header.default_topic_queue_nums, 8);
440        //assert!(header.queue_id.is_some());
441        assert_eq!(header.sys_flag, 0);
442        assert_eq!(header.born_timestamp, 1622547800000);
443        assert_eq!(header.flag, 0);
444        assert!(header.properties.is_none());
445        assert!(header.reconsume_times.is_none());
446        assert!(header.unit_mode.is_none());
447        assert!(header.batch.is_none());
448        assert!(header.max_reconsume_times.is_none());
449    }
450
451    #[test]
452    fn send_message_request_header_handles_invalid_data() {
453        let mut map = HashMap::new();
454        map.insert(
455            CheetahString::from_static_str("producerGroup"),
456            CheetahString::from_static_str("test_producer_group"),
457        );
458        map.insert(
459            CheetahString::from_static_str("topic"),
460            CheetahString::from_static_str("test_topic"),
461        );
462        map.insert(
463            CheetahString::from_static_str("defaultTopic"),
464            CheetahString::from_static_str("test_default_topic"),
465        );
466        map.insert(
467            CheetahString::from_static_str("defaultTopicQueueNums"),
468            CheetahString::from_static_str("invalid"),
469        );
470        map.insert(
471            CheetahString::from_static_str("sysFlag"),
472            CheetahString::from_static_str("invalid"),
473        );
474        map.insert(
475            CheetahString::from_static_str("bornTimestamp"),
476            CheetahString::from_static_str("invalid"),
477        );
478        map.insert(
479            CheetahString::from_static_str("flag"),
480            CheetahString::from_static_str("invalid"),
481        );
482
483        let result = <SendMessageRequestHeader as FromMap>::from(&map);
484        assert!(result.is_err());
485    }
486}