rocketmq_remoting/runtime/connection_handler_context.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::net::SocketAddr;
18
19use rocketmq_rust::ArcMut;
20use tracing::error;
21
22use crate::connection::Connection;
23use crate::net::channel::Channel;
24use crate::protocol::remoting_command::RemotingCommand;
25
26/// Shared mutable context for request handlers.
27///
28/// This type alias wraps `ConnectionHandlerContextWrapper` in an `ArcMut` to allow
29/// efficient sharing and mutation across async tasks. Handlers receive this context
30/// and use it to access the channel and send responses.
31pub type ConnectionHandlerContext = ArcMut<ConnectionHandlerContextWrapper>;
32
33/// Request handler context - provides access to the channel for a specific connection.
34///
35/// `ConnectionHandlerContextWrapper` is the execution context passed to request processors.
36/// It encapsulates the channel associated with the incoming request, allowing handlers to:
37///
38/// - **Send responses**: Via `write()` or `write_ref()`
39/// - **Access connection metadata**: Remote address, connection state
40/// - **Perform advanced operations**: Direct connection access if needed
41///
42/// ## Design Rationale
43///
44/// - **Thin wrapper**: Delegates most operations to the underlying `Channel`
45/// - **Hash/Eq based on channel**: Contexts for the same channel are equal
46/// - **Wrapped in ArcMut**: Shared across async tasks, enables interior mutability
47///
48/// ## Naming Note
49///
50/// The "Wrapper" suffix indicates this is the concrete type wrapped by the
51/// `ConnectionHandlerContext` type alias. It's rarely used directly - prefer
52/// using the type alias in function signatures.
53#[derive(Hash, Eq, PartialEq)]
54pub struct ConnectionHandlerContextWrapper {
55 // === Core State ===
56 /// The channel associated with this request handler context.
57 ///
58 /// Provides access to:
59 /// - Underlying connection for I/O
60 /// - Address information (local/remote)
61 /// - Channel identity (ID)
62 pub(crate) channel: Channel,
63}
64
65impl ConnectionHandlerContextWrapper {
66 /// Creates a new handler context wrapping the given channel.
67 ///
68 /// # Arguments
69 ///
70 /// * `channel` - The channel associated with this handler invocation
71 ///
72 /// # Returns
73 ///
74 /// A new context ready for use by request processors
75 pub fn new(channel: Channel) -> Self {
76 Self { channel }
77 }
78
79 // === Connection Access ===
80
81 /// Gets an immutable reference to the underlying connection.
82 ///
83 /// # Returns
84 ///
85 /// Immutable reference to the `Connection` for inspection
86 ///
87 /// # Use Case
88 ///
89 /// Checking connection health, reading connection ID, etc.
90 pub fn connection_ref(&self) -> &Connection {
91 self.channel.connection_ref()
92 }
93
94 /// Gets a mutable reference to the underlying connection.
95 ///
96 /// # Returns
97 ///
98 /// Mutable reference to the `Connection` for advanced I/O
99 ///
100 /// # Use Case
101 ///
102 /// Direct send/receive operations bypassing channel abstractions
103 pub fn connection_mut(&mut self) -> &mut Connection {
104 self.channel.connection_mut()
105 }
106
107 // === Response Writing ===
108
109 /// Sends a response command back to the client (consumes command).
110 ///
111 /// This is the primary method for responding to requests. Errors are
112 /// logged but not propagated to allow handlers to continue processing.
113 ///
114 /// # Arguments
115 ///
116 /// * `cmd` - The response command to send (consumed)
117 ///
118 /// # Behavior
119 ///
120 /// - **Success**: Command encoded and sent
121 /// - **Error**: Logged at ERROR level, method returns normally
122 ///
123 /// # Example
124 ///
125 /// ```ignore
126 /// async fn handle_request(ctx: &mut ConnectionHandlerContext, request: RemotingCommand) {
127 /// let response = RemotingCommand::create_response_command()
128 /// .set_opaque(request.opaque());
129 /// ctx.write(response).await;
130 /// }
131 /// ```
132 pub async fn write_response(&mut self, cmd: RemotingCommand) {
133 match self.channel.connection_mut().send_command(cmd).await {
134 Ok(_) => {}
135 Err(error) => {
136 error!("failed to send response: {}", error);
137 }
138 }
139 }
140
141 /// Sends a response command back to the client (borrows command).
142 ///
143 /// Similar to `write_response`, but borrows the command instead of consuming it.
144 /// Use when the caller needs to retain ownership of the command.
145 ///
146 /// # Arguments
147 ///
148 /// * `cmd` - Mutable reference to the response command to send
149 ///
150 /// # Behavior
151 ///
152 /// - **Success**: Command encoded and sent
153 /// - **Error**: Logged at ERROR level, method returns normally
154 ///
155 /// # Note
156 ///
157 /// The command's body may be consumed during sending (`take_body()`).
158 pub async fn write_response_ref(&mut self, cmd: &mut RemotingCommand) {
159 match self.channel.connection_mut().send_command_ref(cmd).await {
160 Ok(_) => {}
161 Err(error) => {
162 error!("failed to send response: {}", error);
163 }
164 }
165 }
166
167 /// Legacy alias for `write_response()` - kept for backward compatibility.
168 ///
169 /// # Deprecated
170 ///
171 /// Use `write_response()` for clearer semantics.
172 #[deprecated(since = "0.6.0", note = "Use `write_response()` instead")]
173 pub async fn write(&mut self, cmd: RemotingCommand) {
174 self.write_response(cmd).await;
175 }
176
177 /// Legacy alias for `write_response_ref()` - kept for backward compatibility.
178 ///
179 /// # Deprecated
180 ///
181 /// Use `write_response_ref()` for clearer semantics.
182 #[deprecated(since = "0.6.0", note = "Use `write_response_ref()` instead")]
183 pub async fn write_ref(&mut self, cmd: &mut RemotingCommand) {
184 self.write_response_ref(cmd).await;
185 }
186
187 // === Channel Access ===
188
189 /// Gets an immutable reference to the channel.
190 ///
191 /// # Returns
192 ///
193 /// Immutable reference to the `Channel`
194 ///
195 /// # Use Case
196 ///
197 /// Accessing channel metadata (ID, addresses, etc.)
198 pub fn channel(&self) -> &Channel {
199 &self.channel
200 }
201
202 /// Gets a mutable reference to the channel.
203 ///
204 /// # Returns
205 ///
206 /// Mutable reference to the `Channel`
207 ///
208 /// # Use Case
209 ///
210 /// Advanced channel operations (modify addresses, access inner state)
211 pub fn channel_mut(&mut self) -> &mut Channel {
212 &mut self.channel
213 }
214
215 // === Convenience Accessors ===
216
217 /// Gets the remote peer's socket address.
218 ///
219 /// # Returns
220 ///
221 /// Socket address of the remote peer
222 ///
223 /// # Use Case
224 ///
225 /// Logging, authorization checks, rate limiting by IP
226 pub fn remote_address(&self) -> SocketAddr {
227 self.channel.remote_address()
228 }
229}
230
231impl AsRef<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
232 fn as_ref(&self) -> &ConnectionHandlerContextWrapper {
233 self
234 }
235}
236
237impl AsMut<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
238 fn as_mut(&mut self) -> &mut ConnectionHandlerContextWrapper {
239 self
240 }
241}