rocketmq_remoting/runtime/
connection_handler_context.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::net::SocketAddr;
18
19use rocketmq_rust::ArcMut;
20use tracing::error;
21
22use crate::connection::Connection;
23use crate::net::channel::Channel;
24use crate::protocol::remoting_command::RemotingCommand;
25
26/// Shared mutable context for request handlers.
27///
28/// This type alias wraps `ConnectionHandlerContextWrapper` in an `ArcMut` to allow
29/// efficient sharing and mutation across async tasks. Handlers receive this context
30/// and use it to access the channel and send responses.
31pub type ConnectionHandlerContext = ArcMut<ConnectionHandlerContextWrapper>;
32
33/// Request handler context - provides access to the channel for a specific connection.
34///
35/// `ConnectionHandlerContextWrapper` is the execution context passed to request processors.
36/// It encapsulates the channel associated with the incoming request, allowing handlers to:
37///
38/// - **Send responses**: Via `write()` or `write_ref()`
39/// - **Access connection metadata**: Remote address, connection state
40/// - **Perform advanced operations**: Direct connection access if needed
41///
42/// ## Design Rationale
43///
44/// - **Thin wrapper**: Delegates most operations to the underlying `Channel`
45/// - **Hash/Eq based on channel**: Contexts for the same channel are equal
46/// - **Wrapped in ArcMut**: Shared across async tasks, enables interior mutability
47///
48/// ## Naming Note
49///
50/// The "Wrapper" suffix indicates this is the concrete type wrapped by the
51/// `ConnectionHandlerContext` type alias. It's rarely used directly - prefer
52/// using the type alias in function signatures.
53#[derive(Hash, Eq, PartialEq)]
54pub struct ConnectionHandlerContextWrapper {
55    // === Core State ===
56    /// The channel associated with this request handler context.
57    ///
58    /// Provides access to:
59    /// - Underlying connection for I/O
60    /// - Address information (local/remote)
61    /// - Channel identity (ID)
62    pub(crate) channel: Channel,
63}
64
65impl ConnectionHandlerContextWrapper {
66    /// Creates a new handler context wrapping the given channel.
67    ///
68    /// # Arguments
69    ///
70    /// * `channel` - The channel associated with this handler invocation
71    ///
72    /// # Returns
73    ///
74    /// A new context ready for use by request processors
75    pub fn new(channel: Channel) -> Self {
76        Self { channel }
77    }
78
79    // === Connection Access ===
80
81    /// Gets an immutable reference to the underlying connection.
82    ///
83    /// # Returns
84    ///
85    /// Immutable reference to the `Connection` for inspection
86    ///
87    /// # Use Case
88    ///
89    /// Checking connection health, reading connection ID, etc.
90    pub fn connection_ref(&self) -> &Connection {
91        self.channel.connection_ref()
92    }
93
94    /// Gets a mutable reference to the underlying connection.
95    ///
96    /// # Returns
97    ///
98    /// Mutable reference to the `Connection` for advanced I/O
99    ///
100    /// # Use Case
101    ///
102    /// Direct send/receive operations bypassing channel abstractions
103    pub fn connection_mut(&mut self) -> &mut Connection {
104        self.channel.connection_mut()
105    }
106
107    // === Response Writing ===
108
109    /// Sends a response command back to the client (consumes command).
110    ///
111    /// This is the primary method for responding to requests. Errors are
112    /// logged but not propagated to allow handlers to continue processing.
113    ///
114    /// # Arguments
115    ///
116    /// * `cmd` - The response command to send (consumed)
117    ///
118    /// # Behavior
119    ///
120    /// - **Success**: Command encoded and sent
121    /// - **Error**: Logged at ERROR level, method returns normally
122    ///
123    /// # Example
124    ///
125    /// ```ignore
126    /// async fn handle_request(ctx: &mut ConnectionHandlerContext, request: RemotingCommand) {
127    ///     let response = RemotingCommand::create_response_command()
128    ///         .set_opaque(request.opaque());
129    ///     ctx.write(response).await;
130    /// }
131    /// ```
132    pub async fn write_response(&mut self, cmd: RemotingCommand) {
133        match self.channel.connection_mut().send_command(cmd).await {
134            Ok(_) => {}
135            Err(error) => {
136                error!("failed to send response: {}", error);
137            }
138        }
139    }
140
141    /// Sends a response command back to the client (borrows command).
142    ///
143    /// Similar to `write_response`, but borrows the command instead of consuming it.
144    /// Use when the caller needs to retain ownership of the command.
145    ///
146    /// # Arguments
147    ///
148    /// * `cmd` - Mutable reference to the response command to send
149    ///
150    /// # Behavior
151    ///
152    /// - **Success**: Command encoded and sent
153    /// - **Error**: Logged at ERROR level, method returns normally
154    ///
155    /// # Note
156    ///
157    /// The command's body may be consumed during sending (`take_body()`).
158    pub async fn write_response_ref(&mut self, cmd: &mut RemotingCommand) {
159        match self.channel.connection_mut().send_command_ref(cmd).await {
160            Ok(_) => {}
161            Err(error) => {
162                error!("failed to send response: {}", error);
163            }
164        }
165    }
166
167    /// Legacy alias for `write_response()` - kept for backward compatibility.
168    ///
169    /// # Deprecated
170    ///
171    /// Use `write_response()` for clearer semantics.
172    #[deprecated(since = "0.6.0", note = "Use `write_response()` instead")]
173    pub async fn write(&mut self, cmd: RemotingCommand) {
174        self.write_response(cmd).await;
175    }
176
177    /// Legacy alias for `write_response_ref()` - kept for backward compatibility.
178    ///
179    /// # Deprecated
180    ///
181    /// Use `write_response_ref()` for clearer semantics.
182    #[deprecated(since = "0.6.0", note = "Use `write_response_ref()` instead")]
183    pub async fn write_ref(&mut self, cmd: &mut RemotingCommand) {
184        self.write_response_ref(cmd).await;
185    }
186
187    // === Channel Access ===
188
189    /// Gets an immutable reference to the channel.
190    ///
191    /// # Returns
192    ///
193    /// Immutable reference to the `Channel`
194    ///
195    /// # Use Case
196    ///
197    /// Accessing channel metadata (ID, addresses, etc.)
198    pub fn channel(&self) -> &Channel {
199        &self.channel
200    }
201
202    /// Gets a mutable reference to the channel.
203    ///
204    /// # Returns
205    ///
206    /// Mutable reference to the `Channel`
207    ///
208    /// # Use Case
209    ///
210    /// Advanced channel operations (modify addresses, access inner state)
211    pub fn channel_mut(&mut self) -> &mut Channel {
212        &mut self.channel
213    }
214
215    // === Convenience Accessors ===
216
217    /// Gets the remote peer's socket address.
218    ///
219    /// # Returns
220    ///
221    /// Socket address of the remote peer
222    ///
223    /// # Use Case
224    ///
225    /// Logging, authorization checks, rate limiting by IP
226    pub fn remote_address(&self) -> SocketAddr {
227        self.channel.remote_address()
228    }
229}
230
231impl AsRef<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
232    fn as_ref(&self) -> &ConnectionHandlerContextWrapper {
233        self
234    }
235}
236
237impl AsMut<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
238    fn as_mut(&mut self) -> &mut ConnectionHandlerContextWrapper {
239        self
240    }
241}