ppaass_v3_common/
server.rs1use crate::config::RetrieveServerConfig;
2use crate::error::CommonError;
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::future::Future;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::{TcpListener, TcpStream};
9use tracing::{error, info};
10pub struct ServerState {
11 values: HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>>,
12}
13impl ServerState {
14 pub fn new() -> Self {
15 Self {
16 values: HashMap::new(),
17 }
18 }
19 pub fn add_value<T>(&mut self, value: T)
20 where
21 T: Send + Sync + 'static,
22 {
23 self.values.insert(TypeId::of::<T>(), Arc::new(value));
24 }
25
26 pub fn get_value<T>(&self) -> Option<&T>
27 where
28 T: Send + Sync + 'static,
29 {
30 let val = self.values.get(&TypeId::of::<T>())?;
31 val.downcast_ref::<T>()
32 }
33}
34
35pub enum ServerListener {
36 TcpListener(TcpListener),
37}
38
39impl ServerListener {
40 pub async fn accept(&self) -> Result<(TcpStream, SocketAddr), CommonError> {
41 match self {
42 ServerListener::TcpListener(tcp_listener) => {
43 let accept_result = tcp_listener.accept().await?;
44 Ok((accept_result.0, accept_result.1))
45 }
46 }
47 }
48}
49#[async_trait::async_trait]
50pub trait Server<C>
51where
52 C: RetrieveServerConfig + Send + Sync + 'static,
53{
54 fn new(config: Arc<C>, server_state: ServerState) -> Self;
55
56 fn config(&self) -> Arc<C>;
57
58 fn server_state(&self) -> Arc<ServerState>;
59
60 async fn run<F1, Fut1, F2, Fut2>(
61 &self,
62 create_listener: F1,
63 connection_handler: F2,
64 ) -> Result<(), CommonError>
65 where
66 F1: Fn(Arc<C>) -> Fut1 + Send + Sync + 'static,
67 Fut1: Future<Output = Result<ServerListener, CommonError>> + Send + 'static,
68 F2: Fn(Arc<C>, Arc<ServerState>, TcpStream, SocketAddr) -> Fut2
69 + Send
70 + Sync
71 + Clone
72 + 'static,
73 Fut2: Future<Output = Result<(), CommonError>> + Send + 'static,
74 {
75 let config = self.config();
76 let server_state = self.server_state();
77 let listener = create_listener(config.clone()).await?;
78 info!("Server listening on port: {}", config.server_port());
79
80 loop {
81 let (agent_tcp_stream, agent_socket_address) = match listener.accept().await {
82 Ok(agent_tcp_accept_result) => agent_tcp_accept_result,
83 Err(e) => {
84 error!("Failed to accept connection with IPv4 on port: {}", e);
85 continue;
86 }
87 };
88 agent_tcp_stream.set_nodelay(true)?;
89
90 let config = config.clone();
91 let server_state = server_state.clone();
92
93 let connection_handler = connection_handler.clone();
94 tokio::spawn(async move {
95 if let Err(e) =
96 connection_handler(config, server_state, agent_tcp_stream, agent_socket_address)
97 .await
98 {
99 error!(
100 "Fail to handle agent tcp connection [{agent_socket_address}]: {}",
101 e
102 );
103 }
104 });
105 }
106 }
107}
108
109pub struct CommonServer<C>
110where
111 C: RetrieveServerConfig + Send + Sync + 'static,
112{
113 config: Arc<C>,
114 server_state: Arc<ServerState>,
115}
116
117#[async_trait::async_trait]
118impl<C> Server<C> for CommonServer<C>
119where
120 C: RetrieveServerConfig + Send + Sync + 'static,
121{
122 fn new(config: Arc<C>, server_state: ServerState) -> Self {
123 Self {
124 config,
125 server_state: Arc::new(server_state),
126 }
127 }
128 fn config(&self) -> Arc<C> {
129 self.config.clone()
130 }
131 fn server_state(&self) -> Arc<ServerState> {
132 self.server_state.clone()
133 }
134}