rocketmq_namesrv/processor/
client_request_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::FAQUrl;
22use rocketmq_common::TimeUtils;
23use rocketmq_remoting::code::request_code::RequestCode;
24use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
25use rocketmq_remoting::code::response_code::ResponseCode;
26use rocketmq_remoting::net::channel::Channel;
27use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
28use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
29use rocketmq_remoting::protocol::RemotingSerializable;
30use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
31use rocketmq_remoting::runtime::processor::RequestProcessor;
32use rocketmq_rust::ArcMut;
33use tracing::info;
34use tracing::warn;
35
36use crate::bootstrap::NameServerRuntimeInner;
37use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
38
39/// Client request processor for handling route info queries
40pub struct ClientRequestProcessor {
41    name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
42    need_check_namesrv_ready: AtomicBool,
43    startup_time_millis: u64,
44    // Cached configuration values (immutable after construction)
45    wait_seconds_millis: u64,
46    need_wait_for_service: bool,
47    order_message_enable: bool,
48}
49
50impl RequestProcessor for ClientRequestProcessor {
51    #[inline]
52    async fn process_request(
53        &mut self,
54        _channel: Channel,
55        _ctx: ConnectionHandlerContext,
56        request: &mut RemotingCommand,
57    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
58        let request_code = RequestCode::from(request.code());
59        info!(
60            "Name server ClientRequestProcessor Received request code: {:?}",
61            request_code
62        );
63
64        self.get_route_info_by_topic(request)
65    }
66}
67
68impl ClientRequestProcessor {
69    pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
70        let config = name_server_runtime_inner.name_server_config();
71        let wait_seconds_millis = config.wait_seconds_for_service as u64 * 1000;
72        let need_wait_for_service = config.need_wait_for_service;
73        let order_message_enable = config.order_message_enable;
74
75        Self {
76            need_check_namesrv_ready: AtomicBool::new(true),
77            startup_time_millis: TimeUtils::get_current_millis(),
78            wait_seconds_millis,
79            need_wait_for_service,
80            order_message_enable,
81            name_server_runtime_inner,
82        }
83    }
84
85    /// Handles route info query for a specific topic
86    #[inline]
87    fn get_route_info_by_topic(
88        &self,
89        request: &mut RemotingCommand,
90    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
91        let request_header = request.decode_command_custom_header::<GetRouteInfoRequestHeader>()?;
92
93        // Early return: Check if nameserver is ready (using cached config)
94        if self.need_wait_for_service {
95            let elapsed_millis =
96                TimeUtils::get_current_millis().saturating_sub(self.startup_time_millis);
97            let namesrv_ready = !self.need_check_namesrv_ready.load(Ordering::Relaxed)
98                || elapsed_millis >= self.wait_seconds_millis;
99
100            if !namesrv_ready {
101                warn!("name server not ready. request code {}", request.code());
102                return Ok(Some(
103                    RemotingCommand::create_response_command_with_code(
104                        RemotingSysResponseCode::SystemError,
105                    )
106                    .set_remark("name server not ready"),
107                ));
108            }
109        }
110
111        // Lookup topic route data
112        let mut topic_route_data = match self
113            .name_server_runtime_inner
114            .route_info_manager()
115            .pickup_topic_route_data(request_header.topic.as_ref())
116        {
117            Some(data) => data,
118            None => {
119                return Ok(Some(
120                    RemotingCommand::create_response_command_with_code(ResponseCode::TopicNotExist)
121                        .set_remark(format!(
122                            "No topic route info in name server for the topic: {}{}",
123                            request_header.topic,
124                            FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
125                        )),
126                ));
127            }
128        };
129
130        if self.need_check_namesrv_ready.load(Ordering::Relaxed) {
131            self.need_check_namesrv_ready
132                .store(false, Ordering::Relaxed);
133        }
134
135        if self.order_message_enable {
136            topic_route_data.order_topic_conf = self
137                .name_server_runtime_inner
138                .kvconfig_manager()
139                .get_kvconfig(
140                    &CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
141                    &request_header.topic,
142                );
143        }
144
145        // Encode and return successful response
146        let content = topic_route_data.encode()?;
147        Ok(Some(
148            RemotingCommand::create_response_command_with_code(ResponseCode::Success)
149                .set_body(content),
150        ))
151    }
152}