Skip to main content

rocketmq_remoting/protocol/header/
pull_message_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct PullMessageRequestHeader {
26    #[required]
27    pub consumer_group: CheetahString,
28
29    #[required]
30    pub topic: CheetahString,
31
32    #[required]
33    pub queue_id: i32,
34
35    #[required]
36    pub queue_offset: i64,
37
38    #[required]
39    pub max_msg_nums: i32,
40
41    #[required]
42    pub sys_flag: i32,
43
44    #[required]
45    pub commit_offset: i64,
46
47    #[required]
48    pub suspend_timeout_millis: u64,
49
50    #[required]
51    pub sub_version: i64,
52
53    pub subscription: Option<CheetahString>,
54    pub expression_type: Option<CheetahString>,
55    pub max_msg_bytes: Option<i32>,
56    pub request_source: Option<i32>,
57    pub proxy_forward_client_id: Option<CheetahString>,
58    #[serde(flatten)]
59    pub topic_request: Option<TopicRequestHeader>,
60}
61
62impl TopicRequestHeaderTrait for PullMessageRequestHeader {
63    fn set_lo(&mut self, lo: Option<bool>) {
64        if let Some(header) = self.topic_request.as_mut() {
65            header.lo = lo;
66        }
67    }
68
69    fn lo(&self) -> Option<bool> {
70        self.topic_request.as_ref().and_then(|h| h.lo)
71    }
72
73    fn set_topic(&mut self, topic: CheetahString) {
74        self.topic = topic;
75    }
76
77    fn topic(&self) -> &CheetahString {
78        &self.topic
79    }
80
81    fn broker_name(&self) -> Option<&CheetahString> {
82        self.topic_request
83            .as_ref()
84            .and_then(|h| h.rpc.as_ref())
85            .and_then(|h| h.broker_name.as_ref())
86    }
87
88    fn set_broker_name(&mut self, broker_name: CheetahString) {
89        if let Some(header) = self.topic_request.as_mut() {
90            if let Some(rpc_header) = header.rpc.as_mut() {
91                rpc_header.broker_name = Some(broker_name);
92            }
93        }
94    }
95
96    fn namespace(&self) -> Option<&str> {
97        self.topic_request
98            .as_ref()
99            .and_then(|h| h.rpc.as_ref())
100            .and_then(|r| r.namespace.as_deref())
101    }
102
103    fn set_namespace(&mut self, namespace: CheetahString) {
104        if let Some(header) = self.topic_request.as_mut() {
105            if let Some(rpc_header) = header.rpc.as_mut() {
106                rpc_header.namespace = Some(namespace);
107            }
108        }
109    }
110
111    fn namespaced(&self) -> Option<bool> {
112        self.topic_request
113            .as_ref()
114            .and_then(|h| h.rpc.as_ref())
115            .and_then(|r| r.namespaced)
116    }
117
118    fn set_namespaced(&mut self, namespaced: bool) {
119        if let Some(header) = self.topic_request.as_mut() {
120            if let Some(rpc_header) = header.rpc.as_mut() {
121                rpc_header.namespaced = Some(namespaced);
122            }
123        }
124    }
125
126    fn oneway(&self) -> Option<bool> {
127        self.topic_request
128            .as_ref()
129            .and_then(|h| h.rpc.as_ref())
130            .and_then(|r| r.oneway)
131    }
132
133    fn set_oneway(&mut self, oneway: bool) {
134        if let Some(header) = self.topic_request.as_mut() {
135            if let Some(rpc_header) = header.rpc.as_mut() {
136                rpc_header.oneway = Some(oneway);
137            }
138        }
139    }
140
141    fn queue_id(&self) -> i32 {
142        self.queue_id
143    }
144
145    fn set_queue_id(&mut self, queue_id: i32) {
146        self.queue_id = queue_id;
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use std::collections::HashMap;
153
154    use cheetah_string::CheetahString;
155
156    use super::*;
157    use crate::protocol::command_custom_header::CommandCustomHeader;
158    use crate::protocol::command_custom_header::FromMap;
159
160    #[test]
161    fn pull_message_request_header_serializes_correctly() {
162        let header = PullMessageRequestHeader {
163            consumer_group: CheetahString::from_static_str("test_consumer_group"),
164            topic: CheetahString::from_static_str("test_topic"),
165            queue_id: 1,
166            queue_offset: 100,
167            max_msg_nums: 10,
168            sys_flag: 0,
169            commit_offset: 50,
170            suspend_timeout_millis: 3000,
171            subscription: Some(CheetahString::from_static_str("test_subscription")),
172            sub_version: 1,
173            expression_type: Some(CheetahString::from_static_str("test_expression")),
174            max_msg_bytes: Some(1024),
175            request_source: Some(1),
176            proxy_forward_client_id: Some(CheetahString::from_static_str("test_client_id")),
177            topic_request: None,
178        };
179        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
180        assert_eq!(
181            map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
182            "test_consumer_group"
183        );
184        assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
185        assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
186        assert_eq!(map.get(&CheetahString::from_static_str("queueOffset")).unwrap(), "100");
187        assert_eq!(map.get(&CheetahString::from_static_str("maxMsgNums")).unwrap(), "10");
188        assert_eq!(map.get(&CheetahString::from_static_str("sysFlag")).unwrap(), "0");
189        assert_eq!(map.get(&CheetahString::from_static_str("commitOffset")).unwrap(), "50");
190        assert_eq!(
191            map.get(&CheetahString::from_static_str("suspendTimeoutMillis"))
192                .unwrap(),
193            "3000"
194        );
195        assert_eq!(
196            map.get(&CheetahString::from_static_str("subscription")).unwrap(),
197            "test_subscription"
198        );
199        assert_eq!(map.get(&CheetahString::from_static_str("subVersion")).unwrap(), "1");
200        assert_eq!(
201            map.get(&CheetahString::from_static_str("expressionType")).unwrap(),
202            "test_expression"
203        );
204        assert_eq!(map.get(&CheetahString::from_static_str("maxMsgBytes")).unwrap(), "1024");
205        assert_eq!(map.get(&CheetahString::from_static_str("requestSource")).unwrap(), "1");
206        assert_eq!(
207            map.get(&CheetahString::from_static_str("proxyForwardClientId"))
208                .unwrap(),
209            "test_client_id"
210        );
211    }
212
213    #[test]
214    fn pull_message_request_header_deserializes_correctly() {
215        let mut map = HashMap::new();
216        map.insert(
217            CheetahString::from_static_str("consumerGroup"),
218            CheetahString::from_static_str("test_consumer_group"),
219        );
220        map.insert(
221            CheetahString::from_static_str("topic"),
222            CheetahString::from_static_str("test_topic"),
223        );
224        map.insert(
225            CheetahString::from_static_str("queueId"),
226            CheetahString::from_static_str("1"),
227        );
228        map.insert(
229            CheetahString::from_static_str("queueOffset"),
230            CheetahString::from_static_str("100"),
231        );
232        map.insert(
233            CheetahString::from_static_str("maxMsgNums"),
234            CheetahString::from_static_str("10"),
235        );
236        map.insert(
237            CheetahString::from_static_str("sysFlag"),
238            CheetahString::from_static_str("0"),
239        );
240        map.insert(
241            CheetahString::from_static_str("commitOffset"),
242            CheetahString::from_static_str("50"),
243        );
244        map.insert(
245            CheetahString::from_static_str("suspendTimeoutMillis"),
246            CheetahString::from_static_str("3000"),
247        );
248        map.insert(
249            CheetahString::from_static_str("subscription"),
250            CheetahString::from_static_str("test_subscription"),
251        );
252        map.insert(
253            CheetahString::from_static_str("subVersion"),
254            CheetahString::from_static_str("1"),
255        );
256        map.insert(
257            CheetahString::from_static_str("expressionType"),
258            CheetahString::from_static_str("test_expression"),
259        );
260        map.insert(
261            CheetahString::from_static_str("maxMsgBytes"),
262            CheetahString::from_static_str("1024"),
263        );
264        map.insert(
265            CheetahString::from_static_str("requestSource"),
266            CheetahString::from_static_str("1"),
267        );
268        map.insert(
269            CheetahString::from_static_str("proxyForwardClientId"),
270            CheetahString::from_static_str("test_client_id"),
271        );
272
273        let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
274        assert_eq!(header.consumer_group, "test_consumer_group");
275        assert_eq!(header.topic, "test_topic");
276        assert_eq!(header.queue_id, 1);
277        assert_eq!(header.queue_offset, 100);
278        assert_eq!(header.max_msg_nums, 10);
279        assert_eq!(header.sys_flag, 0);
280        assert_eq!(header.commit_offset, 50);
281        assert_eq!(header.suspend_timeout_millis, 3000);
282        assert_eq!(header.subscription.unwrap(), "test_subscription");
283        assert_eq!(header.sub_version, 1);
284        assert_eq!(header.expression_type.unwrap(), "test_expression");
285        assert_eq!(header.max_msg_bytes.unwrap(), 1024);
286        assert_eq!(header.request_source.unwrap(), 1);
287        assert_eq!(header.proxy_forward_client_id.unwrap(), "test_client_id");
288    }
289
290    #[test]
291    fn pull_message_request_header_handles_missing_optional_fields() {
292        let mut map = HashMap::new();
293        map.insert(
294            CheetahString::from_static_str("consumerGroup"),
295            CheetahString::from_static_str("test_consumer_group"),
296        );
297        map.insert(
298            CheetahString::from_static_str("topic"),
299            CheetahString::from_static_str("test_topic"),
300        );
301        map.insert(
302            CheetahString::from_static_str("queueId"),
303            CheetahString::from_static_str("1"),
304        );
305        map.insert(
306            CheetahString::from_static_str("queueOffset"),
307            CheetahString::from_static_str("100"),
308        );
309        map.insert(
310            CheetahString::from_static_str("maxMsgNums"),
311            CheetahString::from_static_str("10"),
312        );
313        map.insert(
314            CheetahString::from_static_str("sysFlag"),
315            CheetahString::from_static_str("0"),
316        );
317        map.insert(
318            CheetahString::from_static_str("commitOffset"),
319            CheetahString::from_static_str("50"),
320        );
321        map.insert(
322            CheetahString::from_static_str("suspendTimeoutMillis"),
323            CheetahString::from_static_str("3000"),
324        );
325        map.insert(
326            CheetahString::from_static_str("subVersion"),
327            CheetahString::from_static_str("1"),
328        );
329
330        let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
331        assert_eq!(header.consumer_group, "test_consumer_group");
332        assert_eq!(header.topic, "test_topic");
333        assert_eq!(header.queue_id, 1);
334        assert_eq!(header.queue_offset, 100);
335        assert_eq!(header.max_msg_nums, 10);
336        assert_eq!(header.sys_flag, 0);
337        assert_eq!(header.commit_offset, 50);
338        assert_eq!(header.suspend_timeout_millis, 3000);
339        assert_eq!(header.sub_version, 1);
340        assert!(header.subscription.is_none());
341        assert!(header.expression_type.is_none());
342        assert!(header.max_msg_bytes.is_none());
343        assert!(header.request_source.is_none());
344        assert!(header.proxy_forward_client_id.is_none());
345    }
346
347    #[test]
348    fn pull_message_request_header_handles_invalid_data() {
349        let mut map = HashMap::new();
350        map.insert(
351            CheetahString::from_static_str("consumerGroup"),
352            CheetahString::from_static_str("test_consumer_group"),
353        );
354        map.insert(
355            CheetahString::from_static_str("topic"),
356            CheetahString::from_static_str("test_topic"),
357        );
358        map.insert(
359            CheetahString::from_static_str("queueId"),
360            CheetahString::from_static_str("invalid"),
361        );
362        map.insert(
363            CheetahString::from_static_str("queueOffset"),
364            CheetahString::from_static_str("invalid"),
365        );
366        map.insert(
367            CheetahString::from_static_str("maxMsgNums"),
368            CheetahString::from_static_str("invalid"),
369        );
370        map.insert(
371            CheetahString::from_static_str("sysFlag"),
372            CheetahString::from_static_str("invalid"),
373        );
374        map.insert(
375            CheetahString::from_static_str("commitOffset"),
376            CheetahString::from_static_str("invalid"),
377        );
378        map.insert(
379            CheetahString::from_static_str("suspendTimeoutMillis"),
380            CheetahString::from_static_str("invalid"),
381        );
382        map.insert(
383            CheetahString::from_static_str("subVersion"),
384            CheetahString::from_static_str("invalid"),
385        );
386
387        let result = <PullMessageRequestHeader as FromMap>::from(&map);
388        assert!(result.is_err());
389    }
390}