ppaass_v3_common/
server.rs

1use crate::config::RetrieveServerConfig;
2use crate::error::CommonError;
3use crate::event::{DownloadSpeedEvent, LogEvent, LogEventLevel, UploadSpeedEvent};
4use crate::publish_server_log_event;
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::future::Future;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::mpsc::{channel, Receiver, Sender};
12use tokio_util::sync::CancellationToken;
13pub struct ServerState {
14    values: HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>>,
15}
16impl ServerState {
17    pub fn new() -> Self {
18        Self {
19            values: HashMap::new(),
20        }
21    }
22    pub fn add_value<T>(&mut self, value: T)
23    where
24        T: Send + Sync + 'static,
25    {
26        self.values.insert(TypeId::of::<T>(), Arc::new(value));
27    }
28    pub fn get_value<T>(&self) -> Option<&T>
29    where
30        T: Send + Sync + 'static,
31    {
32        let val = self.values.get(&TypeId::of::<T>())?;
33        val.downcast_ref::<T>()
34    }
35}
36
37pub struct ServerGuard {
38    pub upload_speed_event_receiver: Receiver<UploadSpeedEvent>,
39    pub download_speed_event_receiver: Receiver<DownloadSpeedEvent>,
40    pub log_event_receiver: Receiver<LogEvent>,
41    pub stop_signal: CancellationToken,
42}
43
44pub struct Server<C>
45where
46    C: RetrieveServerConfig + Send + Sync + 'static,
47{
48    config: Arc<C>,
49    server_state: Arc<ServerState>,
50    upload_speed_event_sender: Sender<UploadSpeedEvent>,
51    download_speed_event_sender: Sender<DownloadSpeedEvent>,
52    log_event_sender: Sender<LogEvent>,
53    stop_signal: CancellationToken,
54}
55
56impl<C> Server<C>
57where
58    C: RetrieveServerConfig + Send + Sync + 'static,
59{
60    pub fn new(config: Arc<C>, server_state: ServerState) -> (Self, ServerGuard) {
61        let (upload_speed_event_sender, upload_speed_event_receiver) =
62            channel::<UploadSpeedEvent>(1024);
63        let (download_speed_event_sender, download_speed_event_receiver) =
64            channel::<DownloadSpeedEvent>(1024);
65        let (log_event_sender, log_event_receiver) = channel::<LogEvent>(1024);
66        let stop_signal = CancellationToken::new();
67        (
68            Self {
69                config,
70                server_state: Arc::new(server_state),
71                upload_speed_event_sender,
72                download_speed_event_sender,
73                log_event_sender,
74                stop_signal: stop_signal.clone(),
75            },
76            ServerGuard {
77                upload_speed_event_receiver,
78                download_speed_event_receiver,
79                log_event_receiver,
80                stop_signal,
81            },
82        )
83    }
84    fn config(&self) -> Arc<C> {
85        self.config.clone()
86    }
87    fn server_state(&self) -> Arc<ServerState> {
88        self.server_state.clone()
89    }
90
91    pub async fn run<F1, Fut1, F2, Fut2>(
92        self,
93        create_listener: F1,
94        connection_handler: F2,
95    ) -> Result<(), CommonError>
96    where
97        F1: Fn(Arc<C>) -> Fut1 + Send + Sync + 'static,
98        Fut1: Future<Output = Result<TcpListener, CommonError>> + Send + 'static,
99        F2: Fn(Arc<C>, Arc<ServerState>, TcpStream, SocketAddr) -> Fut2
100            + Send
101            + Sync
102            + Clone
103            + 'static,
104        Fut2: Future<Output = Result<(), CommonError>> + Send + 'static,
105    {
106        let config = self.config();
107        let server_state = self.server_state();
108        let listener = create_listener(config.clone()).await?;
109        publish_server_log_event(
110            &self.log_event_sender,
111            LogEventLevel::Info,
112            format!("Server listening on port: {}", config.server_port()),
113        )
114        .await;
115
116        loop {
117            tokio::select! {
118                _ = self.stop_signal.cancelled()=>{
119                    return Ok(())
120                }
121                accept_result=listener.accept()=>{
122                    let (tcp_stream, socket_address) = match accept_result {
123                        Ok(agent_tcp_accept_result) => agent_tcp_accept_result,
124                        Err(e) => {
125                            publish_server_log_event(
126                                &self.log_event_sender,
127                                LogEventLevel::Error,
128                                format!("Failed to accept connection with IPv4 on port: {}", e),
129                            )
130                            .await;
131                            continue;
132                        }
133                    };
134                    publish_server_log_event(
135                        &self.log_event_sender,
136                        LogEventLevel::Info,
137                        format!("Accept connection: {}", socket_address),
138                    )
139                    .await;
140                    tcp_stream.set_nodelay(true)?;
141                    let config = config.clone();
142                    let server_state = server_state.clone();
143                    let connection_handler = connection_handler.clone();
144                    let log_event_sender = self.log_event_sender.clone();
145                    tokio::spawn(async move {
146                        if let Err(e) =
147                            connection_handler(config, server_state, tcp_stream, socket_address)
148                                .await
149                        {
150                            publish_server_log_event(
151                                &log_event_sender,
152                                LogEventLevel::Error,
153                                format!(
154                                    "Fail to handle connection [{}] because of error: {e:?}",
155                                    socket_address
156                                ),
157                            )
158                            .await;
159                        }
160                    });
161                }
162            }
163        }
164    }
165}