rocketmq_client_v4/protocols/header/
query_consumer_offset_request_header.rs

1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::request_code::QUERY_CONSUMER_OFFSET;
3use crate::protocols::{ConvertUtil, SerializeDeserialize};
4use bytes::{Buf, Bytes};
5use log::debug;
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Serialize, Deserialize)]
9#[allow(non_snake_case)]
10pub struct QueryConsumerOffsetRequestHeader {
11    pub consumerGroup: String,
12    pub topic: String,
13    pub queueId: i32,
14}
15
16impl SerializeDeserialize for QueryConsumerOffsetRequestHeader {}
17
18impl QueryConsumerOffsetRequestHeader {
19    pub fn new(consumer_group: String, topic: String, queue_id: i32) -> Self {
20        Self {
21            consumerGroup: consumer_group,
22            queueId: queue_id,
23            topic,
24        }
25    }
26
27    pub fn to_command(&self) -> MqCommand {
28        MqCommand::new_with_body(QUERY_CONSUMER_OFFSET, vec![], self.to_bytes_1(), vec![])
29    }
30
31
32    pub fn convert_from_cmd(cmd: &MqCommand) -> Self {
33        //  e_body:Ok(\"\\0\\rconsumerGroup\\0\\0\\0 consume_pushNoticeMessage_test_2\\0\\u{7}queueId\\0\\0\\0\\u{1}0\\0\\u{5}topic\\0\\0\\0\\u{14}pushNoticeMessage_To\")
34        // debug!(
35        //     "QueryConsumerOffsetRequestHeader: body:{:?}, r_body:{:?}, e_body:{:?}",
36        //     String::from_utf8(cmd.body.clone()),
37        //     String::from_utf8(cmd.r_body.clone()),
38        //     String::from_utf8(cmd.e_body.clone())
39        // );
40        let body = &cmd.e_body;
41        let mut body = Bytes::copy_from_slice(body);
42
43        let consumer_group_len = body.get_i16();
44        let _ = body.copy_to_bytes(consumer_group_len as usize);
45        let consumer_group_v_len = body.get_i32();
46        let consumer_group_body = body.copy_to_bytes(consumer_group_v_len as usize);
47
48        let queue_id_key_len = body.get_i16();
49        let _ = body.copy_to_bytes(queue_id_key_len as usize);
50        let queue_id_value_len = body.get_i32();
51        let queue_id_body = body.copy_to_bytes(queue_id_value_len as usize);
52        let queue_id = ConvertUtil::convert_string_bytes_to_i32(queue_id_body.to_vec());
53
54        let topic_key_len = body.get_i16();
55        let _ = body.copy_to_bytes(topic_key_len as usize);
56        let topic_value_len = body.get_i32();
57        let topic_body = body.copy_to_bytes(topic_value_len as usize);
58
59        let ret = Self {
60            consumerGroup: String::from_utf8(consumer_group_body.to_vec()).unwrap(),
61            topic: String::from_utf8(topic_body.to_vec()).unwrap(),
62            queueId: queue_id,
63        };
64        debug!("QueryConsumerOffsetRequestHeader:{:?}", ret);
65
66        ret
67    }
68}