rocketmq_remoting/
remoting.rs1use std::sync::Arc;
18
19use rocketmq_rust::WeakArcMut;
20
21use crate::base::response_future::ResponseFuture;
22use crate::protocol::remoting_command::RemotingCommand;
23use crate::runtime::RPCHook;
24
25#[allow(async_fn_in_trait)]
35pub trait RemotingService: Send {
36 async fn start(&self, this: WeakArcMut<Self>);
42
43 fn shutdown(&mut self);
49
50 fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>);
59
60 fn clear_rpc_hook(&mut self);
66}
67pub trait InvokeCallback {
68 fn operation_complete(&self, response_future: ResponseFuture);
69 fn operation_succeed(&self, response: RemotingCommand);
70 fn operation_fail(&self, throwable: Box<dyn std::error::Error>);
71}
72
73#[allow(unused_variables)]
74pub(crate) mod inner {
75 use std::collections::HashMap;
76 use std::sync::Arc;
77
78 use rocketmq_error::RocketMQError;
79 use rocketmq_error::RocketMQResult;
80 use rocketmq_rust::ArcMut;
81 use tracing::error;
82 use tracing::warn;
83
84 use crate::base::response_future::ResponseFuture;
85 use crate::code::response_code::ResponseCode;
86 use crate::net::channel::Channel;
87 use crate::protocol::remoting_command::RemotingCommand;
88 use crate::protocol::RemotingCommandType;
89 use crate::runtime::connection_handler_context::ConnectionHandlerContext;
90 use crate::runtime::processor::RequestProcessor;
91 use crate::runtime::RPCHook;
92
93 pub(crate) struct RemotingGeneralHandler<RP> {
94 pub(crate) request_processor: RP,
95 pub(crate) rpc_hooks: Vec<Arc<dyn RPCHook>>,
96 pub(crate) response_table: ArcMut<HashMap<i32, ResponseFuture>>,
97 }
98
99 impl<RP> RemotingGeneralHandler<RP>
100 where
101 RP: RequestProcessor + Sync + 'static,
102 {
103 pub async fn process_message_received(
104 &mut self,
105 ctx: &mut ConnectionHandlerContext,
106 cmd: RemotingCommand,
107 ) {
108 match cmd.get_type() {
109 RemotingCommandType::REQUEST => {
110 match self.process_request_command(ctx, cmd).await {
111 Ok(_) => {}
112 Err(e) => {
113 error!("process request command failed: {}", e);
114 }
115 }
116 }
117 RemotingCommandType::RESPONSE => {
118 self.process_response_command(ctx, cmd);
119 }
120 }
121 }
122
123 async fn process_request_command(
124 &mut self,
125 ctx: &mut ConnectionHandlerContext,
126 mut cmd: RemotingCommand,
127 ) -> RocketMQResult<()> {
128 let opaque = cmd.opaque();
129 let reject_request = self.request_processor.reject_request(cmd.code());
130 const REJECT_REQUEST_MSG: &str =
131 "[REJECT REQUEST]system busy, start flow control for a while";
132 if reject_request.0 {
133 let response = if let Some(response) = reject_request.1 {
134 response
135 } else {
136 RemotingCommand::create_response_command_with_code_remark(
137 ResponseCode::SystemBusy,
138 REJECT_REQUEST_MSG,
139 )
140 };
141 ctx.channel
142 .connection_mut()
143 .send_command(response.set_opaque(opaque))
144 .await?;
145 return Ok(());
146 }
147 let oneway_rpc = cmd.is_oneway_rpc();
148 let exception = self
150 .do_before_rpc_hooks(ctx.channel(), Some(&mut cmd))
151 .err();
152 match handle_error(ctx, oneway_rpc, opaque, exception).await {
154 HandleErrorResult::ReturnMethod => return Ok(()),
155 HandleErrorResult::GoHead => {}
156 }
157
158 let mut response = {
159 let channel = ctx.channel.clone();
160 let ctx = ctx.clone();
161 let result = self
162 .request_processor
163 .process_request(channel, ctx, &mut cmd)
164 .await
165 .unwrap_or_else(|_err| {
166 Some(RemotingCommand::create_response_command_with_code(
167 ResponseCode::SystemError,
168 ))
169 });
170 result
171 };
172
173 let exception = self
174 .do_after_rpc_hooks(ctx.channel(), &cmd, response.as_mut())
175 .err();
176
177 match handle_error(ctx, oneway_rpc, opaque, exception).await {
178 HandleErrorResult::ReturnMethod => return Ok(()),
179 HandleErrorResult::GoHead => {}
180 }
181 if response.is_none() || oneway_rpc {
182 return Ok(());
183 }
184 let response = response.unwrap();
185 let result = ctx
186 .channel_mut()
187 .connection_mut()
188 .send_command(response.set_opaque(opaque))
189 .await;
190 match result {
191 Ok(_) => {}
192 Err(err) => match err {
193 RocketMQError::IO(io_error) => {
194 error!("connection disconnect: {}", io_error);
195 return Ok(());
196 }
197 _ => {
198 error!("send response failed: {}", err);
199 }
200 },
201 };
202 Ok(())
203 }
204
205 fn process_response_command(
206 &mut self,
207 ctx: &mut ConnectionHandlerContext,
208 cmd: RemotingCommand,
209 ) {
210 if let Some(future) = self.response_table.remove(&cmd.opaque()) {
211 match future.tx.send(Ok(cmd)) {
212 Ok(_) => {}
213 Err(e) => {
214 warn!("send response to future failed, maybe timeout");
215 }
216 }
217 } else {
218 warn!(
219 "receive response, cmd={}, but not matched any request, address={}, \
220 channelId={}",
221 cmd,
222 ctx.channel().remote_address(),
223 ctx.channel().channel_id(),
224 );
225 }
226 }
227
228 fn do_after_rpc_hooks(
229 &self,
230 channel: &Channel,
231 request: &RemotingCommand,
232 response: Option<&mut RemotingCommand>,
233 ) -> rocketmq_error::RocketMQResult<()> {
234 if let Some(response) = response {
235 for hook in self.rpc_hooks.iter() {
236 hook.do_after_response(channel.remote_address(), request, response)?;
237 }
238 }
239 Ok(())
240 }
241
242 pub fn do_before_rpc_hooks(
243 &self,
244 channel: &Channel,
245 request: Option<&mut RemotingCommand>,
246 ) -> rocketmq_error::RocketMQResult<()> {
247 if let Some(request) = request {
248 for hook in self.rpc_hooks.iter() {
249 hook.do_before_request(channel.remote_address(), request)?;
250 }
251 }
252 Ok(())
253 }
254
255 pub fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
256 self.rpc_hooks.push(hook);
257 }
258 }
259 async fn handle_error(
260 ctx: &mut ConnectionHandlerContext,
261 oneway_rpc: bool,
262 opaque: i32,
263 exception: Option<RocketMQError>,
264 ) -> HandleErrorResult {
265 if let Some(exception_inner) = exception {
266 match exception_inner {
267 RocketMQError::Internal(message) if message.starts_with("Abort") => {
268 let code = ResponseCode::SystemError;
269 if oneway_rpc {
270 return HandleErrorResult::ReturnMethod;
271 }
272 let response =
273 RemotingCommand::create_response_command_with_code_remark(code, message);
274 tokio::select! {
275 result =ctx.connection_mut().send_command(response.set_opaque(opaque)) => match result{
276 Ok(_) =>{},
277 Err(err) => {
278 match err {
279 RocketMQError::IO(io_error) => {
280 error!("send response failed: {}", io_error);
281 return HandleErrorResult::ReturnMethod;
282 }
283 _ => { error!("send response failed: {}", err);}
284 }
285 },
286 },
287 }
288 }
289 _ => {
290 if !oneway_rpc {
291 let response = RemotingCommand::create_response_command_with_code_remark(
292 ResponseCode::SystemError,
293 exception_inner.to_string(),
294 );
295 tokio::select! {
296 result =ctx.connection_mut().send_command(response.set_opaque(opaque)) => match result{
297 Ok(_) =>{},
298 Err(err) => {
299 match err {
300 RocketMQError::IO(io_error) => {
301 error!("send response failed: {}", io_error);
302 return HandleErrorResult::ReturnMethod;
303 }
304 _ => { error!("send response failed: {}", err);}
305 }
306 },
307 },
308 }
309 }
310 }
311 }
312 HandleErrorResult::ReturnMethod
313 } else {
314 HandleErrorResult::GoHead
315 }
316 }
317 enum HandleErrorResult {
318 ReturnMethod,
319 GoHead,
320 }
321}