rocketmq_remoting/protocol/header/
pull_message_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
24use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
25
26#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct PullMessageRequestHeader {
29    #[required]
30    pub consumer_group: CheetahString,
31
32    #[required]
33    pub topic: CheetahString,
34
35    #[required]
36    pub queue_id: i32,
37
38    #[required]
39    pub queue_offset: i64,
40
41    #[required]
42    pub max_msg_nums: i32,
43
44    #[required]
45    pub sys_flag: i32,
46
47    #[required]
48    pub commit_offset: i64,
49
50    #[required]
51    pub suspend_timeout_millis: u64,
52
53    #[required]
54    pub sub_version: i64,
55
56    pub subscription: Option<CheetahString>,
57    pub expression_type: Option<CheetahString>,
58    pub max_msg_bytes: Option<i32>,
59    pub request_source: Option<i32>,
60    pub proxy_forward_client_id: Option<CheetahString>,
61    #[serde(flatten)]
62    pub topic_request: Option<TopicRequestHeader>,
63}
64
65impl TopicRequestHeaderTrait for PullMessageRequestHeader {
66    fn set_lo(&mut self, lo: Option<bool>) {
67        self.topic_request.as_mut().unwrap().lo = lo;
68    }
69
70    fn lo(&self) -> Option<bool> {
71        self.topic_request.as_ref().unwrap().lo
72    }
73
74    fn set_topic(&mut self, topic: CheetahString) {
75        self.topic = topic;
76    }
77
78    fn topic(&self) -> &CheetahString {
79        &self.topic
80    }
81
82    fn broker_name(&self) -> Option<&CheetahString> {
83        self.topic_request
84            .as_ref()
85            .unwrap()
86            .rpc
87            .as_ref()
88            .unwrap()
89            .broker_name
90            .as_ref()
91    }
92
93    fn set_broker_name(&mut self, broker_name: CheetahString) {
94        self.topic_request
95            .as_mut()
96            .unwrap()
97            .rpc
98            .as_mut()
99            .unwrap()
100            .broker_name = Some(broker_name);
101    }
102
103    fn namespace(&self) -> Option<&str> {
104        self.topic_request
105            .as_ref()
106            .unwrap()
107            .rpc
108            .as_ref()
109            .unwrap()
110            .namespace
111            .as_deref()
112    }
113
114    fn set_namespace(&mut self, namespace: CheetahString) {
115        self.topic_request
116            .as_mut()
117            .unwrap()
118            .rpc
119            .as_mut()
120            .unwrap()
121            .namespace = Some(namespace);
122    }
123
124    fn namespaced(&self) -> Option<bool> {
125        self.topic_request
126            .as_ref()
127            .unwrap()
128            .rpc
129            .as_ref()
130            .unwrap()
131            .namespaced
132    }
133
134    fn set_namespaced(&mut self, namespaced: bool) {
135        self.topic_request
136            .as_mut()
137            .unwrap()
138            .rpc
139            .as_mut()
140            .unwrap()
141            .namespaced = Some(namespaced);
142    }
143
144    fn oneway(&self) -> Option<bool> {
145        self.topic_request
146            .as_ref()
147            .unwrap()
148            .rpc
149            .as_ref()
150            .unwrap()
151            .oneway
152    }
153
154    fn set_oneway(&mut self, oneway: bool) {
155        self.topic_request
156            .as_mut()
157            .unwrap()
158            .rpc
159            .as_mut()
160            .unwrap()
161            .oneway = Some(oneway);
162    }
163
164    fn queue_id(&self) -> i32 {
165        self.queue_id
166    }
167
168    fn set_queue_id(&mut self, queue_id: i32) {
169        self.queue_id = queue_id;
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use std::collections::HashMap;
176
177    use cheetah_string::CheetahString;
178
179    use super::*;
180    use crate::protocol::command_custom_header::CommandCustomHeader;
181    use crate::protocol::command_custom_header::FromMap;
182
183    #[test]
184    fn pull_message_request_header_serializes_correctly() {
185        let header = PullMessageRequestHeader {
186            consumer_group: CheetahString::from_static_str("test_consumer_group"),
187            topic: CheetahString::from_static_str("test_topic"),
188            queue_id: 1,
189            queue_offset: 100,
190            max_msg_nums: 10,
191            sys_flag: 0,
192            commit_offset: 50,
193            suspend_timeout_millis: 3000,
194            subscription: Some(CheetahString::from_static_str("test_subscription")),
195            sub_version: 1,
196            expression_type: Some(CheetahString::from_static_str("test_expression")),
197            max_msg_bytes: Some(1024),
198            request_source: Some(1),
199            proxy_forward_client_id: Some(CheetahString::from_static_str("test_client_id")),
200            topic_request: None,
201        };
202        let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
203        assert_eq!(
204            map.get(&CheetahString::from_static_str("consumerGroup"))
205                .unwrap(),
206            "test_consumer_group"
207        );
208        assert_eq!(
209            map.get(&CheetahString::from_static_str("topic")).unwrap(),
210            "test_topic"
211        );
212        assert_eq!(
213            map.get(&CheetahString::from_static_str("queueId")).unwrap(),
214            "1"
215        );
216        assert_eq!(
217            map.get(&CheetahString::from_static_str("queueOffset"))
218                .unwrap(),
219            "100"
220        );
221        assert_eq!(
222            map.get(&CheetahString::from_static_str("maxMsgNums"))
223                .unwrap(),
224            "10"
225        );
226        assert_eq!(
227            map.get(&CheetahString::from_static_str("sysFlag")).unwrap(),
228            "0"
229        );
230        assert_eq!(
231            map.get(&CheetahString::from_static_str("commitOffset"))
232                .unwrap(),
233            "50"
234        );
235        assert_eq!(
236            map.get(&CheetahString::from_static_str("suspendTimeoutMillis"))
237                .unwrap(),
238            "3000"
239        );
240        assert_eq!(
241            map.get(&CheetahString::from_static_str("subscription"))
242                .unwrap(),
243            "test_subscription"
244        );
245        assert_eq!(
246            map.get(&CheetahString::from_static_str("subVersion"))
247                .unwrap(),
248            "1"
249        );
250        assert_eq!(
251            map.get(&CheetahString::from_static_str("expressionType"))
252                .unwrap(),
253            "test_expression"
254        );
255        assert_eq!(
256            map.get(&CheetahString::from_static_str("maxMsgBytes"))
257                .unwrap(),
258            "1024"
259        );
260        assert_eq!(
261            map.get(&CheetahString::from_static_str("requestSource"))
262                .unwrap(),
263            "1"
264        );
265        assert_eq!(
266            map.get(&CheetahString::from_static_str("proxyForwardClientId"))
267                .unwrap(),
268            "test_client_id"
269        );
270    }
271
272    #[test]
273    fn pull_message_request_header_deserializes_correctly() {
274        let mut map = HashMap::new();
275        map.insert(
276            CheetahString::from_static_str("consumerGroup"),
277            CheetahString::from_static_str("test_consumer_group"),
278        );
279        map.insert(
280            CheetahString::from_static_str("topic"),
281            CheetahString::from_static_str("test_topic"),
282        );
283        map.insert(
284            CheetahString::from_static_str("queueId"),
285            CheetahString::from_static_str("1"),
286        );
287        map.insert(
288            CheetahString::from_static_str("queueOffset"),
289            CheetahString::from_static_str("100"),
290        );
291        map.insert(
292            CheetahString::from_static_str("maxMsgNums"),
293            CheetahString::from_static_str("10"),
294        );
295        map.insert(
296            CheetahString::from_static_str("sysFlag"),
297            CheetahString::from_static_str("0"),
298        );
299        map.insert(
300            CheetahString::from_static_str("commitOffset"),
301            CheetahString::from_static_str("50"),
302        );
303        map.insert(
304            CheetahString::from_static_str("suspendTimeoutMillis"),
305            CheetahString::from_static_str("3000"),
306        );
307        map.insert(
308            CheetahString::from_static_str("subscription"),
309            CheetahString::from_static_str("test_subscription"),
310        );
311        map.insert(
312            CheetahString::from_static_str("subVersion"),
313            CheetahString::from_static_str("1"),
314        );
315        map.insert(
316            CheetahString::from_static_str("expressionType"),
317            CheetahString::from_static_str("test_expression"),
318        );
319        map.insert(
320            CheetahString::from_static_str("maxMsgBytes"),
321            CheetahString::from_static_str("1024"),
322        );
323        map.insert(
324            CheetahString::from_static_str("requestSource"),
325            CheetahString::from_static_str("1"),
326        );
327        map.insert(
328            CheetahString::from_static_str("proxyForwardClientId"),
329            CheetahString::from_static_str("test_client_id"),
330        );
331
332        let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
333        assert_eq!(header.consumer_group, "test_consumer_group");
334        assert_eq!(header.topic, "test_topic");
335        assert_eq!(header.queue_id, 1);
336        assert_eq!(header.queue_offset, 100);
337        assert_eq!(header.max_msg_nums, 10);
338        assert_eq!(header.sys_flag, 0);
339        assert_eq!(header.commit_offset, 50);
340        assert_eq!(header.suspend_timeout_millis, 3000);
341        assert_eq!(header.subscription.unwrap(), "test_subscription");
342        assert_eq!(header.sub_version, 1);
343        assert_eq!(header.expression_type.unwrap(), "test_expression");
344        assert_eq!(header.max_msg_bytes.unwrap(), 1024);
345        assert_eq!(header.request_source.unwrap(), 1);
346        assert_eq!(header.proxy_forward_client_id.unwrap(), "test_client_id");
347    }
348
349    #[test]
350    fn pull_message_request_header_handles_missing_optional_fields() {
351        let mut map = HashMap::new();
352        map.insert(
353            CheetahString::from_static_str("consumerGroup"),
354            CheetahString::from_static_str("test_consumer_group"),
355        );
356        map.insert(
357            CheetahString::from_static_str("topic"),
358            CheetahString::from_static_str("test_topic"),
359        );
360        map.insert(
361            CheetahString::from_static_str("queueId"),
362            CheetahString::from_static_str("1"),
363        );
364        map.insert(
365            CheetahString::from_static_str("queueOffset"),
366            CheetahString::from_static_str("100"),
367        );
368        map.insert(
369            CheetahString::from_static_str("maxMsgNums"),
370            CheetahString::from_static_str("10"),
371        );
372        map.insert(
373            CheetahString::from_static_str("sysFlag"),
374            CheetahString::from_static_str("0"),
375        );
376        map.insert(
377            CheetahString::from_static_str("commitOffset"),
378            CheetahString::from_static_str("50"),
379        );
380        map.insert(
381            CheetahString::from_static_str("suspendTimeoutMillis"),
382            CheetahString::from_static_str("3000"),
383        );
384        map.insert(
385            CheetahString::from_static_str("subVersion"),
386            CheetahString::from_static_str("1"),
387        );
388
389        let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
390        assert_eq!(header.consumer_group, "test_consumer_group");
391        assert_eq!(header.topic, "test_topic");
392        assert_eq!(header.queue_id, 1);
393        assert_eq!(header.queue_offset, 100);
394        assert_eq!(header.max_msg_nums, 10);
395        assert_eq!(header.sys_flag, 0);
396        assert_eq!(header.commit_offset, 50);
397        assert_eq!(header.suspend_timeout_millis, 3000);
398        assert_eq!(header.sub_version, 1);
399        assert!(header.subscription.is_none());
400        assert!(header.expression_type.is_none());
401        assert!(header.max_msg_bytes.is_none());
402        assert!(header.request_source.is_none());
403        assert!(header.proxy_forward_client_id.is_none());
404    }
405
406    #[test]
407    fn pull_message_request_header_handles_invalid_data() {
408        let mut map = HashMap::new();
409        map.insert(
410            CheetahString::from_static_str("consumerGroup"),
411            CheetahString::from_static_str("test_consumer_group"),
412        );
413        map.insert(
414            CheetahString::from_static_str("topic"),
415            CheetahString::from_static_str("test_topic"),
416        );
417        map.insert(
418            CheetahString::from_static_str("queueId"),
419            CheetahString::from_static_str("invalid"),
420        );
421        map.insert(
422            CheetahString::from_static_str("queueOffset"),
423            CheetahString::from_static_str("invalid"),
424        );
425        map.insert(
426            CheetahString::from_static_str("maxMsgNums"),
427            CheetahString::from_static_str("invalid"),
428        );
429        map.insert(
430            CheetahString::from_static_str("sysFlag"),
431            CheetahString::from_static_str("invalid"),
432        );
433        map.insert(
434            CheetahString::from_static_str("commitOffset"),
435            CheetahString::from_static_str("invalid"),
436        );
437        map.insert(
438            CheetahString::from_static_str("suspendTimeoutMillis"),
439            CheetahString::from_static_str("invalid"),
440        );
441        map.insert(
442            CheetahString::from_static_str("subVersion"),
443            CheetahString::from_static_str("invalid"),
444        );
445
446        let result = <PullMessageRequestHeader as FromMap>::from(&map);
447        assert!(result.is_err());
448    }
449}