rocketmq_controller/rpc/
server.rs1use std::net::SocketAddr;
19use std::sync::Arc;
20
21use futures::stream::StreamExt;
22use futures::SinkExt;
23use tokio::net::TcpListener;
24use tokio::net::TcpStream;
25use tokio::sync::RwLock;
26use tokio_util::codec::Framed;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30use tracing::warn;
31
32use crate::error::Result;
33use crate::processor::ProcessorManager;
34use crate::rpc::codec::RpcCodec;
35use crate::rpc::codec::RpcRequest;
36use crate::rpc::codec::RpcResponse;
37
38pub struct RpcServer {
44 listen_addr: SocketAddr,
46
47 processor_manager: Arc<ProcessorManager>,
49
50 state: Arc<RwLock<ServerState>>,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum ServerState {
57 Stopped,
59
60 Running,
62
63 ShuttingDown,
65}
66
67impl RpcServer {
68 pub fn new(listen_addr: SocketAddr, processor_manager: Arc<ProcessorManager>) -> Self {
70 Self {
71 listen_addr,
72 processor_manager,
73 state: Arc::new(RwLock::new(ServerState::Stopped)),
74 }
75 }
76
77 pub async fn start(&self) -> Result<()> {
79 {
81 let mut state = self.state.write().await;
82 if *state != ServerState::Stopped {
83 warn!("RPC server already started");
84 return Ok(());
85 }
86 *state = ServerState::Running;
87 }
88
89 info!("Starting RPC server on {}", self.listen_addr);
90
91 let listener = TcpListener::bind(self.listen_addr).await?;
93 info!("RPC server listening on {}", self.listen_addr);
94
95 let processor_manager = self.processor_manager.clone();
97 let state = self.state.clone();
98
99 tokio::spawn(async move {
101 loop {
102 {
104 let current_state = state.read().await;
105 if *current_state == ServerState::ShuttingDown {
106 info!("RPC server accept loop stopping");
107 break;
108 }
109 }
110
111 match listener.accept().await {
113 Ok((stream, addr)) => {
114 debug!("Accepted connection from {}", addr);
115
116 let processor_manager = processor_manager.clone();
118 tokio::spawn(async move {
119 if let Err(e) =
120 Self::handle_connection(stream, addr, processor_manager).await
121 {
122 error!("Error handling connection from {}: {}", addr, e);
123 }
124 });
125 }
126 Err(e) => {
127 error!("Failed to accept connection: {}", e);
128 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
129 }
130 }
131 }
132 });
133
134 Ok(())
135 }
136
137 async fn handle_connection(
139 stream: TcpStream,
140 addr: SocketAddr,
141 processor_manager: Arc<ProcessorManager>,
142 ) -> Result<()> {
143 info!("Handling connection from {}", addr);
144
145 let mut framed = Framed::new(stream, RpcCodec::new());
147
148 while let Some(result) = framed.next().await {
150 match result {
151 Ok(request) => {
152 debug!(
153 "Received request from {}: id={}, type={:?}",
154 addr, request.request_id, request.request_type
155 );
156
157 let response = Self::process_request(request, &processor_manager).await;
159
160 if let Err(e) = framed.send(response).await {
162 error!("Failed to send response to {}: {}", addr, e);
163 break;
164 }
165 }
166 Err(e) => {
167 error!("Failed to decode request from {}: {}", addr, e);
168 break;
169 }
170 }
171 }
172
173 info!("Connection from {} closed", addr);
174 Ok(())
175 }
176
177 async fn process_request(
179 request: RpcRequest,
180 processor_manager: &Arc<ProcessorManager>,
181 ) -> RpcResponse {
182 debug!(
183 "Processing request: id={}, type={:?}",
184 request.request_id, request.request_type
185 );
186
187 match processor_manager
189 .process_request(request.request_type.clone(), &request.payload)
190 .await
191 {
192 Ok(response_data) => {
193 debug!("Request {} processed successfully", request.request_id);
194 RpcResponse {
195 request_id: request.request_id,
196 success: true,
197 error: None,
198 payload: response_data,
199 }
200 }
201 Err(e) => {
202 error!("Failed to process request {}: {}", request.request_id, e);
203 RpcResponse {
204 request_id: request.request_id,
205 success: false,
206 error: Some(e.to_string()),
207 payload: Vec::new(),
208 }
209 }
210 }
211 }
212
213 pub async fn shutdown(&self) -> Result<()> {
215 info!("Shutting down RPC server");
216
217 {
219 let mut state = self.state.write().await;
220 *state = ServerState::ShuttingDown;
221 }
222
223 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
225
226 {
228 let mut state = self.state.write().await;
229 *state = ServerState::Stopped;
230 }
231
232 info!("RPC server stopped");
233 Ok(())
234 }
235
236 pub async fn is_running(&self) -> bool {
238 let state = self.state.read().await;
239 *state == ServerState::Running
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_rpc_server_state() {
249 let state = ServerState::Stopped;
251 assert_eq!(state, ServerState::Stopped);
252 assert_ne!(state, ServerState::Running);
253
254 let _codec = RpcCodec::new();
256 }
257}