rocketmq_namesrv/
processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18
19use rocketmq_remoting::code::request_code::RequestCode;
20use rocketmq_remoting::code::response_code::ResponseCode;
21use rocketmq_remoting::net::channel::Channel;
22use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
23use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
24use rocketmq_remoting::runtime::processor::RejectRequestResponse;
25use rocketmq_remoting::runtime::processor::RequestProcessor;
26use rocketmq_rust::ArcMut;
27
28pub use self::client_request_processor::ClientRequestProcessor;
29use crate::processor::default_request_processor::DefaultRequestProcessor;
30
31mod client_request_processor;
32pub mod default_request_processor;
33
34const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
35
36#[derive(Clone)]
37pub enum NameServerRequestProcessorWrapper {
38    ClientRequestProcessor(ArcMut<ClientRequestProcessor>),
39    DefaultRequestProcessor(ArcMut<DefaultRequestProcessor>),
40}
41
42impl RequestProcessor for NameServerRequestProcessorWrapper {
43    async fn process_request(
44        &mut self,
45        channel: Channel,
46        ctx: ConnectionHandlerContext,
47        request: &mut RemotingCommand,
48    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
49        match self {
50            NameServerRequestProcessorWrapper::ClientRequestProcessor(processor) => {
51                processor.process_request(channel, ctx, request).await
52            }
53            NameServerRequestProcessorWrapper::DefaultRequestProcessor(processor) => {
54                processor.process_request(channel, ctx, request).await
55            }
56        }
57    }
58
59    fn reject_request(&self, code: i32) -> RejectRequestResponse {
60        match self {
61            NameServerRequestProcessorWrapper::ClientRequestProcessor(processor) => {
62                RequestProcessor::reject_request(processor.as_ref(), code)
63            }
64            NameServerRequestProcessorWrapper::DefaultRequestProcessor(processor) => {
65                RequestProcessor::reject_request(processor.as_ref(), code)
66            }
67        }
68    }
69}
70
71pub(crate) type RequestCodeType = i32;
72
73#[derive(Clone, Default)]
74pub struct NameServerRequestProcessor {
75    processor_table: HashMap<RequestCodeType, NameServerRequestProcessorWrapper>,
76    default_request_processor: Option<NameServerRequestProcessorWrapper>,
77}
78
79impl NameServerRequestProcessor {
80    pub fn new() -> Self {
81        Self {
82            processor_table: HashMap::new(),
83            default_request_processor: None,
84        }
85    }
86
87    pub fn register_processor(
88        &mut self,
89        request_code: RequestCode,
90        processor: NameServerRequestProcessorWrapper,
91    ) {
92        self.processor_table.insert(request_code as i32, processor);
93    }
94
95    pub fn register_default_processor(&mut self, processor: NameServerRequestProcessorWrapper) {
96        self.default_request_processor = Some(processor);
97    }
98}
99
100impl RequestProcessor for NameServerRequestProcessor {
101    async fn process_request(
102        &mut self,
103        channel: Channel,
104        ctx: ConnectionHandlerContext,
105        request: &mut RemotingCommand,
106    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
107        match self.processor_table.get_mut(request.code_ref()) {
108            None => match self.default_request_processor.as_mut() {
109                None => {
110                    let response_command =
111                        RemotingCommand::create_response_command_with_code_remark(
112                            ResponseCode::SystemError,
113                            format!("The request code {} is not supported.", request.code_ref()),
114                        );
115                    Ok(Some(response_command.set_opaque(request.opaque())))
116                }
117                Some(processor) => {
118                    RequestProcessor::process_request(processor, channel, ctx, request).await
119                }
120            },
121            Some(processor) => {
122                RequestProcessor::process_request(processor, channel, ctx, request).await
123            }
124        }
125    }
126}