rocketmq_remoting/remoting_server/
rocketmq_tokio_server.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::future::Future;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use rocketmq_common::common::server::config::ServerConfig;
24use rocketmq_rust::wait_for_signal;
25use rocketmq_rust::ArcMut;
26use tokio::net::TcpListener;
27use tokio::net::TcpStream;
28use tokio::sync::broadcast;
29use tokio::sync::mpsc;
30use tokio::sync::Semaphore;
31use tokio::time;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::base::channel_event_listener::ChannelEventListener;
37use crate::base::connection_net_event::ConnectionNetEvent;
38use crate::base::tokio_event::TokioEvent;
39use crate::connection::Connection;
40use crate::net::channel::Channel;
41use crate::net::channel::ChannelInner;
42use crate::remoting::inner::RemotingGeneralHandler;
43use crate::runtime::connection_handler_context::ConnectionHandlerContext;
44use crate::runtime::connection_handler_context::ConnectionHandlerContextWrapper;
45use crate::runtime::processor::RequestProcessor;
46use crate::runtime::RPCHook;
47
48/// Default limit the max number of connections.
49const DEFAULT_MAX_CONNECTIONS: usize = 1000;
50
51/// Default idle timeout in seconds (aligned with Java version: 120s)
52const DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS: u64 = 120;
53
54/// Per-connection handler managing the lifecycle of a single client connection.
55///
56/// # Performance Notes
57/// - Uses reference-counted handler to avoid cloning heavyweight objects
58/// - Shutdown signal via broadcast for efficient multi-connection coordination
59/// - Connection context wrapped in ArcMut for safe concurrent access
60///
61/// # Lifecycle
62/// 1. Created when TCP connection accepted
63/// 2. Spawned into dedicated Tokio task
64/// 3. Processes commands until shutdown or disconnection
65/// 4. Notifies listeners on drop
66pub struct ConnectionHandler<RP> {
67    /// Connection-specific context (channel, state, metrics)
68    ///
69    /// Wrapped in ArcMut to allow sharing with async tasks without excessive cloning
70    connection_handler_context: ConnectionHandlerContext,
71
72    /// Shutdown coordination signal
73    ///
74    /// Receives broadcast when server initiates graceful shutdown
75    shutdown: Shutdown,
76
77    /// Completion notification sender
78    ///
79    /// Dropped when handler completes, signaling to shutdown coordinator
80    _shutdown_complete: mpsc::Sender<()>,
81
82    /// Optional disconnect event broadcaster
83    ///
84    /// If Some, sends `SocketAddr` when connection closes (for routing table cleanup)
85    conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
86
87    /// Shared command processing handler
88    ///
89    /// Reference-counted to avoid cloning per-connection (contains processor + hooks)
90    cmd_handler: ArcMut<RemotingGeneralHandler<RP>>,
91
92    /// Event notification channel for ChannelEventListener
93    ///
94    /// Used to send IDLE and EXCEPTION events to the event dispatcher
95    event_tx: Option<mpsc::UnboundedSender<TokioEvent>>,
96
97    /// Idle timeout duration for this connection
98    ///
99    /// When no data received for this duration, connection is closed and IDLE event is triggered
100    idle_timeout: Duration,
101}
102
103impl<RP> Drop for ConnectionHandler<RP> {
104    fn drop(&mut self) {
105        if let Some(ref sender) = self.conn_disconnect_notify {
106            let socket_addr = self.connection_handler_context.remote_address();
107            warn!(
108                "connection[{}] disconnected, Send notify message.",
109                socket_addr
110            );
111            let _ = sender.send(socket_addr);
112        }
113    }
114}
115
116impl<RP: RequestProcessor + Sync + 'static> ConnectionHandler<RP> {
117    /// Main event loop processing incoming commands until shutdown or disconnect.
118    ///
119    /// # Flow
120    /// 1. Wait for next command or shutdown signal (via `tokio::select!`)
121    /// 2. Decode and validate command
122    /// 3. Dispatch to business logic processor
123    /// 4. Repeat until connection closes or shutdown requested
124    ///
125    /// # Performance
126    /// - Zero-copy command reception where possible
127    /// - Early exit on shutdown reduces unnecessary work
128    /// - Connection state checked once per loop iteration
129    ///
130    /// # Error Handling
131    /// - Decode errors: logged, connection marked unhealthy
132    /// - Processor errors: logged, connection continues (per-request isolation)
133    /// - Connection closed: graceful return Ok(())
134    #[inline]
135    async fn handle(&mut self) -> rocketmq_error::RocketMQResult<()> {
136        // Get idle timeout configuration from handler
137        let idle_timeout = self.idle_timeout;
138        let remote_addr = self.connection_handler_context.remote_address();
139
140        // HOT PATH: Main server receive loop
141        while !self.shutdown.is_shutdown {
142            let channel = self.connection_handler_context.channel_mut();
143
144            let frame = tokio::select! {
145                // Branch 1: Receive next command from peer
146                res = channel.connection_mut().receive_command() => res,
147
148                // Branch 2: Shutdown signal received
149                _ = self.shutdown.recv() => {
150                    // Mark connection as closed to prevent further sends
151                    channel.connection_mut().close();
152                    return Ok(());
153                }
154
155                // Branch 3: Idle timeout - no data received for configured duration
156                _ = tokio::time::sleep(idle_timeout) => {
157                    warn!(
158                        "Connection idle timeout ({}s), remote: {}",
159                        idle_timeout.as_secs(),
160                        remote_addr
161                    );
162
163                    // Clone channel before closing to avoid borrow conflicts
164                    let channel_clone = channel.clone();
165
166                    // Send IDLE event to listener
167                    if let Some(ref event_tx) = self.event_tx {
168                        let _ = event_tx.send(TokioEvent::new(
169                            ConnectionNetEvent::IDLE,
170                            remote_addr,
171                            channel_clone,
172                        ));
173                    }
174
175                    // Close connection due to idle timeout
176                    channel.connection_mut().close();
177                    return Ok(());
178                }
179            };
180
181            // Extract command or handle end-of-stream
182            let cmd = match frame {
183                Some(Ok(frame)) => frame,
184                Some(Err(e)) => {
185                    // Decode error - log and close connection
186                    error!("Failed to decode command: {:?}", e);
187
188                    // Clone channel before closing to avoid borrow conflicts
189                    let channel_clone = channel.clone();
190
191                    // Send EXCEPTION event to listener
192                    if let Some(ref event_tx) = self.event_tx {
193                        let _ = event_tx.send(TokioEvent::new(
194                            ConnectionNetEvent::EXCEPTION,
195                            remote_addr,
196                            channel_clone,
197                        ));
198                    }
199
200                    channel.connection_mut().close();
201                    return Err(e);
202                }
203                None => {
204                    // Peer closed connection gracefully
205                    return Ok(());
206                }
207            };
208
209            // Dispatch command to business logic
210            // Note: process_message_received handles errors internally
211            self.cmd_handler
212                .process_message_received(&mut self.connection_handler_context, cmd)
213                .await;
214        }
215        Ok(())
216    }
217}
218
219/// Server listener managing TCP connection acceptance and connection lifecycle.
220///
221/// # Architecture
222/// ```text
223/// TcpListener → ConnectionListener → ConnectionHandler (per-connection task)
224///                      ↓
225///               Event Dispatcher
226/// ```
227///
228/// # Concurrency Control
229/// - **Connection Limit**: Semaphore-based backpressure (DEFAULT_MAX_CONNECTIONS)
230/// - **Graceful Shutdown**: Broadcast signal to all active handlers
231/// - **Event Notification**: Optional async event dispatcher for connection lifecycle
232///
233/// # Performance Characteristics
234/// - O(1) accept loop with backpressure
235/// - Parallel connection handling via Tokio spawn
236/// - Shared handler state (Arc) to avoid per-connection clones
237struct ConnectionListener<RP> {
238    /// TCP socket acceptor bound to server address
239    listener: TcpListener,
240
241    /// Semaphore controlling max concurrent connections
242    ///
243    /// Permits acquired before accept, released on handler drop.
244    /// Provides backpressure when server reaches capacity.
245    limit_connections: Arc<Semaphore>,
246
247    /// Shutdown broadcast sender
248    ///
249    /// All connection handlers subscribe to this channel.
250    /// Sending signal triggers graceful termination across all connections.
251    notify_shutdown: broadcast::Sender<()>,
252
253    /// Completion coordination channel
254    ///
255    /// Each handler holds a clone of this sender.
256    /// When all handlers drop (server fully shutdown), receiver unblocks.
257    shutdown_complete_tx: mpsc::Sender<()>,
258
259    /// Optional connection disconnect broadcaster
260    ///
261    /// Used for routing table cleanup and metrics.
262    conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
263
264    /// Optional lifecycle event listener
265    ///
266    /// Receives CONNECTED/DISCONNECTED/EXCEPTION events.
267    /// Useful for external monitoring and orchestration.
268    channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
269
270    /// Shared command processing handler
271    ///
272    /// Contains request processor, RPC hooks, and response routing table.
273    /// Arc-wrapped to share across all connection handlers efficiently.
274    cmd_handler: ArcMut<RemotingGeneralHandler<RP>>,
275}
276
277impl<RP: RequestProcessor + Sync + 'static + Clone> ConnectionListener<RP> {
278    /// Main server event loop accepting and spawning connection handlers.
279    ///
280    /// # Architecture
281    /// ```text
282    /// ┌─────────────┐
283    /// │TcpListener  │ ← accept()
284    /// └──────┬──────┘
285    ///        │ spawn for each connection
286    ///        ↓
287    /// ┌──────────────────┐      ┌─────────────────┐
288    /// │ConnectionHandler │ ───► │Event Dispatcher │ ← optional
289    /// └──────────────────┘      └─────────────────┘
290    /// ```
291    ///
292    /// # Performance Optimizations
293    /// 1. **Permit acquisition before accept**: Backpressure at OS level
294    /// 2. **TCP_NODELAY**: Disable Nagle's algorithm for low latency
295    /// 3. **Event channel buffering**: Prevent blocking on event dispatch
296    /// 4. **Arc reuse**: cmd_handler cloned once per connection, not per message
297    ///
298    /// # Concurrency
299    /// - Accept loop: Single-threaded (TcpListener)
300    /// - Handler tasks: Multi-threaded (Tokio runtime)
301    /// - Event dispatcher: Independent task (non-blocking)
302    async fn run(&mut self) -> anyhow::Result<()> {
303        info!("Server ready to accept connections");
304
305        // Event notification channel (unbounded to prevent accept() blocking)
306        let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<TokioEvent>();
307
308        // Spawn event dispatcher task if listener configured
309        if let Some(listener) = self.channel_event_listener.take() {
310            tokio::spawn(async move {
311                while let Some(event) = event_rx.recv().await {
312                    let addr = event.remote_addr();
313                    let addr_str = addr.to_string();
314
315                    // HOT PATH: Match on event type and dispatch to listener
316                    match event.type_() {
317                        ConnectionNetEvent::CONNECTED(_) => {
318                            listener.on_channel_connect(&addr_str, event.channel());
319                        }
320                        ConnectionNetEvent::DISCONNECTED => {
321                            listener.on_channel_close(&addr_str, event.channel());
322                        }
323                        ConnectionNetEvent::EXCEPTION => {
324                            listener.on_channel_exception(&addr_str, event.channel());
325                        }
326                        ConnectionNetEvent::IDLE => {
327                            listener.on_channel_idle(&addr_str, event.channel());
328                        }
329                    }
330                }
331                info!("Event dispatcher task terminated");
332            });
333        }
334
335        // Main accept loop
336        loop {
337            // OPTIMIZATION: Acquire permit BEFORE accept() to provide backpressure
338            // If at capacity, accept() won't be called until a slot frees up
339            let permit = self
340                .limit_connections
341                .clone()
342                .acquire_owned()
343                .await
344                .expect("Semaphore closed unexpectedly");
345
346            // Accept next connection (with exponential backoff on errors)
347            let (socket, remote_addr) = self.accept().await?;
348
349            // OPTIMIZATION: Enable TCP_NODELAY for low-latency RPC
350            // Disables Nagle's algorithm to send small packets immediately
351            if let Err(e) = socket.set_nodelay(true) {
352                warn!("Failed to set TCP_NODELAY for {}: {}", remote_addr, e);
353            }
354
355            let local_addr = socket.local_addr()?;
356            info!("Accepted connection: {} → {}", remote_addr, local_addr);
357
358            // Create connection channel wrapper
359            let channel_inner = ArcMut::new(ChannelInner::new(
360                Connection::new(socket),
361                self.cmd_handler.response_table.clone(),
362            ));
363            let channel = Channel::new(channel_inner, local_addr, remote_addr);
364
365            // Notify CONNECTED event
366            let _ = event_tx.send(TokioEvent::new(
367                ConnectionNetEvent::CONNECTED(remote_addr),
368                remote_addr,
369                channel.clone(),
370            ));
371
372            // Build connection handler
373            let idle_timeout = Duration::from_secs(DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS);
374            let handler = ConnectionHandler {
375                connection_handler_context: ArcMut::new(ConnectionHandlerContextWrapper {
376                    channel: channel.clone(),
377                }),
378                shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
379                _shutdown_complete: self.shutdown_complete_tx.clone(),
380                conn_disconnect_notify: self.conn_disconnect_notify.clone(),
381                cmd_handler: self.cmd_handler.clone(),
382                event_tx: Some(event_tx.clone()),
383                idle_timeout,
384            };
385
386            // Spawn dedicated task for this connection
387            let event_tx_clone = event_tx.clone();
388            tokio::spawn(async move {
389                let mut handler = handler;
390
391                // Run handler until completion
392                if let Err(err) = handler.handle().await {
393                    error!(
394                        remote_addr = %remote_addr,
395                        error = ?err,
396                        "Connection handler terminated with error"
397                    );
398                }
399
400                // Notify DISCONNECTED event
401                let _ = event_tx_clone.send(TokioEvent::new(
402                    ConnectionNetEvent::DISCONNECTED,
403                    remote_addr,
404                    handler.connection_handler_context.channel.clone(),
405                ));
406
407                info!("Client {} disconnected", remote_addr);
408
409                // IMPORTANT: Permit released when `permit` drops here
410                drop(permit);
411            });
412        }
413    }
414
415    /// Accept new TCP connection with exponential backoff on transient errors.
416    ///
417    /// # Error Handling Strategy
418    /// - **Fatal errors** (e.g., listener closed): Return immediately
419    /// - **Transient errors** (e.g., too many open files): Retry with backoff
420    /// - **Max retries**: Give up after backoff reaches 64 seconds
421    ///
422    /// # Backoff Schedule
423    /// ```text
424    /// Attempt | Delay
425    /// --------|-------
426    /// 1       | 1s
427    /// 2       | 2s
428    /// 3       | 4s
429    /// 4       | 8s
430    /// 5       | 16s
431    /// 6       | 32s
432    /// 7       | 64s (final)
433    /// ```
434    ///
435    /// # Performance
436    /// - Fast path: Single syscall when no errors
437    /// - Slow path: Exponential backoff prevents thundering herd
438    async fn accept(&mut self) -> anyhow::Result<(TcpStream, SocketAddr)> {
439        let mut backoff = 1;
440        const MAX_BACKOFF: u64 = 64;
441
442        loop {
443            match self.listener.accept().await {
444                Ok((socket, remote_addr)) => {
445                    // Fast path: successful accept
446                    return Ok((socket, remote_addr));
447                }
448                Err(err) => {
449                    if backoff > MAX_BACKOFF {
450                        // Exceeded retry limit - fatal error
451                        error!(
452                            "Accept failed after {} retries, last error: {}",
453                            MAX_BACKOFF, err
454                        );
455                        return Err(err.into());
456                    }
457
458                    // Log transient error and retry
459                    warn!("Accept error (will retry in {}s): {}", backoff, err);
460                }
461            }
462
463            // Exponential backoff before retry
464            time::sleep(Duration::from_secs(backoff)).await;
465            backoff *= 2;
466        }
467    }
468}
469
470pub struct RocketMQServer<RP> {
471    config: Arc<ServerConfig>,
472    rpc_hooks: Option<Vec<Arc<dyn RPCHook>>>,
473    _phantom_data: std::marker::PhantomData<RP>,
474}
475
476impl<RP> RocketMQServer<RP> {
477    pub fn new(config: Arc<ServerConfig>) -> Self {
478        Self {
479            config,
480            rpc_hooks: Some(vec![]),
481            _phantom_data: std::marker::PhantomData,
482        }
483    }
484
485    pub fn register_rpc_hook(&mut self, hook: Arc<dyn RPCHook>) {
486        if let Some(ref mut hooks) = self.rpc_hooks {
487            hooks.push(hook);
488        } else {
489            self.rpc_hooks = Some(vec![hook]);
490        }
491    }
492}
493
494impl<RP: RequestProcessor + Sync + 'static + Clone> RocketMQServer<RP> {
495    pub async fn run(
496        &mut self,
497        request_processor: RP,
498        channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
499    ) {
500        let addr = format!("{}:{}", self.config.bind_address, self.config.listen_port);
501        let listener = TcpListener::bind(&addr).await.unwrap();
502        let rpc_hooks = self.rpc_hooks.take().unwrap_or_default();
503        info!("Starting remoting_server at: {}", addr);
504        let (notify_conn_disconnect, _) = broadcast::channel::<SocketAddr>(100);
505        run(
506            listener,
507            wait_for_signal(),
508            request_processor,
509            Some(notify_conn_disconnect),
510            rpc_hooks,
511            channel_event_listener,
512        )
513        .await;
514    }
515}
516
517pub async fn run<RP: RequestProcessor + Sync + 'static + Clone>(
518    listener: TcpListener,
519    shutdown: impl Future,
520    request_processor: RP,
521    conn_disconnect_notify: Option<broadcast::Sender<SocketAddr>>,
522    rpc_hooks: Vec<Arc<dyn RPCHook>>,
523    channel_event_listener: Option<Arc<dyn ChannelEventListener>>,
524) {
525    let (notify_shutdown, _) = broadcast::channel(1);
526    let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
527    // Initialize the connection listener state
528    let handler = RemotingGeneralHandler {
529        request_processor,
530        //shutdown: Shutdown::new(notify_shutdown.subscribe()),
531        rpc_hooks,
532        response_table: ArcMut::new(HashMap::with_capacity(512)),
533    };
534    let mut listener = ConnectionListener {
535        listener,
536        notify_shutdown,
537        shutdown_complete_tx,
538        conn_disconnect_notify,
539        limit_connections: Arc::new(Semaphore::new(DEFAULT_MAX_CONNECTIONS)),
540        channel_event_listener,
541        cmd_handler: ArcMut::new(handler),
542    };
543
544    tokio::select! {
545        res = listener.run() => {
546            // If an error is received here, accepting connections from the TCP
547            // listener failed multiple times and the remoting_server is giving up and
548            // shutting down.
549            //
550            // Errors encountered when handling individual connections do not
551            // bubble up to this point.
552            if let Err(err) = res {
553                error!(cause = %err, "failed to accept");
554            }
555        }
556        _ = shutdown => {
557            info!("Shutdown now.....");
558        }
559    }
560
561    let ConnectionListener {
562        shutdown_complete_tx,
563        notify_shutdown,
564        ..
565    } = listener;
566    drop(notify_shutdown);
567    drop(shutdown_complete_tx);
568
569    let _ = shutdown_complete_rx.recv().await;
570}
571
572#[derive(Debug)]
573pub(crate) struct Shutdown {
574    /// `true` if the shutdown signal has been received
575    is_shutdown: bool,
576
577    /// The receive half of the channel used to listen for shutdown.
578    notify: broadcast::Receiver<()>,
579}
580
581impl Shutdown {
582    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
583    pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
584        Shutdown {
585            is_shutdown: false,
586            notify,
587        }
588    }
589
590    /// Returns `true` if the shutdown signal has been received.
591    pub(crate) fn is_shutdown(&self) -> bool {
592        self.is_shutdown
593    }
594
595    /// Receive the shutdown notice, waiting if necessary.
596    pub(crate) async fn recv(&mut self) {
597        // If the shutdown signal has already been received, then return
598        // immediately.
599        if self.is_shutdown {
600            return;
601        }
602
603        // Cannot receive a "lag error" as only one value is ever sent.
604        let _ = self.notify.recv().await;
605
606        // Remember that the signal has been received.
607        self.is_shutdown = true;
608    }
609}