krpc_core/protocol/
server.rs1
2use std::collections::HashMap;
3use std::sync::Arc;
4use krpc_common::RpcServer;
5use tokio::net::TcpListener;
6use tokio::signal;
7use tokio::sync::{broadcast, mpsc};
8use tracing::{error, debug};
9
10use crate::protocol::StreamHandler;
11
12pub struct TcpServer {
13 port: String,
14 rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>,
15 notify_shutdown: broadcast::Sender<()>,
16 shutdown_complete_tx: mpsc::Sender<()>,
17 shutdown_complete_rx: mpsc::Receiver<()>,
18}
19
20impl TcpServer {
21 pub fn init(port: &str,rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>) -> Self {
22 let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
23 return TcpServer {
24 port: port.to_string(),
25 rpc_servers,
26 notify_shutdown: broadcast::channel(1).0,
27 shutdown_complete_tx,
28 shutdown_complete_rx,
29 };
30 }
31 pub async fn run(self) -> crate::Result<()> {
32 let listener = TcpListener::bind(&format!("0.0.0.0:{}", self.port)).await?;
33 loop {
34 let tcp_stream = tokio::select! {
35 _ = signal::ctrl_c() => {
36 let TcpServer {
37 mut shutdown_complete_rx,
38 shutdown_complete_tx,
39 notify_shutdown,
40 ..
41 } = self;
42 drop(notify_shutdown);
43 drop(shutdown_complete_tx);
44 let _ = shutdown_complete_rx.recv().await;
45 tracing::info!("krpc server shut");
46 return Ok(());
47 },
48 res = listener.accept() => res
49 };
50 match tcp_stream {
51 Ok(stream) => {
52 let filter_list = vec![];
53 let stream_handler = StreamHandler {
54 tcp_stream: stream.0,
55 filter_list,
56 rpc_server : self.rpc_servers.clone(),
57 shutdown: self.notify_shutdown.subscribe(),
58 _shutdown_complete: self.shutdown_complete_tx.clone(),
59 };
60 debug!("socket stream connect, addr: {:?}", stream.1);
61 tokio::spawn(stream_handler.run_v2());
62 }
63 Err(err) => error!("tcp connect, err: {:?}", err),
64 }
65 }
66 }
67}
68