rocketmq_remoting/protocol/header/
pull_message_response_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22#[derive(Serialize, Deserialize, Debug, Default, Clone, RequestHeaderCodecV2)]
23#[serde(rename_all = "camelCase")]
24pub struct PullMessageResponseHeader {
25    #[required]
26    pub suggest_which_broker_id: u64,
27
28    #[required]
29    pub next_begin_offset: i64,
30
31    #[required]
32    pub min_offset: i64,
33
34    #[required]
35    pub max_offset: i64,
36
37    pub offset_delta: Option<i64>,
38    pub topic_sys_flag: Option<i32>,
39    pub group_sys_flag: Option<i32>,
40    pub forbidden_type: Option<i32>,
41}
42
43#[cfg(test)]
44mod tests {
45    use std::collections::HashMap;
46
47    use cheetah_string::CheetahString;
48
49    use super::*;
50    use crate::protocol::command_custom_header::CommandCustomHeader;
51    use crate::protocol::command_custom_header::FromMap;
52
53    #[test]
54    fn pull_message_response_header_serializes_correctly() {
55        let header = PullMessageResponseHeader {
56            suggest_which_broker_id: 123,
57            next_begin_offset: 456,
58            min_offset: 789,
59            max_offset: 101112,
60            offset_delta: Some(131415),
61            topic_sys_flag: Some(161718),
62            group_sys_flag: Some(192021),
63            forbidden_type: Some(222324),
64        };
65        let map = header.to_map().unwrap();
66        assert_eq!(
67            map.get(&CheetahString::from_static_str("suggestWhichBrokerId"))
68                .unwrap(),
69            "123"
70        );
71        assert_eq!(
72            map.get(&CheetahString::from_static_str("nextBeginOffset"))
73                .unwrap(),
74            "456"
75        );
76        assert_eq!(
77            map.get(&CheetahString::from_static_str("minOffset"))
78                .unwrap(),
79            "789"
80        );
81        assert_eq!(
82            map.get(&CheetahString::from_static_str("maxOffset"))
83                .unwrap(),
84            "101112"
85        );
86        assert_eq!(
87            map.get(&CheetahString::from_static_str("offsetDelta"))
88                .unwrap(),
89            "131415"
90        );
91        assert_eq!(
92            map.get(&CheetahString::from_static_str("topicSysFlag"))
93                .unwrap(),
94            "161718"
95        );
96        assert_eq!(
97            map.get(&CheetahString::from_static_str("groupSysFlag"))
98                .unwrap(),
99            "192021"
100        );
101        assert_eq!(
102            map.get(&CheetahString::from_static_str("forbiddenType"))
103                .unwrap(),
104            "222324"
105        );
106    }
107
108    #[test]
109    fn pull_message_response_header_deserializes_correctly() {
110        let mut map = HashMap::new();
111        map.insert(
112            CheetahString::from_static_str("suggestWhichBrokerId"),
113            CheetahString::from_static_str("123"),
114        );
115        map.insert(
116            CheetahString::from_static_str("nextBeginOffset"),
117            CheetahString::from_static_str("456"),
118        );
119        map.insert(
120            CheetahString::from_static_str("minOffset"),
121            CheetahString::from_static_str("789"),
122        );
123        map.insert(
124            CheetahString::from_static_str("maxOffset"),
125            CheetahString::from_static_str("101112"),
126        );
127        map.insert(
128            CheetahString::from_static_str("offsetDelta"),
129            CheetahString::from_static_str("131415"),
130        );
131        map.insert(
132            CheetahString::from_static_str("topicSysFlag"),
133            CheetahString::from_static_str("161718"),
134        );
135        map.insert(
136            CheetahString::from_static_str("groupSysFlag"),
137            CheetahString::from_static_str("192021"),
138        );
139        map.insert(
140            CheetahString::from_static_str("forbiddenType"),
141            CheetahString::from_static_str("222324"),
142        );
143
144        let header: PullMessageResponseHeader =
145            <PullMessageResponseHeader as FromMap>::from(&map).unwrap();
146        assert_eq!(header.suggest_which_broker_id, 123);
147        assert_eq!(header.next_begin_offset, 456);
148        assert_eq!(header.min_offset, 789);
149        assert_eq!(header.max_offset, 101112);
150        assert_eq!(header.offset_delta.unwrap(), 131415);
151        assert_eq!(header.topic_sys_flag.unwrap(), 161718);
152        assert_eq!(header.group_sys_flag.unwrap(), 192021);
153        assert_eq!(header.forbidden_type.unwrap(), 222324);
154    }
155
156    #[test]
157    fn pull_message_response_header_handles_missing_optional_fields() {
158        let mut map = HashMap::new();
159        map.insert(
160            CheetahString::from_static_str("suggestWhichBrokerId"),
161            CheetahString::from_static_str("123"),
162        );
163        map.insert(
164            CheetahString::from_static_str("nextBeginOffset"),
165            CheetahString::from_static_str("456"),
166        );
167        map.insert(
168            CheetahString::from_static_str("minOffset"),
169            CheetahString::from_static_str("789"),
170        );
171        map.insert(
172            CheetahString::from_static_str("maxOffset"),
173            CheetahString::from_static_str("101112"),
174        );
175
176        let header = <PullMessageResponseHeader as FromMap>::from(&map).unwrap();
177        assert_eq!(header.suggest_which_broker_id, 123);
178        assert_eq!(header.next_begin_offset, 456);
179        assert_eq!(header.min_offset, 789);
180        assert_eq!(header.max_offset, 101112);
181        assert!(header.offset_delta.is_none());
182        assert!(header.topic_sys_flag.is_none());
183        assert!(header.group_sys_flag.is_none());
184        assert!(header.forbidden_type.is_none());
185    }
186
187    #[test]
188    fn pull_message_response_header_handles_invalid_data() {
189        let mut map = HashMap::new();
190        map.insert(
191            CheetahString::from_static_str("suggestWhichBrokerId"),
192            CheetahString::from_static_str("invalid"),
193        );
194        map.insert(
195            CheetahString::from_static_str("nextBeginOffset"),
196            CheetahString::from_static_str("invalid"),
197        );
198        map.insert(
199            CheetahString::from_static_str("minOffset"),
200            CheetahString::from_static_str("invalid"),
201        );
202        map.insert(
203            CheetahString::from_static_str("maxOffset"),
204            CheetahString::from_static_str("invalid"),
205        );
206
207        let result = <PullMessageResponseHeader as FromMap>::from(&map);
208        assert!(result.is_err());
209    }
210}