rocketmq_remoting/protocol/header/namesrv/
register_broker_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use cheetah_string::CheetahString;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23/// Represents the header for a broker registration request.
24#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
25pub struct RegisterBrokerRequestHeader {
26    /// The name of the broker.
27    #[serde(rename = "brokerName")]
28    #[required]
29    pub broker_name: CheetahString,
30
31    /// The address of the broker.
32    #[serde(rename = "brokerAddr")]
33    #[required]
34    pub broker_addr: CheetahString,
35
36    /// The name of the cluster to which the broker belongs.
37    #[serde(rename = "clusterName")]
38    #[required]
39    pub cluster_name: CheetahString,
40
41    /// The address of the highly available (HA) remoting_server associated with the broker.
42    #[serde(rename = "haServerAddr")]
43    #[required]
44    pub ha_server_addr: CheetahString,
45
46    /// The unique identifier for the broker.
47    #[serde(rename = "brokerId")]
48    #[required]
49    pub broker_id: u64,
50
51    /// The optional heartbeat timeout in milliseconds.
52    #[serde(rename = "heartbeatTimeoutMillis")]
53    pub heartbeat_timeout_millis: Option<i64>,
54
55    /// The optional flag indicating whether acting as the master is enabled.
56    #[serde(rename = "enableActingMaster")]
57    pub enable_acting_master: Option<bool>,
58
59    /// Indicates whether the data is compressed.
60    #[required]
61    pub compressed: bool,
62
63    /// The CRC32 checksum for the message body.
64    #[serde(rename = "bodyCrc32")]
65    #[required]
66    pub body_crc32: u32,
67}
68
69impl RegisterBrokerRequestHeader {
70    /// Creates a new instance of `RegisterBrokerRequestHeader`.
71    ///
72    /// # Arguments
73    ///
74    /// * `broker_name` - The name of the broker.
75    /// * `broker_addr` - The address of the broker.
76    /// * `cluster_name` - The name of the cluster.
77    /// * `ha_server_addr` - The address of the HA remoting_server.
78    /// * `broker_id` - The unique identifier for the broker.
79    /// * `heartbeat_timeout_millis` - The optional heartbeat timeout in milliseconds.
80    /// * `enable_acting_master` - The optional flag indicating whether acting as the master is
81    ///   enabled.
82    /// * `compressed` - Indicates whether the data is compressed.
83    /// * `body_crc32` - The CRC32 checksum for the message body.
84    ///
85    /// # Returns
86    ///
87    /// A new `RegisterBrokerRequestHeader` instance.
88    pub fn new(
89        broker_name: CheetahString,
90        broker_addr: CheetahString,
91        cluster_name: CheetahString,
92        ha_server_addr: CheetahString,
93        broker_id: u64,
94        heartbeat_timeout_millis: Option<i64>,
95        enable_acting_master: Option<bool>,
96        compressed: bool,
97        body_crc32: u32,
98    ) -> Self {
99        RegisterBrokerRequestHeader {
100            broker_name,
101            broker_addr,
102            cluster_name,
103            ha_server_addr,
104            broker_id,
105            heartbeat_timeout_millis,
106            enable_acting_master,
107            compressed,
108            body_crc32,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodecV2)]
114#[serde(rename_all = "camelCase")]
115pub struct RegisterBrokerResponseHeader {
116    pub ha_server_addr: Option<CheetahString>,
117    pub master_addr: Option<CheetahString>,
118}
119
120impl RegisterBrokerResponseHeader {
121    pub fn new(ha_server_addr: Option<CheetahString>, master_addr: Option<CheetahString>) -> Self {
122        RegisterBrokerResponseHeader {
123            ha_server_addr,
124            master_addr,
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use cheetah_string::CheetahString;
132
133    use super::*;
134
135    #[test]
136    fn register_broker_request_header_new() {
137        let header = RegisterBrokerRequestHeader::new(
138            CheetahString::from("broker1"),
139            CheetahString::from("127.0.0.1"),
140            CheetahString::from("cluster1"),
141            CheetahString::from("127.0.0.2"),
142            1,
143            Some(3000),
144            Some(true),
145            true,
146            12345,
147        );
148        assert_eq!(header.broker_name, CheetahString::from("broker1"));
149        assert_eq!(header.broker_addr, CheetahString::from("127.0.0.1"));
150        assert_eq!(header.cluster_name, CheetahString::from("cluster1"));
151        assert_eq!(header.ha_server_addr, CheetahString::from("127.0.0.2"));
152        assert_eq!(header.broker_id, 1);
153        assert_eq!(header.heartbeat_timeout_millis, Some(3000));
154        assert_eq!(header.enable_acting_master, Some(true));
155        assert!(header.compressed);
156        assert_eq!(header.body_crc32, 12345);
157    }
158
159    #[test]
160    fn register_broker_request_header_serialization() {
161        let header = RegisterBrokerRequestHeader::new(
162            CheetahString::from("broker1"),
163            CheetahString::from("127.0.0.1"),
164            CheetahString::from("cluster1"),
165            CheetahString::from("127.0.0.2"),
166            1,
167            Some(3000),
168            Some(true),
169            true,
170            12345,
171        );
172        let serialized = serde_json::to_string(&header).unwrap();
173        assert_eq!(
174            serialized,
175            r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#
176        );
177    }
178
179    #[test]
180    fn register_broker_request_header_deserialization() {
181        let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"heartbeatTimeoutMillis":3000,"enableActingMaster":true,"compressed":true,"bodyCrc32":12345}"#;
182        let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
183        assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
184        assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
185        assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
186        assert_eq!(
187            deserialized.ha_server_addr,
188            CheetahString::from("127.0.0.2")
189        );
190        assert_eq!(deserialized.broker_id, 1);
191        assert_eq!(deserialized.heartbeat_timeout_millis, Some(3000));
192        assert_eq!(deserialized.enable_acting_master, Some(true));
193        assert!(deserialized.compressed);
194        assert_eq!(deserialized.body_crc32, 12345);
195    }
196
197    #[test]
198    fn register_broker_request_header_deserialization_missing_fields() {
199        let json = r#"{"brokerName":"broker1","brokerAddr":"127.0.0.1","clusterName":"cluster1","haServerAddr":"127.0.0.2","brokerId":1,"compressed":true,"bodyCrc32":12345}"#;
200        let deserialized: RegisterBrokerRequestHeader = serde_json::from_str(json).unwrap();
201        assert_eq!(deserialized.broker_name, CheetahString::from("broker1"));
202        assert_eq!(deserialized.broker_addr, CheetahString::from("127.0.0.1"));
203        assert_eq!(deserialized.cluster_name, CheetahString::from("cluster1"));
204        assert_eq!(
205            deserialized.ha_server_addr,
206            CheetahString::from("127.0.0.2")
207        );
208        assert_eq!(deserialized.broker_id, 1);
209        assert_eq!(deserialized.heartbeat_timeout_millis, None);
210        assert_eq!(deserialized.enable_acting_master, None);
211        assert!(deserialized.compressed);
212        assert_eq!(deserialized.body_crc32, 12345);
213    }
214
215    #[test]
216    fn register_broker_response_header_new() {
217        let header = RegisterBrokerResponseHeader::new(
218            Some(CheetahString::from("127.0.0.2")),
219            Some(CheetahString::from("127.0.0.3")),
220        );
221        assert_eq!(
222            header.ha_server_addr,
223            Some(CheetahString::from("127.0.0.2"))
224        );
225        assert_eq!(header.master_addr, Some(CheetahString::from("127.0.0.3")));
226    }
227
228    #[test]
229    fn register_broker_response_header_serialization() {
230        let header = RegisterBrokerResponseHeader::new(
231            Some(CheetahString::from("127.0.0.2")),
232            Some(CheetahString::from("127.0.0.3")),
233        );
234        let serialized = serde_json::to_string(&header).unwrap();
235        assert_eq!(
236            serialized,
237            r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#
238        );
239    }
240
241    #[test]
242    fn register_broker_response_header_deserialization() {
243        let json = r#"{"haServerAddr":"127.0.0.2","masterAddr":"127.0.0.3"}"#;
244        let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
245        assert_eq!(
246            deserialized.ha_server_addr,
247            Some(CheetahString::from("127.0.0.2"))
248        );
249        assert_eq!(
250            deserialized.master_addr,
251            Some(CheetahString::from("127.0.0.3"))
252        );
253    }
254
255    #[test]
256    fn register_broker_response_header_deserialization_missing_fields() {
257        let json = r#"{"haServerAddr":"127.0.0.2"}"#;
258        let deserialized: RegisterBrokerResponseHeader = serde_json::from_str(json).unwrap();
259        assert_eq!(
260            deserialized.ha_server_addr,
261            Some(CheetahString::from("127.0.0.2"))
262        );
263        assert_eq!(deserialized.master_addr, None);
264    }
265}