Skip to main content

rocketmq_remoting/protocol/header/message_operation_header/
send_message_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::code::request_code::RequestCode;
21use crate::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
22use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
23use crate::protocol::remoting_command::RemotingCommand;
24use crate::rpc::topic_request_header::TopicRequestHeader;
25
26#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct SendMessageRequestHeader {
29    #[required]
30    pub producer_group: CheetahString,
31
32    #[required]
33    pub topic: CheetahString,
34
35    #[required]
36    pub default_topic: CheetahString,
37
38    #[required]
39    pub default_topic_queue_nums: i32,
40
41    #[required]
42    pub queue_id: i32,
43
44    #[required]
45    pub sys_flag: i32,
46
47    #[required]
48    pub born_timestamp: i64,
49
50    #[required]
51    pub flag: i32,
52
53    pub properties: Option<CheetahString>,
54    pub reconsume_times: Option<i32>,
55    pub unit_mode: Option<bool>,
56    pub batch: Option<bool>,
57    pub max_reconsume_times: Option<i32>,
58    #[serde(flatten)]
59    pub topic_request_header: Option<TopicRequestHeader>,
60}
61
62impl SendMessageRequestHeader {
63    #[inline(always)]
64    pub fn is_batch(&self) -> bool {
65        self.batch.unwrap_or_default()
66    }
67}
68
69impl TopicRequestHeaderTrait for SendMessageRequestHeader {
70    fn set_lo(&mut self, lo: Option<bool>) {
71        self.topic_request_header.as_mut().unwrap().lo = lo;
72    }
73
74    fn lo(&self) -> Option<bool> {
75        match self.topic_request_header {
76            None => None,
77            Some(ref value) => value.lo,
78        }
79    }
80
81    fn set_topic(&mut self, topic: CheetahString) {
82        self.topic = topic;
83    }
84
85    fn topic(&self) -> &CheetahString {
86        &self.topic
87    }
88
89    fn broker_name(&self) -> Option<&CheetahString> {
90        self.topic_request_header
91            .as_ref()?
92            .rpc_request_header
93            .as_ref()?
94            .broker_name
95            .as_ref()
96    }
97
98    fn set_broker_name(&mut self, broker_name: CheetahString) {
99        self.topic_request_header
100            .as_mut()
101            .unwrap()
102            .rpc_request_header
103            .as_mut()
104            .unwrap()
105            .broker_name = Some(broker_name);
106    }
107
108    fn namespace(&self) -> Option<&str> {
109        self.topic_request_header
110            .as_ref()?
111            .rpc_request_header
112            .as_ref()?
113            .namespace
114            .as_deref()
115    }
116
117    fn set_namespace(&mut self, namespace: CheetahString) {
118        self.topic_request_header
119            .as_mut()
120            .unwrap()
121            .rpc_request_header
122            .as_mut()
123            .unwrap()
124            .namespace = Some(namespace);
125    }
126
127    fn namespaced(&self) -> Option<bool> {
128        self.topic_request_header
129            .as_ref()?
130            .rpc_request_header
131            .as_ref()?
132            .namespaced
133            .as_ref()
134            .cloned()
135    }
136
137    fn set_namespaced(&mut self, namespaced: bool) {
138        self.topic_request_header
139            .as_mut()
140            .unwrap()
141            .rpc_request_header
142            .as_mut()
143            .unwrap()
144            .namespaced = Some(namespaced);
145    }
146
147    fn oneway(&self) -> Option<bool> {
148        self.topic_request_header
149            .as_ref()?
150            .rpc_request_header
151            .as_ref()?
152            .oneway
153            .as_ref()
154            .cloned()
155    }
156
157    fn set_oneway(&mut self, oneway: bool) {
158        self.topic_request_header
159            .as_mut()
160            .unwrap()
161            .rpc_request_header
162            .as_mut()
163            .unwrap()
164            .namespaced = Some(oneway);
165    }
166
167    fn queue_id(&self) -> i32 {
168        self.queue_id
169    }
170
171    fn set_queue_id(&mut self, queue_id: i32) {
172        self.queue_id = queue_id;
173    }
174}
175
176/// Parses the request header from a `RemotingCommand` based on the `RequestCode`.
177///
178/// This function attempts to decode the command custom header from the provided `RemotingCommand`.
179/// If the `RequestCode` is `SendMessageV2` or `SendBatchMessage`, it tries to decode the header
180/// as `SendMessageRequestHeaderV2`. If successful, it converts the `V2` header to a `V1` header.
181/// Otherwise, it decodes the header directly as `SendMessageRequestHeader`.
182///
183/// # Arguments
184///
185/// * `request` - A reference to the `RemotingCommand` containing the command and its headers.
186/// * `request_code` - The `RequestCode` that indicates which version of the request header to
187///   expect.
188///
189/// # Returns
190///
191/// * `Ok(SendMessageRequestHeader)` if the header is successfully parsed and converted (if
192///   necessary).
193/// * `Err(crate::Error)` if there is an error in decoding the header.
194#[inline]
195pub fn parse_request_header(
196    request: &RemotingCommand,
197    request_code: RequestCode,
198) -> rocketmq_error::RocketMQResult<SendMessageRequestHeader> {
199    let mut request_header_v2 = None;
200    if RequestCode::SendMessageV2 == request_code || RequestCode::SendBatchMessage == request_code {
201        // Attempt to decode the command custom header as SendMessageRequestHeaderV2
202        request_header_v2 = request
203            .decode_command_custom_header::<SendMessageRequestHeaderV2>()
204            .ok();
205    }
206    // Match on the result of the V2 header decoding
207    match request_header_v2 {
208        Some(header) => Ok(SendMessageRequestHeaderV2::create_send_message_request_header_v1(
209            &header,
210        )),
211        None => request.decode_command_custom_header::<SendMessageRequestHeader>(),
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::collections::HashMap;
218
219    use cheetah_string::CheetahString;
220
221    use super::*;
222    use crate::code::request_code::RequestCode;
223    use crate::protocol::command_custom_header::CommandCustomHeader;
224    use crate::protocol::command_custom_header::FromMap;
225    use crate::protocol::remoting_command::RemotingCommand;
226
227    #[test]
228    fn parse_request_header_handles_invalid_request_code() {
229        let request = RemotingCommand::create_remoting_command(RequestCode::SendBatchMessage);
230        let request_code = RequestCode::SendBatchMessage;
231        let result = parse_request_header(&request, request_code);
232        assert!(result.is_err());
233    }
234
235    #[test]
236    fn parse_request_header_handles_missing_header() {
237        let request = RemotingCommand::create_remoting_command(RequestCode::SendMessageV2);
238        let request_code = RequestCode::SendMessageV2;
239        let result = parse_request_header(&request, request_code);
240        assert!(result.is_err());
241    }
242
243    #[test]
244    fn send_message_request_header_serializes_correctly() {
245        let header = SendMessageRequestHeader {
246            producer_group: CheetahString::from_static_str("test_producer_group"),
247            topic: CheetahString::from_static_str("test_topic"),
248            default_topic: CheetahString::from_static_str("test_default_topic"),
249            default_topic_queue_nums: 8,
250            queue_id: 1,
251            sys_flag: 0,
252            born_timestamp: 1622547800000,
253            flag: 0,
254            properties: Some(CheetahString::from_static_str("test_properties")),
255            reconsume_times: Some(3),
256            unit_mode: Some(true),
257            batch: Some(false),
258            max_reconsume_times: Some(5),
259            topic_request_header: None,
260        };
261        let map = header.to_map().unwrap();
262        assert_eq!(
263            map.get(&CheetahString::from_static_str("producerGroup")).unwrap(),
264            "test_producer_group"
265        );
266        assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
267        assert_eq!(
268            map.get(&CheetahString::from_static_str("defaultTopic")).unwrap(),
269            "test_default_topic"
270        );
271        assert_eq!(
272            map.get(&CheetahString::from_static_str("defaultTopicQueueNums"))
273                .unwrap(),
274            "8"
275        );
276        assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
277        assert_eq!(map.get(&CheetahString::from_static_str("sysFlag")).unwrap(), "0");
278        assert_eq!(
279            map.get(&CheetahString::from_static_str("bornTimestamp")).unwrap(),
280            "1622547800000"
281        );
282        assert_eq!(map.get(&CheetahString::from_static_str("flag")).unwrap(), "0");
283        assert_eq!(
284            map.get(&CheetahString::from_static_str("properties")).unwrap(),
285            "test_properties"
286        );
287        assert_eq!(map.get(&CheetahString::from_static_str("reconsumeTimes")).unwrap(), "3");
288        assert_eq!(map.get(&CheetahString::from_static_str("unitMode")).unwrap(), "true");
289        assert_eq!(map.get(&CheetahString::from_static_str("batch")).unwrap(), "false");
290        assert_eq!(
291            map.get(&CheetahString::from_static_str("maxReconsumeTimes")).unwrap(),
292            "5"
293        );
294    }
295
296    #[test]
297    fn send_message_request_header_deserializes_correctly() {
298        let mut map = HashMap::new();
299        map.insert(
300            CheetahString::from_static_str("producerGroup"),
301            CheetahString::from_static_str("test_producer_group"),
302        );
303        map.insert(
304            CheetahString::from_static_str("topic"),
305            CheetahString::from_static_str("test_topic"),
306        );
307        map.insert(
308            CheetahString::from_static_str("defaultTopic"),
309            CheetahString::from_static_str("test_default_topic"),
310        );
311        map.insert(
312            CheetahString::from_static_str("defaultTopicQueueNums"),
313            CheetahString::from_static_str("8"),
314        );
315        map.insert(
316            CheetahString::from_static_str("queueId"),
317            CheetahString::from_static_str("1"),
318        );
319        map.insert(
320            CheetahString::from_static_str("sysFlag"),
321            CheetahString::from_static_str("0"),
322        );
323        map.insert(
324            CheetahString::from_static_str("bornTimestamp"),
325            CheetahString::from_static_str("1622547800000"),
326        );
327        map.insert(
328            CheetahString::from_static_str("flag"),
329            CheetahString::from_static_str("0"),
330        );
331        map.insert(
332            CheetahString::from_static_str("properties"),
333            CheetahString::from_static_str("test_properties"),
334        );
335        map.insert(
336            CheetahString::from_static_str("reconsumeTimes"),
337            CheetahString::from_static_str("3"),
338        );
339        map.insert(
340            CheetahString::from_static_str("unitMode"),
341            CheetahString::from_static_str("true"),
342        );
343        map.insert(
344            CheetahString::from_static_str("batch"),
345            CheetahString::from_static_str("false"),
346        );
347        map.insert(
348            CheetahString::from_static_str("maxReconsumeTimes"),
349            CheetahString::from_static_str("5"),
350        );
351
352        let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
353        assert_eq!(header.producer_group, "test_producer_group");
354        assert_eq!(header.topic, "test_topic");
355        assert_eq!(header.default_topic, "test_default_topic");
356        assert_eq!(header.default_topic_queue_nums, 8);
357        assert_eq!(header.queue_id, 1);
358        assert_eq!(header.sys_flag, 0);
359        assert_eq!(header.born_timestamp, 1622547800000);
360        assert_eq!(header.flag, 0);
361        assert_eq!(header.properties.unwrap(), "test_properties");
362        assert_eq!(header.reconsume_times.unwrap(), 3);
363        assert!(header.unit_mode.unwrap());
364        assert!(!header.batch.unwrap());
365        assert_eq!(header.max_reconsume_times.unwrap(), 5);
366    }
367
368    #[test]
369    fn send_message_request_header_handles_missing_optional_fields() {
370        let mut map = HashMap::new();
371        map.insert(
372            CheetahString::from_static_str("queueId"),
373            CheetahString::from_static_str("1"),
374        );
375        map.insert(
376            CheetahString::from_static_str("producerGroup"),
377            CheetahString::from_static_str("test_producer_group"),
378        );
379        map.insert(
380            CheetahString::from_static_str("topic"),
381            CheetahString::from_static_str("test_topic"),
382        );
383        map.insert(
384            CheetahString::from_static_str("defaultTopic"),
385            CheetahString::from_static_str("test_default_topic"),
386        );
387        map.insert(
388            CheetahString::from_static_str("defaultTopicQueueNums"),
389            CheetahString::from_static_str("8"),
390        );
391        map.insert(
392            CheetahString::from_static_str("sysFlag"),
393            CheetahString::from_static_str("0"),
394        );
395        map.insert(
396            CheetahString::from_static_str("bornTimestamp"),
397            CheetahString::from_static_str("1622547800000"),
398        );
399        map.insert(
400            CheetahString::from_static_str("flag"),
401            CheetahString::from_static_str("0"),
402        );
403
404        let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
405        assert_eq!(header.producer_group, "test_producer_group");
406        assert_eq!(header.topic, "test_topic");
407        assert_eq!(header.default_topic, "test_default_topic");
408        assert_eq!(header.default_topic_queue_nums, 8);
409        //assert!(header.queue_id.is_some());
410        assert_eq!(header.sys_flag, 0);
411        assert_eq!(header.born_timestamp, 1622547800000);
412        assert_eq!(header.flag, 0);
413        assert!(header.properties.is_none());
414        assert!(header.reconsume_times.is_none());
415        assert!(header.unit_mode.is_none());
416        assert!(header.batch.is_none());
417        assert!(header.max_reconsume_times.is_none());
418    }
419
420    #[test]
421    fn send_message_request_header_handles_invalid_data() {
422        let mut map = HashMap::new();
423        map.insert(
424            CheetahString::from_static_str("producerGroup"),
425            CheetahString::from_static_str("test_producer_group"),
426        );
427        map.insert(
428            CheetahString::from_static_str("topic"),
429            CheetahString::from_static_str("test_topic"),
430        );
431        map.insert(
432            CheetahString::from_static_str("defaultTopic"),
433            CheetahString::from_static_str("test_default_topic"),
434        );
435        map.insert(
436            CheetahString::from_static_str("defaultTopicQueueNums"),
437            CheetahString::from_static_str("invalid"),
438        );
439        map.insert(
440            CheetahString::from_static_str("sysFlag"),
441            CheetahString::from_static_str("invalid"),
442        );
443        map.insert(
444            CheetahString::from_static_str("bornTimestamp"),
445            CheetahString::from_static_str("invalid"),
446        );
447        map.insert(
448            CheetahString::from_static_str("flag"),
449            CheetahString::from_static_str("invalid"),
450        );
451
452        let result = <SendMessageRequestHeader as FromMap>::from(&map);
453        assert!(result.is_err());
454    }
455}