Skip to main content

rocketmq_remoting/protocol/header/
polling_info_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct PollingInfoRequestHeader {
26    #[required]
27    pub consumer_group: CheetahString,
28
29    #[required]
30    pub topic: CheetahString,
31
32    #[required]
33    pub queue_id: i32,
34
35    #[serde(flatten)]
36    pub topic_request_header: Option<TopicRequestHeader>,
37}
38
39impl TopicRequestHeaderTrait for PollingInfoRequestHeader {
40    fn set_lo(&mut self, lo: Option<bool>) {
41        if let Some(header) = self.topic_request_header.as_mut() {
42            header.lo = lo;
43        }
44    }
45
46    fn lo(&self) -> Option<bool> {
47        self.topic_request_header.as_ref().and_then(|h| h.lo)
48    }
49
50    fn set_topic(&mut self, topic: CheetahString) {
51        self.topic = topic;
52    }
53
54    fn topic(&self) -> &CheetahString {
55        &self.topic
56    }
57
58    fn broker_name(&self) -> Option<&CheetahString> {
59        self.topic_request_header
60            .as_ref()
61            .and_then(|h| h.rpc.as_ref())
62            .and_then(|h| h.broker_name.as_ref())
63    }
64
65    fn set_broker_name(&mut self, broker_name: CheetahString) {
66        if let Some(header) = self.topic_request_header.as_mut() {
67            if let Some(rpc_header) = header.rpc.as_mut() {
68                rpc_header.broker_name = Some(broker_name);
69            }
70        }
71    }
72
73    fn namespace(&self) -> Option<&str> {
74        self.topic_request_header
75            .as_ref()
76            .and_then(|h| h.rpc.as_ref())
77            .and_then(|r| r.namespace.as_deref())
78    }
79
80    fn set_namespace(&mut self, namespace: CheetahString) {
81        if let Some(header) = self.topic_request_header.as_mut() {
82            if let Some(rpc_header) = header.rpc.as_mut() {
83                rpc_header.namespace = Some(namespace);
84            }
85        }
86    }
87
88    fn namespaced(&self) -> Option<bool> {
89        self.topic_request_header
90            .as_ref()
91            .and_then(|h| h.rpc.as_ref())
92            .and_then(|r| r.namespaced)
93    }
94
95    fn set_namespaced(&mut self, namespaced: bool) {
96        if let Some(header) = self.topic_request_header.as_mut() {
97            if let Some(rpc_header) = header.rpc.as_mut() {
98                rpc_header.namespaced = Some(namespaced);
99            }
100        }
101    }
102
103    fn oneway(&self) -> Option<bool> {
104        self.topic_request_header
105            .as_ref()
106            .and_then(|h| h.rpc.as_ref())
107            .and_then(|r| r.oneway)
108    }
109
110    fn set_oneway(&mut self, oneway: bool) {
111        if let Some(header) = self.topic_request_header.as_mut() {
112            if let Some(rpc_header) = header.rpc.as_mut() {
113                rpc_header.oneway = Some(oneway);
114            }
115        }
116    }
117
118    fn queue_id(&self) -> i32 {
119        self.queue_id
120    }
121
122    fn set_queue_id(&mut self, queue_id: i32) {
123        self.queue_id = queue_id;
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use std::collections::HashMap;
130
131    use super::*;
132    use crate::protocol::command_custom_header::CommandCustomHeader;
133    use crate::protocol::command_custom_header::FromMap;
134    use crate::rpc::rpc_request_header::RpcRequestHeader;
135
136    #[test]
137    fn polling_info_request_header_serializes_correctly() {
138        let header = PollingInfoRequestHeader {
139            consumer_group: CheetahString::from_static_str("test_consumer_group"),
140            topic: CheetahString::from_static_str("test_topic"),
141            queue_id: 1,
142            topic_request_header: None,
143        };
144        let map = header.to_map().unwrap();
145        assert_eq!(
146            map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
147            "test_consumer_group"
148        );
149        assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
150        assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
151    }
152
153    #[test]
154    fn polling_info_request_header_deserializes_correctly() {
155        let mut map = HashMap::new();
156        map.insert(
157            CheetahString::from_static_str("consumerGroup"),
158            CheetahString::from_static_str("test_consumer_group"),
159        );
160        map.insert(
161            CheetahString::from_static_str("topic"),
162            CheetahString::from_static_str("test_topic"),
163        );
164        map.insert(
165            CheetahString::from_static_str("queueId"),
166            CheetahString::from_static_str("1"),
167        );
168
169        let header = <PollingInfoRequestHeader as FromMap>::from(&map).unwrap();
170        assert_eq!(header.consumer_group, "test_consumer_group");
171        assert_eq!(header.topic, "test_topic");
172        assert_eq!(header.queue_id, 1);
173        assert!(header.topic_request_header.is_some());
174    }
175
176    #[test]
177    fn polling_info_request_header_handles_invalid_data() {
178        let mut map = HashMap::new();
179        map.insert(
180            CheetahString::from_static_str("consumerGroup"),
181            CheetahString::from_static_str("test_consumer_group"),
182        );
183        map.insert(
184            CheetahString::from_static_str("topic"),
185            CheetahString::from_static_str("test_topic"),
186        );
187        map.insert(
188            CheetahString::from_static_str("queueId"),
189            CheetahString::from_static_str("invalid"),
190        );
191
192        let result = <PollingInfoRequestHeader as FromMap>::from(&map);
193        assert!(result.is_err());
194    }
195
196    #[test]
197    fn topic_request_header_trait_set_and_get_lo() {
198        let mut header = PollingInfoRequestHeader {
199            consumer_group: CheetahString::from_static_str("test_group"),
200            topic: CheetahString::from_static_str("test_topic"),
201            queue_id: 0,
202            topic_request_header: Some(TopicRequestHeader::default()),
203        };
204
205        header.set_lo(Some(true));
206        assert_eq!(header.lo(), Some(true));
207
208        header.set_lo(Some(false));
209        assert_eq!(header.lo(), Some(false));
210
211        header.set_lo(None);
212        assert_eq!(header.lo(), None);
213
214        let mut header = PollingInfoRequestHeader {
215            consumer_group: CheetahString::from_static_str("test_group"),
216            topic: CheetahString::from_static_str("test_topic"),
217            queue_id: 0,
218            topic_request_header: None,
219        };
220
221        header.set_lo(Some(true));
222        assert_eq!(header.lo(), None);
223    }
224
225    #[test]
226    fn topic_request_header_trait_set_and_get_topic() {
227        let mut header = PollingInfoRequestHeader {
228            consumer_group: CheetahString::from_static_str("test_group"),
229            topic: CheetahString::from_static_str("test_topic"),
230            queue_id: 0,
231            topic_request_header: None,
232        };
233
234        header.set_topic(CheetahString::from_static_str("new_topic"));
235        assert_eq!(header.topic(), "new_topic");
236    }
237
238    #[test]
239    fn topic_request_header_trait_set_and_get_broker_name() {
240        let mut header = PollingInfoRequestHeader {
241            consumer_group: CheetahString::from_static_str("test_group"),
242            topic: CheetahString::from_static_str("test_topic"),
243            queue_id: 0,
244            topic_request_header: Some(TopicRequestHeader {
245                lo: None,
246                rpc: Some(RpcRequestHeader::default()),
247            }),
248        };
249
250        header.set_broker_name(CheetahString::from_static_str("new_broker"));
251        assert_eq!(
252            header.broker_name(),
253            Some(&CheetahString::from_static_str("new_broker"))
254        );
255
256        let mut header = PollingInfoRequestHeader {
257            consumer_group: CheetahString::from_static_str("test_group"),
258            topic: CheetahString::from_static_str("test_topic"),
259            queue_id: 0,
260            topic_request_header: None,
261        };
262
263        header.set_broker_name(CheetahString::from_static_str("new_broker"));
264        assert_eq!(header.broker_name(), None);
265    }
266
267    #[test]
268    fn topic_request_header_trait_set_and_get_namespace() {
269        let mut header = PollingInfoRequestHeader {
270            consumer_group: CheetahString::from_static_str("test_group"),
271            topic: CheetahString::from_static_str("test_topic"),
272            queue_id: 0,
273            topic_request_header: Some(TopicRequestHeader {
274                lo: None,
275                rpc: Some(RpcRequestHeader::default()),
276            }),
277        };
278
279        header.set_namespace(CheetahString::from_static_str("new_namespace"));
280        assert_eq!(header.namespace(), Some("new_namespace"));
281
282        let mut header = PollingInfoRequestHeader {
283            consumer_group: CheetahString::from_static_str("test_group"),
284            topic: CheetahString::from_static_str("test_topic"),
285            queue_id: 0,
286            topic_request_header: None,
287        };
288
289        header.set_namespace(CheetahString::from_static_str("new_namespace"));
290        assert_eq!(header.namespace(), None);
291    }
292
293    #[test]
294    fn topic_request_header_trait_set_and_get_namespaced() {
295        let mut header = PollingInfoRequestHeader {
296            consumer_group: CheetahString::from_static_str("test_group"),
297            topic: CheetahString::from_static_str("test_topic"),
298            queue_id: 0,
299            topic_request_header: Some(TopicRequestHeader {
300                lo: None,
301                rpc: Some(RpcRequestHeader::default()),
302            }),
303        };
304
305        header.set_namespaced(true);
306        assert_eq!(header.namespaced(), Some(true));
307
308        header.set_namespaced(false);
309        assert_eq!(header.namespaced(), Some(false));
310
311        let mut header = PollingInfoRequestHeader {
312            consumer_group: CheetahString::from_static_str("test_group"),
313            topic: CheetahString::from_static_str("test_topic"),
314            queue_id: 0,
315            topic_request_header: None,
316        };
317
318        header.set_namespaced(true);
319        assert_eq!(header.namespaced(), None);
320    }
321
322    #[test]
323    fn topic_request_header_trait_set_and_get_oneway() {
324        let mut header = PollingInfoRequestHeader {
325            consumer_group: CheetahString::from_static_str("test_group"),
326            topic: CheetahString::from_static_str("test_topic"),
327            queue_id: 0,
328            topic_request_header: Some(TopicRequestHeader {
329                lo: None,
330                rpc: Some(RpcRequestHeader::default()),
331            }),
332        };
333
334        header.set_oneway(true);
335        assert_eq!(header.oneway(), Some(true));
336
337        header.set_oneway(false);
338        assert_eq!(header.oneway(), Some(false));
339
340        let mut header = PollingInfoRequestHeader {
341            consumer_group: CheetahString::from_static_str("test_group"),
342            topic: CheetahString::from_static_str("test_topic"),
343            queue_id: 0,
344            topic_request_header: None,
345        };
346
347        header.set_oneway(true);
348        assert_eq!(header.oneway(), None);
349    }
350
351    #[test]
352    fn topic_request_header_trait_set_and_get_queue_id() {
353        let mut header = PollingInfoRequestHeader {
354            consumer_group: CheetahString::from_static_str("test_group"),
355            topic: CheetahString::from_static_str("test_topic"),
356            queue_id: 0,
357            topic_request_header: None,
358        };
359
360        header.set_queue_id(10);
361        assert_eq!(header.queue_id(), 10);
362
363        header.set_queue_id(-1);
364        assert_eq!(header.queue_id(), -1);
365    }
366}