rocketmq_remoting/protocol/route/
topic_route_data.rs1use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::route::route_data_view::BrokerData;
24use crate::protocol::route::route_data_view::QueueData;
25use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
26
27#[derive(Debug, Serialize, Deserialize, Clone, Default, Eq, PartialEq)]
28pub struct TopicRouteData {
29 #[serde(rename = "orderTopicConf")]
30 pub order_topic_conf: Option<CheetahString>,
31 #[serde(rename = "queueDatas")]
32 pub queue_datas: Vec<QueueData>,
33 #[serde(rename = "brokerDatas")]
34 pub broker_datas: Vec<BrokerData>,
35 #[serde(rename = "filterServerTable")]
36 pub filter_server_table: HashMap<CheetahString, Vec<CheetahString>>,
37 #[serde(rename = "topicQueueMappingInfo")]
38 pub topic_queue_mapping_by_broker: Option<HashMap<CheetahString, TopicQueueMappingInfo>>,
39}
40
41impl TopicRouteData {
42 pub fn topic_route_data_changed(&self, old_data: Option<&TopicRouteData>) -> bool {
43 if old_data.is_none() {
44 return true;
45 }
46 let mut now = TopicRouteData::from_existing(self);
47 let mut old = TopicRouteData::from_existing(old_data.unwrap());
48 now.queue_datas.sort();
49 now.broker_datas.sort();
50 old.queue_datas.sort();
51 old.broker_datas.sort();
52 now != old
53 }
54
55 pub fn new() -> Self {
56 TopicRouteData {
57 order_topic_conf: None,
58 queue_datas: Vec::new(),
59 broker_datas: Vec::new(),
60 filter_server_table: HashMap::new(),
61 topic_queue_mapping_by_broker: None,
62 }
63 }
64
65 pub fn from_existing(topic_route_data: &TopicRouteData) -> Self {
66 TopicRouteData {
67 order_topic_conf: topic_route_data.order_topic_conf.clone(),
68 queue_datas: topic_route_data.queue_datas.clone(),
69 broker_datas: topic_route_data.broker_datas.clone(),
70 filter_server_table: topic_route_data.filter_server_table.clone(),
71 topic_queue_mapping_by_broker: topic_route_data.topic_queue_mapping_by_broker.clone(),
72 }
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use serde_json;
79
80 use super::*;
81
82 #[test]
83 fn topic_route_data_default_values() {
84 let topic_route_data = TopicRouteData::default();
85 assert!(topic_route_data.order_topic_conf.is_none());
86 assert!(topic_route_data.queue_datas.is_empty());
87 assert!(topic_route_data.broker_datas.is_empty());
88 assert!(topic_route_data.filter_server_table.is_empty());
89 assert!(topic_route_data.topic_queue_mapping_by_broker.is_none());
90 }
91
92 #[test]
93 fn topic_route_data_with_values() {
94 let mut filter_server_table = HashMap::new();
95 filter_server_table.insert(
96 CheetahString::from("key"),
97 vec![CheetahString::from("value")],
98 );
99 let mut topic_queue_mapping_by_broker = HashMap::new();
100 topic_queue_mapping_by_broker.insert(
101 CheetahString::from("broker"),
102 TopicQueueMappingInfo::default(),
103 );
104 let topic_route_data = TopicRouteData {
105 order_topic_conf: Some(CheetahString::from("conf")),
106 queue_datas: vec![QueueData::default()],
107 broker_datas: vec![BrokerData::default()],
108 filter_server_table,
109 topic_queue_mapping_by_broker: Some(topic_queue_mapping_by_broker),
110 };
111 assert_eq!(
112 topic_route_data.order_topic_conf,
113 Some(CheetahString::from("conf"))
114 );
115 assert_eq!(topic_route_data.queue_datas.len(), 1);
116 assert_eq!(topic_route_data.broker_datas.len(), 1);
117 assert_eq!(topic_route_data.filter_server_table.len(), 1);
118 assert!(topic_route_data.topic_queue_mapping_by_broker.is_some());
119 }
120
121 #[test]
122 fn serialize_topic_route_data() {
123 let mut filter_server_table = HashMap::new();
124 filter_server_table.insert(
125 CheetahString::from("key"),
126 vec![CheetahString::from("value")],
127 );
128 let mut topic_queue_mapping_by_broker = HashMap::new();
129 topic_queue_mapping_by_broker.insert(
130 CheetahString::from("broker"),
131 TopicQueueMappingInfo::default(),
132 );
133 let topic_route_data = TopicRouteData {
134 order_topic_conf: Some(CheetahString::from("conf")),
135 queue_datas: vec![QueueData::default()],
136 broker_datas: vec![BrokerData::default()],
137 filter_server_table,
138 topic_queue_mapping_by_broker: Some(topic_queue_mapping_by_broker),
139 };
140 let serialized = serde_json::to_string(&topic_route_data).unwrap();
141 assert!(serialized.contains("\"orderTopicConf\":\"conf\""));
142 assert!(serialized.contains("\"queueDatas\":["));
143 assert!(serialized.contains("\"brokerDatas\":["));
144 assert!(serialized.contains("\"filterServerTable\":{\"key\":[\"value\"]}"));
145 assert!(serialized.contains("\"topicQueueMappingInfo\":{\"broker\":{"));
146 }
147
148 #[test]
180 fn topic_route_data_changed_with_none_old_data() {
181 let topic_route_data = TopicRouteData::default();
182 assert!(topic_route_data.topic_route_data_changed(None));
183 }
184
185 #[test]
186 fn topic_route_data_changed_with_different_data() {
187 let topic_route_data = TopicRouteData::default();
188 let old_data = TopicRouteData {
189 order_topic_conf: Some(CheetahString::from("conf")),
190 ..Default::default()
191 };
192 assert!(topic_route_data.topic_route_data_changed(Some(&old_data)));
193 }
194
195 #[test]
196 fn topic_route_data_changed_with_same_data() {
197 let topic_route_data = TopicRouteData::default();
198 let old_data = TopicRouteData::default();
199 assert!(!topic_route_data.topic_route_data_changed(Some(&old_data)));
200 }
201}