1use core::str;
19use std::collections::HashMap;
20use std::net::SocketAddr;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::mix_all;
24use rocketmq_common::common::mix_all::string_to_properties;
25use rocketmq_common::common::mq_version::RocketMqVersion;
26use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
27use rocketmq_common::CRC32Utils;
28use rocketmq_remoting::code::request_code::RequestCode;
29use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
30use rocketmq_remoting::code::response_code::ResponseCode;
31use rocketmq_remoting::net::channel::Channel;
32use rocketmq_remoting::protocol::body::broker_body::broker_member_group::GetBrokerMemberGroupResponseBody;
33use rocketmq_remoting::protocol::body::broker_body::register_broker_body::RegisterBrokerBody;
34use rocketmq_remoting::protocol::body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper;
35use rocketmq_remoting::protocol::header::namesrv::broker_request::BrokerHeartbeatRequestHeader;
36use rocketmq_remoting::protocol::header::namesrv::broker_request::GetBrokerMemberGroupRequestHeader;
37use rocketmq_remoting::protocol::header::namesrv::broker_request::UnRegisterBrokerRequestHeader;
38use rocketmq_remoting::protocol::header::namesrv::kv_config_header::DeleteKVConfigRequestHeader;
39use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVConfigRequestHeader;
40use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVConfigResponseHeader;
41use rocketmq_remoting::protocol::header::namesrv::kv_config_header::GetKVListByNamespaceRequestHeader;
42use rocketmq_remoting::protocol::header::namesrv::kv_config_header::PutKVConfigRequestHeader;
43use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::AddWritePermOfBrokerRequestHeader;
44use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::AddWritePermOfBrokerResponseHeader;
45use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::WipeWritePermOfBrokerRequestHeader;
46use rocketmq_remoting::protocol::header::namesrv::perm_broker_header::WipeWritePermOfBrokerResponseHeader;
47use rocketmq_remoting::protocol::header::namesrv::query_data_version_header::QueryDataVersionRequestHeader;
48use rocketmq_remoting::protocol::header::namesrv::query_data_version_header::QueryDataVersionResponseHeader;
49use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
50use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
51use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::DeleteTopicFromNamesrvRequestHeader;
52use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::GetTopicsByClusterRequestHeader;
53use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
54use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
55use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
56use rocketmq_remoting::protocol::DataVersion;
57use rocketmq_remoting::protocol::RemotingDeserializable;
58use rocketmq_remoting::protocol::RemotingSerializable;
59use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
60use rocketmq_rust::ArcMut;
61use tracing::warn;
62
63use crate::bootstrap::NameServerRuntimeInner;
64use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
65
66pub struct DefaultRequestProcessor {
67 name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
68}
69
70impl DefaultRequestProcessor {
71 pub fn process_request(
72 &mut self,
73 channel: Channel,
74 _ctx: ConnectionHandlerContext,
75 request_code: RequestCode,
76 request: RemotingCommand,
77 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
78 let response = match request_code {
79 RequestCode::PutKvConfig => self.put_kv_config(request),
80 RequestCode::GetKvConfig => self.get_kv_config(request),
81 RequestCode::DeleteKvConfig => self.delete_kv_config(request),
82 RequestCode::QueryDataVersion => self.query_broker_topic_config(request),
83 RequestCode::RegisterBroker => {
85 self.process_register_broker(channel.remote_address(), request)
86 }
87 RequestCode::UnregisterBroker => self.process_unregister_broker(request),
88 RequestCode::BrokerHeartbeat => self.process_broker_heartbeat(request),
89 RequestCode::GetBrokerMemberGroup => self.get_broker_member_group(request),
90 RequestCode::GetBrokerClusterInfo => self.get_broker_cluster_info(request),
92 RequestCode::WipeWritePermOfBroker => self.wipe_write_perm_of_broker(request),
93 RequestCode::AddWritePermOfBroker => self.add_write_perm_of_broker(request),
94 RequestCode::GetAllTopicListFromNameserver => {
95 self.get_all_topic_list_from_nameserver(request)
96 }
97 RequestCode::DeleteTopicInNamesrv => self.delete_topic_in_name_srv(request),
98 RequestCode::RegisterTopicInNamesrv => self.register_topic_to_name_srv(request),
99 RequestCode::GetKvlistByNamespace => self.get_kv_list_by_namespace(request),
100 RequestCode::GetTopicsByCluster => self.get_topics_by_cluster(request),
101 RequestCode::GetSystemTopicListFromNs => self.get_system_topic_list_from_ns(request),
102 RequestCode::GetUnitTopicList => self.get_unit_topic_list(request),
103 RequestCode::GetHasUnitSubTopicList => self.get_has_unit_sub_topic_list(request),
104 RequestCode::GetHasUnitSubUnunitTopicList => {
105 self.get_has_unit_sub_un_unit_topic_list(request)
106 }
107 RequestCode::UpdateNamesrvConfig => self.update_config(request),
108 RequestCode::GetNamesrvConfig => self.get_config(request),
109 _ => Ok(RemotingCommand::create_response_command_with_code(
110 RemotingSysResponseCode::SystemError,
111 )),
112 }?;
113 Ok(Some(response))
114 }
115}
116
117impl DefaultRequestProcessor {
119 fn put_kv_config(
120 &mut self,
121 request: RemotingCommand,
122 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
123 let request_header = request.decode_command_custom_header::<PutKVConfigRequestHeader>()?;
124 if request_header.namespace.is_empty() || request_header.key.is_empty() {
126 return Ok(RemotingCommand::create_response_command_with_code(
127 RemotingSysResponseCode::SystemError,
128 )
129 .set_remark(CheetahString::from_static_str("namespace or key is empty")));
130 }
131
132 self.name_server_runtime_inner
133 .kvconfig_manager_mut()
134 .put_kv_config(
135 request_header.namespace,
136 request_header.key,
137 request_header.value,
138 );
139 Ok(RemotingCommand::create_response_command())
140 }
141
142 fn get_kv_config(
143 &self,
144 request: RemotingCommand,
145 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
146 let request_header = request.decode_command_custom_header::<GetKVConfigRequestHeader>()?;
147
148 let value = self
149 .name_server_runtime_inner
150 .kvconfig_manager()
151 .get_kvconfig(&request_header.namespace, &request_header.key);
152
153 if value.is_some() {
154 return Ok(RemotingCommand::create_response_command()
155 .set_command_custom_header(GetKVConfigResponseHeader::new(value)));
156 }
157 Ok(
158 RemotingCommand::create_response_command_with_code(ResponseCode::QueryNotFound)
159 .set_remark(format!(
160 "No config item, Namespace: {} Key: {}",
161 request_header.namespace, request_header.key
162 )),
163 )
164 }
165
166 fn delete_kv_config(
167 &mut self,
168 request: RemotingCommand,
169 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
170 let request_header =
171 request.decode_command_custom_header::<DeleteKVConfigRequestHeader>()?;
172
173 self.name_server_runtime_inner
174 .kvconfig_manager_mut()
175 .delete_kv_config(&request_header.namespace, &request_header.key);
176 Ok(RemotingCommand::create_response_command())
177 }
178
179 fn query_broker_topic_config(
180 &mut self,
181 request: RemotingCommand,
182 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
183 let request_header =
184 request.decode_command_custom_header::<QueryDataVersionRequestHeader>()?;
185 let data_version = DataVersion::decode(request.get_body().expect("body is empty"))
186 .expect("decode DataVersion failed");
187 let changed = self
188 .name_server_runtime_inner
189 .route_info_manager()
190 .is_broker_topic_config_changed(
191 &request_header.cluster_name,
192 &request_header.broker_addr,
193 &data_version,
194 );
195
196 self.name_server_runtime_inner
197 .route_info_manager_mut()
198 .update_broker_info_update_timestamp(
199 request_header.cluster_name.clone(),
200 request_header.broker_addr.clone(),
201 );
202 let mut command = RemotingCommand::create_response_command()
203 .set_command_custom_header(QueryDataVersionResponseHeader::new(changed));
204 if let Some(value) = self
205 .name_server_runtime_inner
206 .route_info_manager()
207 .query_broker_topic_config(request_header.cluster_name, request_header.broker_addr)
208 {
209 command = command.set_body(value.encode().expect("encode DataVersion failed"));
210 }
211 Ok(command)
212 }
213}
214
215#[allow(clippy::new_without_default)]
216impl DefaultRequestProcessor {
217 pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
218 Self {
219 name_server_runtime_inner,
220 }
221 }
222}
223impl DefaultRequestProcessor {
224 fn process_register_broker(
225 &mut self,
226 remote_addr: SocketAddr,
227 request: RemotingCommand,
228 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
229 let request_header =
230 request.decode_command_custom_header::<RegisterBrokerRequestHeader>()?;
231 if !check_sum_crc32(&request, &request_header) {
232 return Ok(RemotingCommand::create_response_command_with_code(
233 RemotingSysResponseCode::SystemError,
234 )
235 .set_remark(CheetahString::from_static_str("crc32 not match")));
236 }
237
238 let mut response_command = RemotingCommand::create_response_command();
239 let broker_version =
240 RocketMqVersion::try_from(request.version() as u32).expect("invalid version");
241 let topic_config_wrapper;
242 let mut filter_server_list = Vec::new();
243 if broker_version >= RocketMqVersion::V3_0_11 {
244 let register_broker_body =
245 extract_register_broker_body_from_request(&request, &request_header);
246 topic_config_wrapper = register_broker_body.topic_config_serialize_wrapper;
247 filter_server_list = register_broker_body.filter_server_list;
248 } else {
249 topic_config_wrapper = extract_register_topic_config_from_request(&request);
250 }
251 let result = self
252 .name_server_runtime_inner
253 .route_info_manager()
254 .register_broker(
255 request_header.cluster_name,
256 request_header.broker_addr,
257 request_header.broker_name,
258 request_header.broker_id,
259 request_header.ha_server_addr,
260 request
261 .ext_fields()
262 .and_then(|map| map.get(mix_all::ZONE_NAME).cloned()),
263 request_header.heartbeat_timeout_millis,
264 request_header.enable_acting_master,
265 topic_config_wrapper,
266 filter_server_list,
267 remote_addr,
268 );
269 if result.is_none() {
270 return Ok(response_command
271 .set_code(RemotingSysResponseCode::SystemError)
272 .set_remark(CheetahString::from_static_str("register broker failed")));
273 }
274 if self
275 .name_server_runtime_inner
276 .name_server_config()
277 .return_order_topic_config_to_broker
278 {
279 if let Some(value) = self
280 .name_server_runtime_inner
281 .kvconfig_manager()
282 .get_kv_list_by_namespace(&CheetahString::from_static_str(
283 NAMESPACE_ORDER_TOPIC_CONFIG,
284 ))
285 {
286 response_command = response_command.set_body(value);
287 }
288 }
289 let register_broker_result = result.unwrap();
290 Ok(response_command
291 .set_code(RemotingSysResponseCode::Success)
292 .set_command_custom_header(RegisterBrokerResponseHeader::new(
293 Some(register_broker_result.ha_server_addr),
294 Some(register_broker_result.master_addr),
295 )))
296 }
297
298 fn process_unregister_broker(
299 &mut self,
300 request: RemotingCommand,
301 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
302 let request_header =
303 request.decode_command_custom_header::<UnRegisterBrokerRequestHeader>()?;
304 if !self
308 .name_server_runtime_inner
309 .route_info_manager()
310 .submit_unregister_broker_request(request_header)
311 {
312 warn!("Couldn't submit the unregister broker request to handler");
313 return Ok(RemotingCommand::create_response_command_with_code(
314 ResponseCode::SystemError,
315 ));
316 }
317 Ok(RemotingCommand::create_response_command())
318 }
319}
320
321impl DefaultRequestProcessor {
322 fn process_broker_heartbeat(
323 &mut self,
324 request: RemotingCommand,
325 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
326 let request_header =
327 request.decode_command_custom_header::<BrokerHeartbeatRequestHeader>()?;
328 self.name_server_runtime_inner
329 .route_info_manager_mut()
330 .update_broker_info_update_timestamp(
331 request_header.cluster_name,
332 request_header.broker_addr,
333 );
334 Ok(RemotingCommand::create_response_command())
335 }
336
337 fn get_broker_member_group(
338 &mut self,
339 request: RemotingCommand,
340 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
341 let request_header =
342 request.decode_command_custom_header::<GetBrokerMemberGroupRequestHeader>()?;
343
344 let broker_member_group = self
345 .name_server_runtime_inner
346 .route_info_manager_mut()
347 .get_broker_member_group(&request_header.cluster_name, &request_header.broker_name);
348 let response_body = GetBrokerMemberGroupResponseBody {
349 broker_member_group,
350 };
351 let body = response_body.encode()?;
352 Ok(RemotingCommand::create_response_command().set_body(body))
353 }
354
355 fn get_broker_cluster_info(
356 &self,
357 _request: RemotingCommand,
358 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
359 let vec = self
360 .name_server_runtime_inner
361 .route_info_manager()
362 .get_all_cluster_info()
363 .encode()?;
364 Ok(
365 RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
366 .set_body(vec),
367 )
368 }
369
370 fn wipe_write_perm_of_broker(
371 &mut self,
372 request: RemotingCommand,
373 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
374 let request_header =
375 request.decode_command_custom_header::<WipeWritePermOfBrokerRequestHeader>()?;
376 let wipe_topic_cnt = self
377 .name_server_runtime_inner
378 .route_info_manager_mut()
379 .wipe_write_perm_of_broker_by_lock(&request_header.broker_name);
380 Ok(RemotingCommand::create_response_command()
381 .set_command_custom_header(WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt)))
382 }
383
384 fn add_write_perm_of_broker(
385 &mut self,
386 request: RemotingCommand,
387 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
388 let request_header =
389 request.decode_command_custom_header::<AddWritePermOfBrokerRequestHeader>()?;
390 let add_topic_cnt = self
391 .name_server_runtime_inner
392 .route_info_manager_mut()
393 .add_write_perm_of_broker_by_lock(&request_header.broker_name);
394 Ok(RemotingCommand::create_response_command()
395 .set_command_custom_header(AddWritePermOfBrokerResponseHeader::new(add_topic_cnt)))
396 }
397
398 fn get_all_topic_list_from_nameserver(
399 &self,
400 _request: RemotingCommand,
401 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
402 if self
403 .name_server_runtime_inner
404 .name_server_config()
405 .enable_all_topic_list
406 {
407 let topics = self
408 .name_server_runtime_inner
409 .route_info_manager()
410 .get_all_topic_list();
411 let body = topics.encode()?;
412 return Ok(RemotingCommand::create_response_command().set_body(body));
413 }
414 Ok(
415 RemotingCommand::create_response_command_with_code(
416 RemotingSysResponseCode::SystemError,
417 )
418 .set_remark(CheetahString::from_static_str("disable")),
419 )
420 }
421
422 fn delete_topic_in_name_srv(
423 &mut self,
424 request: RemotingCommand,
425 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
426 let request_header =
427 request.decode_command_custom_header::<DeleteTopicFromNamesrvRequestHeader>()?;
428 self.name_server_runtime_inner
429 .route_info_manager_mut()
430 .delete_topic(request_header.topic, request_header.cluster_name);
431 Ok(RemotingCommand::create_response_command())
432 }
433
434 fn register_topic_to_name_srv(
435 &mut self,
436 request: RemotingCommand,
437 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
438 let request_header =
439 request.decode_command_custom_header::<RegisterTopicRequestHeader>()?;
440 if let Some(ref body) = request.body() {
441 let topic_route_data = TopicRouteData::decode(body).unwrap_or_default();
442 if !topic_route_data.queue_datas.is_empty() {
443 self.name_server_runtime_inner
444 .route_info_manager_mut()
445 .register_topic(request_header.topic, topic_route_data.queue_datas)
446 }
447 }
448 Ok(RemotingCommand::create_response_command())
449 }
450
451 fn get_kv_list_by_namespace(
452 &self,
453 request: RemotingCommand,
454 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
455 let request_header =
456 request.decode_command_custom_header::<GetKVListByNamespaceRequestHeader>()?;
457 let value = self
458 .name_server_runtime_inner
459 .kvconfig_manager()
460 .get_kv_list_by_namespace(&request_header.namespace);
461 if let Some(value) = value {
462 return Ok(RemotingCommand::create_response_command().set_body(value));
463 }
464 Ok(
465 RemotingCommand::create_response_command_with_code(ResponseCode::QueryNotFound)
466 .set_remark(format!(
467 "No config item, Namespace: {}",
468 request_header.namespace.as_str()
469 )),
470 )
471 }
472
473 fn get_topics_by_cluster(
474 &self,
475 request: RemotingCommand,
476 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
477 if !self
478 .name_server_runtime_inner
479 .name_server_config()
480 .enable_topic_list
481 {
482 return Ok(RemotingCommand::create_response_command_with_code(
483 RemotingSysResponseCode::SystemError,
484 )
485 .set_remark(CheetahString::from_static_str("disable")));
486 }
487
488 let request_header =
489 request.decode_command_custom_header::<GetTopicsByClusterRequestHeader>()?;
490 let topics_by_cluster = self
491 .name_server_runtime_inner
492 .route_info_manager()
493 .get_topics_by_cluster(&request_header.cluster);
494 let body = topics_by_cluster.encode()?;
495 Ok(RemotingCommand::create_response_command().set_body(body))
496 }
497
498 fn get_system_topic_list_from_ns(
499 &self,
500 _request: RemotingCommand,
501 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
502 let topic_list = self
503 .name_server_runtime_inner
504 .route_info_manager()
505 .get_system_topic_list();
506 let body = topic_list.encode()?;
507 Ok(RemotingCommand::create_response_command().set_body(body))
508 }
509
510 fn get_unit_topic_list(
511 &self,
512 _request: RemotingCommand,
513 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
514 if self
515 .name_server_runtime_inner
516 .name_server_config()
517 .enable_topic_list
518 {
519 let topic_list = self
520 .name_server_runtime_inner
521 .route_info_manager()
522 .get_unit_topics();
523 let body = topic_list.encode()?;
524 return Ok(RemotingCommand::create_response_command().set_body(body));
525 }
526 Ok(
527 RemotingCommand::create_response_command_with_code(
528 RemotingSysResponseCode::SystemError,
529 )
530 .set_remark("disable"),
531 )
532 }
533
534 fn get_has_unit_sub_topic_list(
535 &self,
536 _request: RemotingCommand,
537 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
538 if self
539 .name_server_runtime_inner
540 .name_server_config()
541 .enable_topic_list
542 {
543 let topic_list = self
544 .name_server_runtime_inner
545 .route_info_manager()
546 .get_has_unit_sub_topic_list();
547 let body = topic_list.encode()?;
548 return Ok(RemotingCommand::create_response_command().set_body(body));
549 }
550 Ok(
551 RemotingCommand::create_response_command_with_code(
552 RemotingSysResponseCode::SystemError,
553 )
554 .set_remark("disable"),
555 )
556 }
557
558 fn get_has_unit_sub_un_unit_topic_list(
559 &self,
560 _request: RemotingCommand,
561 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
562 if self
563 .name_server_runtime_inner
564 .name_server_config()
565 .enable_topic_list
566 {
567 let topic_list = self
568 .name_server_runtime_inner
569 .route_info_manager()
570 .get_has_unit_sub_un_unit_topic_list();
571 return Ok(RemotingCommand::create_response_command()
572 .set_body(topic_list.encode().expect("encode TopicList failed")));
573 }
574 Ok(
575 RemotingCommand::create_response_command_with_code(
576 RemotingSysResponseCode::SystemError,
577 )
578 .set_remark("disable"),
579 )
580 }
581
582 fn update_config(
583 &mut self,
584 request: RemotingCommand,
585 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
586 if let Some(body) = request.body() {
587 let body_str = match str::from_utf8(body) {
588 Ok(s) => s,
589 Err(e) => {
590 return Ok(RemotingCommand::create_response_command_with_code(
591 RemotingSysResponseCode::SystemError,
592 )
593 .set_remark(format!("UnsupportedEncodingException {e:?}")));
594 }
595 };
596
597 let properties = match string_to_properties(body_str) {
598 Some(props) => props,
599 None => {
600 return Ok(RemotingCommand::create_response_command_with_code(
601 RemotingSysResponseCode::SystemError,
602 )
603 .set_remark("string_to_properties error".to_string()));
604 }
605 };
606 if validate_blacklist_config_exist(
607 &properties,
608 &self
609 .name_server_runtime_inner
610 .name_server_config()
611 .get_config_blacklist(),
612 ) {
613 return Ok(RemotingCommand::create_response_command_with_code(
614 RemotingSysResponseCode::NoPermission,
615 )
616 .set_remark("Cannot update config in blacklist.".to_string()));
617 }
618
619 let result = self
620 .name_server_runtime_inner
621 .kvconfig_manager_mut()
622 .update_namesrv_config(properties);
623 if let Err(e) = result {
624 return Ok(RemotingCommand::create_response_command_with_code(
625 RemotingSysResponseCode::SystemError,
626 )
627 .set_remark(format!("Update error {e:?}")));
628 }
629 }
630
631 Ok(
632 RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
633 .set_remark(CheetahString::empty()),
634 )
635 }
636
637 fn get_config(
638 &mut self,
639 _request: RemotingCommand,
640 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
641 let config = self.name_server_runtime_inner.name_server_config();
642 let result = match config.get_all_configs_format_string() {
643 Ok(content) => {
644 let response = RemotingCommand::create_response_command_with_code_remark(
645 RemotingSysResponseCode::Success,
646 CheetahString::empty(),
647 );
648 response.set_body(content.into_bytes())
649 }
650 Err(e) => RemotingCommand::create_response_command_with_code_remark(
651 ResponseCode::SystemError,
652 format!("UnsupportedEncodingException {e}"),
653 ),
654 };
655 Ok(result)
656 }
657}
658
659fn extract_register_topic_config_from_request(
660 request: &RemotingCommand,
661) -> TopicConfigAndMappingSerializeWrapper {
662 if let Some(body_inner) = request.body() {
663 if body_inner.is_empty() {
664 return TopicConfigAndMappingSerializeWrapper::default();
665 }
666 return SerdeJsonUtils::decode::<TopicConfigAndMappingSerializeWrapper>(
667 body_inner.as_ref(),
668 )
669 .expect("decode TopicConfigAndMappingSerializeWrapper failed");
670 }
671 TopicConfigAndMappingSerializeWrapper::default()
672}
673
674fn extract_register_broker_body_from_request(
675 request: &RemotingCommand,
676 request_header: &RegisterBrokerRequestHeader,
677) -> RegisterBrokerBody {
678 if let Some(body_inner) = request.body() {
679 if body_inner.is_empty() {
680 return RegisterBrokerBody::default();
681 }
682 let version = request.rocketmq_version();
683 return RegisterBrokerBody::decode(body_inner, request_header.compressed, version);
684 }
685 RegisterBrokerBody::default()
686}
687
688fn check_sum_crc32(
689 request: &RemotingCommand,
690 request_header: &RegisterBrokerRequestHeader,
691) -> bool {
692 if request_header.body_crc32 == 0 {
693 return true;
694 }
695 if let Some(bytes) = request.get_body() {
696 let crc_32 = CRC32Utils::crc32(bytes.as_ref());
697 if crc_32 != request_header.body_crc32 {
698 warn!(
699 "receive registerBroker request,crc32 not match,origin:{}, cal:{}",
700 request_header.body_crc32, crc_32,
701 );
702 return false;
703 }
704 }
705 true
706}
707
708fn validate_blacklist_config_exist(
709 properties: &HashMap<CheetahString, CheetahString>,
710 config_blacklist: &[CheetahString],
711) -> bool {
712 for black_config in config_blacklist {
713 if properties.contains_key(black_config.as_str()) {
714 return true;
715 }
716 }
717 false
718}
719
720#[cfg(test)]
721mod tests {
722 use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
723
724 use super::*;
725
726 #[test]
727 fn extract_register_topic_config_from_request_with_body() {
728 let body = vec![];
729 let request = RemotingCommand::create_remoting_command(RequestCode::RegisterTopicInNamesrv)
730 .set_body(body);
731
732 let _result = extract_register_topic_config_from_request(&request);
733 }
734
735 #[test]
736 fn extract_register_topic_config_from_request_without_body() {
737 let request = RemotingCommand::new_request(0, vec![]);
738 let _result = extract_register_topic_config_from_request(&request);
739 }
740
741 #[test]
742 fn extract_register_broker_body_from_request_with_body() {
743 let body: Vec<u8> = vec![];
744 let request = RemotingCommand::new_request(0, body);
745 let request_header = RegisterBrokerRequestHeader::default();
746 let _result = extract_register_broker_body_from_request(&request, &request_header);
747 }
748
749 #[test]
750 fn extract_register_broker_body_from_request_without_body() {
751 let request = RemotingCommand::new_request(0, vec![]);
752 let request_header = RegisterBrokerRequestHeader::default();
753 let _result = extract_register_broker_body_from_request(&request, &request_header);
754 }
755
756 #[test]
757 fn check_sum_crc32_valid_crc() {
758 let body = vec![];
759 let crc32 = CRC32Utils::crc32(&body);
760 let request = RemotingCommand::new_request(0, body);
761 let mut request_header = RegisterBrokerRequestHeader::default();
762 request_header.body_crc32 = crc32;
763 let result = check_sum_crc32(&request, &request_header);
764 assert!(result);
765 }
766
767 #[test]
768 fn check_sum_crc32_invalid_crc() {
769 let body = vec![];
770 let request = RemotingCommand::new_request(0, body);
771 let mut request_header = RegisterBrokerRequestHeader::default();
772 request_header.body_crc32 = 12345; let result = check_sum_crc32(&request, &request_header);
774 assert!(!result);
775 }
776
777 #[test]
778 fn check_sum_crc32_zero_crc() {
779 let body = vec![];
780 let request = RemotingCommand::new_request(0, body);
781 let mut request_header = RegisterBrokerRequestHeader::default();
782 request_header.body_crc32 = 0;
783 let result = check_sum_crc32(&request, &request_header);
784 assert!(result);
785 }
786}