rocketmq_remoting/protocol/header/
polling_info_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct PollingInfoRequestHeader {
26 #[required]
27 pub consumer_group: CheetahString,
28
29 #[required]
30 pub topic: CheetahString,
31
32 #[required]
33 pub queue_id: i32,
34
35 #[serde(flatten)]
36 pub topic_request_header: Option<TopicRequestHeader>,
37}
38
39impl TopicRequestHeaderTrait for PollingInfoRequestHeader {
40 fn set_lo(&mut self, lo: Option<bool>) {
41 if let Some(header) = self.topic_request_header.as_mut() {
42 header.lo = lo;
43 }
44 }
45
46 fn lo(&self) -> Option<bool> {
47 self.topic_request_header.as_ref().and_then(|h| h.lo)
48 }
49
50 fn set_topic(&mut self, topic: CheetahString) {
51 self.topic = topic;
52 }
53
54 fn topic(&self) -> &CheetahString {
55 &self.topic
56 }
57
58 fn broker_name(&self) -> Option<&CheetahString> {
59 self.topic_request_header
60 .as_ref()
61 .and_then(|h| h.rpc.as_ref())
62 .and_then(|h| h.broker_name.as_ref())
63 }
64
65 fn set_broker_name(&mut self, broker_name: CheetahString) {
66 if let Some(header) = self.topic_request_header.as_mut() {
67 if let Some(rpc_header) = header.rpc.as_mut() {
68 rpc_header.broker_name = Some(broker_name);
69 }
70 }
71 }
72
73 fn namespace(&self) -> Option<&str> {
74 self.topic_request_header
75 .as_ref()
76 .and_then(|h| h.rpc.as_ref())
77 .and_then(|r| r.namespace.as_deref())
78 }
79
80 fn set_namespace(&mut self, namespace: CheetahString) {
81 if let Some(header) = self.topic_request_header.as_mut() {
82 if let Some(rpc_header) = header.rpc.as_mut() {
83 rpc_header.namespace = Some(namespace);
84 }
85 }
86 }
87
88 fn namespaced(&self) -> Option<bool> {
89 self.topic_request_header
90 .as_ref()
91 .and_then(|h| h.rpc.as_ref())
92 .and_then(|r| r.namespaced)
93 }
94
95 fn set_namespaced(&mut self, namespaced: bool) {
96 if let Some(header) = self.topic_request_header.as_mut() {
97 if let Some(rpc_header) = header.rpc.as_mut() {
98 rpc_header.namespaced = Some(namespaced);
99 }
100 }
101 }
102
103 fn oneway(&self) -> Option<bool> {
104 self.topic_request_header
105 .as_ref()
106 .and_then(|h| h.rpc.as_ref())
107 .and_then(|r| r.oneway)
108 }
109
110 fn set_oneway(&mut self, oneway: bool) {
111 if let Some(header) = self.topic_request_header.as_mut() {
112 if let Some(rpc_header) = header.rpc.as_mut() {
113 rpc_header.oneway = Some(oneway);
114 }
115 }
116 }
117
118 fn queue_id(&self) -> i32 {
119 self.queue_id
120 }
121
122 fn set_queue_id(&mut self, queue_id: i32) {
123 self.queue_id = queue_id;
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::collections::HashMap;
130
131 use super::*;
132 use crate::protocol::command_custom_header::CommandCustomHeader;
133 use crate::protocol::command_custom_header::FromMap;
134 use crate::rpc::rpc_request_header::RpcRequestHeader;
135
136 #[test]
137 fn polling_info_request_header_serializes_correctly() {
138 let header = PollingInfoRequestHeader {
139 consumer_group: CheetahString::from_static_str("test_consumer_group"),
140 topic: CheetahString::from_static_str("test_topic"),
141 queue_id: 1,
142 topic_request_header: None,
143 };
144 let map = header.to_map().unwrap();
145 assert_eq!(
146 map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
147 "test_consumer_group"
148 );
149 assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
150 assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
151 }
152
153 #[test]
154 fn polling_info_request_header_deserializes_correctly() {
155 let mut map = HashMap::new();
156 map.insert(
157 CheetahString::from_static_str("consumerGroup"),
158 CheetahString::from_static_str("test_consumer_group"),
159 );
160 map.insert(
161 CheetahString::from_static_str("topic"),
162 CheetahString::from_static_str("test_topic"),
163 );
164 map.insert(
165 CheetahString::from_static_str("queueId"),
166 CheetahString::from_static_str("1"),
167 );
168
169 let header = <PollingInfoRequestHeader as FromMap>::from(&map).unwrap();
170 assert_eq!(header.consumer_group, "test_consumer_group");
171 assert_eq!(header.topic, "test_topic");
172 assert_eq!(header.queue_id, 1);
173 assert!(header.topic_request_header.is_some());
174 }
175
176 #[test]
177 fn polling_info_request_header_handles_invalid_data() {
178 let mut map = HashMap::new();
179 map.insert(
180 CheetahString::from_static_str("consumerGroup"),
181 CheetahString::from_static_str("test_consumer_group"),
182 );
183 map.insert(
184 CheetahString::from_static_str("topic"),
185 CheetahString::from_static_str("test_topic"),
186 );
187 map.insert(
188 CheetahString::from_static_str("queueId"),
189 CheetahString::from_static_str("invalid"),
190 );
191
192 let result = <PollingInfoRequestHeader as FromMap>::from(&map);
193 assert!(result.is_err());
194 }
195
196 #[test]
197 fn topic_request_header_trait_set_and_get_lo() {
198 let mut header = PollingInfoRequestHeader {
199 consumer_group: CheetahString::from_static_str("test_group"),
200 topic: CheetahString::from_static_str("test_topic"),
201 queue_id: 0,
202 topic_request_header: Some(TopicRequestHeader::default()),
203 };
204
205 header.set_lo(Some(true));
206 assert_eq!(header.lo(), Some(true));
207
208 header.set_lo(Some(false));
209 assert_eq!(header.lo(), Some(false));
210
211 header.set_lo(None);
212 assert_eq!(header.lo(), None);
213
214 let mut header = PollingInfoRequestHeader {
215 consumer_group: CheetahString::from_static_str("test_group"),
216 topic: CheetahString::from_static_str("test_topic"),
217 queue_id: 0,
218 topic_request_header: None,
219 };
220
221 header.set_lo(Some(true));
222 assert_eq!(header.lo(), None);
223 }
224
225 #[test]
226 fn topic_request_header_trait_set_and_get_topic() {
227 let mut header = PollingInfoRequestHeader {
228 consumer_group: CheetahString::from_static_str("test_group"),
229 topic: CheetahString::from_static_str("test_topic"),
230 queue_id: 0,
231 topic_request_header: None,
232 };
233
234 header.set_topic(CheetahString::from_static_str("new_topic"));
235 assert_eq!(header.topic(), "new_topic");
236 }
237
238 #[test]
239 fn topic_request_header_trait_set_and_get_broker_name() {
240 let mut header = PollingInfoRequestHeader {
241 consumer_group: CheetahString::from_static_str("test_group"),
242 topic: CheetahString::from_static_str("test_topic"),
243 queue_id: 0,
244 topic_request_header: Some(TopicRequestHeader {
245 lo: None,
246 rpc: Some(RpcRequestHeader::default()),
247 }),
248 };
249
250 header.set_broker_name(CheetahString::from_static_str("new_broker"));
251 assert_eq!(
252 header.broker_name(),
253 Some(&CheetahString::from_static_str("new_broker"))
254 );
255
256 let mut header = PollingInfoRequestHeader {
257 consumer_group: CheetahString::from_static_str("test_group"),
258 topic: CheetahString::from_static_str("test_topic"),
259 queue_id: 0,
260 topic_request_header: None,
261 };
262
263 header.set_broker_name(CheetahString::from_static_str("new_broker"));
264 assert_eq!(header.broker_name(), None);
265 }
266
267 #[test]
268 fn topic_request_header_trait_set_and_get_namespace() {
269 let mut header = PollingInfoRequestHeader {
270 consumer_group: CheetahString::from_static_str("test_group"),
271 topic: CheetahString::from_static_str("test_topic"),
272 queue_id: 0,
273 topic_request_header: Some(TopicRequestHeader {
274 lo: None,
275 rpc: Some(RpcRequestHeader::default()),
276 }),
277 };
278
279 header.set_namespace(CheetahString::from_static_str("new_namespace"));
280 assert_eq!(header.namespace(), Some("new_namespace"));
281
282 let mut header = PollingInfoRequestHeader {
283 consumer_group: CheetahString::from_static_str("test_group"),
284 topic: CheetahString::from_static_str("test_topic"),
285 queue_id: 0,
286 topic_request_header: None,
287 };
288
289 header.set_namespace(CheetahString::from_static_str("new_namespace"));
290 assert_eq!(header.namespace(), None);
291 }
292
293 #[test]
294 fn topic_request_header_trait_set_and_get_namespaced() {
295 let mut header = PollingInfoRequestHeader {
296 consumer_group: CheetahString::from_static_str("test_group"),
297 topic: CheetahString::from_static_str("test_topic"),
298 queue_id: 0,
299 topic_request_header: Some(TopicRequestHeader {
300 lo: None,
301 rpc: Some(RpcRequestHeader::default()),
302 }),
303 };
304
305 header.set_namespaced(true);
306 assert_eq!(header.namespaced(), Some(true));
307
308 header.set_namespaced(false);
309 assert_eq!(header.namespaced(), Some(false));
310
311 let mut header = PollingInfoRequestHeader {
312 consumer_group: CheetahString::from_static_str("test_group"),
313 topic: CheetahString::from_static_str("test_topic"),
314 queue_id: 0,
315 topic_request_header: None,
316 };
317
318 header.set_namespaced(true);
319 assert_eq!(header.namespaced(), None);
320 }
321
322 #[test]
323 fn topic_request_header_trait_set_and_get_oneway() {
324 let mut header = PollingInfoRequestHeader {
325 consumer_group: CheetahString::from_static_str("test_group"),
326 topic: CheetahString::from_static_str("test_topic"),
327 queue_id: 0,
328 topic_request_header: Some(TopicRequestHeader {
329 lo: None,
330 rpc: Some(RpcRequestHeader::default()),
331 }),
332 };
333
334 header.set_oneway(true);
335 assert_eq!(header.oneway(), Some(true));
336
337 header.set_oneway(false);
338 assert_eq!(header.oneway(), Some(false));
339
340 let mut header = PollingInfoRequestHeader {
341 consumer_group: CheetahString::from_static_str("test_group"),
342 topic: CheetahString::from_static_str("test_topic"),
343 queue_id: 0,
344 topic_request_header: None,
345 };
346
347 header.set_oneway(true);
348 assert_eq!(header.oneway(), None);
349 }
350
351 #[test]
352 fn topic_request_header_trait_set_and_get_queue_id() {
353 let mut header = PollingInfoRequestHeader {
354 consumer_group: CheetahString::from_static_str("test_group"),
355 topic: CheetahString::from_static_str("test_topic"),
356 queue_id: 0,
357 topic_request_header: None,
358 };
359
360 header.set_queue_id(10);
361 assert_eq!(header.queue_id(), 10);
362
363 header.set_queue_id(-1);
364 assert_eq!(header.queue_id(), -1);
365 }
366}