krpc-core 0.2.0

RPC framework for service registration and discovery through API exposure, compatible with Dubbo3 protocol, intertunable with Java projects
Documentation

use std::collections::HashMap;
use std::sync::Arc;
use krpc_common::RpcServer;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::sync::{broadcast, mpsc};
use tracing::{error, debug};

use crate::protocol::StreamHandler;

pub struct TcpServer {
    port: String,
    rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>,
    notify_shutdown: broadcast::Sender<()>,
    shutdown_complete_tx: mpsc::Sender<()>,
    shutdown_complete_rx: mpsc::Receiver<()>,
}

impl TcpServer {
    pub fn init(port: &str,rpc_servers: HashMap<String, Arc<Box<dyn RpcServer>>>) -> Self {
        let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
        return TcpServer {
            port: port.to_string(),
            rpc_servers,
            notify_shutdown: broadcast::channel(1).0,
            shutdown_complete_tx,
            shutdown_complete_rx,
        };
    }
    pub async fn run(self) -> crate::Result<()> {
        let listener = TcpListener::bind(&format!("0.0.0.0:{}", self.port)).await?;
        loop {
            let tcp_stream = tokio::select! {
                _ = signal::ctrl_c() => {
                    let TcpServer {
                        mut shutdown_complete_rx,
                        shutdown_complete_tx,
                        notify_shutdown,
                        ..
                    } = self;
                    drop(notify_shutdown);
                    drop(shutdown_complete_tx);
                    let _ = shutdown_complete_rx.recv().await;
                    tracing::info!("krpc server shut");
                    return Ok(());
                },
                res = listener.accept() => res
            };
            match tcp_stream {
                Ok(stream) => {
                    let filter_list = vec![];
                    let stream_handler = StreamHandler {
                        tcp_stream: stream.0,
                        filter_list,
                        rpc_server : self.rpc_servers.clone(),
                        shutdown: self.notify_shutdown.subscribe(),
                        _shutdown_complete: self.shutdown_complete_tx.clone(),
                    };
                    debug!("socket stream connect, addr: {:?}", stream.1);
                    tokio::spawn(stream_handler.run_v2());
                }
                Err(err) => error!("tcp connect, err: {:?}", err),
            }
        }
    }
}