rocketmq_namesrv/
processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use rocketmq_remoting::code::request_code::RequestCode;
19use rocketmq_remoting::net::channel::Channel;
20use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
21use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
22use rocketmq_remoting::runtime::processor::RequestProcessor;
23use rocketmq_rust::ArcMut;
24use tracing::info;
25
26pub use self::client_request_processor::ClientRequestProcessor;
27use crate::processor::default_request_processor::DefaultRequestProcessor;
28
29mod client_request_processor;
30pub mod default_request_processor;
31
32const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
33
34#[derive(Clone)]
35pub struct NameServerRequestProcessor {
36    pub(crate) client_request_processor: ArcMut<ClientRequestProcessor>,
37    pub(crate) default_request_processor: ArcMut<DefaultRequestProcessor>,
38}
39
40impl RequestProcessor for NameServerRequestProcessor {
41    async fn process_request(
42        &mut self,
43        channel: Channel,
44        ctx: ConnectionHandlerContext,
45        request: RemotingCommand,
46    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
47        let request_code = RequestCode::from(request.code());
48        info!("Name server Received request code: {:?}", request_code);
49        match request_code {
50            RequestCode::GetRouteinfoByTopic => {
51                self.client_request_processor
52                    .process_request(channel, ctx, request_code, request)
53            }
54            _ => {
55                self.default_request_processor
56                    .process_request(channel, ctx, request_code, request)
57            }
58        }
59    }
60}