rocketmq_namesrv/processor/
client_request_processor.rs1use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::FAQUrl;
22use rocketmq_common::TimeUtils;
23use rocketmq_remoting::code::request_code::RequestCode;
24use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
25use rocketmq_remoting::code::response_code::ResponseCode;
26use rocketmq_remoting::net::channel::Channel;
27use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
28use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
29use rocketmq_remoting::protocol::RemotingSerializable;
30use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
31use rocketmq_remoting::runtime::processor::RequestProcessor;
32use rocketmq_rust::ArcMut;
33use tracing::info;
34use tracing::warn;
35
36use crate::bootstrap::NameServerRuntimeInner;
37use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
38
39pub struct ClientRequestProcessor {
41 name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
42 need_check_namesrv_ready: AtomicBool,
43 startup_time_millis: u64,
44 wait_seconds_millis: u64,
46 need_wait_for_service: bool,
47 order_message_enable: bool,
48}
49
50impl RequestProcessor for ClientRequestProcessor {
51 #[inline]
52 async fn process_request(
53 &mut self,
54 _channel: Channel,
55 _ctx: ConnectionHandlerContext,
56 request: &mut RemotingCommand,
57 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
58 let request_code = RequestCode::from(request.code());
59 info!(
60 "Name server ClientRequestProcessor Received request code: {:?}",
61 request_code
62 );
63
64 self.get_route_info_by_topic(request)
65 }
66}
67
68impl ClientRequestProcessor {
69 pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
70 let config = name_server_runtime_inner.name_server_config();
71 let wait_seconds_millis = config.wait_seconds_for_service as u64 * 1000;
72 let need_wait_for_service = config.need_wait_for_service;
73 let order_message_enable = config.order_message_enable;
74
75 Self {
76 need_check_namesrv_ready: AtomicBool::new(true),
77 startup_time_millis: TimeUtils::get_current_millis(),
78 wait_seconds_millis,
79 need_wait_for_service,
80 order_message_enable,
81 name_server_runtime_inner,
82 }
83 }
84
85 #[inline]
87 fn get_route_info_by_topic(
88 &self,
89 request: &mut RemotingCommand,
90 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
91 let request_header = request.decode_command_custom_header::<GetRouteInfoRequestHeader>()?;
92
93 if self.need_wait_for_service {
95 let elapsed_millis =
96 TimeUtils::get_current_millis().saturating_sub(self.startup_time_millis);
97 let namesrv_ready = !self.need_check_namesrv_ready.load(Ordering::Relaxed)
98 || elapsed_millis >= self.wait_seconds_millis;
99
100 if !namesrv_ready {
101 warn!("name server not ready. request code {}", request.code());
102 return Ok(Some(
103 RemotingCommand::create_response_command_with_code(
104 RemotingSysResponseCode::SystemError,
105 )
106 .set_remark("name server not ready"),
107 ));
108 }
109 }
110
111 let mut topic_route_data = match self
113 .name_server_runtime_inner
114 .route_info_manager()
115 .pickup_topic_route_data(request_header.topic.as_ref())
116 {
117 Some(data) => data,
118 None => {
119 return Ok(Some(
120 RemotingCommand::create_response_command_with_code(ResponseCode::TopicNotExist)
121 .set_remark(format!(
122 "No topic route info in name server for the topic: {}{}",
123 request_header.topic,
124 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
125 )),
126 ));
127 }
128 };
129
130 if self.need_check_namesrv_ready.load(Ordering::Relaxed) {
131 self.need_check_namesrv_ready
132 .store(false, Ordering::Relaxed);
133 }
134
135 if self.order_message_enable {
136 topic_route_data.order_topic_conf = self
137 .name_server_runtime_inner
138 .kvconfig_manager()
139 .get_kvconfig(
140 &CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
141 &request_header.topic,
142 );
143 }
144
145 let content = topic_route_data.encode()?;
147 Ok(Some(
148 RemotingCommand::create_response_command_with_code(ResponseCode::Success)
149 .set_body(content),
150 ))
151 }
152}