Skip to main content

rocketmq_remoting/protocol/header/namesrv/
register_broker_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20/// Represents the header for a broker registration request.
21#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
22pub struct RegisterBrokerRequestHeader {
23    /// The name of the broker.
24    #[serde(rename = "brokerName")]
25    #[required]
26    pub broker_name: CheetahString,
27
28    /// The address of the broker.
29    #[serde(rename = "brokerAddr")]
30    #[required]
31    pub broker_addr: CheetahString,
32
33    /// The name of the cluster to which the broker belongs.
34    #[serde(rename = "clusterName")]
35    #[required]
36    pub cluster_name: CheetahString,
37
38    /// The address of the highly available (HA) remoting_server associated with the broker.
39    #[serde(rename = "haServerAddr")]
40    #[required]
41    pub ha_server_addr: CheetahString,
42
43    /// The unique identifier for the broker.
44    #[serde(rename = "brokerId")]
45    #[required]
46    pub broker_id: u64,
47
48    /// The optional heartbeat timeout in milliseconds.
49    #[serde(rename = "heartbeatTimeoutMillis")]
50    pub heartbeat_timeout_millis: Option<i64>,
51
52    /// The optional flag indicating whether acting as the master is enabled.
53    #[serde(rename = "enableActingMaster")]
54    pub enable_acting_master: Option<bool>,
55
56    /// Indicates whether the data is compressed.
57    #[required]
58    pub compressed: bool,
59
60    /// The CRC32 checksum for the message body.
61    #[serde(rename = "bodyCrc32")]
62    #[required]
63    pub body_crc32: u32,
64}
65
66impl RegisterBrokerRequestHeader {
67    /// Creates a new instance of `RegisterBrokerRequestHeader`.
68    ///
69    /// # Arguments
70    ///
71    /// * `broker_name` - The name of the broker.
72    /// * `broker_addr` - The address of the broker.
73    /// * `cluster_name` - The name of the cluster.
74    /// * `ha_server_addr` - The address of the HA remoting_server.
75    /// * `broker_id` - The unique identifier for the broker.
76    /// * `heartbeat_timeout_millis` - The optional heartbeat timeout in milliseconds.
77    /// * `enable_acting_master` - The optional flag indicating whether acting as the master is
78    ///   enabled.
79    /// * `compressed` - Indicates whether the data is compressed.
80    /// * `body_crc32` - The CRC32 checksum for the message body.
81    ///
82    /// # Returns
83    ///
84    /// A new `RegisterBrokerRequestHeader` instance.
85    pub fn new(
86        broker_name: CheetahString,
87        broker_addr: CheetahString,
88        cluster_name: CheetahString,
89        ha_server_addr: CheetahString,
90        broker_id: u64,
91        heartbeat_timeout_millis: Option<i64>,
92        enable_acting_master: Option<bool>,
93        compressed: bool,
94        body_crc32: u32,
95    ) -> Self {
96        RegisterBrokerRequestHeader {
97            broker_name,
98            broker_addr,
99            cluster_name,
100            ha_server_addr,
101            broker_id,
102            heartbeat_timeout_millis,
103            enable_acting_master,
104            compressed,
105            body_crc32,
106        }
107    }
108}
109
110#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
111#[serde(rename_all = "camelCase")]
112pub struct RegisterBrokerResponseHeader {
113    pub ha_server_addr: Option<CheetahString>,
114    pub master_addr: Option<CheetahString>,
115}
116
117impl RegisterBrokerResponseHeader {
118    pub fn new(ha_server_addr: Option<CheetahString>, master_addr: Option<CheetahString>) -> Self {
119        RegisterBrokerResponseHeader {
120            ha_server_addr,
121            master_addr,
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use cheetah_string::CheetahString;
129
130    use super::*;
131
132    #[test]
133    fn register_broker_request_header_new() {
134        let header = RegisterBrokerRequestHeader::new(
135            CheetahString::from("broker1"),
136            CheetahString::from("127.0.0.1"),
137            CheetahString::from("cluster1"),
138            CheetahString::from("127.0.0.2"),
139            1,
140            Some(3000),
141            Some(true),
142            true,
143            12345,
144        );
145        assert_eq!(header.broker_name, CheetahString::from("broker1"));
146        assert_eq!(header.broker_addr, CheetahString::from("127.0.0.1"));
147        assert_eq!(header.cluster_name, CheetahString::from("cluster1"));
148        assert_eq!(header.ha_server_addr, CheetahString::from("127.0.0.2"));
149        assert_eq!(header.broker_id, 1);
150        assert_eq!(header.heartbeat_timeout_millis, Some(3000));
151        assert_eq!(header.enable_acting_master, Some(true));
152        assert!(header.compressed);
153        assert_eq!(header.body_crc32, 12345);
154    }
155
156    #[test]
157    fn register_broker_request_header_serialization() {
158        let header = RegisterBrokerRequestHeader::new(
159            CheetahString::from("broker1"),
160            CheetahString::from("127.0.0.1"),
161            CheetahString::from("cluster1"),
162            CheetahString::from("127.0.0.2"),
163            1,
164            Some(3000),
165            Some(true),
166            true,
167            12345,
168        );
169        let serialized = serde_json::to_string(&header).unwrap();
170        assert_eq!(
171            serialized,
172            r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#
173        );
174    }
175
176    #[test]
177    fn register_broker_request_header_deserialization() {
178        let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#;
179        let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
180        assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
181        assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
182        assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
183        assert_eq!(deserialized.ha_server_addr, CheetahString::from("127.0.0.2"));
184        assert_eq!(deserialized.broker_id, 1);
185        assert_eq!(deserialized.heartbeat_timeout_millis, Some(3000));
186        assert_eq!(deserialized.enable_acting_master, Some(true));
187        assert!(deserialized.compressed);
188        assert_eq!(deserialized.body_crc32, 12345);
189    }
190
191    #[test]
192    fn register_broker_request_header_deserialization_missing_fields() {
193        let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"compressed":true,"bodyCrc32":12345}"#;
194        let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
195        assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
196        assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
197        assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
198        assert_eq!(deserialized.ha_server_addr, CheetahString::from("127.0.0.2"));
199        assert_eq!(deserialized.broker_id, 1);
200        assert_eq!(deserialized.heartbeat_timeout_millis, None);
201        assert_eq!(deserialized.enable_acting_master, None);
202        assert!(deserialized.compressed);
203        assert_eq!(deserialized.body_crc32, 12345);
204    }
205
206    #[test]
207    fn register_broker_response_header_new() {
208        let header = RegisterBrokerResponseHeader::new(
209            Some(CheetahString::from("127.0.0.2")),
210            Some(CheetahString::from("127.0.0.3")),
211        );
212        assert_eq!(header.ha_server_addr, Some(CheetahString::from("127.0.0.2")));
213        assert_eq!(header.master_addr, Some(CheetahString::from("127.0.0.3")));
214    }
215
216    #[test]
217    fn register_broker_response_header_serialization() {
218        let header = RegisterBrokerResponseHeader::new(
219            Some(CheetahString::from("127.0.0.2")),
220            Some(CheetahString::from("127.0.0.3")),
221        );
222        let serialized = serde_json::to_string(&header).unwrap();
223        assert_eq!(serialized, r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#);
224    }
225
226    #[test]
227    fn register_broker_response_header_deserialization() {
228        let json = r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#;
229        let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
230        assert_eq!(deserialized.ha_server_addr, Some(CheetahString::from("127.0.0.2")));
231        assert_eq!(deserialized.master_addr, Some(CheetahString::from("127.0.0.3")));
232    }
233
234    #[test]
235    fn register_broker_response_header_deserialization_missing_fields() {
236        let json = r#"{"haServerAddr":"127.0.0.2"}"#;
237        let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
238        assert_eq!(deserialized.ha_server_addr, Some(CheetahString::from("127.0.0.2")));
239        assert_eq!(deserialized.master_addr, None);
240    }
241}