rocketmq_remoting/protocol/header/consumer_send_msg_back_request_header.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::rpc::rpc_request_header::RpcRequestHeader;
21
22#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
23#[serde(rename_all = "camelCase")]
24pub struct ConsumerSendMsgBackRequestHeader {
25 #[required]
26 pub offset: i64,
27 #[required]
28 pub group: CheetahString, //consumer group
29 #[required]
30 pub delay_level: i32,
31 pub origin_msg_id: Option<CheetahString>,
32 pub origin_topic: Option<CheetahString>,
33 pub unit_mode: bool,
34 pub max_reconsume_times: Option<i32>,
35 #[serde(flatten)]
36 pub rpc_request_header: Option<RpcRequestHeader>,
37}
38
39#[cfg(test)]
40mod tests {
41 use std::collections::HashMap;
42
43 use cheetah_string::CheetahString;
44
45 use super::*;
46 use crate::protocol::command_custom_header::CommandCustomHeader;
47 use crate::protocol::command_custom_header::FromMap;
48
49 #[test]
50 fn consumer_send_msg_back_request_header_serializes_correctly() {
51 let header = ConsumerSendMsgBackRequestHeader {
52 offset: 12345,
53 group: CheetahString::from_static_str("test_group"),
54 delay_level: 2,
55 origin_msg_id: Some(CheetahString::from_static_str("msg_id")),
56 origin_topic: Some(CheetahString::from_static_str("topic")),
57 unit_mode: true,
58 max_reconsume_times: Some(3),
59 rpc_request_header: None,
60 };
61 let map = header.to_map().unwrap();
62 assert_eq!(map.get(&CheetahString::from_static_str("offset")).unwrap(), "12345");
63 assert_eq!(map.get(&CheetahString::from_static_str("group")).unwrap(), "test_group");
64 assert_eq!(map.get(&CheetahString::from_static_str("delayLevel")).unwrap(), "2");
65 assert_eq!(
66 map.get(&CheetahString::from_static_str("originMsgId")).unwrap(),
67 "msg_id"
68 );
69 assert_eq!(
70 map.get(&CheetahString::from_static_str("originTopic")).unwrap(),
71 "topic"
72 );
73 assert_eq!(map.get(&CheetahString::from_static_str("unitMode")).unwrap(), "true");
74 assert_eq!(
75 map.get(&CheetahString::from_static_str("maxReconsumeTimes")).unwrap(),
76 "3"
77 );
78 }
79
80 #[test]
81 fn consumer_send_msg_back_request_header_deserializes_correctly() {
82 let mut map = HashMap::new();
83 map.insert(
84 CheetahString::from_static_str("offset"),
85 CheetahString::from_static_str("12345"),
86 );
87 map.insert(
88 CheetahString::from_static_str("group"),
89 CheetahString::from_static_str("test_group"),
90 );
91 map.insert(
92 CheetahString::from_static_str("delayLevel"),
93 CheetahString::from_static_str("2"),
94 );
95 map.insert(
96 CheetahString::from_static_str("originMsgId"),
97 CheetahString::from_static_str("msg_id"),
98 );
99 map.insert(
100 CheetahString::from_static_str("originTopic"),
101 CheetahString::from_static_str("topic"),
102 );
103 map.insert(
104 CheetahString::from_static_str("unitMode"),
105 CheetahString::from_static_str("true"),
106 );
107 map.insert(
108 CheetahString::from_static_str("maxReconsumeTimes"),
109 CheetahString::from_static_str("3"),
110 );
111
112 let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
113 assert_eq!(header.offset, 12345);
114 assert_eq!(header.group, "test_group");
115 assert_eq!(header.delay_level, 2);
116 assert_eq!(header.origin_msg_id.unwrap(), "msg_id");
117 assert_eq!(header.origin_topic.unwrap(), "topic");
118 assert!(header.unit_mode);
119 assert_eq!(header.max_reconsume_times.unwrap(), 3);
120 }
121
122 #[test]
123 fn consumer_send_msg_back_request_header_handles_missing_optional_fields() {
124 let mut map = HashMap::new();
125 map.insert(
126 CheetahString::from_static_str("offset"),
127 CheetahString::from_static_str("12345"),
128 );
129 map.insert(
130 CheetahString::from_static_str("group"),
131 CheetahString::from_static_str("test_group"),
132 );
133 map.insert(
134 CheetahString::from_static_str("delayLevel"),
135 CheetahString::from_static_str("2"),
136 );
137 map.insert(
138 CheetahString::from_static_str("unitMode"),
139 CheetahString::from_static_str("true"),
140 );
141
142 let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
143 assert_eq!(header.offset, 12345);
144 assert_eq!(header.group, "test_group");
145 assert_eq!(header.delay_level, 2);
146 assert!(header.origin_msg_id.is_none());
147 assert!(header.origin_topic.is_none());
148 assert!(header.unit_mode);
149 assert!(header.max_reconsume_times.is_none());
150 }
151
152 #[test]
153 fn consumer_send_msg_back_request_header_handles_invalid_data() {
154 let mut map = HashMap::new();
155 map.insert(
156 CheetahString::from_static_str("offset"),
157 CheetahString::from_static_str("invalid"),
158 );
159 map.insert(
160 CheetahString::from_static_str("group"),
161 CheetahString::from_static_str("test_group"),
162 );
163 map.insert(
164 CheetahString::from_static_str("delayLevel"),
165 CheetahString::from_static_str("invalid"),
166 );
167 map.insert(
168 CheetahString::from_static_str("unitMode"),
169 CheetahString::from_static_str("true"),
170 );
171
172 let result = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map);
173 assert!(result.is_err());
174 }
175}
176
177/*impl ConsumerSendMsgBackRequestHeader {
178 pub const OFFSET: &'static str = "offset";
179 pub const GROUP: &'static str = "group";
180 pub const DELAY_LEVEL: &'static str = "delayLevel";
181 pub const ORIGIN_MSG_ID: &'static str = "originMsgId";
182 pub const ORIGIN_TOPIC: &'static str = "originTopic";
183 pub const UNIT_MODE: &'static str = "unitMode";
184 pub const MAX_RECONSUME_TIMES: &'static str = "maxReconsumeTimes";
185}
186
187impl CommandCustomHeader for ConsumerSendMsgBackRequestHeader {
188 fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
189 let mut map = std::collections::HashMap::new();
190 map.insert(
191 CheetahString::from_static_str(Self::OFFSET),
192 CheetahString::from_string(self.offset.to_string()),
193 );
194 map.insert(
195 CheetahString::from_static_str(Self::GROUP),
196 self.group.clone(),
197 );
198 map.insert(
199 CheetahString::from_static_str(Self::DELAY_LEVEL),
200 CheetahString::from_string(self.delay_level.to_string()),
201 );
202 if let Some(value) = &self.origin_msg_id {
203 map.insert(
204 CheetahString::from_static_str(Self::ORIGIN_MSG_ID),
205 value.clone(),
206 );
207 }
208 if let Some(value) = &self.origin_topic {
209 map.insert(
210 CheetahString::from_static_str(Self::ORIGIN_TOPIC),
211 value.clone(),
212 );
213 }
214 map.insert(
215 CheetahString::from_static_str(Self::UNIT_MODE),
216 CheetahString::from_string(self.unit_mode.to_string()),
217 );
218 if let Some(value) = self.max_reconsume_times {
219 map.insert(
220 CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES),
221 CheetahString::from_string(value.to_string()),
222 );
223 }
224 if let Some(ref rpc) = self.rpc_request_header {
225 if let Some(rpc_map) = rpc.to_map() {
226 map.extend(rpc_map);
227 }
228 }
229 Some(map)
230 }
231}
232
233impl FromMap for ConsumerSendMsgBackRequestHeader {
234 type Error = rocketmq_error::RocketMQError;
235
236 type Target = Self;
237
238 fn from(
239 map: &std::collections::HashMap<CheetahString, CheetahString>,
240 ) -> Result<Self::Target, Self::Error> {
241 Ok(ConsumerSendMsgBackRequestHeader {
242 offset: map
243 .get(&CheetahString::from_static_str(Self::OFFSET))
244 .cloned()
245 .ok_or_else(|| {
246 rocketmq_error::RocketMQError::Protocol(
247 rocketmq_error::ProtocolError::header_missing("offset"),
248 )
249 })?
250 .parse()
251 .map_err(|_| {
252 rocketmq_error::RocketMQError::Protocol(
253 rocketmq_error::ProtocolError::invalid_message("Invalid offset"),
254 )
255 })?,
256 group: map
257 .get(&CheetahString::from_static_str(Self::GROUP))
258 .cloned()
259 .ok_or_else(|| {
260 rocketmq_error::RocketMQError::Protocol(
261 rocketmq_error::ProtocolError::header_missing("group"),
262 )
263 })?,
264 delay_level: map
265 .get(&CheetahString::from_static_str(Self::DELAY_LEVEL))
266 .cloned()
267 .ok_or_else(|| {
268 rocketmq_error::RocketMQError::Protocol(
269 rocketmq_error::ProtocolError::header_missing("delay_level"),
270 )
271 })?
272 .parse()
273 .map_err(|_| {
274 rocketmq_error::RocketMQError::Protocol(
275 rocketmq_error::ProtocolError::invalid_message("Invalid delay level"),
276 )
277 })?,
278 origin_msg_id: map
279 .get(&CheetahString::from_static_str(Self::ORIGIN_MSG_ID))
280 .cloned(),
281 origin_topic: map
282 .get(&CheetahString::from_static_str(Self::ORIGIN_TOPIC))
283 .cloned(),
284 unit_mode: map
285 .get(&CheetahString::from_static_str(Self::UNIT_MODE))
286 .cloned()
287 .unwrap_or(CheetahString::from_static_str("false"))
288 .parse()
289 .unwrap_or(false),
290 max_reconsume_times: map
291 .get(&CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES))
292 .and_then(|value| value.parse().ok()),
293 rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
294 })
295 }
296}
297*/