rocketmq_remoting/protocol/header/
update_consumer_offset_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
24use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
25
26#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27pub struct UpdateConsumerOffsetResponseHeader {}
28
29#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
30#[serde(rename_all = "camelCase")]
31pub struct UpdateConsumerOffsetRequestHeader {
32    #[required]
33    pub consumer_group: CheetahString,
34    #[required]
35    pub topic: CheetahString,
36    #[required]
37    pub queue_id: i32,
38    #[required]
39    pub commit_offset: i64,
40    #[serde(flatten)]
41    pub topic_request_header: Option<TopicRequestHeader>,
42}
43/*impl UpdateConsumerOffsetRequestHeader {
44    pub const CONSUMER_GROUP: &'static str = "consumerGroup";
45    pub const TOPIC: &'static str = "topic";
46    pub const QUEUE_ID: &'static str = "queueId";
47    pub const COMMIT_OFFSET: &'static str = "commitOffset";
48}
49
50impl CommandCustomHeader for UpdateConsumerOffsetRequestHeader {
51    fn to_map(&self) -> Option<HashMap<CheetahString, CheetahString>> {
52        let mut map = HashMap::new();
53
54        map.insert(
55            CheetahString::from_static_str(Self::CONSUMER_GROUP),
56            self.consumer_group.clone(),
57        );
58        map.insert(
59            CheetahString::from_static_str(Self::TOPIC),
60            self.topic.clone(),
61        );
62        if let Some(queue_id) = self.queue_id {
63            map.insert(
64                CheetahString::from_static_str(Self::QUEUE_ID),
65                CheetahString::from_string(queue_id.to_string()),
66            );
67        }
68        if let Some(commit_offset) = self.commit_offset {
69            map.insert(
70                CheetahString::from_static_str(Self::COMMIT_OFFSET),
71                CheetahString::from_string(commit_offset.to_string()),
72            );
73        }
74        if let Some(ref value) = self.topic_request_header {
75            if let Some(val) = value.to_map() {
76                map.extend(val);
77            }
78        }
79        Some(map)
80    }
81}
82
83impl FromMap for UpdateConsumerOffsetRequestHeader {
84    type Error = rocketmq_error::RocketMQError;
85
86    type Target = Self;
87
88    fn from(map: &HashMap<CheetahString, CheetahString>) -> Result<Self::Target, Self::Error> {
89        Ok(UpdateConsumerOffsetRequestHeader {
90            consumer_group: map
91                .get(&CheetahString::from_static_str(
92                    UpdateConsumerOffsetRequestHeader::CONSUMER_GROUP,
93                ))
94                .cloned()
95                .unwrap_or_default(),
96            topic: map
97                .get(&CheetahString::from_static_str(
98                    UpdateConsumerOffsetRequestHeader::TOPIC,
99                ))
100                .cloned()
101                .unwrap_or_default(),
102            queue_id: map
103                .get(&CheetahString::from_static_str(
104                    UpdateConsumerOffsetRequestHeader::QUEUE_ID,
105                ))
106                .and_then(|v| v.parse().ok()),
107            commit_offset: map
108                .get(&CheetahString::from_static_str(
109                    UpdateConsumerOffsetRequestHeader::COMMIT_OFFSET,
110                ))
111                .and_then(|v| v.parse().ok()),
112            topic_request_header: Some(<TopicRequestHeader as FromMap>::from(map)?),
113        })
114    }
115}*/
116
117impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
118    fn set_lo(&mut self, lo: Option<bool>) {
119        self.topic_request_header.as_mut().unwrap().lo = lo;
120    }
121
122    fn lo(&self) -> Option<bool> {
123        self.topic_request_header.as_ref().unwrap().lo
124    }
125
126    fn set_topic(&mut self, topic: CheetahString) {
127        self.topic = topic;
128    }
129
130    fn topic(&self) -> &CheetahString {
131        &self.topic
132    }
133
134    fn broker_name(&self) -> Option<&CheetahString> {
135        self.topic_request_header
136            .as_ref()
137            .unwrap()
138            .rpc
139            .as_ref()
140            .unwrap()
141            .broker_name
142            .as_ref()
143    }
144
145    fn set_broker_name(&mut self, broker_name: CheetahString) {
146        self.topic_request_header
147            .as_mut()
148            .unwrap()
149            .rpc
150            .as_mut()
151            .unwrap()
152            .broker_name = Some(broker_name);
153    }
154
155    fn namespace(&self) -> Option<&str> {
156        self.topic_request_header
157            .as_ref()
158            .unwrap()
159            .rpc
160            .as_ref()
161            .unwrap()
162            .namespace
163            .as_deref()
164    }
165
166    fn set_namespace(&mut self, namespace: CheetahString) {
167        self.topic_request_header
168            .as_mut()
169            .unwrap()
170            .rpc
171            .as_mut()
172            .unwrap()
173            .namespace = Some(namespace);
174    }
175
176    fn namespaced(&self) -> Option<bool> {
177        self.topic_request_header
178            .as_ref()
179            .unwrap()
180            .rpc
181            .as_ref()
182            .unwrap()
183            .namespaced
184    }
185
186    fn set_namespaced(&mut self, namespaced: bool) {
187        self.topic_request_header
188            .as_mut()
189            .unwrap()
190            .rpc
191            .as_mut()
192            .unwrap()
193            .namespaced = Some(namespaced);
194    }
195
196    fn oneway(&self) -> Option<bool> {
197        self.topic_request_header
198            .as_ref()
199            .unwrap()
200            .rpc
201            .as_ref()
202            .unwrap()
203            .oneway
204    }
205
206    fn set_oneway(&mut self, oneway: bool) {
207        self.topic_request_header
208            .as_mut()
209            .unwrap()
210            .rpc
211            .as_mut()
212            .unwrap()
213            .oneway = Some(oneway);
214    }
215
216    fn queue_id(&self) -> i32 {
217        self.queue_id
218    }
219
220    fn set_queue_id(&mut self, queue_id: i32) {
221        self.queue_id = queue_id;
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use std::collections::HashMap;
228
229    use cheetah_string::CheetahString;
230
231    use super::*;
232    use crate::protocol::command_custom_header::CommandCustomHeader;
233    use crate::protocol::command_custom_header::FromMap;
234
235    #[test]
236    fn update_consumer_offset_request_header_serializes_correctly() {
237        let header = UpdateConsumerOffsetRequestHeader {
238            consumer_group: CheetahString::from_static_str("test_consumer_group"),
239            topic: CheetahString::from_static_str("test_topic"),
240            queue_id: 1,
241            commit_offset: 100,
242            topic_request_header: None,
243        };
244        let map = header.to_map().unwrap();
245        assert_eq!(
246            map.get(&CheetahString::from_static_str("consumerGroup"))
247                .unwrap(),
248            "test_consumer_group"
249        );
250        assert_eq!(
251            map.get(&CheetahString::from_static_str("topic")).unwrap(),
252            "test_topic"
253        );
254        assert_eq!(
255            map.get(&CheetahString::from_static_str("queueId")).unwrap(),
256            "1"
257        );
258        assert_eq!(
259            map.get(&CheetahString::from_static_str("commitOffset"))
260                .unwrap(),
261            "100"
262        );
263    }
264
265    #[test]
266    fn update_consumer_offset_request_header_deserializes_correctly() {
267        let mut map = HashMap::new();
268        map.insert(
269            CheetahString::from_static_str("consumerGroup"),
270            CheetahString::from_static_str("test_consumer_group"),
271        );
272        map.insert(
273            CheetahString::from_static_str("topic"),
274            CheetahString::from_static_str("test_topic"),
275        );
276        map.insert(
277            CheetahString::from_static_str("queueId"),
278            CheetahString::from_static_str("1"),
279        );
280        map.insert(
281            CheetahString::from_static_str("commitOffset"),
282            CheetahString::from_static_str("100"),
283        );
284
285        let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
286        assert_eq!(header.consumer_group, "test_consumer_group");
287        assert_eq!(header.topic, "test_topic");
288        assert_eq!(header.queue_id, 1);
289        assert_eq!(header.commit_offset, 100);
290    }
291
292    #[test]
293    fn update_consumer_offset_request_header_handles_missing_optional_fields() {
294        let mut map = HashMap::new();
295        map.insert(
296            CheetahString::from_static_str("consumerGroup"),
297            CheetahString::from_static_str("test_consumer_group"),
298        );
299        map.insert(
300            CheetahString::from_static_str("topic"),
301            CheetahString::from_static_str("test_topic"),
302        );
303        map.insert(
304            CheetahString::from_static_str("queueId"),
305            CheetahString::from_static_str("1"),
306        );
307        map.insert(
308            CheetahString::from_static_str("commitOffset"),
309            CheetahString::from_static_str("100"),
310        );
311
312        let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
313        assert_eq!(header.consumer_group, "test_consumer_group");
314        assert_eq!(header.topic, "test_topic");
315        assert_eq!(header.queue_id, 1);
316        assert_eq!(header.commit_offset, 100);
317        assert!(header.topic_request_header.is_some());
318    }
319
320    #[test]
321    fn update_consumer_offset_request_header_handles_invalid_data() {
322        let mut map = HashMap::new();
323        map.insert(
324            CheetahString::from_static_str("consumerGroup"),
325            CheetahString::from_static_str("test_consumer_group"),
326        );
327        map.insert(
328            CheetahString::from_static_str("topic"),
329            CheetahString::from_static_str("test_topic"),
330        );
331        map.insert(
332            CheetahString::from_static_str("queueId"),
333            CheetahString::from_static_str("invalid"),
334        );
335        map.insert(
336            CheetahString::from_static_str("commitOffset"),
337            CheetahString::from_static_str("invalid"),
338        );
339
340        let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
341        assert!(result.is_err());
342    }
343}