Skip to main content

over_there/core/server/
mod.rs

1mod action;
2mod custom;
3pub mod fs;
4mod listening;
5pub mod proc;
6pub mod state;
7
8pub use listening::ListeningServer;
9
10use crate::core::{event::AddrEventManager, Msg, Transport};
11use derive_builder::Builder;
12use log::error;
13use crate::core::transport::{Authenticator, Bicrypter, NetTransmission, Wire};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::{
18    io,
19    net::{TcpListener, UdpSocket},
20    runtime::Handle,
21    sync::mpsc,
22    time,
23};
24
25/// Represents a server configuration prior to listening
26#[derive(Builder, Clone)]
27pub struct Server<A, B>
28where
29    A: Authenticator,
30    B: Bicrypter,
31{
32    /// TTL to collect all packets for a msg
33    #[builder(default = "crate::core::transport::constants::DEFAULT_TTL")]
34    packet_ttl: Duration,
35
36    /// Used to sign & verify msgs
37    authenticator: A,
38
39    /// Used to encrypt & decrypt msgs
40    bicrypter: B,
41
42    /// Transportation mechanism & address to listen on
43    transport: Transport,
44
45    /// Internal buffer for cross-thread messaging
46    #[builder(default = "1000")]
47    buffer: usize,
48
49    /// Interval at which cleanup of dangling resources is performed
50    #[builder(default = "Duration::from_secs(60)")]
51    cleanup_interval: Duration,
52
53    /// TTL for an untouched, open file before it is closed during cleanup
54    #[builder(default = "state::constants::DEFAULT_FILE_TTL")]
55    file_ttl: Duration,
56
57    /// TTL for an untouched, running process before it is killed during cleanup
58    #[builder(default = "state::constants::DEFAULT_PROC_TTL")]
59    proc_ttl: Duration,
60
61    /// TTL for an untouched, dead process before it is removed during cleanup
62    #[builder(default = "state::constants::DEFAULT_DEAD_PROC_TTL")]
63    dead_proc_ttl: Duration,
64}
65
66impl<A, B> Server<A, B>
67where
68    A: Authenticator + Send + Sync + 'static,
69    B: Bicrypter + Send + Sync + 'static,
70{
71    /// Starts actively listening for msgs via the specified transport medium
72    ///
73    /// Will fail if using TCP transport as requires Clone; should instead
74    /// use `cloneable_listen` if using TCP
75    pub async fn listen(self) -> io::Result<ListeningServer> {
76        let handle = Handle::current();
77        let state = Arc::new(state::ServerState::new(
78            self.file_ttl,
79            self.proc_ttl,
80            self.dead_proc_ttl,
81        ));
82
83        handle.spawn(cleanup_loop(Arc::clone(&state), self.cleanup_interval));
84
85        match self.transport.clone() {
86            Transport::Tcp(_) => Err(io::Error::new(
87                io::ErrorKind::InvalidInput,
88                "Authenticator or Bicrypter is not clonable",
89            )),
90            Transport::Udp(addrs) => {
91                build_and_listen_udp_server(self, state, &addrs).await
92            }
93        }
94    }
95}
96
97impl<A, B> Server<A, B>
98where
99    A: Authenticator + Send + Sync + Clone + 'static,
100    B: Bicrypter + Send + Sync + Clone + 'static,
101{
102    /// Starts actively listening for msgs via the specified transport medium,
103    /// using cloneable methods for Authenticator and Bicrypter operations
104    pub async fn cloneable_listen(self) -> io::Result<ListeningServer> {
105        let handle = Handle::current();
106        let state = Arc::new(state::ServerState::new(
107            self.file_ttl,
108            self.proc_ttl,
109            self.dead_proc_ttl,
110        ));
111
112        handle.spawn(cleanup_loop(Arc::clone(&state), self.cleanup_interval));
113
114        match self.transport.clone() {
115            Transport::Tcp(addrs) => {
116                build_and_listen_tcp_server(self, state, &addrs).await
117            }
118            Transport::Udp(addrs) => {
119                build_and_listen_udp_server(self, state, &addrs).await
120            }
121        }
122    }
123}
124
125async fn build_and_listen_tcp_server<A, B>(
126    server: Server<A, B>,
127    state: Arc<state::ServerState>,
128    addrs: &[SocketAddr],
129) -> io::Result<ListeningServer>
130where
131    A: Authenticator + Send + Sync + Clone + 'static,
132    B: Bicrypter + Send + Sync + Clone + 'static,
133{
134    let handle = Handle::current();
135
136    // NOTE: Tokio does not support &[SocketAddr] -> ToSocketAddrs,
137    //       so we have to loop through manually
138    // See https://github.com/tokio-rs/tokio/pull/1760#discussion_r379120864
139    let listener = {
140        let mut listener = None;
141        for addr in addrs.iter() {
142            let result = TcpListener::bind(addr).await;
143            if result.is_ok() {
144                listener = result.ok();
145                break;
146            }
147        }
148        listener
149            .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?
150    };
151    let addr = listener.local_addr()?;
152
153    let wire = Wire::new(
154        NetTransmission::TcpEthernet.into(),
155        server.packet_ttl,
156        server.authenticator,
157        server.bicrypter,
158    );
159
160    let (tx, rx) = mpsc::channel(server.buffer);
161    let event_handle = handle.spawn(tcp_event_loop(Arc::clone(&state), rx));
162    let addr_event_manager = AddrEventManager::for_tcp_listener(
163        handle.clone(),
164        server.buffer,
165        listener,
166        wire,
167        tx,
168    );
169
170    Ok(ListeningServer {
171        addr,
172        addr_event_manager,
173        state,
174        event_handle,
175    })
176}
177
178async fn build_and_listen_udp_server<A, B>(
179    server: Server<A, B>,
180    state: Arc<state::ServerState>,
181    addrs: &[SocketAddr],
182) -> io::Result<ListeningServer>
183where
184    A: Authenticator + Send + Sync + 'static,
185    B: Bicrypter + Send + Sync + 'static,
186{
187    let handle = Handle::current();
188
189    // NOTE: Tokio does not support &[SocketAddr] -> ToSocketAddrs,
190    //       so we have to loop through manually
191    // See https://github.com/tokio-rs/tokio/pull/1760#discussion_r379120864
192    let socket = {
193        let mut socket = None;
194        for addr in addrs.iter() {
195            let result = UdpSocket::bind(addr).await;
196            if result.is_ok() {
197                socket = result.ok();
198                break;
199            }
200        }
201        socket
202            .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?
203    };
204    let addr = socket.local_addr()?;
205    let transmission = NetTransmission::udp_from_addr(addr);
206
207    let wire = Wire::new(
208        transmission.into(),
209        server.packet_ttl,
210        server.authenticator,
211        server.bicrypter,
212    );
213
214    let (tx, rx) = mpsc::channel(server.buffer);
215    let event_handle = handle.spawn(udp_event_loop(Arc::clone(&state), rx));
216    let addr_event_manager = AddrEventManager::for_udp_socket(
217        handle.clone(),
218        server.buffer,
219        socket,
220        wire,
221        tx,
222    );
223
224    Ok(ListeningServer {
225        addr,
226        addr_event_manager,
227        state,
228        event_handle,
229    })
230}
231
232async fn tcp_event_loop(
233    state: Arc<state::ServerState>,
234    mut rx: mpsc::Receiver<(Msg, SocketAddr, mpsc::Sender<Vec<u8>>)>,
235) {
236    while let Some((msg, addr, tx)) = rx.recv().await {
237        if let Err(x) = action::Executor::<Vec<u8>>::new(
238            tx,
239            addr,
240            action::Executor::<Vec<u8>>::DEFAULT_MAX_DEPTH,
241        )
242        .execute(Arc::clone(&state), msg)
243        .await
244        {
245            error!("Failed to execute action: {}", x);
246        }
247    }
248}
249
250async fn udp_event_loop(
251    state: Arc<state::ServerState>,
252    mut rx: mpsc::Receiver<(
253        Msg,
254        SocketAddr,
255        mpsc::Sender<(Vec<u8>, SocketAddr)>,
256    )>,
257) {
258    while let Some((msg, addr, tx)) = rx.recv().await {
259        if let Err(x) = action::Executor::<(Vec<u8>, SocketAddr)>::new(
260            tx,
261            addr,
262            action::Executor::<(Vec<u8>, SocketAddr)>::DEFAULT_MAX_DEPTH,
263        )
264        .execute(Arc::clone(&state), msg)
265        .await
266        {
267            error!("Failed to execute action: {}", x);
268        }
269    }
270}
271
272async fn cleanup_loop(state: Arc<state::ServerState>, period: Duration) {
273    while state.is_running() {
274        state.evict_files().await;
275        state.evict_procs().await;
276        time::delay_for(period).await;
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[tokio::test]
285    async fn cleanup_loop_should_evict_unused_files_every_period() {
286        let state = Arc::new(state::ServerState::default());
287
288        // Touch some file ids so we can verify that the loop will remove
289        // some of them, but don't bother opening them as full evict is
290        // tested elsewhere
291        state
292            .touch_file_id_with_ttl(0, Duration::from_micros(5))
293            .await;
294        state
295            .touch_file_id_with_ttl(1, Duration::from_secs(60))
296            .await;
297
298        // Run loop with a very short period so we can ensure we check quickly
299        Handle::current()
300            .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
301
302        // Ensure that we've waited long enough for some files to be evicted
303        time::delay_for(Duration::from_millis(10)).await;
304
305        // Verify expected files were evicted
306        let file_ids = state.file_ids.lock().await;
307        assert!(!file_ids.contains(&From::from(0)), "File not evicted");
308        assert!(
309            file_ids.contains(&From::from(1)),
310            "File unexpectedly evicted"
311        );
312    }
313
314    #[tokio::test]
315    async fn cleanup_loop_should_evict_unused_processes_every_period() {
316        let state = Arc::new(state::ServerState::default());
317
318        // Touch some proc ids so we can verify that the loop will remove
319        // some of them, but don't bother spawning them as full evict is
320        // tested elsewhere
321        state
322            .touch_proc_id_with_ttl(0, Duration::from_micros(5))
323            .await;
324        state
325            .touch_proc_id_with_ttl(1, Duration::from_secs(60))
326            .await;
327
328        // Run loop with a very short period so we ensure we check quickly
329        Handle::current()
330            .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
331
332        // Ensure that we've waited long enough for some procs to be evicted
333        time::delay_for(Duration::from_millis(10)).await;
334
335        // Verify expected files were evicted
336        let proc_ids = state.proc_ids.lock().await;
337        assert!(!proc_ids.contains(&From::from(0)), "Proc not evicted");
338        assert!(
339            proc_ids.contains(&From::from(1)),
340            "Proc unexpectedly evicted"
341        );
342    }
343
344    #[tokio::test]
345    async fn cleanup_loop_should_only_run_if_state_marked_running() {
346        let state = Arc::new(state::ServerState::default());
347
348        // Shutdown before we even start to ensure that cleanup never happens
349        state.shutdown();
350
351        let state = Arc::new(state::ServerState::default());
352
353        state.touch_file_id_with_ttl(0, Duration::new(0, 0)).await;
354        state.touch_proc_id_with_ttl(0, Duration::new(0, 0)).await;
355
356        Handle::current()
357            .spawn(cleanup_loop(Arc::clone(&state), Duration::from_millis(1)));
358
359        assert!(
360            state.file_ids.lock().await.contains(&From::from(0)),
361            "File unexpectedly evicted"
362        );
363        assert!(
364            state.proc_ids.lock().await.contains(&From::from(0)),
365            "Proc unexpectedly evicted"
366        );
367    }
368}