Skip to main content

rocketmq_remoting/protocol/header/
consumer_send_msg_back_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::rpc::rpc_request_header::RpcRequestHeader;
21
22#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
23#[serde(rename_all = "camelCase")]
24pub struct ConsumerSendMsgBackRequestHeader {
25    #[required]
26    pub offset: i64,
27    #[required]
28    pub group: CheetahString, //consumer group
29    #[required]
30    pub delay_level: i32,
31    pub origin_msg_id: Option<CheetahString>,
32    pub origin_topic: Option<CheetahString>,
33    pub unit_mode: bool,
34    pub max_reconsume_times: Option<i32>,
35    #[serde(flatten)]
36    pub rpc_request_header: Option<RpcRequestHeader>,
37}
38
39#[cfg(test)]
40mod tests {
41    use std::collections::HashMap;
42
43    use cheetah_string::CheetahString;
44
45    use super::*;
46    use crate::protocol::command_custom_header::CommandCustomHeader;
47    use crate::protocol::command_custom_header::FromMap;
48
49    #[test]
50    fn consumer_send_msg_back_request_header_serializes_correctly() {
51        let header = ConsumerSendMsgBackRequestHeader {
52            offset: 12345,
53            group: CheetahString::from_static_str("test_group"),
54            delay_level: 2,
55            origin_msg_id: Some(CheetahString::from_static_str("msg_id")),
56            origin_topic: Some(CheetahString::from_static_str("topic")),
57            unit_mode: true,
58            max_reconsume_times: Some(3),
59            rpc_request_header: None,
60        };
61        let map = header.to_map().unwrap();
62        assert_eq!(map.get(&CheetahString::from_static_str("offset")).unwrap(), "12345");
63        assert_eq!(map.get(&CheetahString::from_static_str("group")).unwrap(), "test_group");
64        assert_eq!(map.get(&CheetahString::from_static_str("delayLevel")).unwrap(), "2");
65        assert_eq!(
66            map.get(&CheetahString::from_static_str("originMsgId")).unwrap(),
67            "msg_id"
68        );
69        assert_eq!(
70            map.get(&CheetahString::from_static_str("originTopic")).unwrap(),
71            "topic"
72        );
73        assert_eq!(map.get(&CheetahString::from_static_str("unitMode")).unwrap(), "true");
74        assert_eq!(
75            map.get(&CheetahString::from_static_str("maxReconsumeTimes")).unwrap(),
76            "3"
77        );
78    }
79
80    #[test]
81    fn consumer_send_msg_back_request_header_deserializes_correctly() {
82        let mut map = HashMap::new();
83        map.insert(
84            CheetahString::from_static_str("offset"),
85            CheetahString::from_static_str("12345"),
86        );
87        map.insert(
88            CheetahString::from_static_str("group"),
89            CheetahString::from_static_str("test_group"),
90        );
91        map.insert(
92            CheetahString::from_static_str("delayLevel"),
93            CheetahString::from_static_str("2"),
94        );
95        map.insert(
96            CheetahString::from_static_str("originMsgId"),
97            CheetahString::from_static_str("msg_id"),
98        );
99        map.insert(
100            CheetahString::from_static_str("originTopic"),
101            CheetahString::from_static_str("topic"),
102        );
103        map.insert(
104            CheetahString::from_static_str("unitMode"),
105            CheetahString::from_static_str("true"),
106        );
107        map.insert(
108            CheetahString::from_static_str("maxReconsumeTimes"),
109            CheetahString::from_static_str("3"),
110        );
111
112        let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
113        assert_eq!(header.offset, 12345);
114        assert_eq!(header.group, "test_group");
115        assert_eq!(header.delay_level, 2);
116        assert_eq!(header.origin_msg_id.unwrap(), "msg_id");
117        assert_eq!(header.origin_topic.unwrap(), "topic");
118        assert!(header.unit_mode);
119        assert_eq!(header.max_reconsume_times.unwrap(), 3);
120    }
121
122    #[test]
123    fn consumer_send_msg_back_request_header_handles_missing_optional_fields() {
124        let mut map = HashMap::new();
125        map.insert(
126            CheetahString::from_static_str("offset"),
127            CheetahString::from_static_str("12345"),
128        );
129        map.insert(
130            CheetahString::from_static_str("group"),
131            CheetahString::from_static_str("test_group"),
132        );
133        map.insert(
134            CheetahString::from_static_str("delayLevel"),
135            CheetahString::from_static_str("2"),
136        );
137        map.insert(
138            CheetahString::from_static_str("unitMode"),
139            CheetahString::from_static_str("true"),
140        );
141
142        let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
143        assert_eq!(header.offset, 12345);
144        assert_eq!(header.group, "test_group");
145        assert_eq!(header.delay_level, 2);
146        assert!(header.origin_msg_id.is_none());
147        assert!(header.origin_topic.is_none());
148        assert!(header.unit_mode);
149        assert!(header.max_reconsume_times.is_none());
150    }
151
152    #[test]
153    fn consumer_send_msg_back_request_header_handles_invalid_data() {
154        let mut map = HashMap::new();
155        map.insert(
156            CheetahString::from_static_str("offset"),
157            CheetahString::from_static_str("invalid"),
158        );
159        map.insert(
160            CheetahString::from_static_str("group"),
161            CheetahString::from_static_str("test_group"),
162        );
163        map.insert(
164            CheetahString::from_static_str("delayLevel"),
165            CheetahString::from_static_str("invalid"),
166        );
167        map.insert(
168            CheetahString::from_static_str("unitMode"),
169            CheetahString::from_static_str("true"),
170        );
171
172        let result = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map);
173        assert!(result.is_err());
174    }
175}
176
177/*impl ConsumerSendMsgBackRequestHeader {
178    pub const OFFSET: &'static str = "offset";
179    pub const GROUP: &'static str = "group";
180    pub const DELAY_LEVEL: &'static str = "delayLevel";
181    pub const ORIGIN_MSG_ID: &'static str = "originMsgId";
182    pub const ORIGIN_TOPIC: &'static str = "originTopic";
183    pub const UNIT_MODE: &'static str = "unitMode";
184    pub const MAX_RECONSUME_TIMES: &'static str = "maxReconsumeTimes";
185}
186
187impl CommandCustomHeader for ConsumerSendMsgBackRequestHeader {
188    fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
189        let mut map = std::collections::HashMap::new();
190        map.insert(
191            CheetahString::from_static_str(Self::OFFSET),
192            CheetahString::from_string(self.offset.to_string()),
193        );
194        map.insert(
195            CheetahString::from_static_str(Self::GROUP),
196            self.group.clone(),
197        );
198        map.insert(
199            CheetahString::from_static_str(Self::DELAY_LEVEL),
200            CheetahString::from_string(self.delay_level.to_string()),
201        );
202        if let Some(value) = &self.origin_msg_id {
203            map.insert(
204                CheetahString::from_static_str(Self::ORIGIN_MSG_ID),
205                value.clone(),
206            );
207        }
208        if let Some(value) = &self.origin_topic {
209            map.insert(
210                CheetahString::from_static_str(Self::ORIGIN_TOPIC),
211                value.clone(),
212            );
213        }
214        map.insert(
215            CheetahString::from_static_str(Self::UNIT_MODE),
216            CheetahString::from_string(self.unit_mode.to_string()),
217        );
218        if let Some(value) = self.max_reconsume_times {
219            map.insert(
220                CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES),
221                CheetahString::from_string(value.to_string()),
222            );
223        }
224        if let Some(ref rpc) = self.rpc_request_header {
225            if let Some(rpc_map) = rpc.to_map() {
226                map.extend(rpc_map);
227            }
228        }
229        Some(map)
230    }
231}
232
233impl FromMap for ConsumerSendMsgBackRequestHeader {
234    type Error = rocketmq_error::RocketMQError;
235
236    type Target = Self;
237
238    fn from(
239        map: &std::collections::HashMap<CheetahString, CheetahString>,
240    ) -> Result<Self::Target, Self::Error> {
241        Ok(ConsumerSendMsgBackRequestHeader {
242            offset: map
243                .get(&CheetahString::from_static_str(Self::OFFSET))
244                .cloned()
245                .ok_or_else(|| {
246                    rocketmq_error::RocketMQError::Protocol(
247                        rocketmq_error::ProtocolError::header_missing("offset"),
248                    )
249                })?
250                .parse()
251                .map_err(|_| {
252                    rocketmq_error::RocketMQError::Protocol(
253                        rocketmq_error::ProtocolError::invalid_message("Invalid offset"),
254                    )
255                })?,
256            group: map
257                .get(&CheetahString::from_static_str(Self::GROUP))
258                .cloned()
259                .ok_or_else(|| {
260                    rocketmq_error::RocketMQError::Protocol(
261                        rocketmq_error::ProtocolError::header_missing("group"),
262                    )
263                })?,
264            delay_level: map
265                .get(&CheetahString::from_static_str(Self::DELAY_LEVEL))
266                .cloned()
267                .ok_or_else(|| {
268                    rocketmq_error::RocketMQError::Protocol(
269                        rocketmq_error::ProtocolError::header_missing("delay_level"),
270                    )
271                })?
272                .parse()
273                .map_err(|_| {
274                    rocketmq_error::RocketMQError::Protocol(
275                        rocketmq_error::ProtocolError::invalid_message("Invalid delay level"),
276                    )
277                })?,
278            origin_msg_id: map
279                .get(&CheetahString::from_static_str(Self::ORIGIN_MSG_ID))
280                .cloned(),
281            origin_topic: map
282                .get(&CheetahString::from_static_str(Self::ORIGIN_TOPIC))
283                .cloned(),
284            unit_mode: map
285                .get(&CheetahString::from_static_str(Self::UNIT_MODE))
286                .cloned()
287                .unwrap_or(CheetahString::from_static_str("false"))
288                .parse()
289                .unwrap_or(false),
290            max_reconsume_times: map
291                .get(&CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES))
292                .and_then(|value| value.parse().ok()),
293            rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
294        })
295    }
296}
297*/