rocketmq_remoting/protocol/header/
reply_message_request_header.rs1use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
23
24#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
26#[serde(rename_all = "camelCase")]
27pub struct ReplyMessageRequestHeader {
28 #[required]
30 pub producer_group: CheetahString,
31
32 #[required]
34 pub topic: CheetahString,
35
36 #[required]
38 pub default_topic: CheetahString,
39
40 #[required]
42 pub default_topic_queue_nums: i32,
43
44 #[required]
46 pub queue_id: i32,
47
48 #[required]
50 pub sys_flag: i32,
51
52 #[required]
54 pub born_timestamp: i64,
55
56 #[required]
58 pub flag: i32,
59
60 pub properties: Option<CheetahString>,
62
63 pub reconsume_times: Option<i32>,
65
66 pub unit_mode: Option<bool>,
68
69 #[required]
71 pub born_host: CheetahString,
72
73 #[required]
75 pub store_host: CheetahString,
76
77 #[required]
79 pub store_timestamp: i64,
80
81 #[serde(flatten)]
82 pub topic_request: Option<TopicRequestHeader>,
83}
84
85#[cfg(test)]
86mod reply_message_request_header_tests {
87 use std::collections::HashMap;
88
89 use super::*;
90 use crate::protocol::command_custom_header::CommandCustomHeader;
91 use crate::protocol::command_custom_header::FromMap;
92 #[test]
93 fn deserialize_from_map_with_all_fields_populates_struct_correctly() {
94 let mut map: HashMap<CheetahString, CheetahString> = HashMap::new();
95 map.insert("producerGroup".into(), "test_producer_group".into());
96 map.insert("topic".into(), "test_topic".into());
97 map.insert("defaultTopic".into(), "test_default_topic".into());
98 map.insert("defaultTopicQueueNums".into(), "10".into());
99 map.insert("queueId".into(), "1".into());
100 map.insert("sysFlag".into(), "0".into());
101 map.insert("flag".into(), "0".into());
102 map.insert("bornTimestamp".into(), "1622547800".into());
103 map.insert("bornHost".into(), "test_born_host".into());
104 map.insert("storeHost".into(), "test_store_host".into());
105 map.insert("storeTimestamp".into(), "1622547800".into());
106 map.insert("unitMode".into(), "true".into());
107
108 let header: ReplyMessageRequestHeader =
109 <ReplyMessageRequestHeader as FromMap>::from(&map).unwrap();
110
111 assert_eq!(header.topic, "test_topic");
112 assert_eq!(header.producer_group, "test_producer_group");
113 assert_eq!(header.default_topic, "test_default_topic");
114 assert_eq!(header.default_topic_queue_nums, 10);
115 assert_eq!(header.queue_id, 1);
116 assert_eq!(header.sys_flag, 0);
117 assert_eq!(header.flag, 0);
118 assert_eq!(header.born_timestamp, 1622547800);
119 assert_eq!(header.born_host, "test_born_host");
120 assert_eq!(header.store_host, "test_store_host");
121 assert_eq!(header.store_timestamp, 1622547800);
122 assert_eq!(header.properties, None);
123 assert_eq!(header.reconsume_times, None);
124 assert_eq!(header.unit_mode, Some(true));
125 }
126
127 #[test]
128 fn deserialize_from_map_with_invalid_number_fields_returns_none() {
129 let mut map = HashMap::new();
130 map.insert("producerGroup".into(), "test_producer_group".into());
131 map.insert("topic".into(), "test_topic".into());
132 map.insert("defaultTopic".into(), "test_default_topic".into());
133 map.insert("defaultTopicQueueNums".into(), "invalid".into());
134 let header: Result<ReplyMessageRequestHeader, rocketmq_error::RocketMQError> =
135 <ReplyMessageRequestHeader as FromMap>::from(&map);
136 assert!(header.is_err());
137 }
138
139 #[test]
140 fn serialize_header_to_map() {
141 let header = ReplyMessageRequestHeader {
142 topic: "test_topic".into(),
143 producer_group: "test_producer_group".into(),
144 default_topic: "test_default_topic".into(),
145 default_topic_queue_nums: 10,
146 queue_id: 1,
147 flag: 2,
148 sys_flag: 0,
149 born_timestamp: 1622547800,
150 born_host: "test_born_host".into(),
151 store_host: "test_store_host".into(),
152 store_timestamp: 1622547800,
153 properties: Some("test_properties".into()),
154 reconsume_times: Some(1),
155 unit_mode: Some(true),
156 topic_request: None,
157 };
158 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
159
160 assert_eq!(map.get("topic").unwrap(), "test_topic");
161 assert_eq!(map.get("producerGroup").unwrap(), "test_producer_group");
162 assert_eq!(map.get("defaultTopicQueueNums").unwrap(), "10");
163 assert_eq!(map.get("bornTimestamp").unwrap(), "1622547800");
164 assert!(!map.contains_key("topicRequest"));
165 assert_eq!(map.get("queueId").unwrap(), "1");
166 assert_eq!(map.get("sysFlag").unwrap(), "0");
167 assert_eq!(map.get("bornHost").unwrap(), "test_born_host");
168 assert_eq!(map.get("storeHost").unwrap(), "test_store_host");
169 assert_eq!(map.get("storeTimestamp").unwrap(), "1622547800");
170 assert_eq!(map.get("flag").unwrap(), "2");
171 assert_eq!(map.get("properties").unwrap(), "test_properties");
172 assert_eq!(map.get("reconsumeTimes").unwrap(), "1");
173 assert_eq!(map.get("unitMode").unwrap(), "true");
174 }
175
176 #[test]
177 fn serialize_header_to_map_with_topic_request_header_includes_nested_fields() {
178 let topic_request_header = TopicRequestHeader::default();
179 let header = ReplyMessageRequestHeader {
180 topic: "test_topic".into(),
181 producer_group: "test_producer_group".into(),
182 default_topic: "test_default_topic".into(),
183 default_topic_queue_nums: 10,
184 queue_id: 1,
185 flag: 2,
186 sys_flag: 0,
187 born_timestamp: 1622547800,
188 born_host: "test_born_host".into(),
189 store_host: "test_store_host".into(),
190 store_timestamp: 1622547800,
191 properties: Some("test_properties".into()),
192 reconsume_times: Some(1),
193 unit_mode: Some(true),
194 topic_request: Some(topic_request_header),
195 };
196 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
197 assert!(!map.contains_key("nestedField"));
198 }
199}