rocketmq_remoting/protocol/header/
reply_message_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
21
22#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct ReplyMessageRequestHeader {
26 #[required]
28 pub producer_group: CheetahString,
29
30 #[required]
32 pub topic: CheetahString,
33
34 #[required]
36 pub default_topic: CheetahString,
37
38 #[required]
40 pub default_topic_queue_nums: i32,
41
42 #[required]
44 pub queue_id: i32,
45
46 #[required]
48 pub sys_flag: i32,
49
50 #[required]
52 pub born_timestamp: i64,
53
54 #[required]
56 pub flag: i32,
57
58 pub properties: Option<CheetahString>,
60
61 pub reconsume_times: Option<i32>,
63
64 pub unit_mode: Option<bool>,
66
67 #[required]
69 pub born_host: CheetahString,
70
71 #[required]
73 pub store_host: CheetahString,
74
75 #[required]
77 pub store_timestamp: i64,
78
79 #[serde(flatten)]
80 pub topic_request: Option<TopicRequestHeader>,
81}
82
83#[cfg(test)]
84mod reply_message_request_header_tests {
85 use std::collections::HashMap;
86
87 use super::*;
88 use crate::protocol::command_custom_header::CommandCustomHeader;
89 use crate::protocol::command_custom_header::FromMap;
90 #[test]
91 fn deserialize_from_map_with_all_fields_populates_struct_correctly() {
92 let mut map: HashMap<CheetahString, CheetahString> = HashMap::new();
93 map.insert("producerGroup".into(), "test_producer_group".into());
94 map.insert("topic".into(), "test_topic".into());
95 map.insert("defaultTopic".into(), "test_default_topic".into());
96 map.insert("defaultTopicQueueNums".into(), "10".into());
97 map.insert("queueId".into(), "1".into());
98 map.insert("sysFlag".into(), "0".into());
99 map.insert("flag".into(), "0".into());
100 map.insert("bornTimestamp".into(), "1622547800".into());
101 map.insert("bornHost".into(), "test_born_host".into());
102 map.insert("storeHost".into(), "test_store_host".into());
103 map.insert("storeTimestamp".into(), "1622547800".into());
104 map.insert("unitMode".into(), "true".into());
105
106 let header: ReplyMessageRequestHeader = <ReplyMessageRequestHeader as FromMap>::from(&map).unwrap();
107
108 assert_eq!(header.topic, "test_topic");
109 assert_eq!(header.producer_group, "test_producer_group");
110 assert_eq!(header.default_topic, "test_default_topic");
111 assert_eq!(header.default_topic_queue_nums, 10);
112 assert_eq!(header.queue_id, 1);
113 assert_eq!(header.sys_flag, 0);
114 assert_eq!(header.flag, 0);
115 assert_eq!(header.born_timestamp, 1622547800);
116 assert_eq!(header.born_host, "test_born_host");
117 assert_eq!(header.store_host, "test_store_host");
118 assert_eq!(header.store_timestamp, 1622547800);
119 assert_eq!(header.properties, None);
120 assert_eq!(header.reconsume_times, None);
121 assert_eq!(header.unit_mode, Some(true));
122 }
123
124 #[test]
125 fn deserialize_from_map_with_invalid_number_fields_returns_none() {
126 let mut map = HashMap::new();
127 map.insert("producerGroup".into(), "test_producer_group".into());
128 map.insert("topic".into(), "test_topic".into());
129 map.insert("defaultTopic".into(), "test_default_topic".into());
130 map.insert("defaultTopicQueueNums".into(), "invalid".into());
131 let header: Result<ReplyMessageRequestHeader, rocketmq_error::RocketMQError> =
132 <ReplyMessageRequestHeader as FromMap>::from(&map);
133 assert!(header.is_err());
134 }
135
136 #[test]
137 fn serialize_header_to_map() {
138 let header = ReplyMessageRequestHeader {
139 topic: "test_topic".into(),
140 producer_group: "test_producer_group".into(),
141 default_topic: "test_default_topic".into(),
142 default_topic_queue_nums: 10,
143 queue_id: 1,
144 flag: 2,
145 sys_flag: 0,
146 born_timestamp: 1622547800,
147 born_host: "test_born_host".into(),
148 store_host: "test_store_host".into(),
149 store_timestamp: 1622547800,
150 properties: Some("test_properties".into()),
151 reconsume_times: Some(1),
152 unit_mode: Some(true),
153 topic_request: None,
154 };
155 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
156
157 assert_eq!(map.get("topic").unwrap(), "test_topic");
158 assert_eq!(map.get("producerGroup").unwrap(), "test_producer_group");
159 assert_eq!(map.get("defaultTopicQueueNums").unwrap(), "10");
160 assert_eq!(map.get("bornTimestamp").unwrap(), "1622547800");
161 assert!(!map.contains_key("topicRequest"));
162 assert_eq!(map.get("queueId").unwrap(), "1");
163 assert_eq!(map.get("sysFlag").unwrap(), "0");
164 assert_eq!(map.get("bornHost").unwrap(), "test_born_host");
165 assert_eq!(map.get("storeHost").unwrap(), "test_store_host");
166 assert_eq!(map.get("storeTimestamp").unwrap(), "1622547800");
167 assert_eq!(map.get("flag").unwrap(), "2");
168 assert_eq!(map.get("properties").unwrap(), "test_properties");
169 assert_eq!(map.get("reconsumeTimes").unwrap(), "1");
170 assert_eq!(map.get("unitMode").unwrap(), "true");
171 }
172
173 #[test]
174 fn serialize_header_to_map_with_topic_request_header_includes_nested_fields() {
175 let topic_request_header = TopicRequestHeader::default();
176 let header = ReplyMessageRequestHeader {
177 topic: "test_topic".into(),
178 producer_group: "test_producer_group".into(),
179 default_topic: "test_default_topic".into(),
180 default_topic_queue_nums: 10,
181 queue_id: 1,
182 flag: 2,
183 sys_flag: 0,
184 born_timestamp: 1622547800,
185 born_host: "test_born_host".into(),
186 store_host: "test_store_host".into(),
187 store_timestamp: 1622547800,
188 properties: Some("test_properties".into()),
189 reconsume_times: Some(1),
190 unit_mode: Some(true),
191 topic_request: Some(topic_request_header),
192 };
193 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
194 assert!(!map.contains_key("nestedField"));
195 }
196}