rocketmq_namesrv/processor/
client_request_processor.rs1use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::FAQUrl;
24use rocketmq_common::TimeUtils;
25use rocketmq_remoting::code::request_code::RequestCode;
26use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
27use rocketmq_remoting::code::response_code::ResponseCode;
28use rocketmq_remoting::net::channel::Channel;
29use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
30use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
31use rocketmq_remoting::protocol::RemotingSerializable;
32use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
33use rocketmq_rust::ArcMut;
34use tracing::warn;
35
36use crate::bootstrap::NameServerRuntimeInner;
37use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
38
39pub struct ClientRequestProcessor {
40 name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
41 need_check_namesrv_ready: AtomicBool,
42 startup_time_millis: u64,
43}
44
45impl ClientRequestProcessor {
46 pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
47 Self {
48 need_check_namesrv_ready: AtomicBool::new(true),
49 startup_time_millis: TimeUtils::get_current_millis(),
50 name_server_runtime_inner,
51 }
52 }
53
54 fn get_route_info_by_topic(
55 &self,
56 request: RemotingCommand,
57 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
58 let request_header = request.decode_command_custom_header::<GetRouteInfoRequestHeader>()?;
59 let namesrv_ready = self.need_check_namesrv_ready.load(Ordering::Relaxed)
60 && TimeUtils::get_current_millis() - self.startup_time_millis
61 >= Duration::from_secs(
62 self.name_server_runtime_inner
63 .name_server_config()
64 .wait_seconds_for_service as u64,
65 )
66 .as_millis() as u64;
67 if self
68 .name_server_runtime_inner
69 .name_server_config()
70 .need_wait_for_service
71 && !namesrv_ready
72 {
73 warn!(
74 "name remoting_server not ready. request code {} ",
75 request.code()
76 );
77 return Ok(Some(
78 RemotingCommand::create_response_command_with_code(
79 RemotingSysResponseCode::SystemError,
80 )
81 .set_remark("name remoting_server not ready"),
82 ));
83 }
84 match self
85 .name_server_runtime_inner
86 .route_info_manager()
87 .pickup_topic_route_data(request_header.topic.as_ref())
88 {
89 None => Ok(Some(
90 RemotingCommand::create_response_command_with_code(ResponseCode::TopicNotExist)
91 .set_remark(format!(
92 "No topic route info in name remoting_server for the topic:{}{}",
93 request_header.topic,
94 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
95 )),
96 )),
97 Some(mut topic_route_data) => {
98 if self.need_check_namesrv_ready.load(Ordering::Acquire) {
99 self.need_check_namesrv_ready
100 .store(false, Ordering::Release);
101 }
102 if self
103 .name_server_runtime_inner
104 .name_server_config()
105 .order_message_enable
106 {
107 let order_topic_config = self
109 .name_server_runtime_inner
110 .kvconfig_manager()
111 .get_kvconfig(
112 &CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
113 &request_header.topic,
114 );
115 topic_route_data.order_topic_conf = order_topic_config;
116 };
117 let content = topic_route_data.encode()?;
127 Ok(Some(
128 RemotingCommand::create_response_command_with_code(ResponseCode::Success)
129 .set_body(content),
130 ))
131 }
132 }
133 }
134}
135
136impl ClientRequestProcessor {
137 pub fn process_request(
138 &mut self,
139 _channel: Channel,
140 _ctx: ConnectionHandlerContext,
141 _request_code: RequestCode,
142 request: RemotingCommand,
143 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
144 self.get_route_info_by_topic(request)
145 }
146}