rocketmq_remoting/
remoting.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::Arc;
18
19use rocketmq_rust::WeakArcMut;
20
21use crate::base::response_future::ResponseFuture;
22use crate::protocol::remoting_command::RemotingCommand;
23use crate::runtime::RPCHook;
24
25/// `RemotingService` trait defines the core functionalities for a remoting service.
26///
27/// This trait outlines the essential operations for starting, shutting down, and managing RPC hooks
28/// within a remoting service. Implementors of this trait are expected to provide concrete
29/// implementations for these operations, facilitating the management of remote procedure calls.
30///
31/// # Requirements
32/// Implementors must be `Send` to ensure thread safety, allowing instances to be transferred
33/// across thread boundaries.
34#[allow(async_fn_in_trait)]
35pub trait RemotingService: Send {
36    /// Asynchronously starts the remoting service.
37    ///
38    /// This function should initialize and start the service, making it ready to handle incoming
39    /// or outgoing remote procedure calls. The exact implementation details, such as opening
40    /// network connections or preparing internal state, are left to the implementor.
41    async fn start(&self, this: WeakArcMut<Self>);
42
43    /// Shuts down the remoting service.
44    ///
45    /// This function is responsible for gracefully shutting down the service. It should ensure
46    /// that all resources are released, and any ongoing operations are completed or aborted
47    /// appropriately before the service stops.
48    fn shutdown(&mut self);
49
50    /// Registers an RPC hook.
51    ///
52    /// This function allows for the registration of an RPC hook, which can be used to intercept
53    /// and modify the behavior of remote procedure calls. Hooks can be used for logging,
54    /// monitoring, or modifying the requests or responses of RPCs.
55    ///
56    /// # Arguments
57    /// * `hook` - An implementation of the `RPCHook` trait that will be registered.
58    fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>);
59
60    /// Clears all registered RPC hooks.
61    ///
62    /// This function removes all previously registered RPC hooks, returning the service to its
63    /// default state without any hooks. This can be useful for cleanup or when changing the
64    /// configuration of the service.
65    fn clear_rpc_hook(&mut self);
66}
67pub trait InvokeCallback {
68    fn operation_complete(&self, response_future: ResponseFuture);
69    fn operation_succeed(&self, response: RemotingCommand);
70    fn operation_fail(&self, throwable: Box<dyn std::error::Error>);
71}
72
73#[allow(unused_variables)]
74pub(crate) mod inner {
75    use std::collections::HashMap;
76    use std::sync::Arc;
77
78    use rocketmq_error::RocketMQError;
79    use rocketmq_error::RocketMQResult;
80    use rocketmq_rust::ArcMut;
81    use tracing::error;
82    use tracing::warn;
83
84    use crate::base::response_future::ResponseFuture;
85    use crate::code::response_code::ResponseCode;
86    use crate::net::channel::Channel;
87    use crate::protocol::remoting_command::RemotingCommand;
88    use crate::protocol::RemotingCommandType;
89    use crate::runtime::connection_handler_context::ConnectionHandlerContext;
90    use crate::runtime::processor::RequestProcessor;
91    use crate::runtime::RPCHook;
92
93    pub(crate) struct RemotingGeneralHandler<RP> {
94        pub(crate) request_processor: RP,
95        pub(crate) rpc_hooks: Vec<Arc<dyn RPCHook>>,
96        pub(crate) response_table: ArcMut<HashMap<i32, ResponseFuture>>,
97    }
98
99    impl<RP> RemotingGeneralHandler<RP>
100    where
101        RP: RequestProcessor + Sync + 'static,
102    {
103        pub async fn process_message_received(
104            &mut self,
105            ctx: &mut ConnectionHandlerContext,
106            cmd: RemotingCommand,
107        ) {
108            match cmd.get_type() {
109                RemotingCommandType::REQUEST => {
110                    match self.process_request_command(ctx, cmd).await {
111                        Ok(_) => {}
112                        Err(e) => {
113                            error!("process request command failed: {}", e);
114                        }
115                    }
116                }
117                RemotingCommandType::RESPONSE => {
118                    self.process_response_command(ctx, cmd);
119                }
120            }
121        }
122
123        async fn process_request_command(
124            &mut self,
125            ctx: &mut ConnectionHandlerContext,
126            mut cmd: RemotingCommand,
127        ) -> RocketMQResult<()> {
128            let opaque = cmd.opaque();
129            let reject_request = self.request_processor.reject_request(cmd.code());
130            const REJECT_REQUEST_MSG: &str =
131                "[REJECT REQUEST]system busy, start flow control for a while";
132            if reject_request.0 {
133                let response = if let Some(response) = reject_request.1 {
134                    response
135                } else {
136                    RemotingCommand::create_response_command_with_code_remark(
137                        ResponseCode::SystemBusy,
138                        REJECT_REQUEST_MSG,
139                    )
140                };
141                ctx.channel
142                    .connection_mut()
143                    .send_command(response.set_opaque(opaque))
144                    .await?;
145                return Ok(());
146            }
147            let oneway_rpc = cmd.is_oneway_rpc();
148            //before handle request hooks
149            let exception = self
150                .do_before_rpc_hooks(ctx.channel(), Some(&mut cmd))
151                .err();
152            //handle error if return have
153            match handle_error(ctx, oneway_rpc, opaque, exception).await {
154                HandleErrorResult::ReturnMethod => return Ok(()),
155                HandleErrorResult::GoHead => {}
156            }
157
158            let mut response = {
159                let channel = ctx.channel.clone();
160                let ctx = ctx.clone();
161                let result = self
162                    .request_processor
163                    .process_request(channel, ctx, &mut cmd)
164                    .await
165                    .unwrap_or_else(|_err| {
166                        Some(RemotingCommand::create_response_command_with_code(
167                            ResponseCode::SystemError,
168                        ))
169                    });
170                result
171            };
172
173            let exception = self
174                .do_after_rpc_hooks(ctx.channel(), &cmd, response.as_mut())
175                .err();
176
177            match handle_error(ctx, oneway_rpc, opaque, exception).await {
178                HandleErrorResult::ReturnMethod => return Ok(()),
179                HandleErrorResult::GoHead => {}
180            }
181            if response.is_none() || oneway_rpc {
182                return Ok(());
183            }
184            let response = response.unwrap();
185            let result = ctx
186                .channel_mut()
187                .connection_mut()
188                .send_command(response.set_opaque(opaque))
189                .await;
190            match result {
191                Ok(_) => {}
192                Err(err) => match err {
193                    RocketMQError::IO(io_error) => {
194                        error!("connection disconnect: {}", io_error);
195                        return Ok(());
196                    }
197                    _ => {
198                        error!("send response failed: {}", err);
199                    }
200                },
201            };
202            Ok(())
203        }
204
205        fn process_response_command(
206            &mut self,
207            ctx: &mut ConnectionHandlerContext,
208            cmd: RemotingCommand,
209        ) {
210            if let Some(future) = self.response_table.remove(&cmd.opaque()) {
211                match future.tx.send(Ok(cmd)) {
212                    Ok(_) => {}
213                    Err(e) => {
214                        warn!("send response to future failed, maybe timeout");
215                    }
216                }
217            } else {
218                warn!(
219                    "receive response, cmd={}, but not matched any request, address={}, \
220                     channelId={}",
221                    cmd,
222                    ctx.channel().remote_address(),
223                    ctx.channel().channel_id(),
224                );
225            }
226        }
227
228        fn do_after_rpc_hooks(
229            &self,
230            channel: &Channel,
231            request: &RemotingCommand,
232            response: Option<&mut RemotingCommand>,
233        ) -> rocketmq_error::RocketMQResult<()> {
234            if let Some(response) = response {
235                for hook in self.rpc_hooks.iter() {
236                    hook.do_after_response(channel.remote_address(), request, response)?;
237                }
238            }
239            Ok(())
240        }
241
242        pub fn do_before_rpc_hooks(
243            &self,
244            channel: &Channel,
245            request: Option<&mut RemotingCommand>,
246        ) -> rocketmq_error::RocketMQResult<()> {
247            if let Some(request) = request {
248                for hook in self.rpc_hooks.iter() {
249                    hook.do_before_request(channel.remote_address(), request)?;
250                }
251            }
252            Ok(())
253        }
254
255        pub fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
256            self.rpc_hooks.push(hook);
257        }
258    }
259    async fn handle_error(
260        ctx: &mut ConnectionHandlerContext,
261        oneway_rpc: bool,
262        opaque: i32,
263        exception: Option<RocketMQError>,
264    ) -> HandleErrorResult {
265        if let Some(exception_inner) = exception {
266            match exception_inner {
267                RocketMQError::Internal(message) if message.starts_with("Abort") => {
268                    let code = ResponseCode::SystemError;
269                    if oneway_rpc {
270                        return HandleErrorResult::ReturnMethod;
271                    }
272                    let response =
273                        RemotingCommand::create_response_command_with_code_remark(code, message);
274                    tokio::select! {
275                        result =ctx.connection_mut().send_command(response.set_opaque(opaque)) => match result{
276                            Ok(_) =>{},
277                            Err(err) => {
278                                match err {
279                                    RocketMQError::IO(io_error) => {
280                                        error!("send response failed: {}", io_error);
281                                        return HandleErrorResult::ReturnMethod;
282                                    }
283                                    _ => { error!("send response failed: {}", err);}
284                                }
285                            },
286                        },
287                    }
288                }
289                _ => {
290                    if !oneway_rpc {
291                        let response = RemotingCommand::create_response_command_with_code_remark(
292                            ResponseCode::SystemError,
293                            exception_inner.to_string(),
294                        );
295                        tokio::select! {
296                            result =ctx.connection_mut().send_command(response.set_opaque(opaque)) => match result{
297                                Ok(_) =>{},
298                                Err(err) => {
299                                    match err {
300                                        RocketMQError::IO(io_error) => {
301                                            error!("send response failed: {}", io_error);
302                                            return HandleErrorResult::ReturnMethod;
303                                        }
304                                        _ => { error!("send response failed: {}", err);}
305                                    }
306                                },
307                            },
308                        }
309                    }
310                }
311            }
312            HandleErrorResult::ReturnMethod
313        } else {
314            HandleErrorResult::GoHead
315        }
316    }
317    enum HandleErrorResult {
318        ReturnMethod,
319        GoHead,
320    }
321}