1use serde::Deserialize;
5use std::collections::HashMap;
6use std::vec::Vec;
7
8pub struct GetRouteInfoRequestHeader {
9 topic: String,
10}
11
12impl GetRouteInfoRequestHeader {
13 pub fn new(topic: &str) -> Self {
14 Self {
15 topic: topic.to_owned(),
16 }
17 }
18}
19
20impl From<GetRouteInfoRequestHeader> for HashMap<String, String> {
21 fn from(header: GetRouteInfoRequestHeader) -> HashMap<String, String> {
22 let mut map = HashMap::new();
23 map.insert("topic".to_owned(), header.topic);
24 map
25 }
26}
27
28#[derive(Debug, Deserialize)]
29#[serde(rename_all = "camelCase")]
30pub struct QueueData {
31 pub(crate) broker_name: String,
32 pub(crate) read_queue_nums: i32,
33 pub(crate) write_queue_nums: i32,
34 pub(crate) perm: i32,
35 pub(crate) topic_syn_flag: i32,
36}
37
38#[derive(Debug, Deserialize)]
39#[serde(rename_all = "camelCase")]
40pub struct BrokerData {
41 pub(crate) cluster: String,
42 pub(crate) broker_name: String,
43 pub(crate) broker_addrs: HashMap<i64, String>,
44}
45
46#[derive(Debug, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct TopicRouteData {
49 pub(crate) order_topic_conf: Option<String>,
50
51 pub(crate) queue_datas: Vec<QueueData>,
52
53 pub(crate) broker_datas: Vec<BrokerData>,
54
55 pub(crate) filter_server_table: HashMap<String, Vec<String>>,
57}
58
59#[derive(Debug)]
60pub(crate) struct SendMessageRequestHeader {
61 pub(crate) producer_group: String,
62 pub(crate) topic: String,
63 pub(crate) default_topic: String,
64 pub(crate) default_topic_queue_nums: i32,
65 pub(crate) queue_id: i32,
66 pub(crate) sys_flag: i32,
67 pub(crate) born_timestamp: i64,
68 pub(crate) flag: i32,
69 pub(crate) properties: Option<String>,
70 pub(crate) reconsume_times: Option<i32>,
71 pub(crate) unit_mode: Option<bool>,
72 pub(crate) batch: Option<bool>,
73 pub(crate) max_reconsume_times: Option<i32>,
74}
75
76impl From<SendMessageRequestHeader> for HashMap<String, String> {
77 fn from(header: SendMessageRequestHeader) -> Self {
78 let mut map = HashMap::new();
79 map.insert("producerGroup".to_owned(), header.producer_group);
80 map.insert("topic".to_owned(), header.topic);
81 map.insert("defaultTopic".to_owned(), header.default_topic);
82 map.insert(
83 "defaultTopicQueueNums".to_owned(),
84 format!("{}", header.default_topic_queue_nums),
85 );
86 map.insert("queueId".to_owned(), format!("{}", header.queue_id));
87 map.insert("sysFlag".to_owned(), format!("{}", header.sys_flag));
88 map.insert(
89 "bornTimestamp".to_owned(),
90 format!("{}", header.born_timestamp),
91 );
92 map.insert("flag".to_owned(), format!("{}", header.flag));
93 if let Some(properties) = header.properties {
94 map.insert("properties".to_owned(), properties);
95 }
96
97 if let Some(reconsume_times) = header.reconsume_times {
98 map.insert("reconsumeTimes".to_owned(), format!("{}", reconsume_times));
99 }
100
101 if let Some(unit_mode) = header.unit_mode {
102 map.insert("unitMode".to_owned(), format!("{}", unit_mode));
103 }
104
105 if let Some(batch) = header.batch {
106 map.insert("batch".to_owned(), format!("{}", batch));
107 }
108
109 if let Some(max_reconsume_times) = header.max_reconsume_times {
110 map.insert(
111 "maxReconsumeTimes".to_owned(),
112 format!("{}", max_reconsume_times),
113 );
114 }
115
116 map
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use std::collections::HashMap;
124
125 #[test]
126 fn test_get_route_info_request_header() {
127 let header = GetRouteInfoRequestHeader::new("Test");
128 let map: HashMap<String, String> = header.into();
129 assert_eq!(map.len(), 1);
130 assert_eq!(Some(&String::from("Test")), map.get("topic"));
131 }
132
133 #[test]
134 fn test_queue_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
135 let json = r#"
136 {"brokerName":"b1","perm":1,"readQueueNums":8,"topicSynFlag":2,"writeQueueNums":6}
137 "#;
138 let queue_data: QueueData = serde_json::from_str(json)?;
139 assert_eq!(queue_data.broker_name, "b1");
140 assert_eq!(queue_data.perm, 1);
141 assert_eq!(queue_data.read_queue_nums, 8);
142 assert_eq!(queue_data.write_queue_nums, 6);
143 assert_eq!(queue_data.topic_syn_flag, 2);
144 Ok(())
145 }
146
147 #[test]
148 fn test_broker_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
149 let json = r#"
150 {"brokerAddrs":{"0":"localhost:8888","1":"localhost:1234"},"brokerName":"b1","cluster":"C1","enableActingMaster":false}
151 "#;
152 let broker_data: BrokerData = serde_json::from_str(json)?;
153 assert_eq!(broker_data.broker_name, "b1");
154 assert_eq!(broker_data.cluster, "C1");
155 assert_eq!(broker_data.broker_addrs.len(), 2);
156 Ok(())
157 }
158
159 #[test]
160 fn test_topic_route_data_deserialization() -> Result<(), Box<dyn std::error::Error>> {
161 let json = r#"
162 {"brokerDatas":[{"brokerAddrs":{"0":"localhost:8888","1":"localhost:1234"},"brokerName":"b1","cluster":"C1","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"b1","perm":1,"readQueueNums":8,"topicSynFlag":2,"writeQueueNums":6}]}
163 "#;
164 let topic_route_data: TopicRouteData = serde_json::from_str(json)?;
165 assert_eq!(topic_route_data.order_topic_conf, None);
166 assert_eq!(topic_route_data.broker_datas.len(), 1);
167 assert_eq!(topic_route_data.queue_datas.len(), 1);
168 if let Some(broker_data) = topic_route_data.broker_datas.first() {
169 assert_eq!(broker_data.broker_name, "b1");
170 }
171
172 if let Some(queue_data) = topic_route_data.queue_datas.first() {
173 assert_eq!(queue_data.broker_name, "b1");
174 }
175
176 Ok(())
177 }
178}