ppaass_v3_common/
server.rs

1use crate::config::RetrieveServerConfig;
2use crate::error::CommonError;
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::{TcpListener, TcpStream};
9use tracing::{error, info};
10pub struct ServerState {
11    values: HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>>,
12}
13impl ServerState {
14    pub fn new() -> Self {
15        Self {
16            values: HashMap::new(),
17        }
18    }
19    pub fn add_value<T>(&mut self, value: T)
20    where
21        T: Send + Sync + 'static,
22    {
23        self.values.insert(TypeId::of::<T>(), Arc::new(value));
24    }
25
26    pub fn get_value<T>(&self) -> Option<&T>
27    where
28        T: Send + Sync + 'static,
29    {
30        let val = self.values.get(&TypeId::of::<T>())?;
31        val.downcast_ref::<T>()
32    }
33}
34
35pub enum ServerListener {
36    TcpListener(TcpListener),
37}
38
39impl ServerListener {
40    pub async fn accept(&self) -> Result<(TcpStream, SocketAddr), CommonError> {
41        match self {
42            ServerListener::TcpListener(tcp_listener) => {
43                let accept_result = tcp_listener.accept().await?;
44                Ok((accept_result.0, accept_result.1))
45            }
46        }
47    }
48}
49#[async_trait::async_trait]
50pub trait Server<C>
51where
52    C: RetrieveServerConfig + Send + Sync + 'static,
53{
54    fn new(config: Arc<C>, server_state: ServerState) -> Self;
55
56    fn config(&self) -> Arc<C>;
57
58    fn server_state(&self) -> Arc<ServerState>;
59
60    async fn run<F1, Fut1, F2, Fut2>(
61        &self,
62        create_listener: F1,
63        connection_handler: F2,
64    ) -> Result<(), CommonError>
65    where
66        F1: Fn(Arc<C>) -> Fut1 + Send + Sync + 'static,
67        Fut1: Future<Output = Result<ServerListener, CommonError>> + Send + 'static,
68        F2: Fn(Arc<C>, Arc<ServerState>, TcpStream, SocketAddr) -> Fut2
69            + Send
70            + Sync
71            + Clone
72            + 'static,
73        Fut2: Future<Output = Result<(), CommonError>> + Send + 'static,
74    {
75        let config = self.config();
76        let server_state = self.server_state();
77        let listener = create_listener(config.clone()).await?;
78        info!("Server listening on port: {}", config.server_port());
79
80        loop {
81            let (agent_tcp_stream, agent_socket_address) = match listener.accept().await {
82                Ok(agent_tcp_accept_result) => agent_tcp_accept_result,
83                Err(e) => {
84                    error!("Failed to accept connection with IPv4 on port: {}", e);
85                    continue;
86                }
87            };
88            agent_tcp_stream.set_nodelay(true)?;
89
90            let config = config.clone();
91            let server_state = server_state.clone();
92
93            let connection_handler = connection_handler.clone();
94            tokio::spawn(async move {
95                if let Err(e) =
96                    connection_handler(config, server_state, agent_tcp_stream, agent_socket_address)
97                        .await
98                {
99                    error!(
100                        "Fail to handle agent tcp connection [{agent_socket_address}]: {}",
101                        e
102                    );
103                }
104            });
105        }
106    }
107}
108
109pub struct CommonServer<C>
110where
111    C: RetrieveServerConfig + Send + Sync + 'static,
112{
113    config: Arc<C>,
114    server_state: Arc<ServerState>,
115}
116
117#[async_trait::async_trait]
118impl<C> Server<C> for CommonServer<C>
119where
120    C: RetrieveServerConfig + Send + Sync + 'static,
121{
122    fn new(config: Arc<C>, server_state: ServerState) -> Self {
123        Self {
124            config,
125            server_state: Arc::new(server_state),
126        }
127    }
128    fn config(&self) -> Arc<C> {
129        self.config.clone()
130    }
131    fn server_state(&self) -> Arc<ServerState> {
132        self.server_state.clone()
133    }
134}