rocketmq_namesrv/processor/
default_request_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use core::str;
19use std::collections::HashMap;
20use std::net::SocketAddr;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::mix_all;
24use rocketmq_common::common::mix_all::string_to_properties;
25use rocketmq_common::common::mq_version::RocketMqVersion;
26use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
27use rocketmq_common::CRC32Utils;
28use rocketmq_remoting::code::request_code::RequestCode;
29use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
30use rocketmq_remoting::code::response_code::ResponseCode;
31use rocketmq_remoting::net::channel::Channel;
32use rocketmq_remoting::protocol::body::broker_body::broker_member_group::GetBrokerMemberGroupResponseBody;
33use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody;
34use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
35use rocketmq_remoting::protocol::header::namesrv::broker_request::BrokerHeartbeatRequestHeader;
36use rocketmq_remoting::protocol::header::namesrv::broker_request::GetBrokerMemberGroupRequestHeader;
37use rocketmq_remoting::protocol::header::namesrv::broker_request::UnRegisterBrokerRequestHeader;
38use rocketmq_remoting::protocol::header::namesrv::kv_config_header::DeleteKVConfigRequestHeader;
39use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVConfigRequestHeader;
40use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVConfigResponseHeader;
41use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVListByNamespaceRequestHeader;
42use rocketmq_remoting::protocol::header::namesrv::kv_config_header::PutKVConfigRequestHeader;
43use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::AddWritePermOfBrokerRequestHeader;
44use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::AddWritePermOfBrokerResponseHeader;
45use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::WipeWritePermOfBrokerRequestHeader;
46use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::WipeWritePermOfBrokerResponseHeader;
47use rocketmq_remoting::protocol::header::namesrv::query_data_version_header::QueryDataVersionRequestHeader;
48use rocketmq_remoting::protocol::header::namesrv::query_data_version_header::QueryDataVersionResponseHeader;
49use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
50use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
51use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::DeleteTopicFromNamesrvRequestHeader;
52use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::GetTopicsByClusterRequestHeader;
53use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
54use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
55use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
56use rocketmq_remoting::protocol::DataVersion;
57use rocketmq_remoting::protocol::RemotingDeserializable;
58use rocketmq_remoting::protocol::RemotingSerializable;
59use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
60use rocketmq_rust::ArcMut;
61use tracing::warn;
62
63use crate::bootstrap::NameServerRuntimeInner;
64use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
65
66pub struct DefaultRequestProcessor {
67    name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
68}
69
70impl DefaultRequestProcessor {
71    pub fn process_request(
72        &mut self,
73        channel: Channel,
74        _ctx: ConnectionHandlerContext,
75        request_code: RequestCode,
76        request: RemotingCommand,
77    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
78        let response = match request_code {
79            RequestCode::PutKvConfig => self.put_kv_config(request),
80            RequestCode::GetKvConfig => self.get_kv_config(request),
81            RequestCode::DeleteKvConfig => self.delete_kv_config(request),
82            RequestCode::QueryDataVersion => self.query_broker_topic_config(request),
83            //handle register broker
84            RequestCode::RegisterBroker => {
85                self.process_register_broker(channel.remote_address(), request)
86            }
87            RequestCode::UnregisterBroker => self.process_unregister_broker(request),
88            RequestCode::BrokerHeartbeat => self.process_broker_heartbeat(request),
89            RequestCode::GetBrokerMemberGroup => self.get_broker_member_group(request),
90            //handle get broker cluster info
91            RequestCode::GetBrokerClusterInfo => self.get_broker_cluster_info(request),
92            RequestCode::WipeWritePermOfBroker => self.wipe_write_perm_of_broker(request),
93            RequestCode::AddWritePermOfBroker => self.add_write_perm_of_broker(request),
94            RequestCode::GetAllTopicListFromNameserver => {
95                self.get_all_topic_list_from_nameserver(request)
96            }
97            RequestCode::DeleteTopicInNamesrv => self.delete_topic_in_name_srv(request),
98            RequestCode::RegisterTopicInNamesrv => self.register_topic_to_name_srv(request),
99            RequestCode::GetKvlistByNamespace => self.get_kv_list_by_namespace(request),
100            RequestCode::GetTopicsByCluster => self.get_topics_by_cluster(request),
101            RequestCode::GetSystemTopicListFromNs => self.get_system_topic_list_from_ns(request),
102            RequestCode::GetUnitTopicList => self.get_unit_topic_list(request),
103            RequestCode::GetHasUnitSubTopicList => self.get_has_unit_sub_topic_list(request),
104            RequestCode::GetHasUnitSubUnunitTopicList => {
105                self.get_has_unit_sub_un_unit_topic_list(request)
106            }
107            RequestCode::UpdateNamesrvConfig => self.update_config(request),
108            RequestCode::GetNamesrvConfig => self.get_config(request),
109            _ => Ok(RemotingCommand::create_response_command_with_code(
110                RemotingSysResponseCode::SystemError,
111            )),
112        }?;
113        Ok(Some(response))
114    }
115}
116
117///implementation put KV config
118impl DefaultRequestProcessor {
119    fn put_kv_config(
120        &mut self,
121        request: RemotingCommand,
122    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
123        let request_header = request.decode_command_custom_header::<PutKVConfigRequestHeader>()?;
124        //check namespace and key, need?
125        if request_header.namespace.is_empty() || request_header.key.is_empty() {
126            return Ok(RemotingCommand::create_response_command_with_code(
127                RemotingSysResponseCode::SystemError,
128            )
129            .set_remark(CheetahString::from_static_str("namespace or key is empty")));
130        }
131
132        self.name_server_runtime_inner
133            .kvconfig_manager_mut()
134            .put_kv_config(
135                request_header.namespace,
136                request_header.key,
137                request_header.value,
138            );
139        Ok(RemotingCommand::create_response_command())
140    }
141
142    fn get_kv_config(
143        &self,
144        request: RemotingCommand,
145    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
146        let request_header = request.decode_command_custom_header::<GetKVConfigRequestHeader>()?;
147
148        let value = self
149            .name_server_runtime_inner
150            .kvconfig_manager()
151            .get_kvconfig(&request_header.namespace, &request_header.key);
152
153        if value.is_some() {
154            return Ok(RemotingCommand::create_response_command()
155                .set_command_custom_header(GetKVConfigResponseHeader::new(value)));
156        }
157        Ok(
158            RemotingCommand::create_response_command_with_code(ResponseCode::QueryNotFound)
159                .set_remark(format!(
160                    "No config item, Namespace: {} Key: {}",
161                    request_header.namespace, request_header.key
162                )),
163        )
164    }
165
166    fn delete_kv_config(
167        &mut self,
168        request: RemotingCommand,
169    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
170        let request_header =
171            request.decode_command_custom_header::<DeleteKVConfigRequestHeader>()?;
172
173        self.name_server_runtime_inner
174            .kvconfig_manager_mut()
175            .delete_kv_config(&request_header.namespace, &request_header.key);
176        Ok(RemotingCommand::create_response_command())
177    }
178
179    fn query_broker_topic_config(
180        &mut self,
181        request: RemotingCommand,
182    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
183        let request_header =
184            request.decode_command_custom_header::<QueryDataVersionRequestHeader>()?;
185        let data_version = DataVersion::decode(request.get_body().expect("body is empty"))
186            .expect("decode DataVersion failed");
187        let changed = self
188            .name_server_runtime_inner
189            .route_info_manager()
190            .is_broker_topic_config_changed(
191                &request_header.cluster_name,
192                &request_header.broker_addr,
193                &data_version,
194            );
195
196        self.name_server_runtime_inner
197            .route_info_manager_mut()
198            .update_broker_info_update_timestamp(
199                request_header.cluster_name.clone(),
200                request_header.broker_addr.clone(),
201            );
202        let mut command = RemotingCommand::create_response_command()
203            .set_command_custom_header(QueryDataVersionResponseHeader::new(changed));
204        if let Some(value) = self
205            .name_server_runtime_inner
206            .route_info_manager()
207            .query_broker_topic_config(request_header.cluster_name, request_header.broker_addr)
208        {
209            command = command.set_body(value.encode().expect("encode DataVersion failed"));
210        }
211        Ok(command)
212    }
213}
214
215#[allow(clippy::new_without_default)]
216impl DefaultRequestProcessor {
217    pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
218        Self {
219            name_server_runtime_inner,
220        }
221    }
222}
223impl DefaultRequestProcessor {
224    fn process_register_broker(
225        &mut self,
226        remote_addr: SocketAddr,
227        request: RemotingCommand,
228    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
229        let request_header =
230            request.decode_command_custom_header::<RegisterBrokerRequestHeader>()?;
231        if !check_sum_crc32(&request, &request_header) {
232            return Ok(RemotingCommand::create_response_command_with_code(
233                RemotingSysResponseCode::SystemError,
234            )
235            .set_remark(CheetahString::from_static_str("crc32 not match")));
236        }
237
238        let mut response_command = RemotingCommand::create_response_command();
239        let broker_version =
240            RocketMqVersion::try_from(request.version() as u32).expect("invalid version");
241        let topic_config_wrapper;
242        let mut filter_server_list = Vec::new();
243        if broker_version >= RocketMqVersion::V3_0_11 {
244            let register_broker_body =
245                extract_register_broker_body_from_request(&request, &request_header);
246            topic_config_wrapper = register_broker_body.topic_config_serialize_wrapper;
247            filter_server_list = register_broker_body.filter_server_list;
248        } else {
249            topic_config_wrapper = extract_register_topic_config_from_request(&request);
250        }
251        let result = self
252            .name_server_runtime_inner
253            .route_info_manager()
254            .register_broker(
255                request_header.cluster_name,
256                request_header.broker_addr,
257                request_header.broker_name,
258                request_header.broker_id,
259                request_header.ha_server_addr,
260                request
261                    .ext_fields()
262                    .and_then(|map| map.get(mix_all::ZONE_NAME).cloned()),
263                request_header.heartbeat_timeout_millis,
264                request_header.enable_acting_master,
265                topic_config_wrapper,
266                filter_server_list,
267                remote_addr,
268            );
269        if result.is_none() {
270            return Ok(response_command
271                .set_code(RemotingSysResponseCode::SystemError)
272                .set_remark(CheetahString::from_static_str("register broker failed")));
273        }
274        if self
275            .name_server_runtime_inner
276            .name_server_config()
277            .return_order_topic_config_to_broker
278        {
279            if let Some(value) = self
280                .name_server_runtime_inner
281                .kvconfig_manager()
282                .get_kv_list_by_namespace(&CheetahString::from_static_str(
283                    NAMESPACE_ORDER_TOPIC_CONFIG,
284                ))
285            {
286                response_command = response_command.set_body(value);
287            }
288        }
289        let register_broker_result = result.unwrap();
290        Ok(response_command
291            .set_code(RemotingSysResponseCode::Success)
292            .set_command_custom_header(RegisterBrokerResponseHeader::new(
293                Some(register_broker_result.ha_server_addr),
294                Some(register_broker_result.master_addr),
295            )))
296    }
297
298    fn process_unregister_broker(
299        &mut self,
300        request: RemotingCommand,
301    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
302        let request_header =
303            request.decode_command_custom_header::<UnRegisterBrokerRequestHeader>()?;
304        /*self.name_server_runtime_inner
305        .route_info_manager_mut()
306        .un_register_broker(vec![request_header]);*/
307        if !self
308            .name_server_runtime_inner
309            .route_info_manager()
310            .submit_unregister_broker_request(request_header)
311        {
312            warn!("Couldn't submit the unregister broker request to handler");
313            return Ok(RemotingCommand::create_response_command_with_code(
314                ResponseCode::SystemError,
315            ));
316        }
317        Ok(RemotingCommand::create_response_command())
318    }
319}
320
321impl DefaultRequestProcessor {
322    fn process_broker_heartbeat(
323        &mut self,
324        request: RemotingCommand,
325    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
326        let request_header =
327            request.decode_command_custom_header::<BrokerHeartbeatRequestHeader>()?;
328        self.name_server_runtime_inner
329            .route_info_manager_mut()
330            .update_broker_info_update_timestamp(
331                request_header.cluster_name,
332                request_header.broker_addr,
333            );
334        Ok(RemotingCommand::create_response_command())
335    }
336
337    fn get_broker_member_group(
338        &mut self,
339        request: RemotingCommand,
340    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
341        let request_header =
342            request.decode_command_custom_header::<GetBrokerMemberGroupRequestHeader>()?;
343
344        let broker_member_group = self
345            .name_server_runtime_inner
346            .route_info_manager_mut()
347            .get_broker_member_group(&request_header.cluster_name, &request_header.broker_name);
348        let response_body = GetBrokerMemberGroupResponseBody {
349            broker_member_group,
350        };
351        let body = response_body.encode()?;
352        Ok(RemotingCommand::create_response_command().set_body(body))
353    }
354
355    fn get_broker_cluster_info(
356        &self,
357        _request: RemotingCommand,
358    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
359        let vec = self
360            .name_server_runtime_inner
361            .route_info_manager()
362            .get_all_cluster_info()
363            .encode()?;
364        Ok(
365            RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
366                .set_body(vec),
367        )
368    }
369
370    fn wipe_write_perm_of_broker(
371        &mut self,
372        request: RemotingCommand,
373    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
374        let request_header =
375            request.decode_command_custom_header::<WipeWritePermOfBrokerRequestHeader>()?;
376        let wipe_topic_cnt = self
377            .name_server_runtime_inner
378            .route_info_manager_mut()
379            .wipe_write_perm_of_broker_by_lock(&request_header.broker_name);
380        Ok(RemotingCommand::create_response_command()
381            .set_command_custom_header(WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt)))
382    }
383
384    fn add_write_perm_of_broker(
385        &mut self,
386        request: RemotingCommand,
387    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
388        let request_header =
389            request.decode_command_custom_header::<AddWritePermOfBrokerRequestHeader>()?;
390        let add_topic_cnt = self
391            .name_server_runtime_inner
392            .route_info_manager_mut()
393            .add_write_perm_of_broker_by_lock(&request_header.broker_name);
394        Ok(RemotingCommand::create_response_command()
395            .set_command_custom_header(AddWritePermOfBrokerResponseHeader::new(add_topic_cnt)))
396    }
397
398    fn get_all_topic_list_from_nameserver(
399        &self,
400        _request: RemotingCommand,
401    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
402        if self
403            .name_server_runtime_inner
404            .name_server_config()
405            .enable_all_topic_list
406        {
407            let topics = self
408                .name_server_runtime_inner
409                .route_info_manager()
410                .get_all_topic_list();
411            let body = topics.encode()?;
412            return Ok(RemotingCommand::create_response_command().set_body(body));
413        }
414        Ok(
415            RemotingCommand::create_response_command_with_code(
416                RemotingSysResponseCode::SystemError,
417            )
418            .set_remark(CheetahString::from_static_str("disable")),
419        )
420    }
421
422    fn delete_topic_in_name_srv(
423        &mut self,
424        request: RemotingCommand,
425    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
426        let request_header =
427            request.decode_command_custom_header::<DeleteTopicFromNamesrvRequestHeader>()?;
428        self.name_server_runtime_inner
429            .route_info_manager_mut()
430            .delete_topic(request_header.topic, request_header.cluster_name);
431        Ok(RemotingCommand::create_response_command())
432    }
433
434    fn register_topic_to_name_srv(
435        &mut self,
436        request: RemotingCommand,
437    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
438        let request_header =
439            request.decode_command_custom_header::<RegisterTopicRequestHeader>()?;
440        if let Some(ref body) = request.body() {
441            let topic_route_data = TopicRouteData::decode(body).unwrap_or_default();
442            if !topic_route_data.queue_datas.is_empty() {
443                self.name_server_runtime_inner
444                    .route_info_manager_mut()
445                    .register_topic(request_header.topic, topic_route_data.queue_datas)
446            }
447        }
448        Ok(RemotingCommand::create_response_command())
449    }
450
451    fn get_kv_list_by_namespace(
452        &self,
453        request: RemotingCommand,
454    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
455        let request_header =
456            request.decode_command_custom_header::<GetKVListByNamespaceRequestHeader>()?;
457        let value = self
458            .name_server_runtime_inner
459            .kvconfig_manager()
460            .get_kv_list_by_namespace(&request_header.namespace);
461        if let Some(value) = value {
462            return Ok(RemotingCommand::create_response_command().set_body(value));
463        }
464        Ok(
465            RemotingCommand::create_response_command_with_code(ResponseCode::QueryNotFound)
466                .set_remark(format!(
467                    "No config item, Namespace: {}",
468                    request_header.namespace.as_str()
469                )),
470        )
471    }
472
473    fn get_topics_by_cluster(
474        &self,
475        request: RemotingCommand,
476    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
477        if !self
478            .name_server_runtime_inner
479            .name_server_config()
480            .enable_topic_list
481        {
482            return Ok(RemotingCommand::create_response_command_with_code(
483                RemotingSysResponseCode::SystemError,
484            )
485            .set_remark(CheetahString::from_static_str("disable")));
486        }
487
488        let request_header =
489            request.decode_command_custom_header::<GetTopicsByClusterRequestHeader>()?;
490        let topics_by_cluster = self
491            .name_server_runtime_inner
492            .route_info_manager()
493            .get_topics_by_cluster(&request_header.cluster);
494        let body = topics_by_cluster.encode()?;
495        Ok(RemotingCommand::create_response_command().set_body(body))
496    }
497
498    fn get_system_topic_list_from_ns(
499        &self,
500        _request: RemotingCommand,
501    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
502        let topic_list = self
503            .name_server_runtime_inner
504            .route_info_manager()
505            .get_system_topic_list();
506        let body = topic_list.encode()?;
507        Ok(RemotingCommand::create_response_command().set_body(body))
508    }
509
510    fn get_unit_topic_list(
511        &self,
512        _request: RemotingCommand,
513    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
514        if self
515            .name_server_runtime_inner
516            .name_server_config()
517            .enable_topic_list
518        {
519            let topic_list = self
520                .name_server_runtime_inner
521                .route_info_manager()
522                .get_unit_topics();
523            let body = topic_list.encode()?;
524            return Ok(RemotingCommand::create_response_command().set_body(body));
525        }
526        Ok(
527            RemotingCommand::create_response_command_with_code(
528                RemotingSysResponseCode::SystemError,
529            )
530            .set_remark("disable"),
531        )
532    }
533
534    fn get_has_unit_sub_topic_list(
535        &self,
536        _request: RemotingCommand,
537    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
538        if self
539            .name_server_runtime_inner
540            .name_server_config()
541            .enable_topic_list
542        {
543            let topic_list = self
544                .name_server_runtime_inner
545                .route_info_manager()
546                .get_has_unit_sub_topic_list();
547            let body = topic_list.encode()?;
548            return Ok(RemotingCommand::create_response_command().set_body(body));
549        }
550        Ok(
551            RemotingCommand::create_response_command_with_code(
552                RemotingSysResponseCode::SystemError,
553            )
554            .set_remark("disable"),
555        )
556    }
557
558    fn get_has_unit_sub_un_unit_topic_list(
559        &self,
560        _request: RemotingCommand,
561    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
562        if self
563            .name_server_runtime_inner
564            .name_server_config()
565            .enable_topic_list
566        {
567            let topic_list = self
568                .name_server_runtime_inner
569                .route_info_manager()
570                .get_has_unit_sub_un_unit_topic_list();
571            return Ok(RemotingCommand::create_response_command()
572                .set_body(topic_list.encode().expect("encode TopicList failed")));
573        }
574        Ok(
575            RemotingCommand::create_response_command_with_code(
576                RemotingSysResponseCode::SystemError,
577            )
578            .set_remark("disable"),
579        )
580    }
581
582    fn update_config(
583        &mut self,
584        request: RemotingCommand,
585    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
586        if let Some(body) = request.body() {
587            let body_str = match str::from_utf8(body) {
588                Ok(s) => s,
589                Err(e) => {
590                    return Ok(RemotingCommand::create_response_command_with_code(
591                        RemotingSysResponseCode::SystemError,
592                    )
593                    .set_remark(format!("UnsupportedEncodingException {e:?}")));
594                }
595            };
596
597            let properties = match string_to_properties(body_str) {
598                Some(props) => props,
599                None => {
600                    return Ok(RemotingCommand::create_response_command_with_code(
601                        RemotingSysResponseCode::SystemError,
602                    )
603                    .set_remark("string_to_properties error".to_string()));
604                }
605            };
606            if validate_blacklist_config_exist(
607                &properties,
608                &self
609                    .name_server_runtime_inner
610                    .name_server_config()
611                    .get_config_blacklist(),
612            ) {
613                return Ok(RemotingCommand::create_response_command_with_code(
614                    RemotingSysResponseCode::NoPermission,
615                )
616                .set_remark("Cannot update config in blacklist.".to_string()));
617            }
618
619            let result = self
620                .name_server_runtime_inner
621                .kvconfig_manager_mut()
622                .update_namesrv_config(properties);
623            if let Err(e) = result {
624                return Ok(RemotingCommand::create_response_command_with_code(
625                    RemotingSysResponseCode::SystemError,
626                )
627                .set_remark(format!("Update error {e:?}")));
628            }
629        }
630
631        Ok(
632            RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
633                .set_remark(CheetahString::empty()),
634        )
635    }
636
637    fn get_config(
638        &mut self,
639        _request: RemotingCommand,
640    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
641        let config = self.name_server_runtime_inner.name_server_config();
642        let result = match config.get_all_configs_format_string() {
643            Ok(content) => {
644                let response = RemotingCommand::create_response_command_with_code_remark(
645                    RemotingSysResponseCode::Success,
646                    CheetahString::empty(),
647                );
648                response.set_body(content.into_bytes())
649            }
650            Err(e) => RemotingCommand::create_response_command_with_code_remark(
651                ResponseCode::SystemError,
652                format!("UnsupportedEncodingException {e}"),
653            ),
654        };
655        Ok(result)
656    }
657}
658
659fn extract_register_topic_config_from_request(
660    request: &RemotingCommand,
661) -> TopicConfigAndMappingSerializeWrapper {
662    if let Some(body_inner) = request.body() {
663        if body_inner.is_empty() {
664            return TopicConfigAndMappingSerializeWrapper::default();
665        }
666        return SerdeJsonUtils::decode::<TopicConfigAndMappingSerializeWrapper>(
667            body_inner.as_ref(),
668        )
669        .expect("decode TopicConfigAndMappingSerializeWrapper failed");
670    }
671    TopicConfigAndMappingSerializeWrapper::default()
672}
673
674fn extract_register_broker_body_from_request(
675    request: &RemotingCommand,
676    request_header: &RegisterBrokerRequestHeader,
677) -> RegisterBrokerBody {
678    if let Some(body_inner) = request.body() {
679        if body_inner.is_empty() {
680            return RegisterBrokerBody::default();
681        }
682        let version = request.rocketmq_version();
683        return RegisterBrokerBody::decode(body_inner, request_header.compressed, version);
684    }
685    RegisterBrokerBody::default()
686}
687
688fn check_sum_crc32(
689    request: &RemotingCommand,
690    request_header: &RegisterBrokerRequestHeader,
691) -> bool {
692    if request_header.body_crc32 == 0 {
693        return true;
694    }
695    if let Some(bytes) = request.get_body() {
696        let crc_32 = CRC32Utils::crc32(bytes.as_ref());
697        if crc_32 != request_header.body_crc32 {
698            warn!(
699                "receive registerBroker request,crc32 not match,origin:{}, cal:{}",
700                request_header.body_crc32, crc_32,
701            );
702            return false;
703        }
704    }
705    true
706}
707
708fn validate_blacklist_config_exist(
709    properties: &HashMap<CheetahString, CheetahString>,
710    config_blacklist: &[CheetahString],
711) -> bool {
712    for black_config in config_blacklist {
713        if properties.contains_key(black_config.as_str()) {
714            return true;
715        }
716    }
717    false
718}
719
720#[cfg(test)]
721mod tests {
722    use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
723
724    use super::*;
725
726    #[test]
727    fn extract_register_topic_config_from_request_with_body() {
728        let body = vec![/* some valid encoded data */];
729        let request = RemotingCommand::create_remoting_command(RequestCode::RegisterTopicInNamesrv)
730            .set_body(body);
731
732        let _result = extract_register_topic_config_from_request(&request);
733    }
734
735    #[test]
736    fn extract_register_topic_config_from_request_without_body() {
737        let request = RemotingCommand::new_request(0, vec![]);
738        let _result = extract_register_topic_config_from_request(&request);
739    }
740
741    #[test]
742    fn extract_register_broker_body_from_request_with_body() {
743        let body: Vec<u8> = vec![/* some valid encoded data */];
744        let request = RemotingCommand::new_request(0, body);
745        let request_header = RegisterBrokerRequestHeader::default();
746        let _result = extract_register_broker_body_from_request(&request, &request_header);
747    }
748
749    #[test]
750    fn extract_register_broker_body_from_request_without_body() {
751        let request = RemotingCommand::new_request(0, vec![]);
752        let request_header = RegisterBrokerRequestHeader::default();
753        let _result = extract_register_broker_body_from_request(&request, &request_header);
754    }
755
756    #[test]
757    fn check_sum_crc32_valid_crc() {
758        let body = vec![/* some valid data */];
759        let crc32 = CRC32Utils::crc32(&body);
760        let request = RemotingCommand::new_request(0, body);
761        let mut request_header = RegisterBrokerRequestHeader::default();
762        request_header.body_crc32 = crc32;
763        let result = check_sum_crc32(&request, &request_header);
764        assert!(result);
765    }
766
767    #[test]
768    fn check_sum_crc32_invalid_crc() {
769        let body = vec![/* some valid data */];
770        let request = RemotingCommand::new_request(0, body);
771        let mut request_header = RegisterBrokerRequestHeader::default();
772        request_header.body_crc32 = 12345; // some invalid crc32
773        let result = check_sum_crc32(&request, &request_header);
774        assert!(!result);
775    }
776
777    #[test]
778    fn check_sum_crc32_zero_crc() {
779        let body = vec![/* some valid data */];
780        let request = RemotingCommand::new_request(0, body);
781        let mut request_header = RegisterBrokerRequestHeader::default();
782        request_header.body_crc32 = 0;
783        let result = check_sum_crc32(&request, &request_header);
784        assert!(result);
785    }
786}