rocketmq_remoting/protocol/header/
update_consumer_offset_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
24pub struct UpdateConsumerOffsetResponseHeader {}
25
26#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct UpdateConsumerOffsetRequestHeader {
29 #[required]
30 pub consumer_group: CheetahString,
31 #[required]
32 pub topic: CheetahString,
33 #[required]
34 pub queue_id: i32,
35 #[required]
36 pub commit_offset: i64,
37 #[serde(flatten)]
38 pub topic_request_header: Option<TopicRequestHeader>,
39}
40
41impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
42 fn set_lo(&mut self, lo: Option<bool>) {
43 if let Some(header) = self.topic_request_header.as_mut() {
44 header.lo = lo;
45 }
46 }
47
48 fn lo(&self) -> Option<bool> {
49 self.topic_request_header.as_ref().and_then(|h| h.lo)
50 }
51
52 fn set_topic(&mut self, topic: CheetahString) {
53 self.topic = topic;
54 }
55
56 fn topic(&self) -> &CheetahString {
57 &self.topic
58 }
59
60 fn broker_name(&self) -> Option<&CheetahString> {
61 self.topic_request_header
62 .as_ref()
63 .and_then(|h| h.rpc.as_ref())
64 .and_then(|h| h.broker_name.as_ref())
65 }
66
67 fn set_broker_name(&mut self, broker_name: CheetahString) {
68 if let Some(header) = self.topic_request_header.as_mut() {
69 if let Some(rpc_header) = header.rpc.as_mut() {
70 rpc_header.broker_name = Some(broker_name);
71 }
72 }
73 }
74
75 fn namespace(&self) -> Option<&str> {
76 self.topic_request_header
77 .as_ref()
78 .and_then(|h| h.rpc.as_ref())
79 .and_then(|r| r.namespace.as_deref())
80 }
81
82 fn set_namespace(&mut self, namespace: CheetahString) {
83 if let Some(header) = self.topic_request_header.as_mut() {
84 if let Some(rpc_header) = header.rpc.as_mut() {
85 rpc_header.namespace = Some(namespace);
86 }
87 }
88 }
89
90 fn namespaced(&self) -> Option<bool> {
91 self.topic_request_header
92 .as_ref()
93 .and_then(|h| h.rpc.as_ref())
94 .and_then(|r| r.namespaced)
95 }
96
97 fn set_namespaced(&mut self, namespaced: bool) {
98 if let Some(header) = self.topic_request_header.as_mut() {
99 if let Some(rpc_header) = header.rpc.as_mut() {
100 rpc_header.namespaced = Some(namespaced);
101 }
102 }
103 }
104
105 fn oneway(&self) -> Option<bool> {
106 self.topic_request_header
107 .as_ref()
108 .and_then(|h| h.rpc.as_ref())
109 .and_then(|r| r.oneway)
110 }
111
112 fn set_oneway(&mut self, oneway: bool) {
113 if let Some(header) = self.topic_request_header.as_mut() {
114 if let Some(rpc_header) = header.rpc.as_mut() {
115 rpc_header.oneway = Some(oneway);
116 }
117 }
118 }
119
120 fn queue_id(&self) -> i32 {
121 self.queue_id
122 }
123
124 fn set_queue_id(&mut self, queue_id: i32) {
125 self.queue_id = queue_id;
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use std::collections::HashMap;
132
133 use cheetah_string::CheetahString;
134
135 use super::*;
136 use crate::protocol::command_custom_header::CommandCustomHeader;
137 use crate::protocol::command_custom_header::FromMap;
138 use crate::rpc::rpc_request_header::RpcRequestHeader;
139
140 #[test]
141 fn update_consumer_offset_request_header_serializes_correctly() {
142 let header = UpdateConsumerOffsetRequestHeader {
143 consumer_group: CheetahString::from_static_str("test_consumer_group"),
144 topic: CheetahString::from_static_str("test_topic"),
145 queue_id: 1,
146 commit_offset: 100,
147 topic_request_header: None,
148 };
149 let map = header.to_map().unwrap();
150 assert_eq!(
151 map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
152 "test_consumer_group"
153 );
154 assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
155 assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
156 assert_eq!(map.get(&CheetahString::from_static_str("commitOffset")).unwrap(), "100");
157 }
158
159 #[test]
160 fn update_consumer_offset_request_header_deserializes_correctly() {
161 let mut map = HashMap::new();
162 map.insert(
163 CheetahString::from_static_str("consumerGroup"),
164 CheetahString::from_static_str("test_consumer_group"),
165 );
166 map.insert(
167 CheetahString::from_static_str("topic"),
168 CheetahString::from_static_str("test_topic"),
169 );
170 map.insert(
171 CheetahString::from_static_str("queueId"),
172 CheetahString::from_static_str("1"),
173 );
174 map.insert(
175 CheetahString::from_static_str("commitOffset"),
176 CheetahString::from_static_str("100"),
177 );
178
179 let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
180 assert_eq!(header.consumer_group, "test_consumer_group");
181 assert_eq!(header.topic, "test_topic");
182 assert_eq!(header.queue_id, 1);
183 assert_eq!(header.commit_offset, 100);
184 }
185
186 #[test]
187 fn update_consumer_offset_request_header_handles_missing_optional_fields() {
188 let mut map = HashMap::new();
189 map.insert(
190 CheetahString::from_static_str("consumerGroup"),
191 CheetahString::from_static_str("test_consumer_group"),
192 );
193 map.insert(
194 CheetahString::from_static_str("topic"),
195 CheetahString::from_static_str("test_topic"),
196 );
197 map.insert(
198 CheetahString::from_static_str("queueId"),
199 CheetahString::from_static_str("1"),
200 );
201 map.insert(
202 CheetahString::from_static_str("commitOffset"),
203 CheetahString::from_static_str("100"),
204 );
205
206 let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
207 assert_eq!(header.consumer_group, "test_consumer_group");
208 assert_eq!(header.topic, "test_topic");
209 assert_eq!(header.queue_id, 1);
210 assert_eq!(header.commit_offset, 100);
211 assert!(header.topic_request_header.is_some());
212 }
213
214 #[test]
215 fn update_consumer_offset_request_header_handles_invalid_data() {
216 let mut map = HashMap::new();
217 map.insert(
218 CheetahString::from_static_str("consumerGroup"),
219 CheetahString::from_static_str("test_consumer_group"),
220 );
221 map.insert(
222 CheetahString::from_static_str("topic"),
223 CheetahString::from_static_str("test_topic"),
224 );
225 map.insert(
226 CheetahString::from_static_str("queueId"),
227 CheetahString::from_static_str("invalid"),
228 );
229 map.insert(
230 CheetahString::from_static_str("commitOffset"),
231 CheetahString::from_static_str("invalid"),
232 );
233
234 let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
235 assert!(result.is_err());
236 }
237
238 #[test]
239 fn topic_request_header_trait_set_and_get_lo() {
240 let mut header = UpdateConsumerOffsetRequestHeader {
241 consumer_group: CheetahString::from_static_str("test_group"),
242 topic: CheetahString::from_static_str("test_topic"),
243 queue_id: 0,
244 commit_offset: 0,
245 topic_request_header: Some(TopicRequestHeader::default()),
246 };
247
248 header.set_lo(Some(true));
249 assert_eq!(header.lo(), Some(true));
250
251 header.set_lo(Some(false));
252 assert_eq!(header.lo(), Some(false));
253
254 header.set_lo(None);
255 assert_eq!(header.lo(), None);
256
257 let mut header = UpdateConsumerOffsetRequestHeader {
258 consumer_group: CheetahString::from_static_str("test_group"),
259 topic: CheetahString::from_static_str("test_topic"),
260 queue_id: 0,
261 commit_offset: 0,
262 topic_request_header: None,
263 };
264
265 header.set_lo(Some(true));
266 assert_eq!(header.lo(), None);
267 }
268
269 #[test]
270 fn topic_request_header_trait_set_and_get_topic() {
271 let mut header = UpdateConsumerOffsetRequestHeader::default();
272
273 header.set_topic(CheetahString::from_static_str("new_topic"));
274 assert_eq!(header.topic(), "new_topic");
275 }
276
277 #[test]
278 fn topic_request_header_trait_set_and_get_broker_name() {
279 let mut header = UpdateConsumerOffsetRequestHeader {
280 consumer_group: CheetahString::from_static_str("test_group"),
281 topic: CheetahString::from_static_str("test_topic"),
282 queue_id: 0,
283 commit_offset: 0,
284 topic_request_header: Some(TopicRequestHeader {
285 lo: None,
286 rpc: Some(RpcRequestHeader::default()),
287 }),
288 };
289
290 header.set_broker_name(CheetahString::from_static_str("new_broker"));
291 assert_eq!(
292 header.broker_name(),
293 Some(&CheetahString::from_static_str("new_broker"))
294 );
295
296 let mut header = UpdateConsumerOffsetRequestHeader {
297 consumer_group: CheetahString::from_static_str("test_group"),
298 topic: CheetahString::from_static_str("test_topic"),
299 queue_id: 0,
300 commit_offset: 0,
301 topic_request_header: None,
302 };
303
304 header.set_broker_name(CheetahString::from_static_str("new_broker"));
305 assert_eq!(header.broker_name(), None);
306 }
307
308 #[test]
309 fn topic_request_header_trait_set_and_get_namespace() {
310 let mut header = UpdateConsumerOffsetRequestHeader {
311 consumer_group: CheetahString::from_static_str("test_group"),
312 topic: CheetahString::from_static_str("test_topic"),
313 queue_id: 0,
314 commit_offset: 0,
315 topic_request_header: Some(TopicRequestHeader {
316 lo: None,
317 rpc: Some(RpcRequestHeader::default()),
318 }),
319 };
320
321 header.set_namespace(CheetahString::from_static_str("new_namespace"));
322 assert_eq!(header.namespace(), Some("new_namespace"));
323
324 let mut header = UpdateConsumerOffsetRequestHeader {
325 consumer_group: CheetahString::from_static_str("test_group"),
326 topic: CheetahString::from_static_str("test_topic"),
327 queue_id: 0,
328 commit_offset: 0,
329 topic_request_header: None,
330 };
331
332 header.set_namespace(CheetahString::from_static_str("new_namespace"));
333 assert_eq!(header.namespace(), None);
334 }
335
336 #[test]
337 fn topic_request_header_trait_set_and_get_namespaced() {
338 let mut header = UpdateConsumerOffsetRequestHeader {
339 consumer_group: CheetahString::from_static_str("test_group"),
340 topic: CheetahString::from_static_str("test_topic"),
341 queue_id: 0,
342 commit_offset: 0,
343 topic_request_header: Some(TopicRequestHeader {
344 lo: None,
345 rpc: Some(RpcRequestHeader::default()),
346 }),
347 };
348
349 header.set_namespaced(true);
350 assert_eq!(header.namespaced(), Some(true));
351
352 header.set_namespaced(false);
353 assert_eq!(header.namespaced(), Some(false));
354
355 let mut header = UpdateConsumerOffsetRequestHeader {
356 consumer_group: CheetahString::from_static_str("test_group"),
357 topic: CheetahString::from_static_str("test_topic"),
358 queue_id: 0,
359 commit_offset: 0,
360 topic_request_header: None,
361 };
362
363 header.set_namespaced(true);
364 assert_eq!(header.namespaced(), None);
365 }
366
367 #[test]
368 fn topic_request_header_trait_set_and_get_oneway() {
369 let mut header = UpdateConsumerOffsetRequestHeader {
370 consumer_group: CheetahString::from_static_str("test_group"),
371 topic: CheetahString::from_static_str("test_topic"),
372 queue_id: 0,
373 commit_offset: 0,
374 topic_request_header: Some(TopicRequestHeader {
375 lo: None,
376 rpc: Some(RpcRequestHeader::default()),
377 }),
378 };
379
380 header.set_oneway(true);
381 assert_eq!(header.oneway(), Some(true));
382
383 header.set_oneway(false);
384 assert_eq!(header.oneway(), Some(false));
385
386 let mut header = UpdateConsumerOffsetRequestHeader {
387 consumer_group: CheetahString::from_static_str("test_group"),
388 topic: CheetahString::from_static_str("test_topic"),
389 queue_id: 0,
390 commit_offset: 0,
391 topic_request_header: None,
392 };
393
394 header.set_oneway(true);
395 assert_eq!(header.oneway(), None);
396 }
397
398 #[test]
399 fn topic_request_header_trait_set_and_get_queue_id() {
400 let mut header = UpdateConsumerOffsetRequestHeader::default();
401
402 header.set_queue_id(10);
403 assert_eq!(header.queue_id(), 10);
404
405 header.set_queue_id(-1);
406 assert_eq!(header.queue_id(), -1);
407 }
408
409 #[test]
410 fn response_header_can_be_created() {
411 let header: UpdateConsumerOffsetResponseHeader = UpdateConsumerOffsetResponseHeader {};
412 let default: UpdateConsumerOffsetResponseHeader = UpdateConsumerOffsetResponseHeader::default();
413 assert_eq!(format!("{:?}", header), format!("{:?}", default));
414 }
415
416 #[test]
417 fn response_header_is_zero_sized() {
418 assert_eq!(std::mem::size_of::<UpdateConsumerOffsetResponseHeader>(), 0);
419 }
420
421 #[test]
422 fn response_header_serializes_to_empty_json() {
423 let header = UpdateConsumerOffsetResponseHeader::default();
424 let json = serde_json::to_string(&header).unwrap();
425
426 assert_eq!(json, "{}");
427 }
428
429 #[test]
430 fn response_header_deserializes_from_empty_json() {
431 let json = "{}";
432 let header: UpdateConsumerOffsetResponseHeader = serde_json::from_str(json).unwrap();
433
434 let default = UpdateConsumerOffsetResponseHeader::default();
435
436 assert_eq!(format!("{:?}", header), format!("{:?}", default));
437 }
438
439 #[test]
440 fn response_header_round_trip() {
441 let original = UpdateConsumerOffsetResponseHeader::default();
442
443 let json = serde_json::to_string(&original).unwrap();
444 let decoded: UpdateConsumerOffsetResponseHeader = serde_json::from_str(&json).unwrap();
445
446 assert_eq!(format!("{:?}", original), format!("{:?}", decoded));
447 }
448
449 #[test]
450 fn response_header_ignores_extra_fields() {
451 let json = r#"{"unexpected":"field"}"#;
452
453 let header: UpdateConsumerOffsetResponseHeader = serde_json::from_str(json).unwrap();
454
455 let default = UpdateConsumerOffsetResponseHeader::default();
456
457 assert_eq!(format!("{:?}", header), format!("{:?}", default));
458 }
459
460 #[test]
461 fn all_instances_are_equivalent() {
462 let a = UpdateConsumerOffsetResponseHeader {};
463 let b = UpdateConsumerOffsetResponseHeader::default();
464
465 assert_eq!(format!("{:?}", a), format!("{:?}", b));
466 }
467
468 #[test]
469 fn response_header_encodes_to_empty_map() {
470 let header = UpdateConsumerOffsetResponseHeader::default();
471 let map = header.to_map().unwrap();
472
473 assert!(map.is_empty());
474 }
475}