rocketmq_client_v4/protocols/header/
get_consumer_status_request_header.rs

1use crate::protocols::body::get_consumer_status_body::GetConsumerStatusBody;
2use crate::protocols::body::message_queue::MessageQueue;
3use crate::protocols::mq_command::MqCommand;
4use crate::protocols::{request_code, SerializeDeserialize};
5use log::info;
6use serde::{Deserialize, Serialize};
7use tokio::io::AsyncWriteExt;
8use tokio::net::TcpStream;
9
10#[derive(Debug, Serialize, Deserialize)]
11#[allow(non_snake_case)]
12pub struct GetConsumerStatusRequestHeader {
13    pub topic: String,
14    pub group: String,
15    pub clientAddr: String,
16}
17
18impl GetConsumerStatusRequestHeader {
19    pub fn new(topic: String, group: String, client_addr: String) -> Self {
20        GetConsumerStatusRequestHeader {
21            topic,
22            group,
23            clientAddr: client_addr,
24        }
25    }
26
27    pub async fn send_request(
28        &mut self,
29        broker_stream: &mut TcpStream,
30        queue_list: &Vec<MessageQueue>,
31    ) {
32        let header = self.to_bytes_1();
33        let cmd = MqCommand::new_with_body(
34            request_code::INVOKE_BROKER_TO_GET_CONSUMER_STATUS,
35            vec![],
36            header,
37            vec![],
38        );
39        let write = broker_stream.write_all(&cmd.to_bytes()).await;
40        if write.is_err() {
41            panic!(
42                "send INVOKE_BROKER_TO_GET_CONSUMER_STATUS failed.:{:?}",
43                write.err()
44            );
45        }
46        for _ in 0..5 {
47            let resp_cmd = MqCommand::read_from_stream(broker_stream).await;
48            info!("INVOKE_BROKER_TO_GET_CONSUMER_STATUS response:resp code:{:?}, opaque:{:?},req_opaque:{}, remark:{:?}, extend:{:?}, body:{:?}",
49                 resp_cmd.req_code, resp_cmd.opaque, cmd.opaque, String::from_utf8(resp_cmd.r_body), String::from_utf8(resp_cmd.e_body), String::from_utf8(resp_cmd.body)
50            );
51            if resp_cmd.req_code == request_code::GET_CONSUMER_STATUS_FROM_CLIENT {
52                // send response to broker
53                let resp_body = GetConsumerStatusBody::new_from_queues(queue_list);
54                resp_body.send_request(broker_stream, resp_cmd.opaque).await;
55            }
56
57            if resp_cmd.opaque == cmd.opaque {
58                break;
59            }
60        }
61    }
62}
63
64impl SerializeDeserialize for GetConsumerStatusRequestHeader {}