rocketmq_client_v4/protocols/header/
update_consumer_offset_request_header.rs

1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::{request_code, SerializeDeserialize};
3use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Serialize, Deserialize)]
6#[allow(non_snake_case)]
7pub struct UpdateConsumerOffsetRequestHeader {
8    //    @CFNotNull
9    //     private String consumerGroup;
10    //     @CFNotNull
11    //     private String topic;
12    //     @CFNotNull
13    //     private Integer queueId;
14    //     @CFNotNull
15    //     private Long commitOffset;
16    pub consumerGroup: String,
17    pub topic: String,
18    pub queueId: i32,
19    pub commitOffset: i64,
20}
21
22impl UpdateConsumerOffsetRequestHeader {
23    pub fn new(consumer_group: String, topic: String, queue_id: i32, commit_offset: i64) -> Self {
24        UpdateConsumerOffsetRequestHeader {
25            consumerGroup: consumer_group,
26            topic,
27            queueId: queue_id,
28            commitOffset: commit_offset,
29        }
30    }
31
32    pub fn convert_from_command(cmd: &MqCommand) -> Self {
33        let map = Self::bytes_1_to_map(cmd.e_body.clone());
34        let consume_group = map.get("consumerGroup").unwrap();
35        let topic = map.get("topic").unwrap();
36        let queue_id = map.get("queueId").unwrap();
37        let commit_offset = map.get("commitOffset").unwrap();
38        Self {
39            consumerGroup: consume_group.to_string(),
40            topic: topic.to_string(),
41            queueId: queue_id.parse().unwrap(),
42            commitOffset: commit_offset.parse().unwrap(),
43        }
44    }
45
46    pub fn command(&self) -> MqCommand {
47        let body = self.to_bytes_1();
48        MqCommand::new_with_body(request_code::UPDATE_CONSUMER_OFFSET, vec![], body, vec![])
49    }
50}
51
52impl SerializeDeserialize for UpdateConsumerOffsetRequestHeader {}