krpc_core/protocol/
server.rs

1
2use std::collections::HashMap;
3use std::sync::Arc;
4use krpc_common::RpcServer;
5use tokio::net::TcpListener;
6use tokio::signal;
7use tokio::sync::{broadcast, mpsc};
8use tracing::{error, debug};
9
10use crate::protocol::StreamHandler;
11
12pub struct TcpServer {
13    port: String,
14    rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>,
15    notify_shutdown: broadcast::Sender<()>,
16    shutdown_complete_tx: mpsc::Sender<()>,
17    shutdown_complete_rx: mpsc::Receiver<()>,
18}
19
20impl TcpServer {
21    pub fn init(port: &str,rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>) -> Self {
22        let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
23        return TcpServer {
24            port: port.to_string(),
25            rpc_servers,
26            notify_shutdown: broadcast::channel(1).0,
27            shutdown_complete_tx,
28            shutdown_complete_rx,
29        };
30    }
31    pub async fn run(self) -> crate::Result<()> {
32        let listener = TcpListener::bind(&format!("0.0.0.0:{}", self.port)).await?;
33        loop {
34            let tcp_stream = tokio::select! {
35                _ = signal::ctrl_c() => {
36                    let TcpServer {
37                        mut shutdown_complete_rx,
38                        shutdown_complete_tx,
39                        notify_shutdown,
40                        ..
41                    } = self;
42                    drop(notify_shutdown);
43                    drop(shutdown_complete_tx);
44                    let _ = shutdown_complete_rx.recv().await;
45                    tracing::info!("krpc server shut");
46                    return Ok(());
47                },
48                res = listener.accept() => res
49            };
50            match tcp_stream {
51                Ok(stream) => {
52                    let filter_list = vec![];
53                    let stream_handler = StreamHandler {
54                        tcp_stream: stream.0,
55                        filter_list,
56                        rpc_server : self.rpc_servers.clone(),
57                        shutdown: self.notify_shutdown.subscribe(),
58                        _shutdown_complete: self.shutdown_complete_tx.clone(),
59                    };
60                    debug!("socket stream connect, addr: {:?}", stream.1);
61                    tokio::spawn(stream_handler.run_v2());
62                }
63                Err(err) => error!("tcp connect, err: {:?}", err),
64            }
65        }
66    }
67}
68