apollo_framework/server.rs
1//! Contains the server component of Apollo.
2//!
3//! Opens a server-socket on the specified port (**server.port** in the config or 2410 as fallback)
4//! and binds it to the selected IP (**server.host** in the config or 0.0.0.0 as fallback). Each
5//! incoming client is expected to send RESP requests and will be provided with the appropriate
6//! responses.
7//!
8//! Note that in order to achieve zero downtime / ultra high availability demands, the sever will
9//! periodically try to bind the socket to the selected port, therefore an "new" instance can
10//! be started and the "old" once can bleed out and the port will be "handed through" with minimal
11//! downtime. Also, this will listen to change events of the config and will relocate to another
12//! port or host if changed.
13//!
14//! # Example
15//!
16//! ```no_run
17//! use apollo_framework::server::Server;
18//! use tokio::time::Duration;
19//!
20//! #[tokio::main]
21//! async fn main() {
22//! }
23//! ```
24use std::sync::{Arc, Mutex};
25use std::time::{Duration, Instant};
26
27use tokio::net::{TcpListener, TcpStream};
28
29use crate::config::Config;
30use crate::platform::Platform;
31use std::future::Future;
32use std::sync::atomic::{AtomicBool, Ordering};
33
34/// Specifies the timeout when waiting for a new incoming connection.
35///
36/// When waiting for a new connection we need to interrupt this every once in a while so that
37/// we can check if the platform has been shut down.
38const CONNECT_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
39
40/// Represents a client connection.
41pub struct Connection<P: Default + Send + Sync> {
42 peer_address: String,
43 active: AtomicBool,
44 payload: P,
45}
46
47impl<P: Default + Send + Sync> PartialEq for Connection<P> {
48 fn eq(&self, other: &Self) -> bool {
49 self.peer_address == other.peer_address
50 }
51}
52
53impl<P: Default + Send + Sync> Connection<P> {
54 /// Determines if the connection is active or if a termination has been requested.
55 pub fn is_active(&self) -> bool {
56 self.active.load(Ordering::Acquire)
57 }
58
59 /// Terminates the connection.
60 pub fn quit(&self) {
61 self.active.store(false, Ordering::Release);
62 }
63
64 /// TODO
65 pub fn payload(&self) -> &P {
66 &self.payload
67 }
68}
69
70/// Provides some metadata for a client connection.
71pub struct ConnectionInfo<P: Default + Send + Sync> {
72 /// Contains the peer address of the client being connected.
73 pub peer_address: String,
74
75 /// Contains the name of the connected client.
76 pub payload: P,
77}
78
79/// Represents a TODO server which manages all TCP connections.
80pub struct Server<P: Default + Send + Sync> {
81 running: AtomicBool,
82 current_address: Mutex<Option<String>>,
83 platform: Arc<Platform>,
84 connections: Mutex<Vec<Arc<Connection<P>>>>,
85}
86
87impl<P: 'static + Default + Send + Sync + Clone> Server<P> {
88 /// Creates and installs a **Server** into the given **Platform**.
89 ///
90 /// Note that this is called by the [Builder](crate::builder::Builder) unless disabled.
91 ///
92 /// Also note, that this will not technically start the server. This has to be done manually
93 /// via [event_loop](Server::event_loop) as it is most probable done in the main thread.
94 pub fn install(platform: &Arc<Platform>) -> Arc<Self> {
95 let server = Arc::new(Server {
96 running: AtomicBool::new(false),
97 current_address: Mutex::new(None),
98 platform: platform.clone(),
99 connections: Mutex::new(Vec::new()),
100 });
101
102 platform.register::<Server<P>>(server.clone());
103
104 server
105 }
106
107 /// Lists all currently active connections.
108 pub fn connections(&self) -> Vec<ConnectionInfo<P>> {
109 let mut result = Vec::new();
110 for connection in self.connections.lock().unwrap().iter() {
111 result.push(ConnectionInfo {
112 peer_address: connection.peer_address.clone(),
113 payload: connection.payload.clone(),
114 });
115 }
116
117 result
118 }
119
120 /// Kills the connection of the given peer address.
121 pub fn kill(&self, peer_address: &str) -> bool {
122 self.connections
123 .lock()
124 .unwrap()
125 .iter()
126 .find(|c| c.peer_address == peer_address)
127 .map(|c| c.active.store(false, Ordering::Release))
128 .is_some()
129 }
130
131 /// Adds a newly created client connection.
132 ///
133 /// Note that this involves locking a **Mutex**. However, we expect our clients to use
134 /// connection pooling, so that only a few rather long running connections are present.
135 fn add_connection(&self, connection: Arc<Connection<P>>) {
136 self.connections.lock().unwrap().push(connection);
137 }
138
139 /// Removes a connection after it has been closed by either side.
140 fn remove_connection(&self, connection: Arc<Connection<P>>) {
141 let mut mut_connections = self.connections.lock().unwrap();
142 if let Some(index) = mut_connections
143 .iter()
144 .position(|other| *other == connection)
145 {
146 let _ = mut_connections.remove(index);
147 }
148 }
149
150 /// Determines if the server socket should keep listening for incoming connections.
151 ///
152 /// In contrast to **Platform::is_running** this is not used to control the shutdown of the
153 /// server. Rather we toggle this flag to false if a config and therefore address change was
154 /// detected. This way **server_loop** will exit and a new server socket for the appropriate
155 /// address will be setup by the **event_loop**.
156 fn is_running(&self) -> bool {
157 self.running.load(Ordering::Acquire)
158 }
159
160 /// Determines the server address based on the current configuration.
161 ///
162 /// If no, an invalid or a partial config is present, fallback values are used. By default we
163 /// use port 2410 and bind to "0.0.0.0".
164 fn address(&self) -> String {
165 self.platform
166 .find::<Config>()
167 .map(|config| {
168 let handle = config.current();
169 format!(
170 "{}:{}",
171 handle.config()["server"]["host"]
172 .as_str()
173 .unwrap_or("0.0.0.0"),
174 handle.config()["server"]["port"]
175 .as_i64()
176 .filter(|port| port > &0 && port <= &(u16::MAX as i64))
177 .unwrap_or(2410)
178 )
179 })
180 .unwrap_or_else(|| "0.0.0.0:2410".to_owned())
181 }
182
183 /// Starts the event loop in a separate thread.
184 ///
185 /// This is most probably used by test scenarios where the tests itself run in the main thread.
186 pub fn fork<F>(
187 server: &Arc<Server<P>>,
188 client_loop: &'static (impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F + Send + Sync),
189 ) where
190 F: Future<Output = anyhow::Result<()>> + Send + Sync,
191 {
192 let cloned_server = server.clone();
193 let _ = tokio::spawn(async move {
194 cloned_server.event_loop(client_loop).await;
195 });
196 }
197
198 /// Starts the event loop in a separate thread and waits until the server is up and running.
199 ///
200 /// Just like **fork** this is intended to be used in test environments.
201 pub async fn fork_and_await<F>(
202 server: &Arc<Server<P>>,
203 client_loop: &'static (impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F + Send + Sync),
204 ) where
205 F: Future<Output = anyhow::Result<()>> + Send + Sync,
206 {
207 Server::fork(server, client_loop);
208
209 while !server.is_running() {
210 tokio::time::sleep(Duration::from_secs(1)).await;
211 }
212 }
213
214 /// Tries to open a server socket on the specified address to serve incoming client connections.
215 ///
216 /// The task of this loop is to bind the server socket to the specified address. Once this was
217 /// successful, we enter the [server_loop](Server::server_loop) to actually handle incoming
218 /// connections. Once this loop returns, either the platform is no longer running and we should
219 /// exit, or the config has changed and we should try to bind the server to the new address.
220 pub async fn event_loop<F>(
221 &self,
222 client_loop: impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
223 + Send
224 + Sync
225 + Copy
226 + 'static,
227 ) where
228 F: Future<Output = anyhow::Result<()>> + Send,
229 {
230 let mut address = String::new();
231 let mut last_bind_error_reported = Instant::now();
232
233 while self.platform.is_running() {
234 // If the sever is started for the first time or if it has been restarted due to a
235 // config change, we need to reload the address...
236 if !self.is_running() {
237 address = self.address();
238 self.running.store(true, Ordering::Release);
239 }
240
241 // Bind and hopefully enter the server_loop...
242 if let Ok(mut listener) = TcpListener::bind(&address).await {
243 log::info!("Opened server socket on {}...", &address);
244 *self.current_address.lock().unwrap() = Some(address.clone());
245 self.server_loop(&mut listener, client_loop).await;
246 log::info!("Closing server socket on {}.", &address);
247 } else {
248 // If we were unable to bind to the server, we log this every once in a while
249 // (every 5s). Otherwise we would jam the log as re retry every 500ms.
250 if Instant::now()
251 .duration_since(last_bind_error_reported)
252 .as_secs()
253 > 5
254 {
255 log::error!(
256 "Cannot open server address: {}. Retrying every 500ms...",
257 &address
258 );
259 last_bind_error_reported = Instant::now();
260 }
261 tokio::time::sleep(Duration::from_millis(500)).await;
262 }
263 }
264 }
265
266 /// Runs the main server loop which processes incoming connections.
267 ///
268 /// This also listens on config changes and exits to the event_loop if necessary (server
269 /// address changed...).
270 async fn server_loop<F>(
271 &self,
272 listener: &mut TcpListener,
273 client_loop: impl Fn(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
274 + Copy
275 + Send
276 + Sync
277 + 'static,
278 ) where
279 F: Future<Output = anyhow::Result<()>> + Send,
280 {
281 let mut config_changed_flag = self.platform.require::<Config>().notifier();
282
283 while self.platform.is_running() && self.is_running() {
284 tokio::select! {
285 // We use a timeout here so that the while condition (esp. platform.is_running())
286 // is checked every once in a while...
287 timeout_stream = tokio::time::timeout(CONNECT_WAIT_TIMEOUT, listener.accept()) => {
288 // We're only interested in a positive result here, as an Err simply indicates
289 // that the timeout was hit - in this case we do nothing as the while condition
290 // is all the needs to be checked...
291 if let Ok(stream) = timeout_stream {
292 // If a stream is present, we treat this as new connection and eventually
293 // start a client_loop for it...
294 if let Ok((stream, _)) = stream {
295 self.handle_new_connection(stream, client_loop);
296 } else {
297 // Otherwise the socket has been closed therefore we exit to the
298 // event_loop which will either complete exit or try to re-create
299 // the socket.
300 return;
301 }
302 }
303 }
304 _ = config_changed_flag.recv() => {
305 // If the config was changed, we need to check if the address itself changed...
306 let new_address = self.address();
307 if let Some(current_address) = &*self.current_address.lock().unwrap() {
308 if current_address != &new_address {
309 log::info!("Server address has changed. Restarting server socket...");
310
311 // Force the event_loop to re-evaluate the expected server address...
312 self.running.store(false, Ordering::Release);
313
314 // Return to event_loop so that the server socket is re-created...
315 return;
316 }
317 }
318 }
319 }
320 }
321 }
322
323 /// Handles a new incoming connection.
324 ///
325 /// This will register the connection in the list of client connections and then fork a
326 /// "thread" which mainly simply executes the **client_loop** for this connection.
327 fn handle_new_connection<F>(
328 &self,
329 stream: TcpStream,
330 client_loop: impl FnOnce(Arc<Platform>, Arc<Connection<P>>, TcpStream) -> F
331 + 'static
332 + Send
333 + Sync
334 + Copy,
335 ) where
336 F: Future<Output = anyhow::Result<()>> + Send,
337 {
338 let platform = self.platform.clone();
339 let _ = tokio::spawn(async move {
340 // Mark the connection as nodelay, as we already optimize all writes as far as possible.
341 let _ = stream.set_nodelay(true);
342
343 // Register the new connection to that the can report it in the maintenance utilities...
344 let server = platform.require::<Server<P>>();
345 let connection = Arc::new(Connection {
346 peer_address: stream
347 .peer_addr()
348 .map(|addr| addr.to_string())
349 .unwrap_or_else(|_| "<unknown>".to_owned()),
350 active: AtomicBool::new(true),
351 payload: P::default(),
352 });
353 log::debug!("Opened connection from {}...", connection.peer_address);
354 server.add_connection(connection.clone());
355
356 // Executes the client loop for this connection....
357 if let Err(error) = client_loop(platform, connection.clone(), stream).await {
358 log::debug!(
359 "An IO error occurred in connection {}: {}",
360 connection.peer_address,
361 error
362 );
363 }
364
365 // Removes the connection as it has been closed...
366 log::debug!("Closing connection to {}...", connection.peer_address);
367 server.remove_connection(connection);
368 });
369 }
370}