rocketmq_client_v4/protocols/header/
query_consumer_offset_response_header.rs1use crate::protocols::mq_command::MqCommand;
2use crate::protocols::{mq_command, ConvertUtil};
3use bytes::{Buf, Bytes};
4use log::{info, warn};
5use serde::{Deserialize, Serialize};
6use tokio::net::TcpStream;
7
8#[derive(Debug, Deserialize, Serialize)]
9pub struct QueryConsumerOffsetResponseHeader {
10 pub offset: i64,
11}
12
13impl QueryConsumerOffsetResponseHeader {
14 pub fn convert_from_command(cmd: MqCommand) -> Option<Self> {
15 return match cmd.header_serialize_method {
16 mq_command::HEADER_SERIALIZE_METHOD_JSON => {
17 info!(
18 "QueryConsumerOffsetResponseHeader e_body:{:?}",
19 String::from_utf8(cmd.e_body)
20 );
21 None
22 }
23 _ => {
24 let mut bytes = Bytes::from(cmd.e_body);
25 let key_len = bytes.get_i16();
26 let _ = bytes.copy_to_bytes(key_len as usize);
27 let body_len = bytes.get_i32();
28 let body = bytes.copy_to_bytes(body_len as usize).to_vec();
29 let offset = ConvertUtil::convert_string_bytes_to_i64(body);
30 return Some(QueryConsumerOffsetResponseHeader { offset });
31 }
32 };
33 }
34 pub async fn read_from_broker(broker_stream: &mut TcpStream, opaque: i32) -> i64 {
35 let frame = MqCommand::read_from_stream_with_opaque(broker_stream, opaque).await;
36
37 if frame.e_len == 0 {
38 warn!(
39 "read from QueryConsumerOffsetResponseHeader failed, code:{}, r_body:{:?}",
40 frame.req_code,
41 String::from_utf8(frame.r_body)
42 );
43 return 0;
44 }
45
46 match Self::convert_from_command(frame) {
47 None => {
48 return 0;
49 }
50 Some(header) => header.offset,
51 }
52 }
53}
54
55#[cfg(test)]
56mod test {
57 use crate::protocols::ConvertUtil;
58
59 #[test]
60 fn i64_test() {
61 let v = vec![48];
62 let i = ConvertUtil::convert_string_bytes_to_i64(v);
63 println!("data:{:?}", i);
64 }
65}