apollo_framework/
server.rs

1//! Contains the server component of Apollo.
2//!
3//! Opens a server-socket on the specified port (**server.port** in the config or 2410 as fallback)
4//! and binds it to the selected IP (**server.host** in the config or 0.0.0.0 as fallback). Each
5//! incoming client is expected to send RESP requests and will be provided with the appropriate
6//! responses.
7//!
8//! Note that in order to achieve zero downtime / ultra high availability demands, the sever will
9//! periodically try to bind the socket to the selected port, therefore an "new" instance can
10//! be started and the "old" once can bleed out and the port will be "handed through" with minimal
11//! downtime. Also, this will listen to change events of the config and will relocate to another
12//! port or host if changed.
13//!
14//! # Example
15//!
16//! ```no_run
17//! use apollo_framework::server::Server;
18//! use tokio::time::Duration;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! }
23//! ```
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, Instant};
26
27use tokio::net::{TcpListener, TcpStream};
28
29use crate::config::Config;
30use crate::platform::Platform;
31use std::future::Future;
32use std::sync::atomic::{AtomicBool, Ordering};
33
34/// Specifies the timeout when waiting for a new incoming connection.
35///
36/// When waiting for a new connection we need to interrupt this every once in a while so that
37/// we can check if the platform has been shut down.
38const CONNECT_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
39
40/// Represents a client connection.
41pub struct Connection<P: Default + Send + Sync> {
42    peer_address: String,
43    active: AtomicBool,
44    payload: P,
45}
46
47impl<P: Default + Send + Sync> PartialEq for Connection<P> {
48    fn eq(&self, other: &Self) -> bool {
49        self.peer_address == other.peer_address
50    }
51}
52
53impl<P: Default + Send + Sync> Connection<P> {
54    /// Determines if the connection is active or if a termination has been requested.
55    pub fn is_active(&self) -> bool {
56        self.active.load(Ordering::Acquire)
57    }
58
59    /// Terminates the connection.
60    pub fn quit(&self) {
61        self.active.store(false, Ordering::Release);
62    }
63
64    /// TODO
65    pub fn payload(&self) -> &P {
66        &self.payload
67    }
68}
69
70/// Provides some metadata for a client connection.
71pub struct ConnectionInfo<P: Default + Send + Sync> {
72    /// Contains the peer address of the client being connected.
73    pub peer_address: String,
74
75    /// Contains the name of the connected client.
76    pub payload: P,
77}
78
79/// Represents a TODO server which manages all TCP connections.
80pub struct Server<P: Default + Send + Sync> {
81    running: AtomicBool,
82    current_address: Mutex<Option<String>>,
83    platform: Arc<Platform>,
84    connections: Mutex<Vec<Arc<Connection<P>>>>,
85}
86
87impl<P: 'static + Default + Send + Sync + Clone> Server<P> {
88    /// Creates and installs a **Server** into the given **Platform**.
89    ///
90    /// Note that this is called by the [Builder](crate::builder::Builder) unless disabled.
91    ///
92    /// Also note, that this will not technically start the server. This has to be done manually
93    /// via [event_loop](Server::event_loop) as it is most probable done in the main thread.
94    pub fn install(platform: &Arc<Platform>) -> Arc<Self> {
95        let server = Arc::new(Server {
96            running: AtomicBool::new(false),
97            current_address: Mutex::new(None),
98            platform: platform.clone(),
99            connections: Mutex::new(Vec::new()),
100        });
101
102        platform.register::<Server<P>>(server.clone());
103
104        server
105    }
106
107    /// Lists all currently active connections.
108    pub fn connections(&self) -> Vec<ConnectionInfo<P>> {
109        let mut result = Vec::new();
110        for connection in self.connections.lock().unwrap().iter() {
111            result.push(ConnectionInfo {
112                peer_address: connection.peer_address.clone(),
113                payload: connection.payload.clone(),
114            });
115        }
116
117        result
118    }
119
120    /// Kills the connection of the given peer address.
121    pub fn kill(&self, peer_address: &str) -> bool {
122        self.connections
123            .lock()
124            .unwrap()
125            .iter()
126            .find(|c| c.peer_address == peer_address)
127            .map(|c| c.active.store(false, Ordering::Release))
128            .is_some()
129    }
130
131    /// Adds a newly created client connection.
132    ///
133    /// Note that this involves locking a **Mutex**. However, we expect our clients to use
134    /// connection pooling, so that only a few rather long running connections are present.
135    fn add_connection(&self, connection: Arc<Connection<P>>) {
136        self.connections.lock().unwrap().push(connection);
137    }
138
139    /// Removes a connection after it has been closed by either side.
140    fn remove_connection(&self, connection: Arc<Connection<P>>) {
141        let mut mut_connections = self.connections.lock().unwrap();
142        if let Some(index) = mut_connections
143            .iter()
144            .position(|other| *other == connection)
145        {
146            let _ = mut_connections.remove(index);
147        }
148    }
149
150    /// Determines if the server socket should keep listening for incoming connections.
151    ///
152    /// In contrast to **Platform::is_running** this is not used to control the shutdown of the
153    /// server. Rather we toggle this flag to false if a config and therefore address change was
154    /// detected. This way **server_loop** will exit and a new server socket for the appropriate
155    /// address will be setup by the **event_loop**.
156    fn is_running(&self) -> bool {
157        self.running.load(Ordering::Acquire)
158    }
159
160    /// Determines the server address based on the current configuration.
161    ///
162    /// If no, an invalid or a partial config is present, fallback values are used. By default we
163    /// use port 2410 and bind to "0.0.0.0".
164    fn address(&self) -> String {
165        self.platform
166            .find::<Config>()
167            .map(|config| {
168                let handle = config.current();
169                format!(
170                    "{}:{}",
171                    handle.config()["server"]["host"]
172                        .as_str()
173                        .unwrap_or("0.0.0.0"),
174                    handle.config()["server"]["port"]
175                        .as_i64()
176                        .filter(|port| port > &0 && port <= &(u16::MAX as i64))
177                        .unwrap_or(2410)
178                )
179            })
180            .unwrap_or_else(|| "0.0.0.0:2410".to_owned())
181    }
182
183    /// Starts the event loop in a separate thread.
184    ///
185    /// This is most probably used by test scenarios where the tests itself run in the main thread.
186    pub fn fork<F>(
187        server: &Arc<Server<P>>,
188        client_loop: &'static (impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F + Send + Sync),
189    ) where
190        F: Future<Output = anyhow::Result<()>> + Send + Sync,
191    {
192        let cloned_server = server.clone();
193        let _ = tokio::spawn(async move {
194            cloned_server.event_loop(client_loop).await;
195        });
196    }
197
198    /// Starts the event loop in a separate thread and waits until the server is up and running.
199    ///
200    /// Just like **fork** this is intended to be used in test environments.
201    pub async fn fork_and_await<F>(
202        server: &Arc<Server<P>>,
203        client_loop: &'static (impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F + Send + Sync),
204    ) where
205        F: Future<Output = anyhow::Result<()>> + Send + Sync,
206    {
207        Server::fork(server, client_loop);
208
209        while !server.is_running() {
210            tokio::time::sleep(Duration::from_secs(1)).await;
211        }
212    }
213
214    /// Tries to open a server socket on the specified address to serve incoming client connections.
215    ///
216    /// The task of this loop is to bind the server socket to the specified address. Once this was
217    /// successful, we enter the [server_loop](Server::server_loop) to actually handle incoming
218    /// connections. Once this loop returns, either the platform is no longer running and we should
219    /// exit, or the config has changed and we should try to bind the server to the new address.
220    pub async fn event_loop<F>(
221        &self,
222        client_loop: impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
223            + Send
224            + Sync
225            + Copy
226            + 'static,
227    ) where
228        F: Future<Output = anyhow::Result<()>> + Send,
229    {
230        let mut address = String::new();
231        let mut last_bind_error_reported = Instant::now();
232
233        while self.platform.is_running() {
234            // If the sever is started for the first time or if it has been restarted due to a
235            // config change, we need to reload the address...
236            if !self.is_running() {
237                address = self.address();
238                self.running.store(true, Ordering::Release);
239            }
240
241            // Bind and hopefully enter the server_loop...
242            if let Ok(mut listener) = TcpListener::bind(&address).await {
243                log::info!("Opened server socket on {}...", &address);
244                *self.current_address.lock().unwrap() = Some(address.clone());
245                self.server_loop(&mut listener, client_loop).await;
246                log::info!("Closing server socket on {}.", &address);
247            } else {
248                // If we were unable to bind to the server, we log this every once in a while
249                // (every 5s). Otherwise we would jam the log as re retry every 500ms.
250                if Instant::now()
251                    .duration_since(last_bind_error_reported)
252                    .as_secs()
253                    > 5
254                {
255                    log::error!(
256                        "Cannot open server address: {}. Retrying every 500ms...",
257                        &address
258                    );
259                    last_bind_error_reported = Instant::now();
260                }
261                tokio::time::sleep(Duration::from_millis(500)).await;
262            }
263        }
264    }
265
266    /// Runs the main server loop which processes incoming connections.
267    ///
268    /// This also listens on config changes and exits to the event_loop if necessary (server
269    /// address changed...).   
270    async fn server_loop<F>(
271        &self,
272        listener: &mut TcpListener,
273        client_loop: impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
274            + Copy
275            + Send
276            + Sync
277            + 'static,
278    ) where
279        F: Future<Output = anyhow::Result<()>> + Send,
280    {
281        let mut config_changed_flag = self.platform.require::<Config>().notifier();
282
283        while self.platform.is_running() && self.is_running() {
284            tokio::select! {
285                // We use a timeout here so that the while condition (esp. platform.is_running())
286                // is checked every once in a while...
287                timeout_stream = tokio::time::timeout(CONNECT_WAIT_TIMEOUT, listener.accept()) => {
288                    // We're only interested in a positive result here, as an Err simply indicates
289                    // that the timeout was hit - in this case we do nothing as the while condition
290                    // is all the needs to be checked...
291                    if let Ok(stream) = timeout_stream {
292                        // If a stream is present, we treat this as new connection and eventually
293                        // start a client_loop for it...
294                        if let Ok((stream, _)) = stream {
295                            self.handle_new_connection(stream, client_loop);
296                        } else {
297                            // Otherwise the socket has been closed therefore we exit to the
298                            // event_loop which will either complete exit or try to re-create
299                            // the socket.
300                            return;
301                        }
302                    }
303                }
304                _ = config_changed_flag.recv() => {
305                    // If the config was changed, we need to check if the address itself changed...
306                    let new_address = self.address();
307                    if let Some(current_address) = &*self.current_address.lock().unwrap() {
308                       if current_address != &new_address {
309                           log::info!("Server address has changed. Restarting server socket...");
310
311                           // Force the event_loop to re-evaluate the expected server address...
312                           self.running.store(false, Ordering::Release);
313
314                           // Return to event_loop so that the server socket is re-created...
315                           return;
316                       }
317                    }
318               }
319            }
320        }
321    }
322
323    /// Handles a new incoming connection.
324    ///
325    /// This will register the connection in the list of client connections and then fork a
326    /// "thread" which mainly simply executes the **client_loop** for this connection.
327    fn handle_new_connection<F>(
328        &self,
329        stream: TcpStream,
330        client_loop: impl FnOnce(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
331            + 'static
332            + Send
333            + Sync
334            + Copy,
335    ) where
336        F: Future<Output = anyhow::Result<()>> + Send,
337    {
338        let platform = self.platform.clone();
339        let _ = tokio::spawn(async move {
340            // Mark the connection as nodelay, as we already optimize all writes as far as possible.
341            let _ = stream.set_nodelay(true);
342
343            // Register the new connection to that the can report it in the maintenance utilities...
344            let server = platform.require::<Server<P>>();
345            let connection = Arc::new(Connection {
346                peer_address: stream
347                    .peer_addr()
348                    .map(|addr| addr.to_string())
349                    .unwrap_or_else(|_| "<unknown>".to_owned()),
350                active: AtomicBool::new(true),
351                payload: P::default(),
352            });
353            log::debug!("Opened connection from {}...", connection.peer_address);
354            server.add_connection(connection.clone());
355
356            // Executes the client loop for this connection....
357            if let Err(error) = client_loop(platform, connection.clone(), stream).await {
358                log::debug!(
359                    "An IO error occurred in connection {}: {}",
360                    connection.peer_address,
361                    error
362                );
363            }
364
365            // Removes the connection as it has been closed...
366            log::debug!("Closing connection to {}...", connection.peer_address);
367            server.remove_connection(connection);
368        });
369    }
370}