rocketmq_client_v4/protocols/body/
heartbeat_data.rs1use crate::protocols::body::consumer_data::{ConsumerData, CONSUME_TYPE_PUSH};
2use crate::protocols::body::producer_data::ProducerData;
3use crate::protocols::body::subscription_data::SubscriptionData;
4use crate::protocols::mq_command::MqCommand;
5use crate::protocols::{request_code, SerializeDeserialize};
6use log::{debug, info};
7use serde::Serialize;
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10
11#[derive(Debug, Serialize)]
12#[allow(non_snake_case)]
13pub struct HeartbeatData {
14 pub clientID: String,
15 pub producerDataSet: Vec<ProducerData>,
16 pub consumerDataSet: Vec<ConsumerData>,
17}
18
19impl HeartbeatData {
20 pub fn new_producer_data(client_id: String, group_name: String) -> Self {
21 HeartbeatData {
22 clientID: client_id,
23 producerDataSet: vec![ProducerData {
24 groupName: group_name,
25 }],
26 consumerDataSet: vec![],
27 }
28 }
29
30 pub fn new_push_consumer_data(
31 client_id: String,
32 group_name: String,
33 consume_from_where: i32,
34 subscription_data: SubscriptionData,
35 message_model: String,
36 ) -> Self {
37 HeartbeatData {
38 clientID: client_id,
39 producerDataSet: vec![],
40 consumerDataSet: vec![ConsumerData {
41 groupName: group_name,
42 consumeFromWhere: consume_from_where,
43 subscriptionDataSet: vec![subscription_data],
44 consumeType: CONSUME_TYPE_PUSH.to_string(),
45 messageModel: message_model,
46 unitMode: false,
47 }],
48 }
49 }
50
51 pub async fn send_heartbeat(&self, broker_stream: &mut TcpStream) {
52 let body = self.to_json_bytes();
53 let body = MqCommand::new_with_body(request_code::HEART_BEAT, vec![], vec![], body);
54 let opa = body.opaque;
55 let body = body.to_bytes();
56 let result = broker_stream.write_all(&body).await;
57 if result.is_err() {
58 panic!("send heartbeat failed: {:?}", result.err());
59 }
60 let _ = broker_stream.flush().await;
61 let resp = MqCommand::read_from_stream_with_opaque(broker_stream, opa).await;
62 debug!("send_heartbeat req opa:{}, resp opa:{}", opa, resp.opaque);
63 if resp.req_code != 0 {
64 info!(
65 "send heartbeat resp:{}, remark:{:?},extends:{:?} ",
66 resp.req_code,
67 String::from_utf8(resp.r_body),
68 String::from_utf8(resp.e_body)
69 );
70 }
71 }
72}
73
74impl SerializeDeserialize for HeartbeatData {}