rocketmq_remoting/protocol/header/consumer_send_msg_back_request_header.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::rpc::rpc_request_header::RpcRequestHeader;
23
24#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct ConsumerSendMsgBackRequestHeader {
27 #[required]
28 pub offset: i64,
29 #[required]
30 pub group: CheetahString,
31 #[required]
32 pub delay_level: i32,
33 pub origin_msg_id: Option<CheetahString>,
34 pub origin_topic: Option<CheetahString>,
35 pub unit_mode: bool,
36 pub max_reconsume_times: Option<i32>,
37 #[serde(flatten)]
38 pub rpc_request_header: Option<RpcRequestHeader>,
39}
40
41#[cfg(test)]
42mod tests {
43 use std::collections::HashMap;
44
45 use cheetah_string::CheetahString;
46
47 use super::*;
48 use crate::protocol::command_custom_header::CommandCustomHeader;
49 use crate::protocol::command_custom_header::FromMap;
50
51 #[test]
52 fn consumer_send_msg_back_request_header_serializes_correctly() {
53 let header = ConsumerSendMsgBackRequestHeader {
54 offset: 12345,
55 group: CheetahString::from_static_str("test_group"),
56 delay_level: 2,
57 origin_msg_id: Some(CheetahString::from_static_str("msg_id")),
58 origin_topic: Some(CheetahString::from_static_str("topic")),
59 unit_mode: true,
60 max_reconsume_times: Some(3),
61 rpc_request_header: None,
62 };
63 let map = header.to_map().unwrap();
64 assert_eq!(
65 map.get(&CheetahString::from_static_str("offset")).unwrap(),
66 "12345"
67 );
68 assert_eq!(
69 map.get(&CheetahString::from_static_str("group")).unwrap(),
70 "test_group"
71 );
72 assert_eq!(
73 map.get(&CheetahString::from_static_str("delayLevel"))
74 .unwrap(),
75 "2"
76 );
77 assert_eq!(
78 map.get(&CheetahString::from_static_str("originMsgId"))
79 .unwrap(),
80 "msg_id"
81 );
82 assert_eq!(
83 map.get(&CheetahString::from_static_str("originTopic"))
84 .unwrap(),
85 "topic"
86 );
87 assert_eq!(
88 map.get(&CheetahString::from_static_str("unitMode"))
89 .unwrap(),
90 "true"
91 );
92 assert_eq!(
93 map.get(&CheetahString::from_static_str("maxReconsumeTimes"))
94 .unwrap(),
95 "3"
96 );
97 }
98
99 #[test]
100 fn consumer_send_msg_back_request_header_deserializes_correctly() {
101 let mut map = HashMap::new();
102 map.insert(
103 CheetahString::from_static_str("offset"),
104 CheetahString::from_static_str("12345"),
105 );
106 map.insert(
107 CheetahString::from_static_str("group"),
108 CheetahString::from_static_str("test_group"),
109 );
110 map.insert(
111 CheetahString::from_static_str("delayLevel"),
112 CheetahString::from_static_str("2"),
113 );
114 map.insert(
115 CheetahString::from_static_str("originMsgId"),
116 CheetahString::from_static_str("msg_id"),
117 );
118 map.insert(
119 CheetahString::from_static_str("originTopic"),
120 CheetahString::from_static_str("topic"),
121 );
122 map.insert(
123 CheetahString::from_static_str("unitMode"),
124 CheetahString::from_static_str("true"),
125 );
126 map.insert(
127 CheetahString::from_static_str("maxReconsumeTimes"),
128 CheetahString::from_static_str("3"),
129 );
130
131 let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
132 assert_eq!(header.offset, 12345);
133 assert_eq!(header.group, "test_group");
134 assert_eq!(header.delay_level, 2);
135 assert_eq!(header.origin_msg_id.unwrap(), "msg_id");
136 assert_eq!(header.origin_topic.unwrap(), "topic");
137 assert!(header.unit_mode);
138 assert_eq!(header.max_reconsume_times.unwrap(), 3);
139 }
140
141 #[test]
142 fn consumer_send_msg_back_request_header_handles_missing_optional_fields() {
143 let mut map = HashMap::new();
144 map.insert(
145 CheetahString::from_static_str("offset"),
146 CheetahString::from_static_str("12345"),
147 );
148 map.insert(
149 CheetahString::from_static_str("group"),
150 CheetahString::from_static_str("test_group"),
151 );
152 map.insert(
153 CheetahString::from_static_str("delayLevel"),
154 CheetahString::from_static_str("2"),
155 );
156 map.insert(
157 CheetahString::from_static_str("unitMode"),
158 CheetahString::from_static_str("true"),
159 );
160
161 let header = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map).unwrap();
162 assert_eq!(header.offset, 12345);
163 assert_eq!(header.group, "test_group");
164 assert_eq!(header.delay_level, 2);
165 assert!(header.origin_msg_id.is_none());
166 assert!(header.origin_topic.is_none());
167 assert!(header.unit_mode);
168 assert!(header.max_reconsume_times.is_none());
169 }
170
171 #[test]
172 fn consumer_send_msg_back_request_header_handles_invalid_data() {
173 let mut map = HashMap::new();
174 map.insert(
175 CheetahString::from_static_str("offset"),
176 CheetahString::from_static_str("invalid"),
177 );
178 map.insert(
179 CheetahString::from_static_str("group"),
180 CheetahString::from_static_str("test_group"),
181 );
182 map.insert(
183 CheetahString::from_static_str("delayLevel"),
184 CheetahString::from_static_str("invalid"),
185 );
186 map.insert(
187 CheetahString::from_static_str("unitMode"),
188 CheetahString::from_static_str("true"),
189 );
190
191 let result = <ConsumerSendMsgBackRequestHeader as FromMap>::from(&map);
192 assert!(result.is_err());
193 }
194}
195
196/*impl ConsumerSendMsgBackRequestHeader {
197 pub const OFFSET: &'static str = "offset";
198 pub const GROUP: &'static str = "group";
199 pub const DELAY_LEVEL: &'static str = "delayLevel";
200 pub const ORIGIN_MSG_ID: &'static str = "originMsgId";
201 pub const ORIGIN_TOPIC: &'static str = "originTopic";
202 pub const UNIT_MODE: &'static str = "unitMode";
203 pub const MAX_RECONSUME_TIMES: &'static str = "maxReconsumeTimes";
204}
205
206impl CommandCustomHeader for ConsumerSendMsgBackRequestHeader {
207 fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
208 let mut map = std::collections::HashMap::new();
209 map.insert(
210 CheetahString::from_static_str(Self::OFFSET),
211 CheetahString::from_string(self.offset.to_string()),
212 );
213 map.insert(
214 CheetahString::from_static_str(Self::GROUP),
215 self.group.clone(),
216 );
217 map.insert(
218 CheetahString::from_static_str(Self::DELAY_LEVEL),
219 CheetahString::from_string(self.delay_level.to_string()),
220 );
221 if let Some(value) = &self.origin_msg_id {
222 map.insert(
223 CheetahString::from_static_str(Self::ORIGIN_MSG_ID),
224 value.clone(),
225 );
226 }
227 if let Some(value) = &self.origin_topic {
228 map.insert(
229 CheetahString::from_static_str(Self::ORIGIN_TOPIC),
230 value.clone(),
231 );
232 }
233 map.insert(
234 CheetahString::from_static_str(Self::UNIT_MODE),
235 CheetahString::from_string(self.unit_mode.to_string()),
236 );
237 if let Some(value) = self.max_reconsume_times {
238 map.insert(
239 CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES),
240 CheetahString::from_string(value.to_string()),
241 );
242 }
243 if let Some(ref rpc) = self.rpc_request_header {
244 if let Some(rpc_map) = rpc.to_map() {
245 map.extend(rpc_map);
246 }
247 }
248 Some(map)
249 }
250}
251
252impl FromMap for ConsumerSendMsgBackRequestHeader {
253 type Error = rocketmq_error::RocketMQError;
254
255 type Target = Self;
256
257 fn from(
258 map: &std::collections::HashMap<CheetahString, CheetahString>,
259 ) -> Result<Self::Target, Self::Error> {
260 Ok(ConsumerSendMsgBackRequestHeader {
261 offset: map
262 .get(&CheetahString::from_static_str(Self::OFFSET))
263 .cloned()
264 .ok_or_else(|| {
265 rocketmq_error::RocketMQError::Protocol(
266 rocketmq_error::ProtocolError::header_missing("offset"),
267 )
268 })?
269 .parse()
270 .map_err(|_| {
271 rocketmq_error::RocketMQError::Protocol(
272 rocketmq_error::ProtocolError::invalid_message("Invalid offset"),
273 )
274 })?,
275 group: map
276 .get(&CheetahString::from_static_str(Self::GROUP))
277 .cloned()
278 .ok_or_else(|| {
279 rocketmq_error::RocketMQError::Protocol(
280 rocketmq_error::ProtocolError::header_missing("group"),
281 )
282 })?,
283 delay_level: map
284 .get(&CheetahString::from_static_str(Self::DELAY_LEVEL))
285 .cloned()
286 .ok_or_else(|| {
287 rocketmq_error::RocketMQError::Protocol(
288 rocketmq_error::ProtocolError::header_missing("delay_level"),
289 )
290 })?
291 .parse()
292 .map_err(|_| {
293 rocketmq_error::RocketMQError::Protocol(
294 rocketmq_error::ProtocolError::invalid_message("Invalid delay level"),
295 )
296 })?,
297 origin_msg_id: map
298 .get(&CheetahString::from_static_str(Self::ORIGIN_MSG_ID))
299 .cloned(),
300 origin_topic: map
301 .get(&CheetahString::from_static_str(Self::ORIGIN_TOPIC))
302 .cloned(),
303 unit_mode: map
304 .get(&CheetahString::from_static_str(Self::UNIT_MODE))
305 .cloned()
306 .unwrap_or(CheetahString::from_static_str("false"))
307 .parse()
308 .unwrap_or(false),
309 max_reconsume_times: map
310 .get(&CheetahString::from_static_str(Self::MAX_RECONSUME_TIMES))
311 .and_then(|value| value.parse().ok()),
312 rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
313 })
314 }
315}
316*/