rocketmq_remoting/protocol/header/
consumer_send_msg_back_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::rpc::rpc_request_header::RpcRequestHeader;
23
24#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct ConsumerSendMsgBackRequestHeader {
27    #[required]
28    pub offset: i64,
29    #[required]
30    pub group: CheetahString,
31    #[required]
32    pub delay_level: i32,
33    pub origin_msg_id: Option<CheetahString>,
34    pub origin_topic: Option<CheetahString>,
35    pub unit_mode: bool,
36    pub max_reconsume_times: Option<i32>,
37    #[serde(flatten)]
38    pub rpc_request_header: Option<RpcRequestHeader>,
39}
40
41#[cfg(test)]
42mod tests {
43    use std::collections::HashMap;
44
45    use cheetah_string::CheetahString;
46
47    use super::*;
48    use crate::protocol::command_custom_header::CommandCustomHeader;
49    use crate::protocol::command_custom_header::FromMap;
50
51    #[test]
52    fn consumer_send_msg_back_request_header_serializes_correctly() {
53        let header = ConsumerSendMsgBackRequestHeader {
54            offset: 12345,
55            group: CheetahString::from_static_str("test_group"),
56            delay_level: 2,
57            origin_msg_id: Some(CheetahString::from_static_str("msg_id")),
58            origin_topic: Some(CheetahString::from_static_str("topic")),
59            unit_mode: true,
60            max_reconsume_times: Some(3),
61            rpc_request_header: None,
62        };
63        let map = header.to_map().unwrap();
64        assert_eq!(
65            map.get(&CheetahString::from_static_str("offset")).unwrap(),
66            "12345"
67        );
68        assert_eq!(
69            map.get(&CheetahString::from_static_str("group")).unwrap(),
70            "test_group"
71        );
72        assert_eq!(
73            map.get(&CheetahString::from_static_str("delayLevel"))
74                .unwrap(),
75            "2"
76        );
77        assert_eq!(
78            map.get(&CheetahString::from_static_str("originMsgId"))
79                .unwrap(),
80            "msg_id"
81        );
82        assert_eq!(
83            map.get(&CheetahString::from_static_str("originTopic"))
84                .unwrap(),
85            "topic"
86        );
87        assert_eq!(
88            map.get(&CheetahString::from_static_str("unitMode"))
89                .unwrap(),
90            "true"
91        );
92        assert_eq!(
93            map.get(&CheetahString::from_static_str("maxReconsumeTimes"))
94                .unwrap(),
95            "3"
96        );
97    }
98
99    #[test]
100    fn consumer_send_msg_back_request_header_deserializes_correctly() {
101        let mut map = HashMap::new();
102        map.insert(
103            CheetahString::from_static_str("offset"),
104            CheetahString::from_static_str("12345"),
105        );
106        map.insert(
107            CheetahString::from_static_str("group"),
108            CheetahString::from_static_str("test_group"),
109        );
110        map.insert(
111            CheetahString::from_static_str("delayLevel"),
112            CheetahString::from_static_str("2"),
113        );
114        map.insert(
115            CheetahString::from_static_str("originMsgId"),
116            CheetahString::from_static_str("msg_id"),
117        );
118        map.insert(
119            CheetahString::from_static_str("originTopic"),
120            CheetahString::from_static_str("topic"),
121        );
122        map.insert(
123            CheetahString::from_static_str("unitMode"),
124            CheetahString::from_static_str("true"),
125        );
126        map.insert(
127            CheetahString::from_static_str("maxReconsumeTimes"),
128            CheetahString::from_static_str("3"),
129        );
130
131        let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
132        assert_eq!(header.offset, 12345);
133        assert_eq!(header.group, "test_group");
134        assert_eq!(header.delay_level, 2);
135        assert_eq!(header.origin_msg_id.unwrap(), "msg_id");
136        assert_eq!(header.origin_topic.unwrap(), "topic");
137        assert!(header.unit_mode);
138        assert_eq!(header.max_reconsume_times.unwrap(), 3);
139    }
140
141    #[test]
142    fn consumer_send_msg_back_request_header_handles_missing_optional_fields() {
143        let mut map = HashMap::new();
144        map.insert(
145            CheetahString::from_static_str("offset"),
146            CheetahString::from_static_str("12345"),
147        );
148        map.insert(
149            CheetahString::from_static_str("group"),
150            CheetahString::from_static_str("test_group"),
151        );
152        map.insert(
153            CheetahString::from_static_str("delayLevel"),
154            CheetahString::from_static_str("2"),
155        );
156        map.insert(
157            CheetahString::from_static_str("unitMode"),
158            CheetahString::from_static_str("true"),
159        );
160
161        let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
162        assert_eq!(header.offset, 12345);
163        assert_eq!(header.group, "test_group");
164        assert_eq!(header.delay_level, 2);
165        assert!(header.origin_msg_id.is_none());
166        assert!(header.origin_topic.is_none());
167        assert!(header.unit_mode);
168        assert!(header.max_reconsume_times.is_none());
169    }
170
171    #[test]
172    fn consumer_send_msg_back_request_header_handles_invalid_data() {
173        let mut map = HashMap::new();
174        map.insert(
175            CheetahString::from_static_str("offset"),
176            CheetahString::from_static_str("invalid"),
177        );
178        map.insert(
179            CheetahString::from_static_str("group"),
180            CheetahString::from_static_str("test_group"),
181        );
182        map.insert(
183            CheetahString::from_static_str("delayLevel"),
184            CheetahString::from_static_str("invalid"),
185        );
186        map.insert(
187            CheetahString::from_static_str("unitMode"),
188            CheetahString::from_static_str("true"),
189        );
190
191        let result = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map);
192        assert!(result.is_err());
193    }
194}
195
196/*impl ConsumerSendMsgBackRequestHeader {
197    pub const OFFSET: &'static str = "offset";
198    pub const GROUP: &'static str = "group";
199    pub const DELAY_LEVEL: &'static str = "delayLevel";
200    pub const ORIGIN_MSG_ID: &'static str = "originMsgId";
201    pub const ORIGIN_TOPIC: &'static str = "originTopic";
202    pub const UNIT_MODE: &'static str = "unitMode";
203    pub const MAX_RECONSUME_TIMES: &'static str = "maxReconsumeTimes";
204}
205
206impl CommandCustomHeader for ConsumerSendMsgBackRequestHeader {
207    fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
208        let mut map = std::collections::HashMap::new();
209        map.insert(
210            CheetahString::from_static_str(Self::OFFSET),
211            CheetahString::from_string(self.offset.to_string()),
212        );
213        map.insert(
214            CheetahString::from_static_str(Self::GROUP),
215            self.group.clone(),
216        );
217        map.insert(
218            CheetahString::from_static_str(Self::DELAY_LEVEL),
219            CheetahString::from_string(self.delay_level.to_string()),
220        );
221        if let Some(value) = &self.origin_msg_id {
222            map.insert(
223                CheetahString::from_static_str(Self::ORIGIN_MSG_ID),
224                value.clone(),
225            );
226        }
227        if let Some(value) = &self.origin_topic {
228            map.insert(
229                CheetahString::from_static_str(Self::ORIGIN_TOPIC),
230                value.clone(),
231            );
232        }
233        map.insert(
234            CheetahString::from_static_str(Self::UNIT_MODE),
235            CheetahString::from_string(self.unit_mode.to_string()),
236        );
237        if let Some(value) = self.max_reconsume_times {
238            map.insert(
239                CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES),
240                CheetahString::from_string(value.to_string()),
241            );
242        }
243        if let Some(ref rpc) = self.rpc_request_header {
244            if let Some(rpc_map) = rpc.to_map() {
245                map.extend(rpc_map);
246            }
247        }
248        Some(map)
249    }
250}
251
252impl FromMap for ConsumerSendMsgBackRequestHeader {
253    type Error = rocketmq_error::RocketMQError;
254
255    type Target = Self;
256
257    fn from(
258        map: &std::collections::HashMap<CheetahString, CheetahString>,
259    ) -> Result<Self::Target, Self::Error> {
260        Ok(ConsumerSendMsgBackRequestHeader {
261            offset: map
262                .get(&CheetahString::from_static_str(Self::OFFSET))
263                .cloned()
264                .ok_or_else(|| {
265                    rocketmq_error::RocketMQError::Protocol(
266                        rocketmq_error::ProtocolError::header_missing("offset"),
267                    )
268                })?
269                .parse()
270                .map_err(|_| {
271                    rocketmq_error::RocketMQError::Protocol(
272                        rocketmq_error::ProtocolError::invalid_message("Invalid offset"),
273                    )
274                })?,
275            group: map
276                .get(&CheetahString::from_static_str(Self::GROUP))
277                .cloned()
278                .ok_or_else(|| {
279                    rocketmq_error::RocketMQError::Protocol(
280                        rocketmq_error::ProtocolError::header_missing("group"),
281                    )
282                })?,
283            delay_level: map
284                .get(&CheetahString::from_static_str(Self::DELAY_LEVEL))
285                .cloned()
286                .ok_or_else(|| {
287                    rocketmq_error::RocketMQError::Protocol(
288                        rocketmq_error::ProtocolError::header_missing("delay_level"),
289                    )
290                })?
291                .parse()
292                .map_err(|_| {
293                    rocketmq_error::RocketMQError::Protocol(
294                        rocketmq_error::ProtocolError::invalid_message("Invalid delay level"),
295                    )
296                })?,
297            origin_msg_id: map
298                .get(&CheetahString::from_static_str(Self::ORIGIN_MSG_ID))
299                .cloned(),
300            origin_topic: map
301                .get(&CheetahString::from_static_str(Self::ORIGIN_TOPIC))
302                .cloned(),
303            unit_mode: map
304                .get(&CheetahString::from_static_str(Self::UNIT_MODE))
305                .cloned()
306                .unwrap_or(CheetahString::from_static_str("false"))
307                .parse()
308                .unwrap_or(false),
309            max_reconsume_times: map
310                .get(&CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES))
311                .and_then(|value| value.parse().ok()),
312            rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
313        })
314    }
315}
316*/