rocketmq_client_v4/protocols/header/
get_route_info_request_header.rs

1use crate::protocols::body::topic_route_data::TopicRouteData;
2use crate::protocols::mq_command::MqCommand;
3use crate::protocols::request_code::GET_ROUTE_BY_TOPIC;
4use crate::protocols::{fixed_un_standard_json, response_code, SerializeDeserialize};
5use log::{debug, warn};
6use serde::{Deserialize, Serialize};
7use tokio::io::AsyncWriteExt;
8use tokio::net::TcpStream;
9
10#[derive(Serialize, Deserialize)]
11pub struct GetRouteInfoRequestHeader {
12    pub topic: String,
13}
14
15impl GetRouteInfoRequestHeader {
16    pub fn get_route_info_request(topic: &str) -> Self {
17        Self {
18            topic: topic.to_string(),
19        }
20    }
21
22    pub async fn get_topic_route_data(
23        &self,
24        name_server: &mut TcpStream,
25    ) -> Option<TopicRouteData> {
26        let header = self.to_bytes_1();
27        let bytes = MqCommand::new_with_body(GET_ROUTE_BY_TOPIC, vec![], header, vec![]);
28        let opa = bytes.opaque;
29        let bytes = bytes.to_bytes();
30        let req = name_server.write_all(&bytes).await;
31        if req.is_err() {
32            warn!("get_topic_route_data failed{:?}", req);
33            return None;
34        }
35        let _ = name_server.flush().await;
36
37        let cmd = MqCommand::read_from_stream_with_opaque(name_server, opa).await;
38
39        // info!(
40        //     "get_topic_route_data req opa:{}, resp opa{}, data:{}",
41        //     opa,
42        //     cmd.opaque,
43        //     String::from_utf8(cmd.body.clone()).unwrap()
44        // );
45        return match cmd.req_code {
46            response_code::SUCCESS => {
47                // info!(
48                //     "before fixed:{:?}",
49                //     String::from_utf8(cmd.body.clone()).unwrap()
50                // );
51                let body = fixed_un_standard_json(&cmd.body);
52                // info!("after fixed:{:?}", String::from_utf8(body.clone()).unwrap());
53                let data: TopicRouteData = serde_json::from_slice(&body).unwrap();
54                debug!("topic route info:{:?}", data);
55                return Some(data);
56            }
57            _ => {
58                warn!(
59                    "invalid response code:{}, {}",
60                    cmd.req_code,
61                    String::from_utf8(cmd.r_body).unwrap()
62                );
63                None
64            }
65        };
66    }
67}
68
69impl SerializeDeserialize for GetRouteInfoRequestHeader {}
70
71#[cfg(test)]
72mod test {
73    use crate::protocols::header::get_route_info_request_header::GetRouteInfoRequestHeader;
74    use crate::protocols::SerializeDeserialize;
75
76    #[test]
77    fn test_serialize() {
78        let header = GetRouteInfoRequestHeader {
79            topic: "topic111".to_string(),
80        };
81        let bytes = header.to_bytes_1();
82        println!("bytes:{:?}", bytes);
83    }
84}