Skip to main content

rocketmq_remoting/protocol/header/
reply_message_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
21
22/// Represents the header of a reply message request.
23#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct ReplyMessageRequestHeader {
26    /// Producer group associated with the message.
27    #[required]
28    pub producer_group: CheetahString,
29
30    /// The topic of the message.
31    #[required]
32    pub topic: CheetahString,
33
34    /// Default topic used when the specified topic is not found.
35    #[required]
36    pub default_topic: CheetahString,
37
38    /// Number of queues in the default topic.
39    #[required]
40    pub default_topic_queue_nums: i32,
41
42    /// Queue ID of the message.
43    #[required]
44    pub queue_id: i32,
45
46    /// System flags associated with the message.
47    #[required]
48    pub sys_flag: i32,
49
50    /// Timestamp of when the message was born.
51    #[required]
52    pub born_timestamp: i64,
53
54    /// Flags associated with the message.
55    #[required]
56    pub flag: i32,
57
58    /// Properties of the message (nullable).
59    pub properties: Option<CheetahString>,
60
61    /// Number of times the message has been reconsumed (nullable).
62    pub reconsume_times: Option<i32>,
63
64    /// Whether the message processing is in unit mode (nullable).
65    pub unit_mode: Option<bool>,
66
67    /// Host where the message was born.
68    #[required]
69    pub born_host: CheetahString,
70
71    /// Host where the message is stored.
72    #[required]
73    pub store_host: CheetahString,
74
75    /// Timestamp of when the message was stored.
76    #[required]
77    pub store_timestamp: i64,
78
79    #[serde(flatten)]
80    pub topic_request: Option<TopicRequestHeader>,
81}
82
83#[cfg(test)]
84mod reply_message_request_header_tests {
85    use std::collections::HashMap;
86
87    use super::*;
88    use crate::protocol::command_custom_header::CommandCustomHeader;
89    use crate::protocol::command_custom_header::FromMap;
90    #[test]
91    fn deserialize_from_map_with_all_fields_populates_struct_correctly() {
92        let mut map: HashMap<CheetahString, CheetahString> = HashMap::new();
93        map.insert("producerGroup".into(), "test_producer_group".into());
94        map.insert("topic".into(), "test_topic".into());
95        map.insert("defaultTopic".into(), "test_default_topic".into());
96        map.insert("defaultTopicQueueNums".into(), "10".into());
97        map.insert("queueId".into(), "1".into());
98        map.insert("sysFlag".into(), "0".into());
99        map.insert("flag".into(), "0".into());
100        map.insert("bornTimestamp".into(), "1622547800".into());
101        map.insert("bornHost".into(), "test_born_host".into());
102        map.insert("storeHost".into(), "test_store_host".into());
103        map.insert("storeTimestamp".into(), "1622547800".into());
104        map.insert("unitMode".into(), "true".into());
105
106        let header: ReplyMessageRequestHeader = <ReplyMessageRequestHeader as FromMap>::from(&map).unwrap();
107
108        assert_eq!(header.topic, "test_topic");
109        assert_eq!(header.producer_group, "test_producer_group");
110        assert_eq!(header.default_topic, "test_default_topic");
111        assert_eq!(header.default_topic_queue_nums, 10);
112        assert_eq!(header.queue_id, 1);
113        assert_eq!(header.sys_flag, 0);
114        assert_eq!(header.flag, 0);
115        assert_eq!(header.born_timestamp, 1622547800);
116        assert_eq!(header.born_host, "test_born_host");
117        assert_eq!(header.store_host, "test_store_host");
118        assert_eq!(header.store_timestamp, 1622547800);
119        assert_eq!(header.properties, None);
120        assert_eq!(header.reconsume_times, None);
121        assert_eq!(header.unit_mode, Some(true));
122    }
123
124    #[test]
125    fn deserialize_from_map_with_invalid_number_fields_returns_none() {
126        let mut map = HashMap::new();
127        map.insert("producerGroup".into(), "test_producer_group".into());
128        map.insert("topic".into(), "test_topic".into());
129        map.insert("defaultTopic".into(), "test_default_topic".into());
130        map.insert("defaultTopicQueueNums".into(), "invalid".into());
131        let header: Result<ReplyMessageRequestHeader, rocketmq_error::RocketMQError> =
132            <ReplyMessageRequestHeader as FromMap>::from(&map);
133        assert!(header.is_err());
134    }
135
136    #[test]
137    fn serialize_header_to_map() {
138        let header = ReplyMessageRequestHeader {
139            topic: "test_topic".into(),
140            producer_group: "test_producer_group".into(),
141            default_topic: "test_default_topic".into(),
142            default_topic_queue_nums: 10,
143            queue_id: 1,
144            flag: 2,
145            sys_flag: 0,
146            born_timestamp: 1622547800,
147            born_host: "test_born_host".into(),
148            store_host: "test_store_host".into(),
149            store_timestamp: 1622547800,
150            properties: Some("test_properties".into()),
151            reconsume_times: Some(1),
152            unit_mode: Some(true),
153            topic_request: None,
154        };
155        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
156
157        assert_eq!(map.get("topic").unwrap(), "test_topic");
158        assert_eq!(map.get("producerGroup").unwrap(), "test_producer_group");
159        assert_eq!(map.get("defaultTopicQueueNums").unwrap(), "10");
160        assert_eq!(map.get("bornTimestamp").unwrap(), "1622547800");
161        assert!(!map.contains_key("topicRequest"));
162        assert_eq!(map.get("queueId").unwrap(), "1");
163        assert_eq!(map.get("sysFlag").unwrap(), "0");
164        assert_eq!(map.get("bornHost").unwrap(), "test_born_host");
165        assert_eq!(map.get("storeHost").unwrap(), "test_store_host");
166        assert_eq!(map.get("storeTimestamp").unwrap(), "1622547800");
167        assert_eq!(map.get("flag").unwrap(), "2");
168        assert_eq!(map.get("properties").unwrap(), "test_properties");
169        assert_eq!(map.get("reconsumeTimes").unwrap(), "1");
170        assert_eq!(map.get("unitMode").unwrap(), "true");
171    }
172
173    #[test]
174    fn serialize_header_to_map_with_topic_request_header_includes_nested_fields() {
175        let topic_request_header = TopicRequestHeader::default();
176        let header = ReplyMessageRequestHeader {
177            topic: "test_topic".into(),
178            producer_group: "test_producer_group".into(),
179            default_topic: "test_default_topic".into(),
180            default_topic_queue_nums: 10,
181            queue_id: 1,
182            flag: 2,
183            sys_flag: 0,
184            born_timestamp: 1622547800,
185            born_host: "test_born_host".into(),
186            store_host: "test_store_host".into(),
187            store_timestamp: 1622547800,
188            properties: Some("test_properties".into()),
189            reconsume_times: Some(1),
190            unit_mode: Some(true),
191            topic_request: Some(topic_request_header),
192        };
193        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
194        assert!(!map.contains_key("nestedField"));
195    }
196}