rocketmq_controller/rpc/
server.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::net::SocketAddr;
19use std::sync::Arc;
20
21use futures::stream::StreamExt;
22use futures::SinkExt;
23use tokio::net::TcpListener;
24use tokio::net::TcpStream;
25use tokio::sync::RwLock;
26use tokio_util::codec::Framed;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30use tracing::warn;
31
32use crate::error::Result;
33use crate::processor::ProcessorManager;
34use crate::rpc::codec::RpcCodec;
35use crate::rpc::codec::RpcRequest;
36use crate::rpc::codec::RpcResponse;
37
38/// RPC server
39///
40/// Handles incoming TCP connections from brokers and clients,
41/// decodes RPC requests, routes them to appropriate processors,
42/// and sends back responses.
43pub struct RpcServer {
44    /// Listen address
45    listen_addr: SocketAddr,
46
47    /// Processor manager
48    processor_manager: Arc<ProcessorManager>,
49
50    /// Server state
51    state: Arc<RwLock<ServerState>>,
52}
53
54/// Server state
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum ServerState {
57    /// Server is not started
58    Stopped,
59
60    /// Server is running
61    Running,
62
63    /// Server is shutting down
64    ShuttingDown,
65}
66
67impl RpcServer {
68    /// Create a new RPC server
69    pub fn new(listen_addr: SocketAddr, processor_manager: Arc<ProcessorManager>) -> Self {
70        Self {
71            listen_addr,
72            processor_manager,
73            state: Arc::new(RwLock::new(ServerState::Stopped)),
74        }
75    }
76
77    /// Start the RPC server
78    pub async fn start(&self) -> Result<()> {
79        // Check state
80        {
81            let mut state = self.state.write().await;
82            if *state != ServerState::Stopped {
83                warn!("RPC server already started");
84                return Ok(());
85            }
86            *state = ServerState::Running;
87        }
88
89        info!("Starting RPC server on {}", self.listen_addr);
90
91        // Bind to the address
92        let listener = TcpListener::bind(self.listen_addr).await?;
93        info!("RPC server listening on {}", self.listen_addr);
94
95        // Clone Arc for the task
96        let processor_manager = self.processor_manager.clone();
97        let state = self.state.clone();
98
99        // Spawn accept loop
100        tokio::spawn(async move {
101            loop {
102                // Check if we should stop
103                {
104                    let current_state = state.read().await;
105                    if *current_state == ServerState::ShuttingDown {
106                        info!("RPC server accept loop stopping");
107                        break;
108                    }
109                }
110
111                // Accept new connection
112                match listener.accept().await {
113                    Ok((stream, addr)) => {
114                        debug!("Accepted connection from {}", addr);
115
116                        // Spawn handler for this connection
117                        let processor_manager = processor_manager.clone();
118                        tokio::spawn(async move {
119                            if let Err(e) =
120                                Self::handle_connection(stream, addr, processor_manager).await
121                            {
122                                error!("Error handling connection from {}: {}", addr, e);
123                            }
124                        });
125                    }
126                    Err(e) => {
127                        error!("Failed to accept connection: {}", e);
128                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
129                    }
130                }
131            }
132        });
133
134        Ok(())
135    }
136
137    /// Handle a single connection
138    async fn handle_connection(
139        stream: TcpStream,
140        addr: SocketAddr,
141        processor_manager: Arc<ProcessorManager>,
142    ) -> Result<()> {
143        info!("Handling connection from {}", addr);
144
145        // Create framed stream with codec
146        let mut framed = Framed::new(stream, RpcCodec::new());
147
148        // Process requests
149        while let Some(result) = framed.next().await {
150            match result {
151                Ok(request) => {
152                    debug!(
153                        "Received request from {}: id={}, type={:?}",
154                        addr, request.request_id, request.request_type
155                    );
156
157                    // Process the request
158                    let response = Self::process_request(request, &processor_manager).await;
159
160                    // Send response
161                    if let Err(e) = framed.send(response).await {
162                        error!("Failed to send response to {}: {}", addr, e);
163                        break;
164                    }
165                }
166                Err(e) => {
167                    error!("Failed to decode request from {}: {}", addr, e);
168                    break;
169                }
170            }
171        }
172
173        info!("Connection from {} closed", addr);
174        Ok(())
175    }
176
177    /// Process a single request
178    async fn process_request(
179        request: RpcRequest,
180        processor_manager: &Arc<ProcessorManager>,
181    ) -> RpcResponse {
182        debug!(
183            "Processing request: id={}, type={:?}",
184            request.request_id, request.request_type
185        );
186
187        // Process the request
188        match processor_manager
189            .process_request(request.request_type.clone(), &request.payload)
190            .await
191        {
192            Ok(response_data) => {
193                debug!("Request {} processed successfully", request.request_id);
194                RpcResponse {
195                    request_id: request.request_id,
196                    success: true,
197                    error: None,
198                    payload: response_data,
199                }
200            }
201            Err(e) => {
202                error!("Failed to process request {}: {}", request.request_id, e);
203                RpcResponse {
204                    request_id: request.request_id,
205                    success: false,
206                    error: Some(e.to_string()),
207                    payload: Vec::new(),
208                }
209            }
210        }
211    }
212
213    /// Shutdown the RPC server
214    pub async fn shutdown(&self) -> Result<()> {
215        info!("Shutting down RPC server");
216
217        // Update state
218        {
219            let mut state = self.state.write().await;
220            *state = ServerState::ShuttingDown;
221        }
222
223        // Wait a bit for accept loop to stop
224        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
225
226        // Update state
227        {
228            let mut state = self.state.write().await;
229            *state = ServerState::Stopped;
230        }
231
232        info!("RPC server stopped");
233        Ok(())
234    }
235
236    /// Check if server is running
237    pub async fn is_running(&self) -> bool {
238        let state = self.state.read().await;
239        *state == ServerState::Running
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_rpc_server_state() {
249        // Test that we can work with server states
250        let state = ServerState::Stopped;
251        assert_eq!(state, ServerState::Stopped);
252        assert_ne!(state, ServerState::Running);
253
254        // Test codec creation
255        let _codec = RpcCodec::new();
256    }
257}