rocketmq_remoting/remoting_server/rocketmq_tokio_server.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use rocketmq_common::common::server::config::ServerConfig;
24use rocketmq_rust::wait_for_signal;
25use rocketmq_rust::ArcMut;
26use tokio::net::TcpListener;
27use tokio::net::TcpStream;
28use tokio::sync::broadcast;
29use tokio::sync::mpsc;
30use tokio::sync::Semaphore;
31use tokio::time;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::base::channel_event_listener::ChannelEventListener;
37use crate::base::connection_net_event::ConnectionNetEvent;
38use crate::base::tokio_event::TokioEvent;
39use crate::connection::Connection;
40use crate::net::channel::Channel;
41use crate::net::channel::ChannelInner;
42use crate::remoting::inner::RemotingGeneralHandler;
43use crate::runtime::connection_handler_context::ConnectionHandlerContext;
44use crate::runtime::connection_handler_context::ConnectionHandlerContextWrapper;
45use crate::runtime::processor::RequestProcessor;
46use crate::runtime::RPCHook;
47
48/// Default limit the max number of connections.
49const DEFAULT_MAX_CONNECTIONS: usize = 1000;
50
51/// Default idle timeout in seconds (aligned with Java version: 120s)
52const DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS: u64 = 120;
53
54/// Per-connection handler managing the lifecycle of a single client connection.
55///
56/// # Performance Notes
57/// - Uses reference-counted handler to avoid cloning heavyweight objects
58/// - Shutdown signal via broadcast for efficient multi-connection coordination
59/// - Connection context wrapped in ArcMut for safe concurrent access
60///
61/// # Lifecycle
62/// 1. Created when TCP connection accepted
63/// 2. Spawned into dedicated Tokio task
64/// 3. Processes commands until shutdown or disconnection
65/// 4. Notifies listeners on drop
66pub struct ConnectionHandler<RP> {
67 /// Connection-specific context (channel, state, metrics)
68 ///
69 /// Wrapped in ArcMut to allow sharing with async tasks without excessive cloning
70 connection_handler_context: ConnectionHandlerContext,
71
72 /// Shutdown coordination signal
73 ///
74 /// Receives broadcast when server initiates graceful shutdown
75 shutdown: Shutdown,
76
77 /// Completion notification sender
78 ///
79 /// Dropped when handler completes, signaling to shutdown coordinator
80 _shutdown_complete: mpsc::Sender<()>,
81
82 /// Optional disconnect event broadcaster
83 ///
84 /// If Some, sends `SocketAddr` when connection closes (for routing table cleanup)
85 conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
86
87 /// Shared command processing handler
88 ///
89 /// Reference-counted to avoid cloning per-connection (contains processor + hooks)
90 cmd_handler: ArcMut<RemotingGeneralHandler<RP>>,
91
92 /// Event notification channel for ChannelEventListener
93 ///
94 /// Used to send IDLE and EXCEPTION events to the event dispatcher
95 event_tx: Option<mpsc::UnboundedSender<TokioEvent>>,
96
97 /// Idle timeout duration for this connection
98 ///
99 /// When no data received for this duration, connection is closed and IDLE event is triggered
100 idle_timeout: Duration,
101}
102
103impl<RP> Drop for ConnectionHandler<RP> {
104 fn drop(&mut self) {
105 if let Some(ref sender) = self.conn_disconnect_notify {
106 let socket_addr = self.connection_handler_context.remote_address();
107 warn!(
108 "connection[{}] disconnected, Send notify message.",
109 socket_addr
110 );
111 let _ = sender.send(socket_addr);
112 }
113 }
114}
115
116impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
117 /// Main event loop processing incoming commands until shutdown or disconnect.
118 ///
119 /// # Flow
120 /// 1. Wait for next command or shutdown signal (via `tokio::select!`)
121 /// 2. Decode and validate command
122 /// 3. Dispatch to business logic processor
123 /// 4. Repeat until connection closes or shutdown requested
124 ///
125 /// # Performance
126 /// - Zero-copy command reception where possible
127 /// - Early exit on shutdown reduces unnecessary work
128 /// - Connection state checked once per loop iteration
129 ///
130 /// # Error Handling
131 /// - Decode errors: logged, connection marked unhealthy
132 /// - Processor errors: logged, connection continues (per-request isolation)
133 /// - Connection closed: graceful return Ok(())
134 #[inline]
135 async fn handle(&mut self) -> rocketmq_error::RocketMQResult<()> {
136 // Get idle timeout configuration from handler
137 let idle_timeout = self.idle_timeout;
138 let remote_addr = self.connection_handler_context.remote_address();
139
140 // HOT PATH: Main server receive loop
141 while !self.shutdown.is_shutdown {
142 let channel = self.connection_handler_context.channel_mut();
143
144 let frame = tokio::select! {
145 // Branch 1: Receive next command from peer
146 res = channel.connection_mut().receive_command() => res,
147
148 // Branch 2: Shutdown signal received
149 _ = self.shutdown.recv() => {
150 // Mark connection as closed to prevent further sends
151 channel.connection_mut().close();
152 return Ok(());
153 }
154
155 // Branch 3: Idle timeout - no data received for configured duration
156 _ = tokio::time::sleep(idle_timeout) => {
157 warn!(
158 "Connection idle timeout ({}s), remote: {}",
159 idle_timeout.as_secs(),
160 remote_addr
161 );
162
163 // Clone channel before closing to avoid borrow conflicts
164 let channel_clone = channel.clone();
165
166 // Send IDLE event to listener
167 if let Some(ref event_tx) = self.event_tx {
168 let _ = event_tx.send(TokioEvent::new(
169 ConnectionNetEvent::IDLE,
170 remote_addr,
171 channel_clone,
172 ));
173 }
174
175 // Close connection due to idle timeout
176 channel.connection_mut().close();
177 return Ok(());
178 }
179 };
180
181 // Extract command or handle end-of-stream
182 let cmd = match frame {
183 Some(Ok(frame)) => frame,
184 Some(Err(e)) => {
185 // Decode error - log and close connection
186 error!("Failed to decode command: {:?}", e);
187
188 // Clone channel before closing to avoid borrow conflicts
189 let channel_clone = channel.clone();
190
191 // Send EXCEPTION event to listener
192 if let Some(ref event_tx) = self.event_tx {
193 let _ = event_tx.send(TokioEvent::new(
194 ConnectionNetEvent::EXCEPTION,
195 remote_addr,
196 channel_clone,
197 ));
198 }
199
200 channel.connection_mut().close();
201 return Err(e);
202 }
203 None => {
204 // Peer closed connection gracefully
205 return Ok(());
206 }
207 };
208
209 // Dispatch command to business logic
210 // Note: process_message_received handles errors internally
211 self.cmd_handler
212 .process_message_received(&mut self.connection_handler_context, cmd)
213 .await;
214 }
215 Ok(())
216 }
217}
218
219/// Server listener managing TCP connection acceptance and connection lifecycle.
220///
221/// # Architecture
222/// ```text
223/// TcpListener → ConnectionListener → ConnectionHandler (per-connection task)
224/// ↓
225/// Event Dispatcher
226/// ```
227///
228/// # Concurrency Control
229/// - **Connection Limit**: Semaphore-based backpressure (DEFAULT_MAX_CONNECTIONS)
230/// - **Graceful Shutdown**: Broadcast signal to all active handlers
231/// - **Event Notification**: Optional async event dispatcher for connection lifecycle
232///
233/// # Performance Characteristics
234/// - O(1) accept loop with backpressure
235/// - Parallel connection handling via Tokio spawn
236/// - Shared handler state (Arc) to avoid per-connection clones
237struct ConnectionListener<RP> {
238 /// TCP socket acceptor bound to server address
239 listener: TcpListener,
240
241 /// Semaphore controlling max concurrent connections
242 ///
243 /// Permits acquired before accept, released on handler drop.
244 /// Provides backpressure when server reaches capacity.
245 limit_connections: Arc<Semaphore>,
246
247 /// Shutdown broadcast sender
248 ///
249 /// All connection handlers subscribe to this channel.
250 /// Sending signal triggers graceful termination across all connections.
251 notify_shutdown: broadcast::Sender<()>,
252
253 /// Completion coordination channel
254 ///
255 /// Each handler holds a clone of this sender.
256 /// When all handlers drop (server fully shutdown), receiver unblocks.
257 shutdown_complete_tx: mpsc::Sender<()>,
258
259 /// Optional connection disconnect broadcaster
260 ///
261 /// Used for routing table cleanup and metrics.
262 conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
263
264 /// Optional lifecycle event listener
265 ///
266 /// Receives CONNECTED/DISCONNECTED/EXCEPTION events.
267 /// Useful for external monitoring and orchestration.
268 channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
269
270 /// Shared command processing handler
271 ///
272 /// Contains request processor, RPC hooks, and response routing table.
273 /// Arc-wrapped to share across all connection handlers efficiently.
274 cmd_handler: ArcMut<RemotingGeneralHandler<RP>>,
275}
276
277impl<RP: RequestProcessor + Sync + 'static + Clone> ConnectionListener<RP> {
278 /// Main server event loop accepting and spawning connection handlers.
279 ///
280 /// # Architecture
281 /// ```text
282 /// ┌─────────────┐
283 /// │TcpListener │ ← accept()
284 /// └──────┬──────┘
285 /// │ spawn for each connection
286 /// ↓
287 /// ┌──────────────────┐ ┌─────────────────┐
288 /// │ConnectionHandler │ ───► │Event Dispatcher │ ← optional
289 /// └──────────────────┘ └─────────────────┘
290 /// ```
291 ///
292 /// # Performance Optimizations
293 /// 1. **Permit acquisition before accept**: Backpressure at OS level
294 /// 2. **TCP_NODELAY**: Disable Nagle's algorithm for low latency
295 /// 3. **Event channel buffering**: Prevent blocking on event dispatch
296 /// 4. **Arc reuse**: cmd_handler cloned once per connection, not per message
297 ///
298 /// # Concurrency
299 /// - Accept loop: Single-threaded (TcpListener)
300 /// - Handler tasks: Multi-threaded (Tokio runtime)
301 /// - Event dispatcher: Independent task (non-blocking)
302 async fn run(&mut self) -> anyhow::Result<()> {
303 info!("Server ready to accept connections");
304
305 // Event notification channel (unbounded to prevent accept() blocking)
306 let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<TokioEvent>();
307
308 // Spawn event dispatcher task if listener configured
309 if let Some(listener) = self.channel_event_listener.take() {
310 tokio::spawn(async move {
311 while let Some(event) = event_rx.recv().await {
312 let addr = event.remote_addr();
313 let addr_str = addr.to_string();
314
315 // HOT PATH: Match on event type and dispatch to listener
316 match event.type_() {
317 ConnectionNetEvent::CONNECTED(_) => {
318 listener.on_channel_connect(&addr_str, event.channel());
319 }
320 ConnectionNetEvent::DISCONNECTED => {
321 listener.on_channel_close(&addr_str, event.channel());
322 }
323 ConnectionNetEvent::EXCEPTION => {
324 listener.on_channel_exception(&addr_str, event.channel());
325 }
326 ConnectionNetEvent::IDLE => {
327 listener.on_channel_idle(&addr_str, event.channel());
328 }
329 }
330 }
331 info!("Event dispatcher task terminated");
332 });
333 }
334
335 // Main accept loop
336 loop {
337 // OPTIMIZATION: Acquire permit BEFORE accept() to provide backpressure
338 // If at capacity, accept() won't be called until a slot frees up
339 let permit = self
340 .limit_connections
341 .clone()
342 .acquire_owned()
343 .await
344 .expect("Semaphore closed unexpectedly");
345
346 // Accept next connection (with exponential backoff on errors)
347 let (socket, remote_addr) = self.accept().await?;
348
349 // OPTIMIZATION: Enable TCP_NODELAY for low-latency RPC
350 // Disables Nagle's algorithm to send small packets immediately
351 if let Err(e) = socket.set_nodelay(true) {
352 warn!("Failed to set TCP_NODELAY for {}: {}", remote_addr, e);
353 }
354
355 let local_addr = socket.local_addr()?;
356 info!("Accepted connection: {} → {}", remote_addr, local_addr);
357
358 // Create connection channel wrapper
359 let channel_inner = ArcMut::new(ChannelInner::new(
360 Connection::new(socket),
361 self.cmd_handler.response_table.clone(),
362 ));
363 let channel = Channel::new(channel_inner, local_addr, remote_addr);
364
365 // Notify CONNECTED event
366 let _ = event_tx.send(TokioEvent::new(
367 ConnectionNetEvent::CONNECTED(remote_addr),
368 remote_addr,
369 channel.clone(),
370 ));
371
372 // Build connection handler
373 let idle_timeout = Duration::from_secs(DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS);
374 let handler = ConnectionHandler {
375 connection_handler_context: ArcMut::new(ConnectionHandlerContextWrapper {
376 channel: channel.clone(),
377 }),
378 shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
379 _shutdown_complete: self.shutdown_complete_tx.clone(),
380 conn_disconnect_notify: self.conn_disconnect_notify.clone(),
381 cmd_handler: self.cmd_handler.clone(),
382 event_tx: Some(event_tx.clone()),
383 idle_timeout,
384 };
385
386 // Spawn dedicated task for this connection
387 let event_tx_clone = event_tx.clone();
388 tokio::spawn(async move {
389 let mut handler = handler;
390
391 // Run handler until completion
392 if let Err(err) = handler.handle().await {
393 error!(
394 remote_addr = %remote_addr,
395 error = ?err,
396 "Connection handler terminated with error"
397 );
398 }
399
400 // Notify DISCONNECTED event
401 let _ = event_tx_clone.send(TokioEvent::new(
402 ConnectionNetEvent::DISCONNECTED,
403 remote_addr,
404 handler.connection_handler_context.channel.clone(),
405 ));
406
407 info!("Client {} disconnected", remote_addr);
408
409 // IMPORTANT: Permit released when `permit` drops here
410 drop(permit);
411 });
412 }
413 }
414
415 /// Accept new TCP connection with exponential backoff on transient errors.
416 ///
417 /// # Error Handling Strategy
418 /// - **Fatal errors** (e.g., listener closed): Return immediately
419 /// - **Transient errors** (e.g., too many open files): Retry with backoff
420 /// - **Max retries**: Give up after backoff reaches 64 seconds
421 ///
422 /// # Backoff Schedule
423 /// ```text
424 /// Attempt | Delay
425 /// --------|-------
426 /// 1 | 1s
427 /// 2 | 2s
428 /// 3 | 4s
429 /// 4 | 8s
430 /// 5 | 16s
431 /// 6 | 32s
432 /// 7 | 64s (final)
433 /// ```
434 ///
435 /// # Performance
436 /// - Fast path: Single syscall when no errors
437 /// - Slow path: Exponential backoff prevents thundering herd
438 async fn accept(&mut self) -> anyhow::Result<(TcpStream, SocketAddr)> {
439 let mut backoff = 1;
440 const MAX_BACKOFF: u64 = 64;
441
442 loop {
443 match self.listener.accept().await {
444 Ok((socket, remote_addr)) => {
445 // Fast path: successful accept
446 return Ok((socket, remote_addr));
447 }
448 Err(err) => {
449 if backoff > MAX_BACKOFF {
450 // Exceeded retry limit - fatal error
451 error!(
452 "Accept failed after {} retries, last error: {}",
453 MAX_BACKOFF, err
454 );
455 return Err(err.into());
456 }
457
458 // Log transient error and retry
459 warn!("Accept error (will retry in {}s): {}", backoff, err);
460 }
461 }
462
463 // Exponential backoff before retry
464 time::sleep(Duration::from_secs(backoff)).await;
465 backoff *= 2;
466 }
467 }
468}
469
470pub struct RocketMQServer<RP> {
471 config: Arc<ServerConfig>,
472 rpc_hooks: Option<Vec<Arc<dyn RPCHook>>>,
473 _phantom_data: std::marker::PhantomData<RP>,
474}
475
476impl<RP> RocketMQServer<RP> {
477 pub fn new(config: Arc<ServerConfig>) -> Self {
478 Self {
479 config,
480 rpc_hooks: Some(vec![]),
481 _phantom_data: std::marker::PhantomData,
482 }
483 }
484
485 pub fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
486 if let Some(ref mut hooks) = self.rpc_hooks {
487 hooks.push(hook);
488 } else {
489 self.rpc_hooks = Some(vec![hook]);
490 }
491 }
492}
493
494impl<RP: RequestProcessor + Sync + 'static + Clone> RocketMQServer<RP> {
495 pub async fn run(
496 &mut self,
497 request_processor: RP,
498 channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
499 ) {
500 let addr = format!("{}:{}", self.config.bind_address, self.config.listen_port);
501 let listener = TcpListener::bind(&addr).await.unwrap();
502 let rpc_hooks = self.rpc_hooks.take().unwrap_or_default();
503 info!("Starting remoting_server at: {}", addr);
504 let (notify_conn_disconnect, _) = broadcast::channel::<SocketAddr>(100);
505 run(
506 listener,
507 wait_for_signal(),
508 request_processor,
509 Some(notify_conn_disconnect),
510 rpc_hooks,
511 channel_event_listener,
512 )
513 .await;
514 }
515}
516
517pub async fn run<RP: RequestProcessor + Sync + 'static + Clone>(
518 listener: TcpListener,
519 shutdown: impl Future,
520 request_processor: RP,
521 conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
522 rpc_hooks: Vec<Arc<dyn RPCHook>>,
523 channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
524) {
525 let (notify_shutdown, _) = broadcast::channel(1);
526 let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
527 // Initialize the connection listener state
528 let handler = RemotingGeneralHandler {
529 request_processor,
530 //shutdown: Shutdown::new(notify_shutdown.subscribe()),
531 rpc_hooks,
532 response_table: ArcMut::new(HashMap::with_capacity(512)),
533 };
534 let mut listener = ConnectionListener {
535 listener,
536 notify_shutdown,
537 shutdown_complete_tx,
538 conn_disconnect_notify,
539 limit_connections: Arc::new(Semaphore::new(DEFAULT_MAX_CONNECTIONS)),
540 channel_event_listener,
541 cmd_handler: ArcMut::new(handler),
542 };
543
544 tokio::select! {
545 res = listener.run() => {
546 // If an error is received here, accepting connections from the TCP
547 // listener failed multiple times and the remoting_server is giving up and
548 // shutting down.
549 //
550 // Errors encountered when handling individual connections do not
551 // bubble up to this point.
552 if let Err(err) = res {
553 error!(cause = %err, "failed to accept");
554 }
555 }
556 _ = shutdown => {
557 info!("Shutdown now.....");
558 }
559 }
560
561 let ConnectionListener {
562 shutdown_complete_tx,
563 notify_shutdown,
564 ..
565 } = listener;
566 drop(notify_shutdown);
567 drop(shutdown_complete_tx);
568
569 let _ = shutdown_complete_rx.recv().await;
570}
571
572#[derive(Debug)]
573pub(crate) struct Shutdown {
574 /// `true` if the shutdown signal has been received
575 is_shutdown: bool,
576
577 /// The receive half of the channel used to listen for shutdown.
578 notify: broadcast::Receiver<()>,
579}
580
581impl Shutdown {
582 /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
583 pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
584 Shutdown {
585 is_shutdown: false,
586 notify,
587 }
588 }
589
590 /// Returns `true` if the shutdown signal has been received.
591 pub(crate) fn is_shutdown(&self) -> bool {
592 self.is_shutdown
593 }
594
595 /// Receive the shutdown notice, waiting if necessary.
596 pub(crate) async fn recv(&mut self) {
597 // If the shutdown signal has already been received, then return
598 // immediately.
599 if self.is_shutdown {
600 return;
601 }
602
603 // Cannot receive a "lag error" as only one value is ever sent.
604 let _ = self.notify.recv().await;
605
606 // Remember that the signal has been received.
607 self.is_shutdown = true;
608 }
609}