rocketmq_client/
protocol.rs

1//!
2//! Define protocols used when talking to Apache RocketMQ servers.
3//!
4use serde::Deserialize;
5use std::collections::HashMap;
6use std::vec::Vec;
7
8pub struct GetRouteInfoRequestHeader {
9    topic: String,
10}
11
12impl GetRouteInfoRequestHeader {
13    pub fn new(topic: &str) -> Self {
14        Self {
15            topic: topic.to_owned(),
16        }
17    }
18}
19
20impl From<GetRouteInfoRequestHeader> for HashMap<String, String> {
21    fn from(header: GetRouteInfoRequestHeader) -> HashMap<String, String> {
22        let mut map = HashMap::new();
23        map.insert("topic".to_owned(), header.topic);
24        map
25    }
26}
27
28#[derive(Debug, Deserialize)]
29#[serde(rename_all = "camelCase")]
30pub struct QueueData {
31    pub(crate) broker_name: String,
32    pub(crate) read_queue_nums: i32,
33    pub(crate) write_queue_nums: i32,
34    pub(crate) perm: i32,
35    pub(crate) topic_syn_flag: i32,
36}
37
38#[derive(Debug, Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct BrokerData {
41    pub(crate) cluster: String,
42    pub(crate) broker_name: String,
43    pub(crate) broker_addrs: HashMap<i64, String>,
44}
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct TopicRouteData {
49    pub(crate) order_topic_conf: Option<String>,
50
51    pub(crate) queue_datas: Vec<QueueData>,
52
53    pub(crate) broker_datas: Vec<BrokerData>,
54
55    // deprecated
56    pub(crate) filter_server_table: HashMap<String, Vec<String>>,
57}
58
59#[derive(Debug)]
60pub(crate) struct SendMessageRequestHeader {
61    pub(crate) producer_group: String,
62    pub(crate) topic: String,
63    pub(crate) default_topic: String,
64    pub(crate) default_topic_queue_nums: i32,
65    pub(crate) queue_id: i32,
66    pub(crate) sys_flag: i32,
67    pub(crate) born_timestamp: i64,
68    pub(crate) flag: i32,
69    pub(crate) properties: Option<String>,
70    pub(crate) reconsume_times: Option<i32>,
71    pub(crate) unit_mode: Option<bool>,
72    pub(crate) batch: Option<bool>,
73    pub(crate) max_reconsume_times: Option<i32>,
74}
75
76impl From<SendMessageRequestHeader> for HashMap<String, String> {
77    fn from(header: SendMessageRequestHeader) -> Self {
78        let mut map = HashMap::new();
79        map.insert("producerGroup".to_owned(), header.producer_group);
80        map.insert("topic".to_owned(), header.topic);
81        map.insert("defaultTopic".to_owned(), header.default_topic);
82        map.insert(
83            "defaultTopicQueueNums".to_owned(),
84            format!("{}", header.default_topic_queue_nums),
85        );
86        map.insert("queueId".to_owned(), format!("{}", header.queue_id));
87        map.insert("sysFlag".to_owned(), format!("{}", header.sys_flag));
88        map.insert(
89            "bornTimestamp".to_owned(),
90            format!("{}", header.born_timestamp),
91        );
92        map.insert("flag".to_owned(), format!("{}", header.flag));
93        if let Some(properties) = header.properties {
94            map.insert("properties".to_owned(), properties);
95        }
96
97        if let Some(reconsume_times) = header.reconsume_times {
98            map.insert("reconsumeTimes".to_owned(), format!("{}", reconsume_times));
99        }
100
101        if let Some(unit_mode) = header.unit_mode {
102            map.insert("unitMode".to_owned(), format!("{}", unit_mode));
103        }
104
105        if let Some(batch) = header.batch {
106            map.insert("batch".to_owned(), format!("{}", batch));
107        }
108
109        if let Some(max_reconsume_times) = header.max_reconsume_times {
110            map.insert(
111                "maxReconsumeTimes".to_owned(),
112                format!("{}", max_reconsume_times),
113            );
114        }
115
116        map
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use std::collections::HashMap;
124
125    #[test]
126    fn test_get_route_info_request_header() {
127        let header = GetRouteInfoRequestHeader::new("Test");
128        let map: HashMap<String, String> = header.into();
129        assert_eq!(map.len(), 1);
130        assert_eq!(Some(&String::from("Test")), map.get("topic"));
131    }
132
133    #[test]
134    fn test_queue_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
135        let json = r#"
136        {"brokerName":"b1","perm":1,"readQueueNums":8,"topicSynFlag":2,"writeQueueNums":6}
137        "#;
138        let queue_data: QueueData = serde_json::from_str(json)?;
139        assert_eq!(queue_data.broker_name, "b1");
140        assert_eq!(queue_data.perm, 1);
141        assert_eq!(queue_data.read_queue_nums, 8);
142        assert_eq!(queue_data.write_queue_nums, 6);
143        assert_eq!(queue_data.topic_syn_flag, 2);
144        Ok(())
145    }
146
147    #[test]
148    fn test_broker_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
149        let json = r#"
150        {"brokerAddrs":{"0":"localhost:8888","1":"localhost:1234"},"brokerName":"b1","cluster":"C1","enableActingMaster":false}
151        "#;
152        let broker_data: BrokerData = serde_json::from_str(json)?;
153        assert_eq!(broker_data.broker_name, "b1");
154        assert_eq!(broker_data.cluster, "C1");
155        assert_eq!(broker_data.broker_addrs.len(), 2);
156        Ok(())
157    }
158
159    #[test]
160    fn test_topic_route_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
161        let json = r#"
162        {"brokerDatas":[{"brokerAddrs":{"0":"localhost:8888","1":"localhost:1234"},"brokerName":"b1","cluster":"C1","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"b1","perm":1,"readQueueNums":8,"topicSynFlag":2,"writeQueueNums":6}]}
163        "#;
164        let topic_route_data: TopicRouteData = serde_json::from_str(json)?;
165        assert_eq!(topic_route_data.order_topic_conf, None);
166        assert_eq!(topic_route_data.broker_datas.len(), 1);
167        assert_eq!(topic_route_data.queue_datas.len(), 1);
168        if let Some(broker_data) = topic_route_data.broker_datas.first() {
169            assert_eq!(broker_data.broker_name, "b1");
170        }
171
172        if let Some(queue_data) = topic_route_data.queue_datas.first() {
173            assert_eq!(queue_data.broker_name, "b1");
174        }
175
176        Ok(())
177    }
178}