rocketmq_client_v4/protocols/header/
get_consumer_status_request_header.rs1use crate::protocols::body::get_consumer_status_body::GetConsumerStatusBody;
2use crate::protocols::body::message_queue::MessageQueue;
3use crate::protocols::mq_command::MqCommand;
4use crate::protocols::{request_code, SerializeDeserialize};
5use log::info;
6use serde::{Deserialize, Serialize};
7use tokio::io::AsyncWriteExt;
8use tokio::net::TcpStream;
9
10#[derive(Debug, Serialize, Deserialize)]
11#[allow(non_snake_case)]
12pub struct GetConsumerStatusRequestHeader {
13 pub topic: String,
14 pub group: String,
15 pub clientAddr: String,
16}
17
18impl GetConsumerStatusRequestHeader {
19 pub fn new(topic: String, group: String, client_addr: String) -> Self {
20 GetConsumerStatusRequestHeader {
21 topic,
22 group,
23 clientAddr: client_addr,
24 }
25 }
26
27 pub async fn send_request(
28 &mut self,
29 broker_stream: &mut TcpStream,
30 queue_list: &Vec<MessageQueue>,
31 ) {
32 let header = self.to_bytes_1();
33 let cmd = MqCommand::new_with_body(
34 request_code::INVOKE_BROKER_TO_GET_CONSUMER_STATUS,
35 vec![],
36 header,
37 vec![],
38 );
39 let write = broker_stream.write_all(&cmd.to_bytes()).await;
40 if write.is_err() {
41 panic!(
42 "send INVOKE_BROKER_TO_GET_CONSUMER_STATUS failed.:{:?}",
43 write.err()
44 );
45 }
46 for _ in 0..5 {
47 let resp_cmd = MqCommand::read_from_stream(broker_stream).await;
48 info!("INVOKE_BROKER_TO_GET_CONSUMER_STATUS response:resp code:{:?}, opaque:{:?},req_opaque:{}, remark:{:?}, extend:{:?}, body:{:?}",
49 resp_cmd.req_code, resp_cmd.opaque, cmd.opaque, String::from_utf8(resp_cmd.r_body), String::from_utf8(resp_cmd.e_body), String::from_utf8(resp_cmd.body)
50 );
51 if resp_cmd.req_code == request_code::GET_CONSUMER_STATUS_FROM_CLIENT {
52 let resp_body = GetConsumerStatusBody::new_from_queues(queue_list);
54 resp_body.send_request(broker_stream, resp_cmd.opaque).await;
55 }
56
57 if resp_cmd.opaque == cmd.opaque {
58 break;
59 }
60 }
61 }
62}
63
64impl SerializeDeserialize for GetConsumerStatusRequestHeader {}