Skip to main content

rocketmq_remoting/protocol/header/
update_consumer_offset_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
22
23#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
24pub struct UpdateConsumerOffsetResponseHeader {}
25
26#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27#[serde(rename_all = "camelCase")]
28pub struct UpdateConsumerOffsetRequestHeader {
29    #[required]
30    pub consumer_group: CheetahString,
31    #[required]
32    pub topic: CheetahString,
33    #[required]
34    pub queue_id: i32,
35    #[required]
36    pub commit_offset: i64,
37    #[serde(flatten)]
38    pub topic_request_header: Option<TopicRequestHeader>,
39}
40
41impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
42    fn set_lo(&mut self, lo: Option<bool>) {
43        if let Some(header) = self.topic_request_header.as_mut() {
44            header.lo = lo;
45        }
46    }
47
48    fn lo(&self) -> Option<bool> {
49        self.topic_request_header.as_ref().and_then(|h| h.lo)
50    }
51
52    fn set_topic(&mut self, topic: CheetahString) {
53        self.topic = topic;
54    }
55
56    fn topic(&self) -> &CheetahString {
57        &self.topic
58    }
59
60    fn broker_name(&self) -> Option<&CheetahString> {
61        self.topic_request_header
62            .as_ref()
63            .and_then(|h| h.rpc.as_ref())
64            .and_then(|h| h.broker_name.as_ref())
65    }
66
67    fn set_broker_name(&mut self, broker_name: CheetahString) {
68        if let Some(header) = self.topic_request_header.as_mut() {
69            if let Some(rpc_header) = header.rpc.as_mut() {
70                rpc_header.broker_name = Some(broker_name);
71            }
72        }
73    }
74
75    fn namespace(&self) -> Option<&str> {
76        self.topic_request_header
77            .as_ref()
78            .and_then(|h| h.rpc.as_ref())
79            .and_then(|r| r.namespace.as_deref())
80    }
81
82    fn set_namespace(&mut self, namespace: CheetahString) {
83        if let Some(header) = self.topic_request_header.as_mut() {
84            if let Some(rpc_header) = header.rpc.as_mut() {
85                rpc_header.namespace = Some(namespace);
86            }
87        }
88    }
89
90    fn namespaced(&self) -> Option<bool> {
91        self.topic_request_header
92            .as_ref()
93            .and_then(|h| h.rpc.as_ref())
94            .and_then(|r| r.namespaced)
95    }
96
97    fn set_namespaced(&mut self, namespaced: bool) {
98        if let Some(header) = self.topic_request_header.as_mut() {
99            if let Some(rpc_header) = header.rpc.as_mut() {
100                rpc_header.namespaced = Some(namespaced);
101            }
102        }
103    }
104
105    fn oneway(&self) -> Option<bool> {
106        self.topic_request_header
107            .as_ref()
108            .and_then(|h| h.rpc.as_ref())
109            .and_then(|r| r.oneway)
110    }
111
112    fn set_oneway(&mut self, oneway: bool) {
113        if let Some(header) = self.topic_request_header.as_mut() {
114            if let Some(rpc_header) = header.rpc.as_mut() {
115                rpc_header.oneway = Some(oneway);
116            }
117        }
118    }
119
120    fn queue_id(&self) -> i32 {
121        self.queue_id
122    }
123
124    fn set_queue_id(&mut self, queue_id: i32) {
125        self.queue_id = queue_id;
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::collections::HashMap;
132
133    use cheetah_string::CheetahString;
134
135    use super::*;
136    use crate::protocol::command_custom_header::CommandCustomHeader;
137    use crate::protocol::command_custom_header::FromMap;
138    use crate::rpc::rpc_request_header::RpcRequestHeader;
139
140    #[test]
141    fn update_consumer_offset_request_header_serializes_correctly() {
142        let header = UpdateConsumerOffsetRequestHeader {
143            consumer_group: CheetahString::from_static_str("test_consumer_group"),
144            topic: CheetahString::from_static_str("test_topic"),
145            queue_id: 1,
146            commit_offset: 100,
147            topic_request_header: None,
148        };
149        let map = header.to_map().unwrap();
150        assert_eq!(
151            map.get(&CheetahString::from_static_str("consumerGroup")).unwrap(),
152            "test_consumer_group"
153        );
154        assert_eq!(map.get(&CheetahString::from_static_str("topic")).unwrap(), "test_topic");
155        assert_eq!(map.get(&CheetahString::from_static_str("queueId")).unwrap(), "1");
156        assert_eq!(map.get(&CheetahString::from_static_str("commitOffset")).unwrap(), "100");
157    }
158
159    #[test]
160    fn update_consumer_offset_request_header_deserializes_correctly() {
161        let mut map = HashMap::new();
162        map.insert(
163            CheetahString::from_static_str("consumerGroup"),
164            CheetahString::from_static_str("test_consumer_group"),
165        );
166        map.insert(
167            CheetahString::from_static_str("topic"),
168            CheetahString::from_static_str("test_topic"),
169        );
170        map.insert(
171            CheetahString::from_static_str("queueId"),
172            CheetahString::from_static_str("1"),
173        );
174        map.insert(
175            CheetahString::from_static_str("commitOffset"),
176            CheetahString::from_static_str("100"),
177        );
178
179        let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
180        assert_eq!(header.consumer_group, "test_consumer_group");
181        assert_eq!(header.topic, "test_topic");
182        assert_eq!(header.queue_id, 1);
183        assert_eq!(header.commit_offset, 100);
184    }
185
186    #[test]
187    fn update_consumer_offset_request_header_handles_missing_optional_fields() {
188        let mut map = HashMap::new();
189        map.insert(
190            CheetahString::from_static_str("consumerGroup"),
191            CheetahString::from_static_str("test_consumer_group"),
192        );
193        map.insert(
194            CheetahString::from_static_str("topic"),
195            CheetahString::from_static_str("test_topic"),
196        );
197        map.insert(
198            CheetahString::from_static_str("queueId"),
199            CheetahString::from_static_str("1"),
200        );
201        map.insert(
202            CheetahString::from_static_str("commitOffset"),
203            CheetahString::from_static_str("100"),
204        );
205
206        let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
207        assert_eq!(header.consumer_group, "test_consumer_group");
208        assert_eq!(header.topic, "test_topic");
209        assert_eq!(header.queue_id, 1);
210        assert_eq!(header.commit_offset, 100);
211        assert!(header.topic_request_header.is_some());
212    }
213
214    #[test]
215    fn update_consumer_offset_request_header_handles_invalid_data() {
216        let mut map = HashMap::new();
217        map.insert(
218            CheetahString::from_static_str("consumerGroup"),
219            CheetahString::from_static_str("test_consumer_group"),
220        );
221        map.insert(
222            CheetahString::from_static_str("topic"),
223            CheetahString::from_static_str("test_topic"),
224        );
225        map.insert(
226            CheetahString::from_static_str("queueId"),
227            CheetahString::from_static_str("invalid"),
228        );
229        map.insert(
230            CheetahString::from_static_str("commitOffset"),
231            CheetahString::from_static_str("invalid"),
232        );
233
234        let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
235        assert!(result.is_err());
236    }
237
238    #[test]
239    fn topic_request_header_trait_set_and_get_lo() {
240        let mut header = UpdateConsumerOffsetRequestHeader {
241            consumer_group: CheetahString::from_static_str("test_group"),
242            topic: CheetahString::from_static_str("test_topic"),
243            queue_id: 0,
244            commit_offset: 0,
245            topic_request_header: Some(TopicRequestHeader::default()),
246        };
247
248        header.set_lo(Some(true));
249        assert_eq!(header.lo(), Some(true));
250
251        header.set_lo(Some(false));
252        assert_eq!(header.lo(), Some(false));
253
254        header.set_lo(None);
255        assert_eq!(header.lo(), None);
256
257        let mut header = UpdateConsumerOffsetRequestHeader {
258            consumer_group: CheetahString::from_static_str("test_group"),
259            topic: CheetahString::from_static_str("test_topic"),
260            queue_id: 0,
261            commit_offset: 0,
262            topic_request_header: None,
263        };
264
265        header.set_lo(Some(true));
266        assert_eq!(header.lo(), None);
267    }
268
269    #[test]
270    fn topic_request_header_trait_set_and_get_topic() {
271        let mut header = UpdateConsumerOffsetRequestHeader::default();
272
273        header.set_topic(CheetahString::from_static_str("new_topic"));
274        assert_eq!(header.topic(), "new_topic");
275    }
276
277    #[test]
278    fn topic_request_header_trait_set_and_get_broker_name() {
279        let mut header = UpdateConsumerOffsetRequestHeader {
280            consumer_group: CheetahString::from_static_str("test_group"),
281            topic: CheetahString::from_static_str("test_topic"),
282            queue_id: 0,
283            commit_offset: 0,
284            topic_request_header: Some(TopicRequestHeader {
285                lo: None,
286                rpc: Some(RpcRequestHeader::default()),
287            }),
288        };
289
290        header.set_broker_name(CheetahString::from_static_str("new_broker"));
291        assert_eq!(
292            header.broker_name(),
293            Some(&CheetahString::from_static_str("new_broker"))
294        );
295
296        let mut header = UpdateConsumerOffsetRequestHeader {
297            consumer_group: CheetahString::from_static_str("test_group"),
298            topic: CheetahString::from_static_str("test_topic"),
299            queue_id: 0,
300            commit_offset: 0,
301            topic_request_header: None,
302        };
303
304        header.set_broker_name(CheetahString::from_static_str("new_broker"));
305        assert_eq!(header.broker_name(), None);
306    }
307
308    #[test]
309    fn topic_request_header_trait_set_and_get_namespace() {
310        let mut header = UpdateConsumerOffsetRequestHeader {
311            consumer_group: CheetahString::from_static_str("test_group"),
312            topic: CheetahString::from_static_str("test_topic"),
313            queue_id: 0,
314            commit_offset: 0,
315            topic_request_header: Some(TopicRequestHeader {
316                lo: None,
317                rpc: Some(RpcRequestHeader::default()),
318            }),
319        };
320
321        header.set_namespace(CheetahString::from_static_str("new_namespace"));
322        assert_eq!(header.namespace(), Some("new_namespace"));
323
324        let mut header = UpdateConsumerOffsetRequestHeader {
325            consumer_group: CheetahString::from_static_str("test_group"),
326            topic: CheetahString::from_static_str("test_topic"),
327            queue_id: 0,
328            commit_offset: 0,
329            topic_request_header: None,
330        };
331
332        header.set_namespace(CheetahString::from_static_str("new_namespace"));
333        assert_eq!(header.namespace(), None);
334    }
335
336    #[test]
337    fn topic_request_header_trait_set_and_get_namespaced() {
338        let mut header = UpdateConsumerOffsetRequestHeader {
339            consumer_group: CheetahString::from_static_str("test_group"),
340            topic: CheetahString::from_static_str("test_topic"),
341            queue_id: 0,
342            commit_offset: 0,
343            topic_request_header: Some(TopicRequestHeader {
344                lo: None,
345                rpc: Some(RpcRequestHeader::default()),
346            }),
347        };
348
349        header.set_namespaced(true);
350        assert_eq!(header.namespaced(), Some(true));
351
352        header.set_namespaced(false);
353        assert_eq!(header.namespaced(), Some(false));
354
355        let mut header = UpdateConsumerOffsetRequestHeader {
356            consumer_group: CheetahString::from_static_str("test_group"),
357            topic: CheetahString::from_static_str("test_topic"),
358            queue_id: 0,
359            commit_offset: 0,
360            topic_request_header: None,
361        };
362
363        header.set_namespaced(true);
364        assert_eq!(header.namespaced(), None);
365    }
366
367    #[test]
368    fn topic_request_header_trait_set_and_get_oneway() {
369        let mut header = UpdateConsumerOffsetRequestHeader {
370            consumer_group: CheetahString::from_static_str("test_group"),
371            topic: CheetahString::from_static_str("test_topic"),
372            queue_id: 0,
373            commit_offset: 0,
374            topic_request_header: Some(TopicRequestHeader {
375                lo: None,
376                rpc: Some(RpcRequestHeader::default()),
377            }),
378        };
379
380        header.set_oneway(true);
381        assert_eq!(header.oneway(), Some(true));
382
383        header.set_oneway(false);
384        assert_eq!(header.oneway(), Some(false));
385
386        let mut header = UpdateConsumerOffsetRequestHeader {
387            consumer_group: CheetahString::from_static_str("test_group"),
388            topic: CheetahString::from_static_str("test_topic"),
389            queue_id: 0,
390            commit_offset: 0,
391            topic_request_header: None,
392        };
393
394        header.set_oneway(true);
395        assert_eq!(header.oneway(), None);
396    }
397
398    #[test]
399    fn topic_request_header_trait_set_and_get_queue_id() {
400        let mut header = UpdateConsumerOffsetRequestHeader::default();
401
402        header.set_queue_id(10);
403        assert_eq!(header.queue_id(), 10);
404
405        header.set_queue_id(-1);
406        assert_eq!(header.queue_id(), -1);
407    }
408
409    #[test]
410    fn response_header_can_be_created() {
411        let header: UpdateConsumerOffsetResponseHeader = UpdateConsumerOffsetResponseHeader {};
412        let default: UpdateConsumerOffsetResponseHeader = UpdateConsumerOffsetResponseHeader::default();
413        assert_eq!(format!("{:?}", header), format!("{:?}", default));
414    }
415
416    #[test]
417    fn response_header_is_zero_sized() {
418        assert_eq!(std::mem::size_of::<UpdateConsumerOffsetResponseHeader>(), 0);
419    }
420
421    #[test]
422    fn response_header_serializes_to_empty_json() {
423        let header = UpdateConsumerOffsetResponseHeader::default();
424        let json = serde_json::to_string(&header).unwrap();
425
426        assert_eq!(json, "{}");
427    }
428
429    #[test]
430    fn response_header_deserializes_from_empty_json() {
431        let json = "{}";
432        let header: UpdateConsumerOffsetResponseHeader = serde_json::from_str(json).unwrap();
433
434        let default = UpdateConsumerOffsetResponseHeader::default();
435
436        assert_eq!(format!("{:?}", header), format!("{:?}", default));
437    }
438
439    #[test]
440    fn response_header_round_trip() {
441        let original = UpdateConsumerOffsetResponseHeader::default();
442
443        let json = serde_json::to_string(&original).unwrap();
444        let decoded: UpdateConsumerOffsetResponseHeader = serde_json::from_str(&json).unwrap();
445
446        assert_eq!(format!("{:?}", original), format!("{:?}", decoded));
447    }
448
449    #[test]
450    fn response_header_ignores_extra_fields() {
451        let json = r#"{"unexpected":"field"}"#;
452
453        let header: UpdateConsumerOffsetResponseHeader = serde_json::from_str(json).unwrap();
454
455        let default = UpdateConsumerOffsetResponseHeader::default();
456
457        assert_eq!(format!("{:?}", header), format!("{:?}", default));
458    }
459
460    #[test]
461    fn all_instances_are_equivalent() {
462        let a = UpdateConsumerOffsetResponseHeader {};
463        let b = UpdateConsumerOffsetResponseHeader::default();
464
465        assert_eq!(format!("{:?}", a), format!("{:?}", b));
466    }
467
468    #[test]
469    fn response_header_encodes_to_empty_map() {
470        let header = UpdateConsumerOffsetResponseHeader::default();
471        let map = header.to_map().unwrap();
472
473        assert!(map.is_empty());
474    }
475}