reactive_messaging/socket_services/socket_server/
socket_server.rs

1//! Provides the [new_socket_server!()] & [new_composite_socket_server!()] macros for instantiating servers, which may then be started by
2//! [start_unresponsive_server_processor!()] & [start_responsive_server_processor!()] -- with each taking different variants of the reactive processor logic --
3//! or have the composite processors spawned by [spawn_unresponsive_composite_server_processor!()] & [spawn_responsive_composite_server_processor!()].
4//!
5//! Both reactive processing logic variants will take in a `Stream` as parameter and should return another `Stream` as output:
6//!   * Responsive: the reactive processor's output `Stream` yields items that are messages to be sent back to each client;
7//!   * Unresponsive the reactive processor's output `Stream` won't be sent to the clients, allowing the `Stream`s to return items from any type.
8//!
9//! The Unresponsive processors still allow you to send messages to the client and should be preferred (as far as performance is concerned)
10//! for protocols where the count of messages IN don't nearly map 1/1 with the count of messages OUT.
11//!
12//! For every server variant, the `Stream` items may be a combination of fallible/non-fallible (using `Result<>` or not) & future/non-future.
13//!
14//! Instead of using the mentioned macros, you might want to take a look at [CompositeSocketServer] to access the inner implementation directly
15//! -- both ways have the same flexibility, but the macro version takes in all parameters in the conveniently packed and documented [ConstConfig]
16//! struct, instead of requiring several const generic parameters.
17
18
19use crate::{
20    socket_services::socket_server::common::upgrade_to_termination_tracking,
21    types::{
22        ProtocolEvent,
23        MessagingMutinyStream,
24    }, socket_connection::{
25        peer::Peer,
26        socket_connection_handler::SocketConnectionHandler,
27        connection_provider::{ServerConnectionHandler, ConnectionChannel},
28    },
29    serde::{ ReactiveMessagingDeserializer, ReactiveMessagingSerializer},
30};
31use crate::socket_connection::connection::SocketConnection;
32use crate::socket_services::types::MessagingService;
33use crate::types::ConnectionEvent;
34use std::{
35    error::Error,
36    fmt::Debug,
37    future::Future,
38    sync::Arc,
39};
40use reactive_mutiny::prelude::advanced::{
41    FullDuplexUniChannel,
42    GenericUni,
43};
44use std::net::SocketAddr;
45use futures::{future::BoxFuture, Stream};
46use tokio::io::AsyncWriteExt;
47use log::{error, trace, warn};
48
49
50/// Instantiates & allocates resources for a stateless [CompositeSocketServer] (suitable for single protocol communications),
51/// ready to be later started by [`start_unresponsive_server_processor!()`] or [`start_responsive_server_processor!()`].
52///
53/// Params:
54///   - `const_config`: [ConstConfig] -- the configurations for the server, enforcing const/compile time optimizations;
55///   - `interface_ip`: IntoString -- the interface to listen to incoming connections;
56///   - `port`: u16 -- the port to listen to incoming connections;
57
58/// Example:
59/// ```nocompile
60///     let mut server = new_socket_server!(CONFIG, "127.0.0.1", 8768);
61/// ```
62/// See [`new_composite_socket_server!()`] if you want to use the "Composite Protocol Stacking" pattern.
63#[macro_export]
64macro_rules! new_socket_server {
65    ($const_config:    expr,
66     $interface_ip:    expr,
67     $port:            expr) => {
68        crate::new_composite_socket_server!($const_config, $interface_ip, $port, ())
69    }
70}
71pub use new_socket_server;
72
73
74/// Instantiates & allocates resources for a stateful [CompositeSocketServer] (suitable for the "Composite Protocol Stacking" pattern),
75/// ready to have processors added by [spawn_unresponsive_server_processor!()] or [spawn_responsive_server_processor!()]
76/// and to be later started by [CompositeSocketServer::start_multi_protocol()]
77/// -- using the default "Atomic" channels (see [new_composite_fullsync_server!()] & [new_composite_crossbeam_server!()] for alternatives).\
78/// Params:
79///   - `const_config`: [ConstConfig] -- the configurations for the server, enforcing const/compile time optimizations;
80///   - `interface_ip: IntoString` -- the interface to listen to incoming connections;
81///   - `port: u16` -- the port to listen to incoming connections;
82///   - `remote_messages`: [ReactiveMessagingDeserializer<>] -- the type of the messages produced by the clients;
83///   - `local_messages`: [ReactiveMessagingSerializer<>] -- the type of the messages produced by this server -- should, additionally, implement the `Default` trait.
84///   - `state_type: Default` -- The state type used by the "connection routing closure" (to be provided) to promote the "Composite Protocol Stacking" pattern
85/// See [new_socket_server!()] if you want to use the "Composite Protocol Stacking" pattern.
86#[macro_export]
87macro_rules! new_composite_socket_server {
88    ($const_config:    expr,
89     $interface_ip:    expr,
90     $port:            expr,
91     $state_type:      ty) => {{
92        const _CONST_CONFIG: ConstConfig = $const_config;
93        const _CONFIG:      u64          = _CONST_CONFIG.into();
94        CompositeSocketServer::<_CONFIG, $state_type>::new($interface_ip, $port)
95    }}
96}
97pub use new_composite_socket_server;
98
99
100/// See [CompositeSocketServer::spawn_processor()].
101#[macro_export]
102macro_rules! spawn_server_processor {
103    ($const_config:                 expr,
104     $channel_type:                 tt,
105     $socket_server:                expr,
106     $remote_messages:              ty,
107     $local_messages:               ty,
108     $connection_events_handler_fn: expr,
109     $dialog_processor_builder_fn:  expr) => {{
110        _define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
111        $socket_server.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _>($connection_events_handler_fn, $dialog_processor_builder_fn).await
112    }}
113}
114pub use spawn_server_processor;
115
116
117/// Starts a server (previously instantiated by [new_socket_server!()]) that will communicate with clients using a single protocol -- as defined by the given
118/// `dialog_processor_builder_fn`, a builder of "unresponsive" `Stream`s as specified in [CompositeSocketServer::spawn_processor()].\
119/// If you want to follow the "Composite Protocol Stacking" pattern, see the [spawn_composite_server_processor!()] macro instead.
120#[macro_export]
121macro_rules! start_server_processor {
122    ($const_config:                 expr,
123     $channel_type:                 tt,
124     $socket_server:                expr,
125     $remote_messages:              ty,
126     $local_messages:               ty,
127     $connection_events_handler_fn: expr,
128     $dialog_processor_builder_fn:  expr) => {{
129        match spawn_server_processor!($const_config, $channel_type, $socket_server, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
130            Ok(connection_channel) => $socket_server.start_single_protocol(connection_channel).await,
131            Err(err) => Err(err),
132        }
133    }}
134}
135pub use start_server_processor;
136
137
138/// Real definition & implementation for our Socket Server, full of generic parameters.\
139/// Probably you want to instantiate this structure through the sugared macros [new_socket_server!()] or [new_composite_socket_server!()] instead.
140/// Generic Parameters:
141///   - `CONFIG`:           the `u64` version of the [ConstConfig] instance used to build this struct -- from which `ProcessorUniType` and `SenderChannel` derive;
142///   - `RemoteMessages`:   the messages that are generated by the clients (usually an `enum`);
143///   - `LocalMessages`:    the messages that are generated by the server (usually an `enum`);
144///   - `ProcessorUniType`: an instance of a `reactive-mutiny`'s [Uni] type (using one of the zero-copy channels) --
145///                         This [Uni] will execute the given server reactive logic for each incoming message (see how it is used in [new_socket_server!()] or [new_composite_socket_server!()]);
146///   - `SenderChannel`:    an instance of a `reactive-mutiny`'s Uni movable `Channel`, which will provide a `Stream` of messages to be sent to the client;
147///   - `StateType`:        The state type used by the "connection routing closure" (to be provided), enabling the "Composite Protocol Stacking" pattern.
148pub struct CompositeSocketServer<const CONFIG: u64,
149                               StateType:      Send + Sync + Clone + Debug + 'static> {
150
151    /// The interface to listen to incoming connections
152    interface_ip: String,
153    /// The port to listen to incoming connections
154    port: u16,
155    /// The abstraction containing the network loop that accepts connections for us + facilities to start processing already
156    /// opened connections (enabling the "Composite Protocol Stacking" design pattern)
157    connection_provider: Option<ServerConnectionHandler<StateType>>,
158    /// Signalers to cause [Self::termination_waiter()]'s closure to return (once they all dispatch their signals)
159    processor_termination_complete_receivers:  Option<Vec<tokio::sync::oneshot::Receiver<()>>>,
160    /// Connections returned by processors after they are done with them -- these connections
161    /// may be routed to another processor if the "Composite Protocol Stacking" pattern is in play.
162    returned_connections_source: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
163    returned_connections_sink: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
164}
165impl<const CONFIG: u64,
166     StateType:    Send + Sync + Clone + Debug + 'static>
167CompositeSocketServer<CONFIG, StateType> {
168
169    /// Creates a new server instance listening on TCP/IP:
170    ///   `interface_ip`:         the interface's IP to listen to -- 0.0.0.0 will cause listening to all network interfaces
171    ///   `port`:                 what port to listen to
172    pub fn new<IntoString: Into<String>>
173              (interface_ip: IntoString,
174               port:         u16)
175              -> Self {
176        let (returned_connections_sink, returned_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(2);
177        Self {
178            interface_ip: interface_ip.into(),
179            port,
180            connection_provider: None,
181            processor_termination_complete_receivers: Some(vec![]),
182            returned_connections_source: Some(returned_connections_source),
183            returned_connections_sink,
184        }
185    }
186}
187
188impl<const CONFIG: u64,
189     StateType:    Send + Sync + Clone + Debug + 'static>
190MessagingService<CONFIG>
191for CompositeSocketServer<CONFIG, StateType> {
192
193    type StateType = StateType;
194
195    async fn spawn_processor<RemoteMessages:                 ReactiveMessagingDeserializer<RemoteMessages>                                                                                                                                                                                     + Send + Sync + PartialEq + Debug + 'static,
196                             LocalMessages:                  ReactiveMessagingSerializer<LocalMessages>                                                                                                                                                                                        + Send + Sync + PartialEq + Debug + 'static,
197                             ProcessorUniType:               GenericUni<ItemType=RemoteMessages>                                                                                                                                                                                               + Send + Sync                     + 'static,
198                             SenderChannel:                  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages>                                                                                                                                                       + Send + Sync                     + 'static,
199                             OutputStreamItemsType:                                                                                                                                                                                                                                              Send + Sync             + Debug + 'static,
200                             ServerStreamType:               Stream<Item=OutputStreamItemsType>                                                                                                                                                                                                + Send                            + 'static,
201                             ConnectionEventsCallbackFuture: Future<Output=()>                                                                                                                                                                                                                 + Send                            + 'static,
202                             ConnectionEventsCallback:       Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>)                                                                                                                 -> ConnectionEventsCallbackFuture + Send + Sync                     + 'static,
203                             ProcessorBuilderFn:             Fn(/*client_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, /*client_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> ServerStreamType               + Send + Sync                     + 'static>
204
205                            (&mut self,
206                             connection_events_callback:  ConnectionEventsCallback,
207                             dialog_processor_builder_fn: ProcessorBuilderFn)
208
209                            -> Result<ConnectionChannel<StateType>, Box<dyn Error + Sync + Send>> {
210
211        // configure this processor's "termination is complete" signaler
212        let (local_termination_sender, local_termination_receiver) = tokio::sync::oneshot::channel::<()>();
213        self.processor_termination_complete_receivers.as_mut().expect("BUG!").push(local_termination_receiver);
214        let connection_events_callback = upgrade_to_termination_tracking(local_termination_sender, connection_events_callback);
215
216        // the source of connections for this processor to start working on
217        let mut connection_provider = ConnectionChannel::new();
218        let new_connections_source = connection_provider.receiver()
219            .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
220
221        // start the server
222        let socket_communications_handler = SocketConnectionHandler::<CONFIG, RemoteMessages, LocalMessages, ProcessorUniType, SenderChannel, StateType>::new();
223        socket_communications_handler.server_loop_for_text_protocol(&self.interface_ip,
224                                                                    self.port,
225                                                                    new_connections_source,
226                                                                    self.returned_connections_sink.clone(),
227                                                                    connection_events_callback,
228                                                                    dialog_processor_builder_fn).await
229            .map_err(|err| format!("Error starting an unresponsive GenericCompositeSocketServer @ {}:{}: {:?}", self.interface_ip, self.port, err))?;
230        Ok(connection_provider)
231    }
232
233    async fn start_multi_protocol<ConnectionEventsCallbackFuture:  Future<Output=()> + Send>
234                                 (&mut self,
235                                  initial_connection_state:       StateType,
236                                  mut connection_routing_closure: impl FnMut(/*socket_connection: */&SocketConnection<StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<StateType>>> + Send + 'static,
237                                  connection_events_callback:     impl for <'r>  Fn(/*event: */ConnectionEvent<'r, StateType>)                         -> ConnectionEventsCallbackFuture                                 + Send + 'static)
238                                 -> Result<(), Box<dyn Error + Sync + Send>> {
239        let mut connection_provider = ServerConnectionHandler::new(&self.interface_ip, self.port, initial_connection_state).await
240            .map_err(|err| format!("couldn't start the Connection Provider server event loop: {err}"))?;
241        let mut new_connections_source = connection_provider.connection_receiver()
242            .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
243        _ = self.connection_provider.insert(connection_provider);
244
245        let mut returned_connections_source = self.returned_connections_source.take()
246            .ok_or_else(|| String::from("couldn't `take()` from the `returned_connections_source`. Has the server been `.start()`ed more than once?"))?;
247
248        let interface_ip = self.interface_ip.clone();
249        let port = self.port;
250
251        // Spawns the "connection routing task" to:
252        //   - Listen to newly incoming connections as well as upgraded/downgraded ones shared between processors
253        //   - Gives them to the `protocol_stacking_closure`
254        //   - Routes them to the right processor or close the connection
255        tokio::spawn(async move {
256
257            loop {
258                let (mut connection, sender) = tokio::select! {
259
260                    // process newly incoming connections
261                    new_connection = new_connections_source.recv() => {
262                        let Some(new_socket_connection) = new_connection else { break };
263                        connection_events_callback(ConnectionEvent::Connected(&new_socket_connection)).await;
264                        let sender = connection_routing_closure(&new_socket_connection, false);
265                        (new_socket_connection, sender)
266                    },
267
268                    // process connections returned by the processors (after they ended processing them)
269                    returned_connection_and_state = returned_connections_source.recv() => {
270                        let Some(returned_socket_connection) = returned_connection_and_state else { break };
271                        let sender = (!returned_socket_connection.closed())
272                            .then_some(())
273                            .and_then(|_| connection_routing_closure(&returned_socket_connection, true));
274                        (returned_socket_connection, sender)
275                    },
276                };
277
278                // route the connection to another processor or drop it
279                match sender {
280                    Some(sender) => {
281                        let (client_ip, client_port) = connection.connection().peer_addr()
282                            .map(|peer_addr| match peer_addr {
283                                SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
284                                SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
285                            })
286                            .unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
287                        trace!("`reactive-messaging::CompositeSocketServer`: ROUTING the client {client_ip}:{client_port} of the server @ {interface_ip}:{port} to another processor");
288                        if let Err(_) = sender.send(connection).await {
289                            error!("`reactive-messaging::CompositeSocketServer`: BUG(?) in server @ {interface_ip}:{port} while re-routing the client {client_ip}:{client_port}'s socket: THE NEW (ROUTED) PROCESSOR CAN NO LONGER RECEIVE CONNECTIONS -- THE CONNECTION WILL BE DROPPED");
290                            break
291                        }
292                    },
293                    None => {
294                        connection_events_callback(ConnectionEvent::Disconnected(&connection)).await;
295                        if let Err(err) = connection.connection_mut().shutdown().await {
296                            let (client_ip, client_port) = connection.connection().peer_addr()
297                                .map(|peer_addr| match peer_addr {
298                                    SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
299                                    SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
300                                })
301                                .unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
302                            error!("`reactive-messaging::CompositeSocketServer`: ERROR in server @ {interface_ip}:{port} while shutting down the socket with client {client_ip}:{client_port}: {err}");
303                        }
304                    }
305                }
306            }
307            // loop ended
308            trace!("`reactive-messaging::CompositeSocketServer`: The 'Connection Routing Task' for server @ {interface_ip}:{port} ended -- hopefully, due to a graceful server termination.");
309        });
310        Ok(())
311    }
312
313    fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>> > {
314        let mut local_termination_receiver = self.processor_termination_complete_receivers.take();
315        let interface_ip = self.interface_ip.clone();
316        let port = self.port;
317        Box::new(move || Box::pin(async move {
318            let Some(local_termination_receiver) = local_termination_receiver.take() else {
319                return Err(Box::from(format!("GenericCompositeSocketServer::termination_waiter(): termination requested for server @ {interface_ip}:{port}, but the server was not started (or a previous termination was commanded) at the moment the `termination_waiter()`'s returned closure was called")))
320            };
321            for (i, processor_termination_complete_receiver) in local_termination_receiver.into_iter().enumerate() {
322                if let Err(err) = processor_termination_complete_receiver.await {
323                    error!("GenericCompositeSocketServer::termination_waiter(): It is no longer possible to tell when the processor {i} will be termination for server @ {interface_ip}:{port}: `one_shot` signal error: {err}")
324                }
325            }
326            Ok(())
327        }))
328    }
329
330    async fn terminate(mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
331        match self.connection_provider.take() {
332            Some(connection_provider) => {
333                warn!("GenericCompositeSocketServer: Termination asked & initiated for server @ {}:{}", self.interface_ip, self.port);
334                connection_provider.shutdown().await;
335                Ok(())
336            }
337            None => {
338                Err(Box::from("GenericCompositeSocketServer: Termination requested, but the service was not started -- no `self.start_with_*()` was called. Ignoring..."))
339            }
340        }
341    }
342}
343
344
345/// Unit tests the [socket_server](self) module
346#[cfg(any(test,doc))]
347mod tests {
348    use super::*;
349    use crate::prelude::*;
350    use std::{
351        future,
352        ops::Deref,
353        sync::atomic::{AtomicU32, Ordering::Relaxed},
354        time::Duration,
355    };
356    use std::sync::atomic::AtomicBool;
357    use serde::{Deserialize, Serialize};
358    use futures::StreamExt;
359    use tokio::sync::Mutex;
360
361
362    /// The interface for listening to connections
363    const LISTENING_INTERFACE: &str = "127.0.0.1";
364    /// The start of the port range for the tests in this module -- so not to clash with other modules when tests are run in parallel
365    const PORT_START: u16 = 8040;
366
367
368    /// Test that our instantiation macro for single protocol servers is able to produce servers backed by all possible channel types
369    #[cfg_attr(not(doc),test)]
370    fn single_protocol_instantiation() {
371        let _atomic_server = new_socket_server!(
372            ConstConfig {
373                ..ConstConfig::default()
374            },
375            LISTENING_INTERFACE, PORT_START);
376
377        let _fullsync_server  = new_socket_server!(
378            ConstConfig {
379                ..ConstConfig::default()
380            },
381            LISTENING_INTERFACE, PORT_START+1);
382
383        let _crossbeam_server = new_socket_server!(
384            ConstConfig {
385                ..ConstConfig::default()
386            },
387            LISTENING_INTERFACE, PORT_START+2);
388    }
389
390    /// Test that our instantiation macro for composite protocol servers is able to produce servers backed by all possible channel types
391    #[cfg_attr(not(doc),test)]
392    fn composite_protocol_instantiation() {
393        let _atomic_server = new_composite_socket_server!(
394            ConstConfig {
395                ..ConstConfig::default()
396            },
397            LISTENING_INTERFACE, PORT_START+3, () );
398
399        let _fullsync_server  = new_composite_socket_server!(
400            ConstConfig {
401                ..ConstConfig::default()
402            },
403            LISTENING_INTERFACE, PORT_START+4, () );
404
405        let _crossbeam_server = new_composite_socket_server!(
406            ConstConfig {
407                ..ConstConfig::default()
408            },
409            LISTENING_INTERFACE, PORT_START+5, () );
410    }
411
412    /// Test that our server types are ready for usage
413    /// (showcases the "single protocol" case)
414    #[cfg_attr(not(doc),tokio::test)]
415    async fn doc_usage() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
416
417        const PORT: u16 = PORT_START+6;     // All the following servers are started on this same port, ensuring the `terminate()` proceedings are working fine
418        const TEST_CONFIG: ConstConfig = ConstConfig::default();
419
420        // demonstrates how to build an unresponsive server
421        ///////////////////////////////////////////////////
422        // using fully typed generic functions that will work with all possible configs
423        let mut server = new_socket_server!(
424            TEST_CONFIG,
425            LISTENING_INTERFACE,
426            PORT);
427        start_server_processor!(TEST_CONFIG, Atomic, server,
428            DummyClientAndServerMessages,
429            DummyClientAndServerMessages,
430            connection_events_handler,
431            unresponsive_processor
432        )?;
433        async fn connection_events_handler<const CONFIG:  u64,
434                                           LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
435                                           SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
436                                          (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel>) {
437        }
438        fn unresponsive_processor<const CONFIG:   u64,
439                                  LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
440                                  SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
441                                  StreamItemType: Deref<Target=DummyClientAndServerMessages>>
442                                 (_client_addr:           String,
443                                  _connected_port:        u16,
444                                  _peer:                  Arc<Peer<CONFIG, LocalMessages, SenderChannel>>,
445                                  client_messages_stream: impl Stream<Item=StreamItemType>)
446                                 -> impl Stream<Item=()> {
447            client_messages_stream.map(|_payload| ())
448        }
449        let termination_waiter = server.termination_waiter();
450        server.terminate().await?;
451        termination_waiter().await?;
452
453        // demonstrates how to build a responsive server
454        ////////////////////////////////////////////////
455        // using fully typed generic functions that will work with all possible configs
456        let mut server = new_socket_server!(
457            TEST_CONFIG,
458            LISTENING_INTERFACE,
459            PORT);
460        start_server_processor!(TEST_CONFIG, Atomic, server,
461            DummyClientAndServerMessages,
462            DummyClientAndServerMessages,
463            connection_events_handler,
464            responsive_processor
465        )?;
466        fn responsive_processor<const CONFIG:   u64,
467                                SenderChannel:  FullDuplexUniChannel<ItemType=DummyClientAndServerMessages, DerivedItemType=DummyClientAndServerMessages> + Send + Sync,
468                                StreamItemType: Deref<Target=DummyClientAndServerMessages>>
469                               (_client_addr:           String,
470                                _connected_port:        u16,
471                                peer:                   Arc<Peer<CONFIG, DummyClientAndServerMessages, SenderChannel>>,
472                                client_messages_stream: impl Stream<Item=StreamItemType>)
473                               -> impl Stream<Item=()> {
474            client_messages_stream
475                .map(|_payload| DummyClientAndServerMessages::FloodPing)
476                .to_responsive_stream(peer, |_, _| ())
477        }
478
479        let termination_waiter = server.termination_waiter();
480        server.terminate().await?;
481        termination_waiter().await?;
482
483        // demonstrates how to use it with closures -- also allowing for any channel in the configs
484        ///////////////////////////////////////////////////////////////////////////////////////////
485        let mut server = new_socket_server!(
486            TEST_CONFIG,
487            LISTENING_INTERFACE,
488            PORT);
489        start_server_processor!(TEST_CONFIG, Atomic, server,
490            DummyClientAndServerMessages,
491            DummyClientAndServerMessages,
492            |_| future::ready(()),
493            |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
494        )?;
495        let termination_waiter = server.termination_waiter();
496        server.terminate().await?;
497        termination_waiter().await?;
498
499        // demonstrates how to use the internal & generic implementation
500        ////////////////////////////////////////////////////////////////
501        // notice there may be a discrepancy in the `ConstConfig` you provide and the actual concrete types
502        // you also provide for `UniProcessor` and `SenderChannel` -- therefore, this usage is not recommended
503        const CUSTOM_CONFIG: ConstConfig = ConstConfig {
504            receiver_buffer:      2048,
505            sender_buffer:        1024,
506            executor_instruments: reactive_mutiny::prelude::Instruments::LogsWithExpensiveMetrics,
507            ..ConstConfig::default()
508        };
509        let mut server = CompositeSocketServer :: <{CUSTOM_CONFIG.into()},
510                                                                           ()>
511                                                                       :: new(LISTENING_INTERFACE, PORT);
512        type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.receiver_buffer as usize}, 1, {CUSTOM_CONFIG.executor_instruments.into()}>;
513        type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.sender_buffer as usize}, 1>;
514        let connection_channel = server.spawn_processor::<DummyClientAndServerMessages,
515                                                                             DummyClientAndServerMessages,
516                                                                             ProcessorUniType,
517                                                                             SenderChannelType,
518                                                                             _, _, _, _, _ > (
519            |_| future::ready(()),
520            |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
521        ).await?;
522        server.start_single_protocol(connection_channel).await?;
523        let termination_waiter = server.termination_waiter();
524        server.terminate().await?;
525        termination_waiter().await?;
526
527        Ok(())
528    }
529
530    /// assures the termination process is able to:
531    ///   1) communicate with all clients
532    ///   2) wait for up to the given timeout for them to gracefully disconnect
533    ///   3) forcibly disconnect, if needed
534    ///   4) notify any waiter on the server (after all the above steps are done) within the given timeout
535    #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
536    async fn termination_process() {
537        const PORT: u16 = PORT_START+7;
538
539        // the tolerance, in milliseconds -- a too small termination duration means the server didn't wait for the client's disconnection; too much (possibly eternal) means it didn't enforce the timeout
540        let max_time_ms = 20;
541
542        // sensors
543        let client_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
544        let client_received_messages_count_ref2 = Arc::clone(&client_received_messages_count_ref1);
545        let server_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
546        let server_received_messages_count_ref2 = Arc::clone(&server_received_messages_count_ref1);
547
548        // start the server -- the test logic is here
549        let client_peer_ref1 = Arc::new(Mutex::new(None));
550        let client_peer_ref2 = Arc::clone(&client_peer_ref1);
551
552        const TEST_CONFIG: ConstConfig = ConstConfig {
553            ..ConstConfig::default()
554        };
555        let mut server = CompositeSocketServer :: <{TEST_CONFIG.into()},
556                                                                            () >
557                                                                       :: new(LISTENING_INTERFACE, PORT);
558        type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {TEST_CONFIG.receiver_buffer as usize}, 1, {TEST_CONFIG.executor_instruments.into()}>;
559        type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {TEST_CONFIG.sender_buffer as usize}, 1>;
560        let connection_channel = server.spawn_processor :: <DummyClientAndServerMessages,
561                                                                               DummyClientAndServerMessages,
562                                                                               ProcessorUniType,
563                                                                               SenderChannelType,
564                                                                               _, _, _, _, _> (
565                move |connection_event: ProtocolEvent<{TEST_CONFIG.into()}, DummyClientAndServerMessages, SenderChannelType>| {
566                let client_peer = Arc::clone(&client_peer_ref1);
567                async move {
568                    match connection_event {
569                        ProtocolEvent::PeerArrived { peer } => {
570                            // register the client -- which will initiate the server termination further down in this test
571                            client_peer.lock().await.replace(peer);
572                        },
573                        ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
574                        ProtocolEvent::LocalServiceTermination => {
575                            // send a message to the client (the first message, actually... that will initiate a flood of back-and-forth messages)
576                            // then try to close the connection (which would only be gracefully done once all messages were sent... which may never happen).
577                            let client_peer = client_peer.lock().await;
578                            let client_peer = client_peer.as_ref().expect("No client is connected");
579                            // send the flood starting message
580                            let _ = client_peer.send_async(DummyClientAndServerMessages::FloodPing).await;
581                            client_peer.flush_and_close(Duration::ZERO).await;
582                        }
583                    }
584                }
585            },
586            move |_, _, peer, client_messages: MessagingMutinyStream<ProcessorUniType>| {
587                let server_received_messages_count = Arc::clone(&server_received_messages_count_ref1);
588                client_messages
589                    .map(move |client_message| {
590                        std::mem::forget(client_message);   // TODO 2023-07-15: investigate this reactive-mutiny/rust related bug: it seems OgreUnique doesn't like the fact that this type doesn't need dropping? (no internal strings)... or is it a reactive-messaging bug?
591                        server_received_messages_count.fetch_add(1, Relaxed);
592                        DummyClientAndServerMessages::FloodPing
593                    })
594                    .to_responsive_stream(peer, |_, _| ())
595            }
596        ).await.expect("Spawning a server processor");
597        server.start_single_protocol(connection_channel).await.expect("Starting the server");
598
599        // start a client that will engage in a flood ping with the server when provoked (never closing the connection)
600        let mut client = new_socket_client!(
601            TEST_CONFIG,
602            LISTENING_INTERFACE,
603            PORT);
604        start_client_processor!(TEST_CONFIG, Atomic, client,
605            DummyClientAndServerMessages,
606            DummyClientAndServerMessages,
607            |_| async {},
608            move |_, _, peer, server_messages| {
609                let client_received_messages_count = Arc::clone(&client_received_messages_count_ref1);
610                server_messages
611                    .map(move |server_message| {
612                        std::mem::forget(server_message);   // TODO 2023-07-15: investigate this reactive-mutiny/rust related bug: it seems OgreUnique doesn't like the fact that this type doesn't need dropping? (no internal strings)... or is it a reactive-messaging bug?
613                        client_received_messages_count.fetch_add(1, Relaxed);
614                        DummyClientAndServerMessages::FloodPing
615                    })
616                    .to_responsive_stream(peer, |_, _| ())
617            }
618        ).expect("Starting the client");
619
620        // wait for the client to connect
621        while client_peer_ref2.lock().await.is_none() {
622            tokio::time::sleep(Duration::from_millis(2)).await;
623        }
624        // terminate the server & wait until the shutdown process is complete
625        let wait_for_server_termination = server.termination_waiter();
626        server.terminate().await
627            .expect("ERROR Signaling the server of the termination intention");
628        let start = std::time::SystemTime::now();
629        _ = tokio::time::timeout(Duration::from_secs(5), wait_for_server_termination()).await
630            .expect("TIMED OUT (>5s) Waiting for the server to live it's life and to complete the termination process");
631        let elapsed_ms = start.elapsed().unwrap().as_millis();
632        assert!(client_received_messages_count_ref2.load(Relaxed) > 1, "The client didn't receive any messages (not even the 'server is shutting down' notification)");
633        assert!(server_received_messages_count_ref2.load(Relaxed) > 1, "The server didn't receive any messages (not even 'gracefully disconnecting' after being notified that the server is shutting down)");
634        assert!(elapsed_ms <= max_time_ms as u128,
635                "The server termination (of a never complying client) didn't complete in a reasonable time, meaning the termination code is wrong. Maximum acceptable time: {}ms; Measured Time: {}ms",
636                max_time_ms, elapsed_ms);
637    }
638
639    /// assures the "Composite Protocol Stacking" pattern is supported & correctly implemented:
640    ///   1) New server connections are always handled by the first processor
641    ///   2) Connections can be routed freely among processors
642    ///   3) "Last States" are taken into account, enabling the "connection routing closure"
643    ///   4) Connections can be closed after the last processor are through with them
644    /// -- for these, all processors (but the first) will answer with a "welcome message" (this is the suggested behavior for servers).
645    #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
646    async fn composite_protocol_stacking_pattern() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
647
648        const PORT: u16 = PORT_START+8;
649        const TEST_CONFIG: ConstConfig = ConstConfig::default();
650
651        let mut server = new_composite_socket_server!(
652            TEST_CONFIG,
653            LISTENING_INTERFACE,
654            PORT,
655            Protocols);
656
657        #[derive(Debug,PartialEq,Clone)]
658        enum Protocols {
659            IncomingClient,
660            WelcomeAuthenticatedFriend,
661            AccountSettings,
662            GoodbyeOptions,
663            Disconnect,
664        }
665
666        // first level processors shouldn't do anything until the client says something meaningful -- newcomers must know, a priori, who they are talking to (a security measure)
667        let incoming_client_processor_greeted = Arc::new(AtomicBool::new(false));
668        let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted);
669        let incoming_client_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
670            |_| future::ready(()),
671            move |_, _, peer, client_messages_stream| {
672                assert_eq!(peer.try_take_state(), Some(Some(Protocols::IncomingClient)), "Connection is in a wrong state");
673                let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted_ref);
674                client_messages_stream.then(move |_payload| {
675                    let peer = Arc::clone(&peer);
676                    incoming_client_processor_greeted_ref.store(true, Relaxed);
677                    async move {
678                        peer.send_async(format!("`IncomingClient`: New peer {peer:?} ended up initial authentication proceedings. SAY SOMETHING and you will be routed to 'WelcomeAuthenticatedFriend'")).await
679                            .expect("Sending failed");
680                        peer.set_state(Protocols::WelcomeAuthenticatedFriend).await;
681                        peer.flush_and_close(Duration::from_secs(1)).await;
682                    }
683                })
684            }
685        )?;
686
687        // deeper processors should inform the client that they are now subjected to a new processor / protocol, so they may adjust accordingly
688        let welcome_authenticated_friend_processor_greeted = Arc::new(AtomicBool::new(false));
689        let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted);
690        let welcome_authenticated_friend_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
691            |connection_event| async {
692                match connection_event {
693                    ProtocolEvent::PeerArrived { peer } =>
694                        peer.send_async(format!("`WelcomeAuthenticatedFriend`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'AccountSettings'")).await
695                            .expect("Sending failed"),
696                    _ => (),
697                }
698            },
699            move |_, _, peer, client_messages_stream| {
700                assert_eq!(peer.try_take_state(), Some(Some(Protocols::WelcomeAuthenticatedFriend)), "Connection is in a wrong state");
701                let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted_ref);
702                client_messages_stream.then(move |_payload| {
703                    let peer = Arc::clone(&peer);
704                    welcome_authenticated_friend_processor_greeted_ref.store(true, Relaxed);
705                    async move {
706                        peer.set_state(Protocols::AccountSettings).await;
707                        peer.flush_and_close(Duration::from_secs(1)).await;
708                    }
709                })
710            }
711        )?;
712
713        let account_settings_processor_greeted = Arc::new(AtomicBool::new(false));
714        let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted);
715        let account_settings_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
716            |connection_event| async {
717                match connection_event {
718                    ProtocolEvent::PeerArrived { peer } =>
719                        peer.send_async(format!("`AccountSettings`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'GoodbyeOptions'")).await
720                            .expect("Sending failed"),
721                    _ => (),
722                }
723            },
724            move |_, _, peer, client_messages_stream| {
725                assert_eq!(peer.try_take_state(), Some(Some(Protocols::AccountSettings)), "Connection is in a wrong state");
726                let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted_ref);
727                client_messages_stream.then(move |_payload| {
728                    let peer = Arc::clone(&peer);
729                    account_settings_processor_greeted_ref.store(true, Relaxed);
730                    async move {
731                        peer.set_state(Protocols::GoodbyeOptions).await;
732                        peer.flush_and_close(Duration::from_secs(1)).await;
733                    }
734                })
735            }
736        )?;
737
738        let goodbye_options_processor_greeted = Arc::new(AtomicBool::new(false));
739        let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted);
740        let goodbye_options_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
741            |connection_event| async {
742                match connection_event {
743                    ProtocolEvent::PeerArrived { peer } =>
744                        peer.send_async(format!("`GoodbyeOptions`: Now dealing with client {peer:?}. SAY SOMETHING and you will be DISCONNECTED, as our talking is over. Thank you.")).await
745                            .expect("Sending failed"),
746                    _ => (),
747                }
748            },
749            move |_, _, peer, client_messages_stream| {
750                assert_eq!(peer.try_take_state(), Some(Some(Protocols::GoodbyeOptions)), "Connection is in a wrong state");
751                let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted_ref);
752                client_messages_stream.then(move |_payload| {
753                    let peer = Arc::clone(&peer);
754                    goodbye_options_processor_greeted_ref.store(true, Relaxed);
755                    async move {
756                        peer.set_state(Protocols::Disconnect).await;
757                        peer.flush_and_close(Duration::from_secs(1)).await;
758                    }
759                })
760            }
761        )?;
762
763        // this closure will route the connections based on the states the processors above had set
764        // (it will be called whenever a protocol processor ends -- "returning" the connection)
765        let connection_routing_closure = move |socket_connection: &SocketConnection<Protocols>, _|
766            match socket_connection.state() {
767                Protocols::IncomingClient             => Some(incoming_client_processor.clone_sender()),
768                Protocols::WelcomeAuthenticatedFriend => Some(welcome_authenticated_friend_processor.clone_sender()),
769                Protocols::AccountSettings            => Some(account_settings_processor.clone_sender()),
770                Protocols::GoodbyeOptions             => Some(goodbye_options_processor.clone_sender()),
771                Protocols::Disconnect                 => None,
772            };
773        server.start_multi_protocol(Protocols::IncomingClient, connection_routing_closure, |_| future::ready(())).await?;
774        let server_termination_waiter = server.termination_waiter();
775
776        // start the client that will only connect and listen to messages until it is disconnected
777        let mut client = new_socket_client!(
778            TEST_CONFIG,
779            LISTENING_INTERFACE,
780            PORT);
781        start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
782            |connection_event| async {
783                match connection_event {
784                    ProtocolEvent::PeerArrived { peer }           => peer.send_async(String::from("Hello! Am I in?")).await.expect("Sending failed"),
785                    ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
786                    ProtocolEvent::LocalServiceTermination                       => (),
787                }
788            },
789            move |_, _, peer, server_messages| server_messages
790                .map(|msg| {
791                    println!("RECEIVED: {msg} -- answering with 'OK'");
792                    String::from("OK")
793                })
794                .to_responsive_stream(peer, |_, _| ())
795        )?;
796
797        let client_waiter = client.termination_waiter();
798        // wait for the client to do its stuff
799        _ = tokio::time::timeout(Duration::from_secs(5), client_waiter()).await
800            .expect("TIMED OUT (>5s) Waiting for the client & server to do their stuff & disconnect the client");
801
802        // terminate the server & wait until the shutdown process is complete
803        server.terminate().await?;
804        server_termination_waiter().await?;
805
806        assert!(incoming_client_processor_greeted.load(Relaxed),              "`IncomingClient` processor wasn't requested");
807        assert!(welcome_authenticated_friend_processor_greeted.load(Relaxed), "`WelcomeAuthenticatedFriend` processor wasn't requested");
808        assert!(account_settings_processor_greeted.load(Relaxed),             "`AccountSettings` processor wasn't requested");
809        assert!(goodbye_options_processor_greeted.load(Relaxed),              "`GoodbyeOptions` processor wasn't requested");
810
811        Ok(())
812    }
813
814
815    #[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
816    enum DummyClientAndServerMessages {
817        #[default]
818        FloodPing,
819    }
820
821    impl Deref for DummyClientAndServerMessages {
822        type Target = DummyClientAndServerMessages;
823        fn deref(&self) -> &Self::Target {
824            self
825        }
826    }
827
828    impl ReactiveMessagingSerializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
829        #[inline(always)]
830        fn serialize(remote_message: &DummyClientAndServerMessages, buffer: &mut Vec<u8>) {
831            ron_serializer(remote_message, buffer)
832                .expect("composite_socket_server.rs unit tests: No errors should have happened here!")
833        }
834        #[inline(always)]
835        fn processor_error_message(err: String) -> DummyClientAndServerMessages {
836            panic!("composite_socket_server.rs unit tests: protocol error when none should have happened: {err}");
837        }
838    }
839    impl ReactiveMessagingDeserializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
840        #[inline(always)]
841        fn deserialize(local_message: &[u8]) -> Result<DummyClientAndServerMessages, Box<dyn std::error::Error + Sync + Send>> {
842            ron_deserializer(local_message)
843        }
844    }
845}