rocketmq_client_v4/protocols/header/
query_consumer_offset_response_header.rs

1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::{mq_command, ConvertUtil};
3use bytes::{Buf, Bytes};
4use log::{info, warn};
5use serde::{Deserialize, Serialize};
6use tokio::net::TcpStream;
7
8#[derive(Debug, Deserialize, Serialize)]
9pub struct QueryConsumerOffsetResponseHeader {
10    pub offset: i64,
11}
12
13impl QueryConsumerOffsetResponseHeader {
14    pub fn convert_from_command(cmd: MqCommand) -> Option<Self> {
15        return match cmd.header_serialize_method {
16            mq_command::HEADER_SERIALIZE_METHOD_JSON => {
17                info!(
18                    "QueryConsumerOffsetResponseHeader e_body:{:?}",
19                    String::from_utf8(cmd.e_body)
20                );
21                None
22            }
23            _ => {
24                let mut bytes = Bytes::from(cmd.e_body);
25                let key_len = bytes.get_i16();
26                let _ = bytes.copy_to_bytes(key_len as usize);
27                let body_len = bytes.get_i32();
28                let body = bytes.copy_to_bytes(body_len as usize).to_vec();
29                let offset = ConvertUtil::convert_string_bytes_to_i64(body);
30                return Some(QueryConsumerOffsetResponseHeader { offset });
31            }
32        };
33    }
34    pub async fn read_from_broker(broker_stream: &mut TcpStream, opaque: i32) -> i64 {
35        let frame = MqCommand::read_from_stream_with_opaque(broker_stream, opaque).await;
36
37        if frame.e_len == 0 {
38            warn!(
39                "read from QueryConsumerOffsetResponseHeader failed, code:{}, r_body:{:?}",
40                frame.req_code,
41                String::from_utf8(frame.r_body)
42            );
43            return 0;
44        }
45
46        match Self::convert_from_command(frame) {
47            None => {
48                return 0;
49            }
50            Some(header) => header.offset,
51        }
52    }
53}
54
55#[cfg(test)]
56mod test {
57    use crate::protocols::ConvertUtil;
58
59    #[test]
60    fn i64_test() {
61        let v = vec![48];
62        let i = ConvertUtil::convert_string_bytes_to_i64(v);
63        println!("data:{:?}", i);
64    }
65}