rocketmq_namesrv/
processor.rs1use rocketmq_remoting::code::request_code::RequestCode;
19use rocketmq_remoting::net::channel::Channel;
20use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
21use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
22use rocketmq_remoting::runtime::processor::RequestProcessor;
23use rocketmq_rust::ArcMut;
24use tracing::info;
25
26pub use self::client_request_processor::ClientRequestProcessor;
27use crate::processor::default_request_processor::DefaultRequestProcessor;
28
29mod client_request_processor;
30pub mod default_request_processor;
31
32const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
33
34#[derive(Clone)]
35pub struct NameServerRequestProcessor {
36 pub(crate) client_request_processor: ArcMut<ClientRequestProcessor>,
37 pub(crate) default_request_processor: ArcMut<DefaultRequestProcessor>,
38}
39
40impl RequestProcessor for NameServerRequestProcessor {
41 async fn process_request(
42 &mut self,
43 channel: Channel,
44 ctx: ConnectionHandlerContext,
45 request: RemotingCommand,
46 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
47 let request_code = RequestCode::from(request.code());
48 info!("Name server Received request code: {:?}", request_code);
49 match request_code {
50 RequestCode::GetRouteinfoByTopic => {
51 self.client_request_processor
52 .process_request(channel, ctx, request_code, request)
53 }
54 _ => {
55 self.default_request_processor
56 .process_request(channel, ctx, request_code, request)
57 }
58 }
59 }
60}