rocketmq_client_v4/protocols/body/
heartbeat_data.rs

1use crate::protocols::body::consumer_data::{ConsumerData, CONSUME_TYPE_PUSH};
2use crate::protocols::body::producer_data::ProducerData;
3use crate::protocols::body::subscription_data::SubscriptionData;
4use crate::protocols::mq_command::MqCommand;
5use crate::protocols::{request_code, SerializeDeserialize};
6use log::{debug, info};
7use serde::Serialize;
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10
11#[derive(Debug, Serialize)]
12#[allow(non_snake_case)]
13pub struct HeartbeatData {
14    pub clientID: String,
15    pub producerDataSet: Vec<ProducerData>,
16    pub consumerDataSet: Vec<ConsumerData>,
17}
18
19impl HeartbeatData {
20    pub fn new_producer_data(client_id: String, group_name: String) -> Self {
21        HeartbeatData {
22            clientID: client_id,
23            producerDataSet: vec![ProducerData {
24                groupName: group_name,
25            }],
26            consumerDataSet: vec![],
27        }
28    }
29
30    pub fn new_push_consumer_data(
31        client_id: String,
32        group_name: String,
33        consume_from_where: i32,
34        subscription_data: SubscriptionData,
35        message_model: String,
36    ) -> Self {
37        HeartbeatData {
38            clientID: client_id,
39            producerDataSet: vec![],
40            consumerDataSet: vec![ConsumerData {
41                groupName: group_name,
42                consumeFromWhere: consume_from_where,
43                subscriptionDataSet: vec![subscription_data],
44                consumeType: CONSUME_TYPE_PUSH.to_string(),
45                messageModel: message_model,
46                unitMode: false,
47            }],
48        }
49    }
50
51    pub async fn send_heartbeat(&self, broker_stream: &mut TcpStream) {
52        let body = self.to_json_bytes();
53        let body = MqCommand::new_with_body(request_code::HEART_BEAT, vec![], vec![], body);
54        let opa = body.opaque;
55        let body = body.to_bytes();
56        let result = broker_stream.write_all(&body).await;
57        if result.is_err() {
58            panic!("send heartbeat failed: {:?}", result.err());
59        }
60        let _ = broker_stream.flush().await;
61        let resp = MqCommand::read_from_stream_with_opaque(broker_stream, opa).await;
62        debug!("send_heartbeat req opa:{}, resp opa:{}", opa, resp.opaque);
63        if resp.req_code != 0 {
64            info!(
65                "send heartbeat resp:{}, remark:{:?},extends:{:?} ",
66                resp.req_code,
67                String::from_utf8(resp.r_body),
68                String::from_utf8(resp.e_body)
69            );
70        }
71    }
72}
73
74impl SerializeDeserialize for HeartbeatData {}