rocketmq_namesrv/processor/
client_request_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::FAQUrl;
24use rocketmq_common::TimeUtils;
25use rocketmq_remoting::code::request_code::RequestCode;
26use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
27use rocketmq_remoting::code::response_code::ResponseCode;
28use rocketmq_remoting::net::channel::Channel;
29use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequestHeader;
30use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
31use rocketmq_remoting::protocol::RemotingSerializable;
32use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
33use rocketmq_rust::ArcMut;
34use tracing::warn;
35
36use crate::bootstrap::NameServerRuntimeInner;
37use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
38
39pub struct ClientRequestProcessor {
40    name_server_runtime_inner: ArcMut<NameServerRuntimeInner>,
41    need_check_namesrv_ready: AtomicBool,
42    startup_time_millis: u64,
43}
44
45impl ClientRequestProcessor {
46    pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
47        Self {
48            need_check_namesrv_ready: AtomicBool::new(true),
49            startup_time_millis: TimeUtils::get_current_millis(),
50            name_server_runtime_inner,
51        }
52    }
53
54    fn get_route_info_by_topic(
55        &self,
56        request: RemotingCommand,
57    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
58        let request_header = request.decode_command_custom_header::<GetRouteInfoRequestHeader>()?;
59        let namesrv_ready = self.need_check_namesrv_ready.load(Ordering::Relaxed)
60            && TimeUtils::get_current_millis() - self.startup_time_millis
61                >= Duration::from_secs(
62                    self.name_server_runtime_inner
63                        .name_server_config()
64                        .wait_seconds_for_service as u64,
65                )
66                .as_millis() as u64;
67        if self
68            .name_server_runtime_inner
69            .name_server_config()
70            .need_wait_for_service
71            && !namesrv_ready
72        {
73            warn!(
74                "name remoting_server not ready. request code {} ",
75                request.code()
76            );
77            return Ok(Some(
78                RemotingCommand::create_response_command_with_code(
79                    RemotingSysResponseCode::SystemError,
80                )
81                .set_remark("name remoting_server not ready"),
82            ));
83        }
84        match self
85            .name_server_runtime_inner
86            .route_info_manager()
87            .pickup_topic_route_data(request_header.topic.as_ref())
88        {
89            None => Ok(Some(
90                RemotingCommand::create_response_command_with_code(ResponseCode::TopicNotExist)
91                    .set_remark(format!(
92                        "No topic route info in name remoting_server for the topic:{}{}",
93                        request_header.topic,
94                        FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
95                    )),
96            )),
97            Some(mut topic_route_data) => {
98                if self.need_check_namesrv_ready.load(Ordering::Acquire) {
99                    self.need_check_namesrv_ready
100                        .store(false, Ordering::Release);
101                }
102                if self
103                    .name_server_runtime_inner
104                    .name_server_config()
105                    .order_message_enable
106                {
107                    //get kv config
108                    let order_topic_config = self
109                        .name_server_runtime_inner
110                        .kvconfig_manager()
111                        .get_kvconfig(
112                            &CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
113                            &request_header.topic,
114                        );
115                    topic_route_data.order_topic_conf = order_topic_config;
116                };
117                /*let standard_json_only = request_header.accept_standard_json_only.unwrap_or(false);
118                let content = if request.version() >= RocketMqVersion::into(RocketMqVersion::V494)
119                    || standard_json_only
120                {
121                    //topic_route_data.encode()
122                    topic_route_data.encode()
123                } else {
124                    topic_route_data.encode()
125                };*/
126                let content = topic_route_data.encode()?;
127                Ok(Some(
128                    RemotingCommand::create_response_command_with_code(ResponseCode::Success)
129                        .set_body(content),
130                ))
131            }
132        }
133    }
134}
135
136impl ClientRequestProcessor {
137    pub fn process_request(
138        &mut self,
139        _channel: Channel,
140        _ctx: ConnectionHandlerContext,
141        _request_code: RequestCode,
142        request: RemotingCommand,
143    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
144        self.get_route_info_by_topic(request)
145    }
146}