rocketmq_remoting/protocol/header/
update_consumer_offset_header.rs1use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
24use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
25
26#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
27pub struct UpdateConsumerOffsetResponseHeader {}
28
29#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
30#[serde(rename_all = "camelCase")]
31pub struct UpdateConsumerOffsetRequestHeader {
32 #[required]
33 pub consumer_group: CheetahString,
34 #[required]
35 pub topic: CheetahString,
36 #[required]
37 pub queue_id: i32,
38 #[required]
39 pub commit_offset: i64,
40 #[serde(flatten)]
41 pub topic_request_header: Option<TopicRequestHeader>,
42}
43impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
118 fn set_lo(&mut self, lo: Option<bool>) {
119 self.topic_request_header.as_mut().unwrap().lo = lo;
120 }
121
122 fn lo(&self) -> Option<bool> {
123 self.topic_request_header.as_ref().unwrap().lo
124 }
125
126 fn set_topic(&mut self, topic: CheetahString) {
127 self.topic = topic;
128 }
129
130 fn topic(&self) -> &CheetahString {
131 &self.topic
132 }
133
134 fn broker_name(&self) -> Option<&CheetahString> {
135 self.topic_request_header
136 .as_ref()
137 .unwrap()
138 .rpc
139 .as_ref()
140 .unwrap()
141 .broker_name
142 .as_ref()
143 }
144
145 fn set_broker_name(&mut self, broker_name: CheetahString) {
146 self.topic_request_header
147 .as_mut()
148 .unwrap()
149 .rpc
150 .as_mut()
151 .unwrap()
152 .broker_name = Some(broker_name);
153 }
154
155 fn namespace(&self) -> Option<&str> {
156 self.topic_request_header
157 .as_ref()
158 .unwrap()
159 .rpc
160 .as_ref()
161 .unwrap()
162 .namespace
163 .as_deref()
164 }
165
166 fn set_namespace(&mut self, namespace: CheetahString) {
167 self.topic_request_header
168 .as_mut()
169 .unwrap()
170 .rpc
171 .as_mut()
172 .unwrap()
173 .namespace = Some(namespace);
174 }
175
176 fn namespaced(&self) -> Option<bool> {
177 self.topic_request_header
178 .as_ref()
179 .unwrap()
180 .rpc
181 .as_ref()
182 .unwrap()
183 .namespaced
184 }
185
186 fn set_namespaced(&mut self, namespaced: bool) {
187 self.topic_request_header
188 .as_mut()
189 .unwrap()
190 .rpc
191 .as_mut()
192 .unwrap()
193 .namespaced = Some(namespaced);
194 }
195
196 fn oneway(&self) -> Option<bool> {
197 self.topic_request_header
198 .as_ref()
199 .unwrap()
200 .rpc
201 .as_ref()
202 .unwrap()
203 .oneway
204 }
205
206 fn set_oneway(&mut self, oneway: bool) {
207 self.topic_request_header
208 .as_mut()
209 .unwrap()
210 .rpc
211 .as_mut()
212 .unwrap()
213 .oneway = Some(oneway);
214 }
215
216 fn queue_id(&self) -> i32 {
217 self.queue_id
218 }
219
220 fn set_queue_id(&mut self, queue_id: i32) {
221 self.queue_id = queue_id;
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::collections::HashMap;
228
229 use cheetah_string::CheetahString;
230
231 use super::*;
232 use crate::protocol::command_custom_header::CommandCustomHeader;
233 use crate::protocol::command_custom_header::FromMap;
234
235 #[test]
236 fn update_consumer_offset_request_header_serializes_correctly() {
237 let header = UpdateConsumerOffsetRequestHeader {
238 consumer_group: CheetahString::from_static_str("test_consumer_group"),
239 topic: CheetahString::from_static_str("test_topic"),
240 queue_id: 1,
241 commit_offset: 100,
242 topic_request_header: None,
243 };
244 let map = header.to_map().unwrap();
245 assert_eq!(
246 map.get(&CheetahString::from_static_str("consumerGroup"))
247 .unwrap(),
248 "test_consumer_group"
249 );
250 assert_eq!(
251 map.get(&CheetahString::from_static_str("topic")).unwrap(),
252 "test_topic"
253 );
254 assert_eq!(
255 map.get(&CheetahString::from_static_str("queueId")).unwrap(),
256 "1"
257 );
258 assert_eq!(
259 map.get(&CheetahString::from_static_str("commitOffset"))
260 .unwrap(),
261 "100"
262 );
263 }
264
265 #[test]
266 fn update_consumer_offset_request_header_deserializes_correctly() {
267 let mut map = HashMap::new();
268 map.insert(
269 CheetahString::from_static_str("consumerGroup"),
270 CheetahString::from_static_str("test_consumer_group"),
271 );
272 map.insert(
273 CheetahString::from_static_str("topic"),
274 CheetahString::from_static_str("test_topic"),
275 );
276 map.insert(
277 CheetahString::from_static_str("queueId"),
278 CheetahString::from_static_str("1"),
279 );
280 map.insert(
281 CheetahString::from_static_str("commitOffset"),
282 CheetahString::from_static_str("100"),
283 );
284
285 let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
286 assert_eq!(header.consumer_group, "test_consumer_group");
287 assert_eq!(header.topic, "test_topic");
288 assert_eq!(header.queue_id, 1);
289 assert_eq!(header.commit_offset, 100);
290 }
291
292 #[test]
293 fn update_consumer_offset_request_header_handles_missing_optional_fields() {
294 let mut map = HashMap::new();
295 map.insert(
296 CheetahString::from_static_str("consumerGroup"),
297 CheetahString::from_static_str("test_consumer_group"),
298 );
299 map.insert(
300 CheetahString::from_static_str("topic"),
301 CheetahString::from_static_str("test_topic"),
302 );
303 map.insert(
304 CheetahString::from_static_str("queueId"),
305 CheetahString::from_static_str("1"),
306 );
307 map.insert(
308 CheetahString::from_static_str("commitOffset"),
309 CheetahString::from_static_str("100"),
310 );
311
312 let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
313 assert_eq!(header.consumer_group, "test_consumer_group");
314 assert_eq!(header.topic, "test_topic");
315 assert_eq!(header.queue_id, 1);
316 assert_eq!(header.commit_offset, 100);
317 assert!(header.topic_request_header.is_some());
318 }
319
320 #[test]
321 fn update_consumer_offset_request_header_handles_invalid_data() {
322 let mut map = HashMap::new();
323 map.insert(
324 CheetahString::from_static_str("consumerGroup"),
325 CheetahString::from_static_str("test_consumer_group"),
326 );
327 map.insert(
328 CheetahString::from_static_str("topic"),
329 CheetahString::from_static_str("test_topic"),
330 );
331 map.insert(
332 CheetahString::from_static_str("queueId"),
333 CheetahString::from_static_str("invalid"),
334 );
335 map.insert(
336 CheetahString::from_static_str("commitOffset"),
337 CheetahString::from_static_str("invalid"),
338 );
339
340 let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
341 assert!(result.is_err());
342 }
343}