1mod action;
2mod custom;
3pub mod fs;
4mod listening;
5pub mod proc;
6pub mod state;
7
8pub use listening::ListeningServer;
9
10use crate::core::{event::AddrEventManager, Msg, Transport};
11use derive_builder::Builder;
12use log::error;
13use crate::core::transport::{Authenticator, Bicrypter, NetTransmission, Wire};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::{
18 io,
19 net::{TcpListener, UdpSocket},
20 runtime::Handle,
21 sync::mpsc,
22 time,
23};
24
25#[derive(Builder, Clone)]
27pub struct Server<A, B>
28where
29 A: Authenticator,
30 B: Bicrypter,
31{
32 #[builder(default = "crate::core::transport::constants::DEFAULT_TTL")]
34 packet_ttl: Duration,
35
36 authenticator: A,
38
39 bicrypter: B,
41
42 transport: Transport,
44
45 #[builder(default = "1000")]
47 buffer: usize,
48
49 #[builder(default = "Duration::from_secs(60)")]
51 cleanup_interval: Duration,
52
53 #[builder(default = "state::constants::DEFAULT_FILE_TTL")]
55 file_ttl: Duration,
56
57 #[builder(default = "state::constants::DEFAULT_PROC_TTL")]
59 proc_ttl: Duration,
60
61 #[builder(default = "state::constants::DEFAULT_DEAD_PROC_TTL")]
63 dead_proc_ttl: Duration,
64}
65
66impl<A, B> Server<A, B>
67where
68 A: Authenticator + Send + Sync + 'static,
69 B: Bicrypter + Send + Sync + 'static,
70{
71 pub async fn listen(self) -> io::Result<ListeningServer> {
76 let handle = Handle::current();
77 let state = Arc::new(state::ServerState::new(
78 self.file_ttl,
79 self.proc_ttl,
80 self.dead_proc_ttl,
81 ));
82
83 handle.spawn(cleanup_loop(Arc::clone(&state), self.cleanup_interval));
84
85 match self.transport.clone() {
86 Transport::Tcp(_) => Err(io::Error::new(
87 io::ErrorKind::InvalidInput,
88 "Authenticator or Bicrypter is not clonable",
89 )),
90 Transport::Udp(addrs) => {
91 build_and_listen_udp_server(self, state, &addrs).await
92 }
93 }
94 }
95}
96
97impl<A, B> Server<A, B>
98where
99 A: Authenticator + Send + Sync + Clone + 'static,
100 B: Bicrypter + Send + Sync + Clone + 'static,
101{
102 pub async fn cloneable_listen(self) -> io::Result<ListeningServer> {
105 let handle = Handle::current();
106 let state = Arc::new(state::ServerState::new(
107 self.file_ttl,
108 self.proc_ttl,
109 self.dead_proc_ttl,
110 ));
111
112 handle.spawn(cleanup_loop(Arc::clone(&state), self.cleanup_interval));
113
114 match self.transport.clone() {
115 Transport::Tcp(addrs) => {
116 build_and_listen_tcp_server(self, state, &addrs).await
117 }
118 Transport::Udp(addrs) => {
119 build_and_listen_udp_server(self, state, &addrs).await
120 }
121 }
122 }
123}
124
125async fn build_and_listen_tcp_server<A, B>(
126 server: Server<A, B>,
127 state: Arc<state::ServerState>,
128 addrs: &[SocketAddr],
129) -> io::Result<ListeningServer>
130where
131 A: Authenticator + Send + Sync + Clone + 'static,
132 B: Bicrypter + Send + Sync + Clone + 'static,
133{
134 let handle = Handle::current();
135
136 let listener = {
140 let mut listener = None;
141 for addr in addrs.iter() {
142 let result = TcpListener::bind(addr).await;
143 if result.is_ok() {
144 listener = result.ok();
145 break;
146 }
147 }
148 listener
149 .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?
150 };
151 let addr = listener.local_addr()?;
152
153 let wire = Wire::new(
154 NetTransmission::TcpEthernet.into(),
155 server.packet_ttl,
156 server.authenticator,
157 server.bicrypter,
158 );
159
160 let (tx, rx) = mpsc::channel(server.buffer);
161 let event_handle = handle.spawn(tcp_event_loop(Arc::clone(&state), rx));
162 let addr_event_manager = AddrEventManager::for_tcp_listener(
163 handle.clone(),
164 server.buffer,
165 listener,
166 wire,
167 tx,
168 );
169
170 Ok(ListeningServer {
171 addr,
172 addr_event_manager,
173 state,
174 event_handle,
175 })
176}
177
178async fn build_and_listen_udp_server<A, B>(
179 server: Server<A, B>,
180 state: Arc<state::ServerState>,
181 addrs: &[SocketAddr],
182) -> io::Result<ListeningServer>
183where
184 A: Authenticator + Send + Sync + 'static,
185 B: Bicrypter + Send + Sync + 'static,
186{
187 let handle = Handle::current();
188
189 let socket = {
193 let mut socket = None;
194 for addr in addrs.iter() {
195 let result = UdpSocket::bind(addr).await;
196 if result.is_ok() {
197 socket = result.ok();
198 break;
199 }
200 }
201 socket
202 .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?
203 };
204 let addr = socket.local_addr()?;
205 let transmission = NetTransmission::udp_from_addr(addr);
206
207 let wire = Wire::new(
208 transmission.into(),
209 server.packet_ttl,
210 server.authenticator,
211 server.bicrypter,
212 );
213
214 let (tx, rx) = mpsc::channel(server.buffer);
215 let event_handle = handle.spawn(udp_event_loop(Arc::clone(&state), rx));
216 let addr_event_manager = AddrEventManager::for_udp_socket(
217 handle.clone(),
218 server.buffer,
219 socket,
220 wire,
221 tx,
222 );
223
224 Ok(ListeningServer {
225 addr,
226 addr_event_manager,
227 state,
228 event_handle,
229 })
230}
231
232async fn tcp_event_loop(
233 state: Arc<state::ServerState>,
234 mut rx: mpsc::Receiver<(Msg, SocketAddr, mpsc::Sender<Vec<u8>>)>,
235) {
236 while let Some((msg, addr, tx)) = rx.recv().await {
237 if let Err(x) = action::Executor::<Vec<u8>>::new(
238 tx,
239 addr,
240 action::Executor::<Vec<u8>>::DEFAULT_MAX_DEPTH,
241 )
242 .execute(Arc::clone(&state), msg)
243 .await
244 {
245 error!("Failed to execute action: {}", x);
246 }
247 }
248}
249
250async fn udp_event_loop(
251 state: Arc<state::ServerState>,
252 mut rx: mpsc::Receiver<(
253 Msg,
254 SocketAddr,
255 mpsc::Sender<(Vec<u8>, SocketAddr)>,
256 )>,
257) {
258 while let Some((msg, addr, tx)) = rx.recv().await {
259 if let Err(x) = action::Executor::<(Vec<u8>, SocketAddr)>::new(
260 tx,
261 addr,
262 action::Executor::<(Vec<u8>, SocketAddr)>::DEFAULT_MAX_DEPTH,
263 )
264 .execute(Arc::clone(&state), msg)
265 .await
266 {
267 error!("Failed to execute action: {}", x);
268 }
269 }
270}
271
272async fn cleanup_loop(state: Arc<state::ServerState>, period: Duration) {
273 while state.is_running() {
274 state.evict_files().await;
275 state.evict_procs().await;
276 time::delay_for(period).await;
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[tokio::test]
285 async fn cleanup_loop_should_evict_unused_files_every_period() {
286 let state = Arc::new(state::ServerState::default());
287
288 state
292 .touch_file_id_with_ttl(0, Duration::from_micros(5))
293 .await;
294 state
295 .touch_file_id_with_ttl(1, Duration::from_secs(60))
296 .await;
297
298 Handle::current()
300 .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
301
302 time::delay_for(Duration::from_millis(10)).await;
304
305 let file_ids = state.file_ids.lock().await;
307 assert!(!file_ids.contains(&From::from(0)), "File not evicted");
308 assert!(
309 file_ids.contains(&From::from(1)),
310 "File unexpectedly evicted"
311 );
312 }
313
314 #[tokio::test]
315 async fn cleanup_loop_should_evict_unused_processes_every_period() {
316 let state = Arc::new(state::ServerState::default());
317
318 state
322 .touch_proc_id_with_ttl(0, Duration::from_micros(5))
323 .await;
324 state
325 .touch_proc_id_with_ttl(1, Duration::from_secs(60))
326 .await;
327
328 Handle::current()
330 .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
331
332 time::delay_for(Duration::from_millis(10)).await;
334
335 let proc_ids = state.proc_ids.lock().await;
337 assert!(!proc_ids.contains(&From::from(0)), "Proc not evicted");
338 assert!(
339 proc_ids.contains(&From::from(1)),
340 "Proc unexpectedly evicted"
341 );
342 }
343
344 #[tokio::test]
345 async fn cleanup_loop_should_only_run_if_state_marked_running() {
346 let state = Arc::new(state::ServerState::default());
347
348 state.shutdown();
350
351 let state = Arc::new(state::ServerState::default());
352
353 state.touch_file_id_with_ttl(0, Duration::new(0, 0)).await;
354 state.touch_proc_id_with_ttl(0, Duration::new(0, 0)).await;
355
356 Handle::current()
357 .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
358
359 assert!(
360 state.file_ids.lock().await.contains(&From::from(0)),
361 "File unexpectedly evicted"
362 );
363 assert!(
364 state.proc_ids.lock().await.contains(&From::from(0)),
365 "Proc unexpectedly evicted"
366 );
367 }
368}