ppaass_v3_common/
server.rs1use crate::config::RetrieveServerConfig;
2use crate::error::CommonError;
3use crate::event::{DownloadSpeedEvent, LogEvent, LogEventLevel, UploadSpeedEvent};
4use crate::publish_server_log_event;
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::future::Future;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::mpsc::{channel, Receiver, Sender};
12use tokio_util::sync::CancellationToken;
13pub struct ServerState {
14 values: HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>>,
15}
16impl ServerState {
17 pub fn new() -> Self {
18 Self {
19 values: HashMap::new(),
20 }
21 }
22 pub fn add_value<T>(&mut self, value: T)
23 where
24 T: Send + Sync + 'static,
25 {
26 self.values.insert(TypeId::of::<T>(), Arc::new(value));
27 }
28 pub fn get_value<T>(&self) -> Option<&T>
29 where
30 T: Send + Sync + 'static,
31 {
32 let val = self.values.get(&TypeId::of::<T>())?;
33 val.downcast_ref::<T>()
34 }
35}
36
37pub struct ServerGuard {
38 pub upload_speed_event_receiver: Receiver<UploadSpeedEvent>,
39 pub download_speed_event_receiver: Receiver<DownloadSpeedEvent>,
40 pub log_event_receiver: Receiver<LogEvent>,
41 pub stop_signal: CancellationToken,
42}
43
44pub struct Server<C>
45where
46 C: RetrieveServerConfig + Send + Sync + 'static,
47{
48 config: Arc<C>,
49 server_state: Arc<ServerState>,
50 upload_speed_event_sender: Sender<UploadSpeedEvent>,
51 download_speed_event_sender: Sender<DownloadSpeedEvent>,
52 log_event_sender: Sender<LogEvent>,
53 stop_signal: CancellationToken,
54}
55
56impl<C> Server<C>
57where
58 C: RetrieveServerConfig + Send + Sync + 'static,
59{
60 pub fn new(config: Arc<C>, server_state: ServerState) -> (Self, ServerGuard) {
61 let (upload_speed_event_sender, upload_speed_event_receiver) =
62 channel::<UploadSpeedEvent>(1024);
63 let (download_speed_event_sender, download_speed_event_receiver) =
64 channel::<DownloadSpeedEvent>(1024);
65 let (log_event_sender, log_event_receiver) = channel::<LogEvent>(1024);
66 let stop_signal = CancellationToken::new();
67 (
68 Self {
69 config,
70 server_state: Arc::new(server_state),
71 upload_speed_event_sender,
72 download_speed_event_sender,
73 log_event_sender,
74 stop_signal: stop_signal.clone(),
75 },
76 ServerGuard {
77 upload_speed_event_receiver,
78 download_speed_event_receiver,
79 log_event_receiver,
80 stop_signal,
81 },
82 )
83 }
84 fn config(&self) -> Arc<C> {
85 self.config.clone()
86 }
87 fn server_state(&self) -> Arc<ServerState> {
88 self.server_state.clone()
89 }
90
91 pub async fn run<F1, Fut1, F2, Fut2>(
92 self,
93 create_listener: F1,
94 connection_handler: F2,
95 ) -> Result<(), CommonError>
96 where
97 F1: Fn(Arc<C>) -> Fut1 + Send + Sync + 'static,
98 Fut1: Future<Output = Result<TcpListener, CommonError>> + Send + 'static,
99 F2: Fn(Arc<C>, Arc<ServerState>, TcpStream, SocketAddr) -> Fut2
100 + Send
101 + Sync
102 + Clone
103 + 'static,
104 Fut2: Future<Output = Result<(), CommonError>> + Send + 'static,
105 {
106 let config = self.config();
107 let server_state = self.server_state();
108 let listener = create_listener(config.clone()).await?;
109 publish_server_log_event(
110 &self.log_event_sender,
111 LogEventLevel::Info,
112 format!("Server listening on port: {}", config.server_port()),
113 )
114 .await;
115
116 loop {
117 tokio::select! {
118 _ = self.stop_signal.cancelled()=>{
119 return Ok(())
120 }
121 accept_result=listener.accept()=>{
122 let (tcp_stream, socket_address) = match accept_result {
123 Ok(agent_tcp_accept_result) => agent_tcp_accept_result,
124 Err(e) => {
125 publish_server_log_event(
126 &self.log_event_sender,
127 LogEventLevel::Error,
128 format!("Failed to accept connection with IPv4 on port: {}", e),
129 )
130 .await;
131 continue;
132 }
133 };
134 publish_server_log_event(
135 &self.log_event_sender,
136 LogEventLevel::Info,
137 format!("Accept connection: {}", socket_address),
138 )
139 .await;
140 tcp_stream.set_nodelay(true)?;
141 let config = config.clone();
142 let server_state = server_state.clone();
143 let connection_handler = connection_handler.clone();
144 let log_event_sender = self.log_event_sender.clone();
145 tokio::spawn(async move {
146 if let Err(e) =
147 connection_handler(config, server_state, tcp_stream, socket_address)
148 .await
149 {
150 publish_server_log_event(
151 &log_event_sender,
152 LogEventLevel::Error,
153 format!(
154 "Fail to handle connection [{}] because of error: {e:?}",
155 socket_address
156 ),
157 )
158 .await;
159 }
160 });
161 }
162 }
163 }
164 }
165}