rocketmq_namesrv/
processor.rs1use std::collections::HashMap;
18
19use rocketmq_remoting::code::request_code::RequestCode;
20use rocketmq_remoting::code::response_code::ResponseCode;
21use rocketmq_remoting::net::channel::Channel;
22use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
23use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
24use rocketmq_remoting::runtime::processor::RejectRequestResponse;
25use rocketmq_remoting::runtime::processor::RequestProcessor;
26use rocketmq_rust::ArcMut;
27
28pub use self::client_request_processor::ClientRequestProcessor;
29use crate::processor::default_request_processor::DefaultRequestProcessor;
30
31mod client_request_processor;
32pub mod default_request_processor;
33
34const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
35
36#[derive(Clone)]
37pub enum NameServerRequestProcessorWrapper {
38 ClientRequestProcessor(ArcMut<ClientRequestProcessor>),
39 DefaultRequestProcessor(ArcMut<DefaultRequestProcessor>),
40}
41
42impl RequestProcessor for NameServerRequestProcessorWrapper {
43 async fn process_request(
44 &mut self,
45 channel: Channel,
46 ctx: ConnectionHandlerContext,
47 request: &mut RemotingCommand,
48 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
49 match self {
50 NameServerRequestProcessorWrapper::ClientRequestProcessor(processor) => {
51 processor.process_request(channel, ctx, request).await
52 }
53 NameServerRequestProcessorWrapper::DefaultRequestProcessor(processor) => {
54 processor.process_request(channel, ctx, request).await
55 }
56 }
57 }
58
59 fn reject_request(&self, code: i32) -> RejectRequestResponse {
60 match self {
61 NameServerRequestProcessorWrapper::ClientRequestProcessor(processor) => {
62 RequestProcessor::reject_request(processor.as_ref(), code)
63 }
64 NameServerRequestProcessorWrapper::DefaultRequestProcessor(processor) => {
65 RequestProcessor::reject_request(processor.as_ref(), code)
66 }
67 }
68 }
69}
70
71pub(crate) type RequestCodeType = i32;
72
73#[derive(Clone, Default)]
74pub struct NameServerRequestProcessor {
75 processor_table: HashMap<RequestCodeType, NameServerRequestProcessorWrapper>,
76 default_request_processor: Option<NameServerRequestProcessorWrapper>,
77}
78
79impl NameServerRequestProcessor {
80 pub fn new() -> Self {
81 Self {
82 processor_table: HashMap::new(),
83 default_request_processor: None,
84 }
85 }
86
87 pub fn register_processor(
88 &mut self,
89 request_code: RequestCode,
90 processor: NameServerRequestProcessorWrapper,
91 ) {
92 self.processor_table.insert(request_code as i32, processor);
93 }
94
95 pub fn register_default_processor(&mut self, processor: NameServerRequestProcessorWrapper) {
96 self.default_request_processor = Some(processor);
97 }
98}
99
100impl RequestProcessor for NameServerRequestProcessor {
101 async fn process_request(
102 &mut self,
103 channel: Channel,
104 ctx: ConnectionHandlerContext,
105 request: &mut RemotingCommand,
106 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
107 match self.processor_table.get_mut(request.code_ref()) {
108 None => match self.default_request_processor.as_mut() {
109 None => {
110 let response_command =
111 RemotingCommand::create_response_command_with_code_remark(
112 ResponseCode::SystemError,
113 format!("The request code {} is not supported.", request.code_ref()),
114 );
115 Ok(Some(response_command.set_opaque(request.opaque())))
116 }
117 Some(processor) => {
118 RequestProcessor::process_request(processor, channel, ctx, request).await
119 }
120 },
121 Some(processor) => {
122 RequestProcessor::process_request(processor, channel, ctx, request).await
123 }
124 }
125 }
126}