rocketmq_remoting/protocol/header/
pull_message_request_header.rs1use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
24use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
25
26#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct PullMessageRequestHeader {
29 #[required]
30 pub consumer_group: CheetahString,
31
32 #[required]
33 pub topic: CheetahString,
34
35 #[required]
36 pub queue_id: i32,
37
38 #[required]
39 pub queue_offset: i64,
40
41 #[required]
42 pub max_msg_nums: i32,
43
44 #[required]
45 pub sys_flag: i32,
46
47 #[required]
48 pub commit_offset: i64,
49
50 #[required]
51 pub suspend_timeout_millis: u64,
52
53 #[required]
54 pub sub_version: i64,
55
56 pub subscription: Option<CheetahString>,
57 pub expression_type: Option<CheetahString>,
58 pub max_msg_bytes: Option<i32>,
59 pub request_source: Option<i32>,
60 pub proxy_forward_client_id: Option<CheetahString>,
61 #[serde(flatten)]
62 pub topic_request: Option<TopicRequestHeader>,
63}
64
65impl TopicRequestHeaderTrait for PullMessageRequestHeader {
66 fn set_lo(&mut self, lo: Option<bool>) {
67 self.topic_request.as_mut().unwrap().lo = lo;
68 }
69
70 fn lo(&self) -> Option<bool> {
71 self.topic_request.as_ref().unwrap().lo
72 }
73
74 fn set_topic(&mut self, topic: CheetahString) {
75 self.topic = topic;
76 }
77
78 fn topic(&self) -> &CheetahString {
79 &self.topic
80 }
81
82 fn broker_name(&self) -> Option<&CheetahString> {
83 self.topic_request
84 .as_ref()
85 .unwrap()
86 .rpc
87 .as_ref()
88 .unwrap()
89 .broker_name
90 .as_ref()
91 }
92
93 fn set_broker_name(&mut self, broker_name: CheetahString) {
94 self.topic_request
95 .as_mut()
96 .unwrap()
97 .rpc
98 .as_mut()
99 .unwrap()
100 .broker_name = Some(broker_name);
101 }
102
103 fn namespace(&self) -> Option<&str> {
104 self.topic_request
105 .as_ref()
106 .unwrap()
107 .rpc
108 .as_ref()
109 .unwrap()
110 .namespace
111 .as_deref()
112 }
113
114 fn set_namespace(&mut self, namespace: CheetahString) {
115 self.topic_request
116 .as_mut()
117 .unwrap()
118 .rpc
119 .as_mut()
120 .unwrap()
121 .namespace = Some(namespace);
122 }
123
124 fn namespaced(&self) -> Option<bool> {
125 self.topic_request
126 .as_ref()
127 .unwrap()
128 .rpc
129 .as_ref()
130 .unwrap()
131 .namespaced
132 }
133
134 fn set_namespaced(&mut self, namespaced: bool) {
135 self.topic_request
136 .as_mut()
137 .unwrap()
138 .rpc
139 .as_mut()
140 .unwrap()
141 .namespaced = Some(namespaced);
142 }
143
144 fn oneway(&self) -> Option<bool> {
145 self.topic_request
146 .as_ref()
147 .unwrap()
148 .rpc
149 .as_ref()
150 .unwrap()
151 .oneway
152 }
153
154 fn set_oneway(&mut self, oneway: bool) {
155 self.topic_request
156 .as_mut()
157 .unwrap()
158 .rpc
159 .as_mut()
160 .unwrap()
161 .oneway = Some(oneway);
162 }
163
164 fn queue_id(&self) -> i32 {
165 self.queue_id
166 }
167
168 fn set_queue_id(&mut self, queue_id: i32) {
169 self.queue_id = queue_id;
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use std::collections::HashMap;
176
177 use cheetah_string::CheetahString;
178
179 use super::*;
180 use crate::protocol::command_custom_header::CommandCustomHeader;
181 use crate::protocol::command_custom_header::FromMap;
182
183 #[test]
184 fn pull_message_request_header_serializes_correctly() {
185 let header = PullMessageRequestHeader {
186 consumer_group: CheetahString::from_static_str("test_consumer_group"),
187 topic: CheetahString::from_static_str("test_topic"),
188 queue_id: 1,
189 queue_offset: 100,
190 max_msg_nums: 10,
191 sys_flag: 0,
192 commit_offset: 50,
193 suspend_timeout_millis: 3000,
194 subscription: Some(CheetahString::from_static_str("test_subscription")),
195 sub_version: 1,
196 expression_type: Some(CheetahString::from_static_str("test_expression")),
197 max_msg_bytes: Some(1024),
198 request_source: Some(1),
199 proxy_forward_client_id: Some(CheetahString::from_static_str("test_client_id")),
200 topic_request: None,
201 };
202 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
203 assert_eq!(
204 map.get(&CheetahString::from_static_str("consumerGroup"))
205 .unwrap(),
206 "test_consumer_group"
207 );
208 assert_eq!(
209 map.get(&CheetahString::from_static_str("topic")).unwrap(),
210 "test_topic"
211 );
212 assert_eq!(
213 map.get(&CheetahString::from_static_str("queueId")).unwrap(),
214 "1"
215 );
216 assert_eq!(
217 map.get(&CheetahString::from_static_str("queueOffset"))
218 .unwrap(),
219 "100"
220 );
221 assert_eq!(
222 map.get(&CheetahString::from_static_str("maxMsgNums"))
223 .unwrap(),
224 "10"
225 );
226 assert_eq!(
227 map.get(&CheetahString::from_static_str("sysFlag")).unwrap(),
228 "0"
229 );
230 assert_eq!(
231 map.get(&CheetahString::from_static_str("commitOffset"))
232 .unwrap(),
233 "50"
234 );
235 assert_eq!(
236 map.get(&CheetahString::from_static_str("suspendTimeoutMillis"))
237 .unwrap(),
238 "3000"
239 );
240 assert_eq!(
241 map.get(&CheetahString::from_static_str("subscription"))
242 .unwrap(),
243 "test_subscription"
244 );
245 assert_eq!(
246 map.get(&CheetahString::from_static_str("subVersion"))
247 .unwrap(),
248 "1"
249 );
250 assert_eq!(
251 map.get(&CheetahString::from_static_str("expressionType"))
252 .unwrap(),
253 "test_expression"
254 );
255 assert_eq!(
256 map.get(&CheetahString::from_static_str("maxMsgBytes"))
257 .unwrap(),
258 "1024"
259 );
260 assert_eq!(
261 map.get(&CheetahString::from_static_str("requestSource"))
262 .unwrap(),
263 "1"
264 );
265 assert_eq!(
266 map.get(&CheetahString::from_static_str("proxyForwardClientId"))
267 .unwrap(),
268 "test_client_id"
269 );
270 }
271
272 #[test]
273 fn pull_message_request_header_deserializes_correctly() {
274 let mut map = HashMap::new();
275 map.insert(
276 CheetahString::from_static_str("consumerGroup"),
277 CheetahString::from_static_str("test_consumer_group"),
278 );
279 map.insert(
280 CheetahString::from_static_str("topic"),
281 CheetahString::from_static_str("test_topic"),
282 );
283 map.insert(
284 CheetahString::from_static_str("queueId"),
285 CheetahString::from_static_str("1"),
286 );
287 map.insert(
288 CheetahString::from_static_str("queueOffset"),
289 CheetahString::from_static_str("100"),
290 );
291 map.insert(
292 CheetahString::from_static_str("maxMsgNums"),
293 CheetahString::from_static_str("10"),
294 );
295 map.insert(
296 CheetahString::from_static_str("sysFlag"),
297 CheetahString::from_static_str("0"),
298 );
299 map.insert(
300 CheetahString::from_static_str("commitOffset"),
301 CheetahString::from_static_str("50"),
302 );
303 map.insert(
304 CheetahString::from_static_str("suspendTimeoutMillis"),
305 CheetahString::from_static_str("3000"),
306 );
307 map.insert(
308 CheetahString::from_static_str("subscription"),
309 CheetahString::from_static_str("test_subscription"),
310 );
311 map.insert(
312 CheetahString::from_static_str("subVersion"),
313 CheetahString::from_static_str("1"),
314 );
315 map.insert(
316 CheetahString::from_static_str("expressionType"),
317 CheetahString::from_static_str("test_expression"),
318 );
319 map.insert(
320 CheetahString::from_static_str("maxMsgBytes"),
321 CheetahString::from_static_str("1024"),
322 );
323 map.insert(
324 CheetahString::from_static_str("requestSource"),
325 CheetahString::from_static_str("1"),
326 );
327 map.insert(
328 CheetahString::from_static_str("proxyForwardClientId"),
329 CheetahString::from_static_str("test_client_id"),
330 );
331
332 let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
333 assert_eq!(header.consumer_group, "test_consumer_group");
334 assert_eq!(header.topic, "test_topic");
335 assert_eq!(header.queue_id, 1);
336 assert_eq!(header.queue_offset, 100);
337 assert_eq!(header.max_msg_nums, 10);
338 assert_eq!(header.sys_flag, 0);
339 assert_eq!(header.commit_offset, 50);
340 assert_eq!(header.suspend_timeout_millis, 3000);
341 assert_eq!(header.subscription.unwrap(), "test_subscription");
342 assert_eq!(header.sub_version, 1);
343 assert_eq!(header.expression_type.unwrap(), "test_expression");
344 assert_eq!(header.max_msg_bytes.unwrap(), 1024);
345 assert_eq!(header.request_source.unwrap(), 1);
346 assert_eq!(header.proxy_forward_client_id.unwrap(), "test_client_id");
347 }
348
349 #[test]
350 fn pull_message_request_header_handles_missing_optional_fields() {
351 let mut map = HashMap::new();
352 map.insert(
353 CheetahString::from_static_str("consumerGroup"),
354 CheetahString::from_static_str("test_consumer_group"),
355 );
356 map.insert(
357 CheetahString::from_static_str("topic"),
358 CheetahString::from_static_str("test_topic"),
359 );
360 map.insert(
361 CheetahString::from_static_str("queueId"),
362 CheetahString::from_static_str("1"),
363 );
364 map.insert(
365 CheetahString::from_static_str("queueOffset"),
366 CheetahString::from_static_str("100"),
367 );
368 map.insert(
369 CheetahString::from_static_str("maxMsgNums"),
370 CheetahString::from_static_str("10"),
371 );
372 map.insert(
373 CheetahString::from_static_str("sysFlag"),
374 CheetahString::from_static_str("0"),
375 );
376 map.insert(
377 CheetahString::from_static_str("commitOffset"),
378 CheetahString::from_static_str("50"),
379 );
380 map.insert(
381 CheetahString::from_static_str("suspendTimeoutMillis"),
382 CheetahString::from_static_str("3000"),
383 );
384 map.insert(
385 CheetahString::from_static_str("subVersion"),
386 CheetahString::from_static_str("1"),
387 );
388
389 let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
390 assert_eq!(header.consumer_group, "test_consumer_group");
391 assert_eq!(header.topic, "test_topic");
392 assert_eq!(header.queue_id, 1);
393 assert_eq!(header.queue_offset, 100);
394 assert_eq!(header.max_msg_nums, 10);
395 assert_eq!(header.sys_flag, 0);
396 assert_eq!(header.commit_offset, 50);
397 assert_eq!(header.suspend_timeout_millis, 3000);
398 assert_eq!(header.sub_version, 1);
399 assert!(header.subscription.is_none());
400 assert!(header.expression_type.is_none());
401 assert!(header.max_msg_bytes.is_none());
402 assert!(header.request_source.is_none());
403 assert!(header.proxy_forward_client_id.is_none());
404 }
405
406 #[test]
407 fn pull_message_request_header_handles_invalid_data() {
408 let mut map = HashMap::new();
409 map.insert(
410 CheetahString::from_static_str("consumerGroup"),
411 CheetahString::from_static_str("test_consumer_group"),
412 );
413 map.insert(
414 CheetahString::from_static_str("topic"),
415 CheetahString::from_static_str("test_topic"),
416 );
417 map.insert(
418 CheetahString::from_static_str("queueId"),
419 CheetahString::from_static_str("invalid"),
420 );
421 map.insert(
422 CheetahString::from_static_str("queueOffset"),
423 CheetahString::from_static_str("invalid"),
424 );
425 map.insert(
426 CheetahString::from_static_str("maxMsgNums"),
427 CheetahString::from_static_str("invalid"),
428 );
429 map.insert(
430 CheetahString::from_static_str("sysFlag"),
431 CheetahString::from_static_str("invalid"),
432 );
433 map.insert(
434 CheetahString::from_static_str("commitOffset"),
435 CheetahString::from_static_str("invalid"),
436 );
437 map.insert(
438 CheetahString::from_static_str("suspendTimeoutMillis"),
439 CheetahString::from_static_str("invalid"),
440 );
441 map.insert(
442 CheetahString::from_static_str("subVersion"),
443 CheetahString::from_static_str("invalid"),
444 );
445
446 let result = <PullMessageRequestHeader as FromMap>::from(&map);
447 assert!(result.is_err());
448 }
449}