rocketmq_remoting/protocol/header/
reply_message_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
23
24/// Represents the header of a reply message request.
25#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
26#[serde(rename_all = "camelCase")]
27pub struct ReplyMessageRequestHeader {
28    /// Producer group associated with the message.
29    #[required]
30    pub producer_group: CheetahString,
31
32    /// The topic of the message.
33    #[required]
34    pub topic: CheetahString,
35
36    /// Default topic used when the specified topic is not found.
37    #[required]
38    pub default_topic: CheetahString,
39
40    /// Number of queues in the default topic.
41    #[required]
42    pub default_topic_queue_nums: i32,
43
44    /// Queue ID of the message.
45    #[required]
46    pub queue_id: i32,
47
48    /// System flags associated with the message.
49    #[required]
50    pub sys_flag: i32,
51
52    /// Timestamp of when the message was born.
53    #[required]
54    pub born_timestamp: i64,
55
56    /// Flags associated with the message.
57    #[required]
58    pub flag: i32,
59
60    /// Properties of the message (nullable).
61    pub properties: Option<CheetahString>,
62
63    /// Number of times the message has been reconsumed (nullable).
64    pub reconsume_times: Option<i32>,
65
66    /// Whether the message processing is in unit mode (nullable).
67    pub unit_mode: Option<bool>,
68
69    /// Host where the message was born.
70    #[required]
71    pub born_host: CheetahString,
72
73    /// Host where the message is stored.
74    #[required]
75    pub store_host: CheetahString,
76
77    /// Timestamp of when the message was stored.
78    #[required]
79    pub store_timestamp: i64,
80
81    #[serde(flatten)]
82    pub topic_request: Option<TopicRequestHeader>,
83}
84
85#[cfg(test)]
86mod reply_message_request_header_tests {
87    use std::collections::HashMap;
88
89    use super::*;
90    use crate::protocol::command_custom_header::CommandCustomHeader;
91    use crate::protocol::command_custom_header::FromMap;
92    #[test]
93    fn deserialize_from_map_with_all_fields_populates_struct_correctly() {
94        let mut map: HashMap<CheetahString, CheetahString> = HashMap::new();
95        map.insert("producerGroup".into(), "test_producer_group".into());
96        map.insert("topic".into(), "test_topic".into());
97        map.insert("defaultTopic".into(), "test_default_topic".into());
98        map.insert("defaultTopicQueueNums".into(), "10".into());
99        map.insert("queueId".into(), "1".into());
100        map.insert("sysFlag".into(), "0".into());
101        map.insert("flag".into(), "0".into());
102        map.insert("bornTimestamp".into(), "1622547800".into());
103        map.insert("bornHost".into(), "test_born_host".into());
104        map.insert("storeHost".into(), "test_store_host".into());
105        map.insert("storeTimestamp".into(), "1622547800".into());
106        map.insert("unitMode".into(), "true".into());
107
108        let header: ReplyMessageRequestHeader =
109            <ReplyMessageRequestHeader as FromMap>::from(&map).unwrap();
110
111        assert_eq!(header.topic, "test_topic");
112        assert_eq!(header.producer_group, "test_producer_group");
113        assert_eq!(header.default_topic, "test_default_topic");
114        assert_eq!(header.default_topic_queue_nums, 10);
115        assert_eq!(header.queue_id, 1);
116        assert_eq!(header.sys_flag, 0);
117        assert_eq!(header.flag, 0);
118        assert_eq!(header.born_timestamp, 1622547800);
119        assert_eq!(header.born_host, "test_born_host");
120        assert_eq!(header.store_host, "test_store_host");
121        assert_eq!(header.store_timestamp, 1622547800);
122        assert_eq!(header.properties, None);
123        assert_eq!(header.reconsume_times, None);
124        assert_eq!(header.unit_mode, Some(true));
125    }
126
127    #[test]
128    fn deserialize_from_map_with_invalid_number_fields_returns_none() {
129        let mut map = HashMap::new();
130        map.insert("producerGroup".into(), "test_producer_group".into());
131        map.insert("topic".into(), "test_topic".into());
132        map.insert("defaultTopic".into(), "test_default_topic".into());
133        map.insert("defaultTopicQueueNums".into(), "invalid".into());
134        let header: Result<ReplyMessageRequestHeader, rocketmq_error::RocketMQError> =
135            <ReplyMessageRequestHeader as FromMap>::from(&map);
136        assert!(header.is_err());
137    }
138
139    #[test]
140    fn serialize_header_to_map() {
141        let header = ReplyMessageRequestHeader {
142            topic: "test_topic".into(),
143            producer_group: "test_producer_group".into(),
144            default_topic: "test_default_topic".into(),
145            default_topic_queue_nums: 10,
146            queue_id: 1,
147            flag: 2,
148            sys_flag: 0,
149            born_timestamp: 1622547800,
150            born_host: "test_born_host".into(),
151            store_host: "test_store_host".into(),
152            store_timestamp: 1622547800,
153            properties: Some("test_properties".into()),
154            reconsume_times: Some(1),
155            unit_mode: Some(true),
156            topic_request: None,
157        };
158        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
159
160        assert_eq!(map.get("topic").unwrap(), "test_topic");
161        assert_eq!(map.get("producerGroup").unwrap(), "test_producer_group");
162        assert_eq!(map.get("defaultTopicQueueNums").unwrap(), "10");
163        assert_eq!(map.get("bornTimestamp").unwrap(), "1622547800");
164        assert!(!map.contains_key("topicRequest"));
165        assert_eq!(map.get("queueId").unwrap(), "1");
166        assert_eq!(map.get("sysFlag").unwrap(), "0");
167        assert_eq!(map.get("bornHost").unwrap(), "test_born_host");
168        assert_eq!(map.get("storeHost").unwrap(), "test_store_host");
169        assert_eq!(map.get("storeTimestamp").unwrap(), "1622547800");
170        assert_eq!(map.get("flag").unwrap(), "2");
171        assert_eq!(map.get("properties").unwrap(), "test_properties");
172        assert_eq!(map.get("reconsumeTimes").unwrap(), "1");
173        assert_eq!(map.get("unitMode").unwrap(), "true");
174    }
175
176    #[test]
177    fn serialize_header_to_map_with_topic_request_header_includes_nested_fields() {
178        let topic_request_header = TopicRequestHeader::default();
179        let header = ReplyMessageRequestHeader {
180            topic: "test_topic".into(),
181            producer_group: "test_producer_group".into(),
182            default_topic: "test_default_topic".into(),
183            default_topic_queue_nums: 10,
184            queue_id: 1,
185            flag: 2,
186            sys_flag: 0,
187            born_timestamp: 1622547800,
188            born_host: "test_born_host".into(),
189            store_host: "test_store_host".into(),
190            store_timestamp: 1622547800,
191            properties: Some("test_properties".into()),
192            reconsume_times: Some(1),
193            unit_mode: Some(true),
194            topic_request: Some(topic_request_header),
195        };
196        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
197        assert!(!map.contains_key("nestedField"));
198    }
199}