rocketmq_remoting/protocol/header/
pull_message_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct PullMessageRequestHeader {
26 #[required]
27 pub consumer_group: CheetahString,
28
29 #[required]
30 pub topic: CheetahString,
31
32 #[required]
33 pub queue_id: i32,
34
35 #[required]
36 pub queue_offset: i64,
37
38 #[required]
39 pub max_msg_nums: i32,
40
41 #[required]
42 pub sys_flag: i32,
43
44 #[required]
45 pub commit_offset: i64,
46
47 #[required]
48 pub suspend_timeout_millis: u64,
49
50 #[required]
51 pub sub_version: i64,
52
53 pub subscription: Option<CheetahString>,
54 pub expression_type: Option<CheetahString>,
55 pub max_msg_bytes: Option<i32>,
56 pub request_source: Option<i32>,
57 pub proxy_forward_client_id: Option<CheetahString>,
58 #[serde(flatten)]
59 pub topic_request: Option<TopicRequestHeader>,
60}
61
62impl TopicRequestHeaderTrait for PullMessageRequestHeader {
63 fn set_lo(&mut self, lo: Option<bool>) {
64 if let Some(header) = self.topic_request.as_mut() {
65 header.lo = lo;
66 }
67 }
68
69 fn lo(&self) -> Option<bool> {
70 self.topic_request.as_ref().and_then(|h| h.lo)
71 }
72
73 fn set_topic(&mut self, topic: CheetahString) {
74 self.topic = topic;
75 }
76
77 fn topic(&self) -> &CheetahString {
78 &self.topic
79 }
80
81 fn broker_name(&self) -> Option<&CheetahString> {
82 self.topic_request
83 .as_ref()
84 .and_then(|h| h.rpc.as_ref())
85 .and_then(|h| h.broker_name.as_ref())
86 }
87
88 fn set_broker_name(&mut self, broker_name: CheetahString) {
89 if let Some(header) = self.topic_request.as_mut() {
90 if let Some(rpc_header) = header.rpc.as_mut() {
91 rpc_header.broker_name = Some(broker_name);
92 }
93 }
94 }
95
96 fn namespace(&self) -> Option<&str> {
97 self.topic_request
98 .as_ref()
99 .and_then(|h| h.rpc.as_ref())
100 .and_then(|r| r.namespace.as_deref())
101 }
102
103 fn set_namespace(&mut self, namespace: CheetahString) {
104 if let Some(header) = self.topic_request.as_mut() {
105 if let Some(rpc_header) = header.rpc.as_mut() {
106 rpc_header.namespace = Some(namespace);
107 }
108 }
109 }
110
111 fn namespaced(&self) -> Option<bool> {
112 self.topic_request
113 .as_ref()
114 .and_then(|h| h.rpc.as_ref())
115 .and_then(|r| r.namespaced)
116 }
117
118 fn set_namespaced(&mut self, namespaced: bool) {
119 if let Some(header) = self.topic_request.as_mut() {
120 if let Some(rpc_header) = header.rpc.as_mut() {
121 rpc_header.namespaced = Some(namespaced);
122 }
123 }
124 }
125
126 fn oneway(&self) -> Option<bool> {
127 self.topic_request
128 .as_ref()
129 .and_then(|h| h.rpc.as_ref())
130 .and_then(|r| r.oneway)
131 }
132
133 fn set_oneway(&mut self, oneway: bool) {
134 if let Some(header) = self.topic_request.as_mut() {
135 if let Some(rpc_header) = header.rpc.as_mut() {
136 rpc_header.oneway = Some(oneway);
137 }
138 }
139 }
140
141 fn queue_id(&self) -> i32 {
142 self.queue_id
143 }
144
145 fn set_queue_id(&mut self, queue_id: i32) {
146 self.queue_id = queue_id;
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use std::collections::HashMap;
153
154 use cheetah_string::CheetahString;
155
156 use super::*;
157 use crate::protocol::command_custom_header::CommandCustomHeader;
158 use crate::protocol::command_custom_header::FromMap;
159
160 #[test]
161 fn pull_message_request_header_serializes_correctly() {
162 let header = PullMessageRequestHeader {
163 consumer_group: CheetahString::from_static_str("test_consumer_group"),
164 topic: CheetahString::from_static_str("test_topic"),
165 queue_id: 1,
166 queue_offset: 100,
167 max_msg_nums: 10,
168 sys_flag: 0,
169 commit_offset: 50,
170 suspend_timeout_millis: 3000,
171 subscription: Some(CheetahString::from_static_str("test_subscription")),
172 sub_version: 1,
173 expression_type: Some(CheetahString::from_static_str("test_expression")),
174 max_msg_bytes: Some(1024),
175 request_source: Some(1),
176 proxy_forward_client_id: Some(CheetahString::from_static_str("test_client_id")),
177 topic_request: None,
178 };
179 let map: HashMap<CheetahString, CheetahString> = header.to_map().unwrap();
180 assert_eq!(
181 map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
182 "test_consumer_group"
183 );
184 assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
185 assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
186 assert_eq!(map.get(&CheetahString::from_static_str("queueOffset")).unwrap(), "100");
187 assert_eq!(map.get(&CheetahString::from_static_str("maxMsgNums")).unwrap(), "10");
188 assert_eq!(map.get(&CheetahString::from_static_str("sysFlag")).unwrap(), "0");
189 assert_eq!(map.get(&CheetahString::from_static_str("commitOffset")).unwrap(), "50");
190 assert_eq!(
191 map.get(&CheetahString::from_static_str("suspendTimeoutMillis"))
192 .unwrap(),
193 "3000"
194 );
195 assert_eq!(
196 map.get(&CheetahString::from_static_str("subscription")).unwrap(),
197 "test_subscription"
198 );
199 assert_eq!(map.get(&CheetahString::from_static_str("subVersion")).unwrap(), "1");
200 assert_eq!(
201 map.get(&CheetahString::from_static_str("expressionType")).unwrap(),
202 "test_expression"
203 );
204 assert_eq!(map.get(&CheetahString::from_static_str("maxMsgBytes")).unwrap(), "1024");
205 assert_eq!(map.get(&CheetahString::from_static_str("requestSource")).unwrap(), "1");
206 assert_eq!(
207 map.get(&CheetahString::from_static_str("proxyForwardClientId"))
208 .unwrap(),
209 "test_client_id"
210 );
211 }
212
213 #[test]
214 fn pull_message_request_header_deserializes_correctly() {
215 let mut map = HashMap::new();
216 map.insert(
217 CheetahString::from_static_str("consumerGroup"),
218 CheetahString::from_static_str("test_consumer_group"),
219 );
220 map.insert(
221 CheetahString::from_static_str("topic"),
222 CheetahString::from_static_str("test_topic"),
223 );
224 map.insert(
225 CheetahString::from_static_str("queueId"),
226 CheetahString::from_static_str("1"),
227 );
228 map.insert(
229 CheetahString::from_static_str("queueOffset"),
230 CheetahString::from_static_str("100"),
231 );
232 map.insert(
233 CheetahString::from_static_str("maxMsgNums"),
234 CheetahString::from_static_str("10"),
235 );
236 map.insert(
237 CheetahString::from_static_str("sysFlag"),
238 CheetahString::from_static_str("0"),
239 );
240 map.insert(
241 CheetahString::from_static_str("commitOffset"),
242 CheetahString::from_static_str("50"),
243 );
244 map.insert(
245 CheetahString::from_static_str("suspendTimeoutMillis"),
246 CheetahString::from_static_str("3000"),
247 );
248 map.insert(
249 CheetahString::from_static_str("subscription"),
250 CheetahString::from_static_str("test_subscription"),
251 );
252 map.insert(
253 CheetahString::from_static_str("subVersion"),
254 CheetahString::from_static_str("1"),
255 );
256 map.insert(
257 CheetahString::from_static_str("expressionType"),
258 CheetahString::from_static_str("test_expression"),
259 );
260 map.insert(
261 CheetahString::from_static_str("maxMsgBytes"),
262 CheetahString::from_static_str("1024"),
263 );
264 map.insert(
265 CheetahString::from_static_str("requestSource"),
266 CheetahString::from_static_str("1"),
267 );
268 map.insert(
269 CheetahString::from_static_str("proxyForwardClientId"),
270 CheetahString::from_static_str("test_client_id"),
271 );
272
273 let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
274 assert_eq!(header.consumer_group, "test_consumer_group");
275 assert_eq!(header.topic, "test_topic");
276 assert_eq!(header.queue_id, 1);
277 assert_eq!(header.queue_offset, 100);
278 assert_eq!(header.max_msg_nums, 10);
279 assert_eq!(header.sys_flag, 0);
280 assert_eq!(header.commit_offset, 50);
281 assert_eq!(header.suspend_timeout_millis, 3000);
282 assert_eq!(header.subscription.unwrap(), "test_subscription");
283 assert_eq!(header.sub_version, 1);
284 assert_eq!(header.expression_type.unwrap(), "test_expression");
285 assert_eq!(header.max_msg_bytes.unwrap(), 1024);
286 assert_eq!(header.request_source.unwrap(), 1);
287 assert_eq!(header.proxy_forward_client_id.unwrap(), "test_client_id");
288 }
289
290 #[test]
291 fn pull_message_request_header_handles_missing_optional_fields() {
292 let mut map = HashMap::new();
293 map.insert(
294 CheetahString::from_static_str("consumerGroup"),
295 CheetahString::from_static_str("test_consumer_group"),
296 );
297 map.insert(
298 CheetahString::from_static_str("topic"),
299 CheetahString::from_static_str("test_topic"),
300 );
301 map.insert(
302 CheetahString::from_static_str("queueId"),
303 CheetahString::from_static_str("1"),
304 );
305 map.insert(
306 CheetahString::from_static_str("queueOffset"),
307 CheetahString::from_static_str("100"),
308 );
309 map.insert(
310 CheetahString::from_static_str("maxMsgNums"),
311 CheetahString::from_static_str("10"),
312 );
313 map.insert(
314 CheetahString::from_static_str("sysFlag"),
315 CheetahString::from_static_str("0"),
316 );
317 map.insert(
318 CheetahString::from_static_str("commitOffset"),
319 CheetahString::from_static_str("50"),
320 );
321 map.insert(
322 CheetahString::from_static_str("suspendTimeoutMillis"),
323 CheetahString::from_static_str("3000"),
324 );
325 map.insert(
326 CheetahString::from_static_str("subVersion"),
327 CheetahString::from_static_str("1"),
328 );
329
330 let header = <PullMessageRequestHeader as FromMap>::from(&map).unwrap();
331 assert_eq!(header.consumer_group, "test_consumer_group");
332 assert_eq!(header.topic, "test_topic");
333 assert_eq!(header.queue_id, 1);
334 assert_eq!(header.queue_offset, 100);
335 assert_eq!(header.max_msg_nums, 10);
336 assert_eq!(header.sys_flag, 0);
337 assert_eq!(header.commit_offset, 50);
338 assert_eq!(header.suspend_timeout_millis, 3000);
339 assert_eq!(header.sub_version, 1);
340 assert!(header.subscription.is_none());
341 assert!(header.expression_type.is_none());
342 assert!(header.max_msg_bytes.is_none());
343 assert!(header.request_source.is_none());
344 assert!(header.proxy_forward_client_id.is_none());
345 }
346
347 #[test]
348 fn pull_message_request_header_handles_invalid_data() {
349 let mut map = HashMap::new();
350 map.insert(
351 CheetahString::from_static_str("consumerGroup"),
352 CheetahString::from_static_str("test_consumer_group"),
353 );
354 map.insert(
355 CheetahString::from_static_str("topic"),
356 CheetahString::from_static_str("test_topic"),
357 );
358 map.insert(
359 CheetahString::from_static_str("queueId"),
360 CheetahString::from_static_str("invalid"),
361 );
362 map.insert(
363 CheetahString::from_static_str("queueOffset"),
364 CheetahString::from_static_str("invalid"),
365 );
366 map.insert(
367 CheetahString::from_static_str("maxMsgNums"),
368 CheetahString::from_static_str("invalid"),
369 );
370 map.insert(
371 CheetahString::from_static_str("sysFlag"),
372 CheetahString::from_static_str("invalid"),
373 );
374 map.insert(
375 CheetahString::from_static_str("commitOffset"),
376 CheetahString::from_static_str("invalid"),
377 );
378 map.insert(
379 CheetahString::from_static_str("suspendTimeoutMillis"),
380 CheetahString::from_static_str("invalid"),
381 );
382 map.insert(
383 CheetahString::from_static_str("subVersion"),
384 CheetahString::from_static_str("invalid"),
385 );
386
387 let result = <PullMessageRequestHeader as FromMap>::from(&map);
388 assert!(result.is_err());
389 }
390}