reactive_messaging/socket_services/socket_client/
socket_client.rs

1//! Provides the [new_socket_client!()] & [new_composite_socket_client!()] macros for instantiating clients, which may then be started by
2//! [start_client_processor!()], or have the composite processors spawned by [spawn_composite_client_processor!()].
3//!
4//! The reactive processing logic will take in a `Stream` as parameter and may return another `Stream` as output,
5//! which can be configured to automatically send yielded items to the remote party by composing it with the functions defined in
6//! [crate::prelude::ResponsiveStream].
7//!
8//! Even if the `Stream` isn't upgraded to a "responsive stream", processors still allow you to send messages to the server and should be
9//! preferred (as far as performance is concerned) for protocols where the count of messages IN don't nearly map 1/1 with the count of messages OUT.
10//!
11//! For every client processor, the `Stream` items may be a combination of fallible/non-fallible (using `Result<>` or not) & future/non-future.
12//!
13//! Instead of using the mentioned macros, you might want to take a look at [CompositeSocketClient] to access the inner implementation directly
14//! -- both ways have the same flexibility, but the macro version takes in all parameters in the conveniently packed and documented [ConstConfig]
15//! struct, instead of requiring several const generic parameters.
16
17
18use crate::{
19    socket_services::socket_client::common::upgrade_to_connection_event_tracking,
20    types::{
21        ProtocolEvent,
22        MessagingMutinyStream,
23    },
24    socket_connection::{
25        peer::Peer,
26        socket_connection_handler::SocketConnectionHandler,
27        connection_provider::{ClientConnectionManager, ConnectionChannel},
28    },
29    serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer},
30};
31use crate::socket_connection::connection::SocketConnection;
32use crate::socket_services::types::MessagingService;
33use crate::types::ConnectionEvent;
34use std::{
35    error::Error,
36    fmt::Debug,
37    future::Future,
38    sync::{
39        Arc,
40        atomic::AtomicBool,
41    },
42};
43use std::sync::atomic::Ordering::Relaxed;
44use reactive_mutiny::prelude::{FullDuplexUniChannel, GenericUni};
45use futures::{future::BoxFuture, Stream};
46use tokio::io::AsyncWriteExt;
47use log::{trace, warn, error, debug};
48
49
50/// Instantiates & allocates resources for a stateless [CompositeSocketClient] (suitable for single protocol communications),
51/// ready to be later started by [`start_client_processor!()`].
52///
53/// Params:
54///   - `const_config`: [ConstConfig] -- the configurations for the client, enforcing const/compile time optimizations;
55///   - `host`: IntoString -- the interface to listen to incoming connections;
56///   - `port`: u16 -- the port to listen to incoming connections;
57///
58/// Example:
59/// ```nocompile
60///     let mut client = new_socket_client!(CONFIG, "google.com", 80);
61/// ```
62/// See [`new_composite_socket_client!()`] if you want to use the "Composite Protocol Stacking" pattern.
63#[macro_export]
64macro_rules! new_socket_client {
65    ($const_config:    expr,
66     $host:              expr,
67     $port:            expr) => {
68        new_composite_socket_client!($const_config, $host, $port, ())
69    }
70}
71pub use new_socket_client;
72
73
74/// Instantiates & allocates resources for a stateful [CompositeSocketClient] (suitable for the "Composite Protocol Stacking" pattern),
75/// ready to have processors added by [spawn_client_processor!()].
76///
77/// Params:
78///   - `const_config`: [ConstConfig] -- the configurations for the client, enforcing const/compile time optimizations;
79///   - `const_config`: [ConstConfig] -- the configurations for the client, enforcing const/compile time optimizations;
80///   - `host`: IntoString -- the interface to listen to incoming connections;
81///   - `port`: u16 -- the port to listen to incoming connections;
82///   - `state_type` -- The type (suggested: a `enum`) used by the "connection routing closure" (to be provided) to promote the "Composite Protocol Stacking" pattern.
83///
84/// Example:
85/// ```nocompile
86///     let mut client = new_composite_socket_client!(CONFIG, "localhost", 8768, MyStatesEnum);
87/// ```
88/// See [new_socket_client!()] if you want to use the "Composite Protocol Stacking" pattern.\
89#[macro_export]
90macro_rules! new_composite_socket_client {
91    ($const_config:    expr,
92     $ip:              expr,
93     $port:            expr,
94     $state_type:      ty) => {{
95        const _CONFIG:      u64          = $const_config.into();
96        CompositeSocketClient::<_CONFIG, $state_type>::new($ip, $port)
97    }}
98}
99pub use new_composite_socket_client;
100
101
102/// Spawns a processor for a client (previously instantiated by [`new_composite_socket_client!()`]) that may communicate with the server using multiple protocols / multiple calls
103/// to this macro with different parameters -- and finally calling [CompositeSocketClient::start_multi_protocol()] when the "Composite Protocol Stacking" is complete.
104///
105/// The given `dialog_processor_builder_fn` is a builder of `Stream`s, as specified in [CompositeSocketClient::spawn_processor()].\
106/// If you don't need multiple protocols and don't want to follow the "Composite Protocol Stacking" pattern, see the [`start_client_processor!()`] macro instead.
107///
108/// Params:
109///   - `const_config`: [ConstConfig] -- the configurations for the client, enforcing const/compile time optimizations;
110///   - `channel_type`: One of the following `reactive-mutiny` channels to be used for message passing. Either `Atomic`, `FullSync` or `Crossbeam`;
111///   - `socket_client`: [CompositeSocketClient] -- The object returned by the call to [`new_socket_client!()`];
112///   - `remote_messages`: [ReactiveMessagingDeserializer<>] -- the type of the messages produced by the server;
113///   - `local_messages`: [ReactiveMessagingSerializer<>] -- the type of the messages produced by this client -- should, additionally, implement the `Default` trait;
114///   - `protocol_events_handler_fn`: async [Fn] -- The callback that receives the connection/protocol events [ProtocolEvent];
115///   - `dialog_processor_builder_fn`: [FnOnce] -- The builder for the `Stream` that consumes server messages.
116///
117/// `protocol_events_handler_fn` -- a generic function (or closure) to handle "new peer", "peer left" and "service termination" events (possibly to manage sessions). Sign it as:
118/// ```nocompile
119///     async fn protocol_events_handler<const CONFIG:  u64,
120///                                      LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
121///                                      SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
122///                                     (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
123/// ```
124///
125/// `dialog_processor_builder_fn` -- the generic function (or closure) -- called once for each connection -- that receives the `Stream` of remote messages and returns
126///                                  another `Stream`, that may be, optionally, sent out to the `peer` (see [crate::prelude::ResponsiveStream]). Sign it as:
127/// ```nocompile
128///     fn processor<const CONFIG:   u64,
129///                  LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
130///                  SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
131///                  StreamItemType: Deref<Target=[your type for messages produced by the SERVER]>>
132///                 (peer_addr:              String,
133///                  connected_port:         u16,
134///                  peer:                   Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
135///                  remote_messages_stream: impl Stream<Item=StreamItemType>)
136///                 -> impl Stream<Item=()> {...}
137/// ```
138///
139/// Returns the `handle` that should be used by the closure given to [[CompositeSocketClient::start_multi_protocol()]] to refer to this processor / protocol.
140///
141/// For examples, please consult the unit tests at the end of this module.
142#[macro_export]
143macro_rules! spawn_client_processor {
144    ($const_config:                 expr,
145     $channel_type:                 tt,
146     $socket_client:                expr,
147     $remote_messages:              ty,
148     $local_messages:               ty,
149     $protocol_events_handler_fn:   expr,
150     $dialog_processor_builder_fn:  expr) => {{
151        _define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
152        $socket_client.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _>($protocol_events_handler_fn, $dialog_processor_builder_fn).await
153    }}
154}
155pub use spawn_client_processor;
156
157
158/// Starts a client (previously instantiated by [`new_socket_client!()`]) that will communicate with the server using a single protocol -- as defined by the given
159/// `dialog_processor_builder_fn`, a builder of `Stream`s as specified in [CompositeSocketClient::spawn_processor()].
160///
161/// If you want to follow the "Composite Protocol Stacking" pattern, see the [`spawn_composite_client_processor!()`] macro instead.
162///
163/// Params:
164///   - `const_config`: [ConstConfig] -- the configurations for the client, enforcing const/compile time optimizations;
165///   - `channel_type`: One of the following `reactive-mutiny` channels to be used for message passing. Either `Atomic`, `FullSync` or `Crossbeam`;
166///   - `socket_client`: [CompositeSocketClient] -- The object returned by the call to [`new_socket_client!()`];
167///   - `remote_messages`: [ReactiveMessagingDeserializer<>] -- the type of the messages produced by the server;
168///   - `local_messages`: [ReactiveMessagingSerializer<>] -- the type of the messages produced by this client -- should, additionally, implement the `Default` trait;
169///   - `connection_events_handler_fn`: async [Fn] -- The callback that receives the connection/protocol events [ProtocolEvent];
170///   - `dialog_processor_builder_fn`: [FnOnce] -- The builder for the `Stream` that consumes server messages.
171///
172/// `connection_events_handler_fn` -- a generic function (or closure) to handle "new peer", "peer left" and "service termination" events (possibly to manage sessions). Sign it as:
173/// ```nocompile
174///     async fn connection_events_handler<const CONFIG:  u64,
175///                                        LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
176///                                        SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
177///                                       (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
178/// ```
179///
180/// `dialog_processor_builder_fn` -- the generic function (or closure) -- called once for each connection -- that receives the `Stream` of remote messages and returns
181///                                  another `Stream`, that may be, optionally, sent out to the `peer` (see [crate::prelude::ResponsiveStream]). Sign it as:
182/// ```nocompile
183///     fn unresponsive_processor<const CONFIG:   u64,
184///                               LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
185///                               SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
186///                               StreamItemType: Deref<Target=[your type for messages produced by the SERVER]>>
187///                              (peer_addr:              String,
188///                               connected_port:         u16,
189///                               peer:                   Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
190///                               remote_messages_stream: impl Stream<Item=StreamItemType>)
191///                              -> impl Stream<Item=ANY_TYPE> {...}
192/// ```
193///
194/// For examples, please consult the unit tests at the end of this module.
195#[macro_export]
196macro_rules! start_client_processor {
197    ($const_config:                 expr,
198     $channel_type:                 tt,
199     $socket_client:                expr,
200     $remote_messages:              ty,
201     $local_messages:               ty,
202     $connection_events_handler_fn: expr,
203     $dialog_processor_builder_fn:  expr) => {{
204        match spawn_client_processor!($const_config, $channel_type, $socket_client, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
205            Ok(connection_channel) => $socket_client.start_single_protocol(connection_channel).await,
206            Err(err) => Err(err),
207        }
208    }}
209}
210pub use start_client_processor;
211
212
213/// Real definition & implementation for our Socket Client, full of generic parameters.\
214/// Probably you want to instantiate this structure through the sugared macros [new_socket_client!()] or [new_composite_socket_client!()] instead.
215/// Generic Parameters:
216///   - `CONFIG`:           the `u64` version of the [ConstConfig] instance used to build this struct -- from which `ProcessorUniType` and `SenderChannel` derive;
217///   - `RemoteMessages`:   the messages that are generated by the server (usually an `enum`);
218///   - `LocalMessages`:    the messages that are generated by this client (usually an `enum`);
219///   - `ProcessorUniType`: an instance of a `reactive-mutiny`'s [Uni] type (using one of the zero-copy channels) --
220///                         This [Uni] will execute the given client reactive logic for each incoming message (see how it is used in [new_socket_client!()]);
221///   - `SenderChannel`:    an instance of a `reactive-mutiny`'s Uni movable `Channel`, which will provide a `Stream` of messages to be sent to the server;
222///   - `StateType`:        The state type used by the "connection routing closure" (to be provided), enabling the "Composite Protocol Stacking" pattern.
223pub struct CompositeSocketClient<const CONFIG: u64,
224                                 StateType:    Send + Sync + Clone + Debug + 'static> {
225
226    /// false if a disconnection happened, as tracked by the socket logic
227    connected: Arc<AtomicBool>,
228    /// The interface to listen to incoming connections
229    ip: String,
230    /// The port to listen to incoming connections
231    port: u16,
232    /// Signaler to stop this client -- allowing multiple subscribers
233    client_termination_signaler: Option<tokio::sync::broadcast::Sender<()>>,
234    /// Signaler to cause [Self::termination_waiter()]'s closure to return
235    local_termination_is_complete_receiver: Option<tokio::sync::mpsc::Receiver<()>>,
236    local_termination_is_complete_sender: tokio::sync::mpsc::Sender<()>,
237    /// The client connection is returned by processors after they are done with it -- this connection
238    /// may be routed to another processor if the "Composite Protocol Stacking" pattern is in play.
239    returned_connection_source: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
240    returned_connection_sink: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
241    /// The count of processors, for termination notification purposes
242    spawned_processors_count: u32,
243}
244impl<const CONFIG: u64,
245     StateType:    Send + Sync + Clone + Debug + 'static>
246CompositeSocketClient<CONFIG, StateType> {
247
248    /// Instantiates a client to connect to a TCP/IP Server:
249    ///   `ip`:                   the server IP to connect to
250    ///   `port`:                 the server port to connect to
251    pub fn new<IntoString: Into<String>>
252              (ip:   IntoString,
253               port: u16)
254              -> Self {
255        let (returned_connection_sink, returned_connection_source) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(1);
256        let (client_termination_signaler, _) = tokio::sync::broadcast::channel(1);
257        let (local_termination_is_complete_sender, local_termination_is_complete_receiver) = tokio::sync::mpsc::channel(1);
258        Self {
259            connected:                               Arc::new(AtomicBool::new(false)),
260            ip:                                      ip.into(),
261            port,
262            client_termination_signaler:             Some(client_termination_signaler),
263            local_termination_is_complete_receiver:  Some(local_termination_is_complete_receiver),
264            local_termination_is_complete_sender,
265            returned_connection_source:              Some(returned_connection_source),
266            returned_connection_sink,
267            spawned_processors_count:                0,
268        }
269    }
270
271    /// Returns the const configuration used for `self`
272    const fn config() -> u64 {
273        CONFIG
274    }
275
276    /// Tells if the connection is active & valid
277    pub fn is_connected(&self) -> bool {
278        self.connected.load(Relaxed)
279    }
280}
281
282impl<const CONFIG: u64,
283     StateType:    Send + Sync + Clone + Debug + 'static>
284MessagingService<CONFIG>
285for CompositeSocketClient<CONFIG, StateType> {
286    type StateType = StateType;
287
288    // TODO 2024-01-03: make this able to process the same connection as many times as needed, for symmetry with the server -- practically, allowing connection reuse
289    #[inline(always)]
290    async fn spawn_processor<RemoteMessages:                 ReactiveMessagingDeserializer<RemoteMessages>                                                                                                                                                                                     + Send + Sync + PartialEq + Debug + 'static,
291                             LocalMessages:                  ReactiveMessagingSerializer<LocalMessages>                                                                                                                                                                                        + Send + Sync + PartialEq + Debug + 'static,
292                             ProcessorUniType:               GenericUni<ItemType=RemoteMessages>                                                                                                                                                                                               + Send + Sync                     + 'static,
293                             SenderChannel:                  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages>                                                                                                                                                       + Send + Sync                     + 'static,
294                             OutputStreamItemsType:                                                                                                                                                                                                                                              Send + Sync             + Debug + 'static,
295                             ServerStreamType:               Stream<Item=OutputStreamItemsType>                                                                                                                                                                                                + Send                            + 'static,
296                             ConnectionEventsCallbackFuture: Future<Output=()>                                                                                                                                                                                                                 + Send                            + 'static,
297                             ConnectionEventsCallback:       Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>)                                                                                                                 -> ConnectionEventsCallbackFuture + Send + Sync                     + 'static,
298                             ProcessorBuilderFn:             Fn(/*server_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, /*server_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> ServerStreamType               + Send + Sync                     + 'static>
299
300                            (&mut self,
301                             connection_events_callback:  ConnectionEventsCallback,
302                             dialog_processor_builder_fn: ProcessorBuilderFn)
303
304                            -> Result<ConnectionChannel<StateType>, Box<dyn Error + Sync + Send>> {
305
306        let ip = self.ip.clone();
307        let port = self.port;
308
309        let returned_connection_sink = self.returned_connection_sink.clone();
310        let local_termination_is_complete_sender = self.local_termination_is_complete_sender.clone();
311        let client_termination_signaler = self.client_termination_signaler.clone();
312
313        let connection_events_callback = upgrade_to_connection_event_tracking(&self.connected, local_termination_is_complete_sender, connection_events_callback);
314
315        // the source of connections for this processor to start working on
316        let mut connection_provider = ConnectionChannel::new();
317        let mut connection_source = connection_provider.receiver()
318            .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
319
320        tokio::spawn(async move {
321            /*while*/ if let Some(connection) = connection_source.recv().await {
322                let client_termination_receiver = client_termination_signaler.expect("BUG! client_termination_signaler is NONE").subscribe();
323                let socket_communications_handler = SocketConnectionHandler::<CONFIG, RemoteMessages, LocalMessages, ProcessorUniType, SenderChannel, StateType>::new();
324                let result = socket_communications_handler.client_for_text_protocol(connection,
325                                                                                                                       client_termination_receiver,
326                                                                                                                       connection_events_callback,
327                                                                                                                       dialog_processor_builder_fn).await
328                    .map_err(|err| format!("Error while executing the dialog processor: {err}"));
329                match result {
330                    Ok(socket_connection) => {
331                        if let Err(err) = returned_connection_sink.send(socket_connection).await {
332                            warn!("`reactive-messaging::CompositeGenericSocketClient`: ERROR returning the connection (after the unresponsive & textual processor ended) @ {ip}:{port}: {err}");
333                        }
334                    }
335                    Err(err) => {
336                        error!("`reactive-messaging::CompositeGenericSocketClient`: ERROR in client (unresponsive & textual) @ {ip}:{port}: {err}");
337                    }
338                }
339            }
340        });
341        self.spawned_processors_count += 1;
342        Ok(connection_provider)
343    }
344
345    async fn start_multi_protocol<ConnectionEventsCallbackFuture:  Future<Output=()> + Send>
346                                 (&mut self,
347                                  initial_connection_state:       StateType,
348                                  mut connection_routing_closure: impl FnMut(/*socket_connection: */&SocketConnection<StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<StateType>>> + Send + 'static,
349                                  connection_events_callback:     impl Fn(/*event: */ConnectionEvent<StateType>)                                       -> ConnectionEventsCallbackFuture                                 + Send + 'static)
350                                 -> Result<(), Box<dyn Error + Sync + Send>> {
351
352        let mut connection_manager = ClientConnectionManager::<CONFIG>::new(&self.ip, self.port);
353        let connection = connection_manager.connect_retryable().await
354            .map_err(|err| format!("Error making client connection to {}:{} -- {err}", self.ip, self.port))?;
355        let mut just_opened_connection = Some(connection);
356
357        let mut returned_connection_source = self.returned_connection_source.take()
358            .ok_or_else(|| String::from("couldn't move the 'Returned Connection Source' out of the Connection Channel"))?;
359
360        let ip = self.ip.clone();
361        let port = self.port;
362
363        // let shutdown_signaler = self.client_termination_signaler.as_ref().expect("BUG! client_termination_signaler is NONE").clone();
364
365        // Spawns the "connection routing task" to:
366        //   - Listen to newly incoming connections as well as upgraded/downgraded ones shared between processors
367        //   - Gives them to the `protocol_stacking_closure`
368        //   - Routes them to the right processor or close the connection
369        tokio::spawn(async move {
370
371            loop {
372                let (mut socket_connection, sender) = match just_opened_connection.take() {
373                    // process the just-opened connection (once)
374                    Some(just_opened_connection) => {
375                        let just_opened_socket_connection = SocketConnection::new(just_opened_connection, initial_connection_state.clone());
376                        connection_events_callback(ConnectionEvent::Connected(&just_opened_socket_connection)).await;
377                        let sender = connection_routing_closure(&just_opened_socket_connection, false);
378                        (just_opened_socket_connection, sender)
379                    },
380                    // process connections returned by the processors (after they ended processing them)
381                    None => {
382                        let Some(returned_socket_connection) = returned_connection_source.recv().await else { break };
383                        let sender = (!returned_socket_connection.closed())
384                            .then_some(())
385                            .and_then(|_| connection_routing_closure(&returned_socket_connection, true));
386                        (returned_socket_connection, sender)
387                    },
388                };
389
390                // route the connection to another processor or drop it
391                match sender {
392                    Some(sender) => {
393                        trace!("`reactive-messaging::CompositeSocketClient`: ROUTING the connection with the server @ {ip}:{port} to another processor");
394                        if let Err(_) = sender.send(socket_connection).await {
395                            error!("`reactive-messaging::CompositeSocketClient`: BUG(?) in the client connected to the server @ {ip}:{port} while re-routing the connection: THE NEW (ROUTED) PROCESSOR CAN NO LONGER RECEIVE CONNECTIONS -- THE CONNECTION WILL BE DROPPED");
396                            break
397                        }
398                    },
399                    None => {
400                        connection_events_callback(ConnectionEvent::Disconnected(&socket_connection)).await;
401                        if let Err(err) = socket_connection.connection_mut().shutdown().await {
402                            debug!("`reactive-messaging::CompositeSocketClient`: ERROR in the client connected to the server @ {ip}:{port} while shutting down the connection (after the processors ended): {err}");
403                        }
404                        break
405                    },
406                }
407            }
408            // loop ended
409            trace!("`reactive-messaging::CompositeSocketClient`: The 'Connection Routing Task' for the client connected to the server @ {ip}:{port} ended -- hopefully, due to a graceful client termination.");
410            // // guarantees this client is properly shutdown
411            //_ = shutdown_signaler.send(())
412        });
413        Ok(())
414    }
415
416    fn termination_waiter(&mut self) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>> {
417        let mut local_termination_receiver = self.local_termination_is_complete_receiver.take();
418        let mut latch = self.spawned_processors_count;
419        Box::new(move || Box::pin({
420            async move {
421                if let Some(mut local_termination_receiver) = local_termination_receiver.take() {
422                    while latch > 0 {
423                        match local_termination_receiver.recv().await {
424                            Some(()) => latch -= 1,
425                            None     => return Err(Box::from(String::from("CompositeGenericSocketClient::termination_waiter(): It is no longer possible to tell when the client will be terminated: the broadcast channel was closed")))
426                        }
427                    }
428                    Ok(())
429                } else {
430                    Err(Box::from("CompositeGenericSocketClient: \"wait for termination\" requested, but the client was not started (or a previous service termination was commanded) at the moment `termination_waiter()` was called"))
431                }
432            }
433        }))
434    }
435
436    async fn terminate(mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
437        match self.client_termination_signaler.take() {
438            Some(client_sender) => {
439                warn!("`reactive-messaging::CompositeGenericSocketClient`: Shutdown asked & initiated for client connected @ {}:{}", self.ip, self.port);
440                _ = client_sender.send(());
441                Ok(())
442            }
443            None => {
444                Err(Box::from("Shutdown requested, but the service was not started. Ignoring..."))
445            }
446        }
447    }
448}
449
450
451/// Unit tests the [socket_server](self) module
452#[cfg(any(test,doc))]
453mod tests {
454    use super::*;
455    use crate::prelude::*;
456    use std::{future, ops::Deref};
457    use std::sync::atomic::Ordering::Relaxed;
458    use std::time::Duration;
459    use futures::StreamExt;
460    use serde::{Deserialize, Serialize};
461    use crate::{new_socket_server, start_server_processor};
462    use crate::serde::{ron_deserializer, ron_serializer};
463
464
465    const REMOTE_SERVER: &str = "66.45.249.218";
466
467
468    /// Test that our instantiation macro is able to produce clients backed by all possible channel types
469    #[cfg_attr(not(doc), test)]
470    fn single_protocol_instantiation() {
471        let _atomic_client = new_socket_client!(
472            ConstConfig {
473                ..ConstConfig::default()
474            },
475            REMOTE_SERVER, 443);
476
477        let _fullsync_client = new_socket_client!(
478            ConstConfig {
479                ..ConstConfig::default()
480            },
481            REMOTE_SERVER, 443);
482
483        let _crossbeam_client = new_socket_client!(
484            ConstConfig {
485                ..ConstConfig::default()
486            },
487            REMOTE_SERVER, 443);
488    }
489
490    /// Test that our instantiation macro is able to produce clients backed by all possible channel types
491    #[cfg_attr(not(doc), test)]
492    fn composite_protocol_instantiation() {
493        let _atomic_client = new_composite_socket_client!(
494            ConstConfig {
495                ..ConstConfig::default()
496            },
497            REMOTE_SERVER, 443, () );
498
499        let _fullsync_client = new_composite_socket_client!(
500            ConstConfig {
501                ..ConstConfig::default()
502            },
503            REMOTE_SERVER, 443, () );
504
505        let _crossbeam_client = new_composite_socket_client!(
506            ConstConfig {
507                ..ConstConfig::default()
508            },
509            REMOTE_SERVER, 443, () );
510    }
511
512    /// Test that our client types are ready for usage
513    /// (showcases the "single protocol" case)
514    #[cfg_attr(not(doc),tokio::test)]
515    async fn doc_usage() {
516
517        const TEST_CONFIG: ConstConfig = ConstConfig::default();
518
519        // demonstrates how to build an unresponsive client
520        ///////////////////////////////////////////////////
521        // using fully typed generic functions that will work with all possible configs
522        let mut client = new_socket_client!(
523            TEST_CONFIG,
524            REMOTE_SERVER,
525            443);
526        start_client_processor!(TEST_CONFIG, Atomic, client,
527            DummyClientAndServerMessages,
528            DummyClientAndServerMessages,
529            connection_events_handler,
530            unresponsive_processor
531        ).expect("Error starting a single protocol client");
532        async fn connection_events_handler<const CONFIG:  u64,
533                                           LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
534                                           SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
535                                          (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel>) {
536        }
537        fn unresponsive_processor<const CONFIG:   u64,
538                                  LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
539                                  SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
540                                  StreamItemType: Deref<Target=DummyClientAndServerMessages>>
541                                 (_client_addr:           String,
542                                  _connected_port:        u16,
543                                  _peer:                  Arc<Peer<CONFIG, LocalMessages, SenderChannel>>,
544                                  client_messages_stream: impl Stream<Item=StreamItemType>)
545                                 -> impl Stream<Item=()> {
546            client_messages_stream.map(|_payload| ())
547        }
548        let wait_for_termination = client.termination_waiter();
549        client.terminate().await.expect("Error on client Termination command");
550        wait_for_termination().await.expect("Error waiting for client Termination");
551        warn!("1st DONE");
552
553        // demonstrates how to build a responsive client
554        ////////////////////////////////////////////////
555        // using fully typed generic functions that will work with all possible configs
556        let mut client = new_socket_client!(
557            TEST_CONFIG,
558            REMOTE_SERVER,
559            443);
560        start_client_processor!(TEST_CONFIG, Atomic, client,
561            DummyClientAndServerMessages,
562            DummyClientAndServerMessages,
563            connection_events_handler,
564            responsive_processor
565        ).expect("Error starting a single protocol client");
566        fn responsive_processor<const CONFIG:   u64,
567                                SenderChannel:  FullDuplexUniChannel<ItemType=DummyClientAndServerMessages, DerivedItemType=DummyClientAndServerMessages> + Send + Sync,
568                                StreamItemType: Deref<Target=DummyClientAndServerMessages>>
569                               (_client_addr:           String,
570                                _connected_port:        u16,
571                                peer:                   Arc<Peer<CONFIG, DummyClientAndServerMessages, SenderChannel>>,
572                                client_messages_stream: impl Stream<Item=StreamItemType>)
573                               -> impl Stream<Item=()> {
574            client_messages_stream
575                .map(|_payload| DummyClientAndServerMessages::FloodPing)
576                .to_responsive_stream(peer, |_, _| ())
577        }
578        let wait_for_termination = client.termination_waiter();
579        client.terminate().await.expect("Error on client Termination command");
580        wait_for_termination().await.expect("Error waiting for client Termination");
581        warn!("2nd DONE");
582
583        // demonstrates how to use it with closures -- also allowing for any channel in the configs
584        ///////////////////////////////////////////////////////////////////////////////////////////
585        let mut client = new_socket_client!(
586            TEST_CONFIG,
587            REMOTE_SERVER,
588            443);
589        start_client_processor!(TEST_CONFIG, Atomic, client,
590            DummyClientAndServerMessages,
591            DummyClientAndServerMessages,
592            |_| future::ready(()),
593            |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
594        ).expect("Error starting a single protocol client");
595        let wait_for_termination = client.termination_waiter();
596        client.terminate().await.expect("Error on client Termination command");
597        wait_for_termination().await.expect("Error waiting for client Termination");
598        warn!("3rd DONE");
599
600        // demonstrates how to use the concrete type
601        ////////////////////////////////////////////
602        // notice there may be a discrepancy in the `ConstConfig` you provide and the actual concrete types
603        // you also provide for `UniProcessor` and `SenderChannel` -- therefore, this usage is not recommended
604        // (but it is here anyway since it may bring, theoretically, an infinitesimal performance benefit)
605        const CUSTOM_CONFIG: ConstConfig = ConstConfig {
606            receiver_buffer:      2048,
607            sender_buffer:        1024,
608            executor_instruments: reactive_mutiny::prelude::Instruments::LogsWithExpensiveMetrics,
609            ..ConstConfig::default()
610        };
611        let mut client = CompositeSocketClient :: <{CUSTOM_CONFIG.into()},
612                                                                          () >
613                                                                      :: new(REMOTE_SERVER,443);
614        type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.receiver_buffer as usize}, 1, {CUSTOM_CONFIG.executor_instruments.into()}>;
615        type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.sender_buffer as usize}, 1>;
616        let connection_channel = client.spawn_processor::<DummyClientAndServerMessages,
617                                                                             DummyClientAndServerMessages,
618                                                                             ProcessorUniType,
619                                                                             SenderChannelType,
620                                                                             _, _, _, _, _ > (
621            |_| future::ready(()),
622            |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
623        ).await.expect("Error spawning a protocol processor");
624        client.start_single_protocol(connection_channel).await.expect("Error starting a single protocol client");
625        let wait_for_termination = client.termination_waiter();
626        client.terminate().await.expect("Error on client Termination command");
627        wait_for_termination().await.expect("Error waiting for client Termination");
628        warn!("4th DONE");
629    }
630
631    /// Ensures the termination of a client works according to the specification.\
632    /// The following clients and servers will only exchange a message when
633    /// they want the other party to disconnect.
634    #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
635    async fn termination_process() {
636        const IP: &str = "127.0.0.1";
637        const PORT: u16 = 8030;
638        const TEST_CONFIG: ConstConfig = ConstConfig::default();
639
640        // CASE 1: locally initiated termination -- client still being active
641        let connected_to_client = Arc::new(AtomicBool::new(false));
642        let connected_to_client_ref = Arc::clone(&connected_to_client);
643        let mut server = new_socket_server!(TEST_CONFIG, IP, PORT);
644        start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
645            move |event| {
646                let connected_to_client_ref = Arc::clone(&connected_to_client_ref);
647                async move {
648                    match event {
649                        ProtocolEvent::PeerArrived { .. } => {
650                            connected_to_client_ref.store(true, Relaxed);
651                        },
652                        ProtocolEvent::PeerLeft { .. } => {},
653                        ProtocolEvent::LocalServiceTermination => {},
654                    }
655                }
656            },
657            |_, _, _, stream| stream
658        ).expect("Error starting the server");
659        let mut client = new_socket_client!(TEST_CONFIG, IP, PORT);
660        start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
661            |_| future::ready(()),
662            |_, _, _, stream| stream
663        ).expect("Error starting the client");
664        let termination_waiter = client.termination_waiter();
665        // sleep a little for the connection to be established.
666        tokio::time::sleep(Duration::from_millis(20)).await;
667        assert!(connected_to_client.load(Relaxed), "Client didn't connect to server");
668        assert!(client.is_connected(), "`client` didn't report any connection");
669        client.terminate().await.expect("Could not terminate the client");
670        _ = tokio::time::timeout(Duration::from_millis(100), termination_waiter()).await
671            .expect("Timed out (>100ms) waiting the the client's termination");
672        server.terminate().await.expect("Could not terminate the server");
673
674        // CASE 2: automatic termination after disconnection
675        let connected_to_client = Arc::new(AtomicBool::new(false));
676        let connected_to_client_ref = Arc::clone(&connected_to_client);
677        let mut server = new_socket_server!(TEST_CONFIG, IP, PORT);
678        start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
679            move |event| {
680                let connected_to_client_ref = Arc::clone(&connected_to_client_ref);
681                async move {
682                    match event {
683                        ProtocolEvent::PeerArrived { peer } => {
684                            connected_to_client_ref.store(true, Relaxed);
685                            peer.send(String::from("Goodbye")).expect("Couldn't send");
686                        },
687                        ProtocolEvent::PeerLeft { .. } => {},
688                        ProtocolEvent::LocalServiceTermination => {},
689                    }
690                }
691            },
692            |_, _, _, stream| stream
693        ).expect("Error starting the server");
694        let mut client = new_socket_client!(TEST_CONFIG, IP, PORT);
695        start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
696            |_| future::ready(()),
697            |_, _, peer, stream| stream.map(move |_msg| peer.cancel_and_close())     // close the connection when any message arrives
698        ).expect("Error starting the client");
699        let termination_waiter = client.termination_waiter();
700        // sleep a little for the communications to go on.
701        // After this, the server should have disconnected the client and calling `termination_waiter()` should return immediately
702        // (without the need to call `client.terminate()`)
703        tokio::time::sleep(Duration::from_millis(100)).await;
704        assert!(connected_to_client.load(Relaxed), "Client didn't connect to server");
705        _ = tokio::time::timeout(Duration::from_millis(1), termination_waiter()).await
706            .expect("A disconnected client should signal its `termination_waiter()` for an immediate return -- what didn't happen");
707        assert!(!client.is_connected(), "`client` didn't report the disconnection");
708        server.terminate().await.expect("Could not terminate the server");
709
710    }
711
712    /// assures the "Composite Protocol Stacking" pattern is supported & correctly implemented:
713    ///   1) Just-opened client connections are always handled by the first processor
714    ///   2) Connections can be routed freely among processors
715    ///   3) "Last States" are taken into account, enabling the "connection routing closure"
716    ///   4) Connections can be closed after the last processor are through with them
717    /// -- for these, the client will send a message whenever it enters a state -- then the server will say "OK"
718    ///    and the client will proceed to the next state, until the last one -- which closes the connection.
719    #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
720    async fn composite_protocol_stacking_pattern() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
721
722        const IP: &str = "127.0.0.1";
723        const PORT: u16 = 8031;
724        const TEST_CONFIG: ConstConfig = ConstConfig::default();
725
726        // start the server that will only listen to messages until it is disconnected
727        let mut server = new_socket_server!(
728            TEST_CONFIG,
729            IP,
730            PORT);
731        start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
732            |_| future::ready(()),
733            move |_, _, peer, client_messages| client_messages
734                .map(|msg| {
735                    println!("SERVER RECEIVED: {msg} -- answering with 'OK'");
736                    String::from("OK")
737                })
738                .to_responsive_stream(peer, |_, _| ())
739
740        )?;
741        let server_termination_waiter = server.termination_waiter();
742
743        let mut client = new_composite_socket_client!(
744            TEST_CONFIG,
745            IP,
746            PORT,
747            Protocols );
748
749        #[derive(Debug,PartialEq,Clone)]
750        enum Protocols {
751            Handshake,
752            WelcomeAuthenticatedFriend,
753            AccountSettings,
754            GoodbyeOptions,
755            Disconnect,
756        }
757
758        // first level processors shouldn't do anything until the client says something meaningful -- newcomers must know, a priori, who they are talking to (a security measure)
759        let handshake_processor_greeted = Arc::new(AtomicBool::new(false));
760        let handshake_processor_greeted_ref = Arc::clone(&handshake_processor_greeted);
761        let handshake_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
762            |connection_event| async {
763                match connection_event {
764                    ProtocolEvent::PeerArrived { peer  } => peer.send_async(String::from("Client is at `Handshake`")).await
765                                                                    .expect("Sending failed"),
766                    _ => {},
767                }
768            },
769            move |_, _, peer, server_messages_stream| {
770                assert_eq!(peer.try_take_state(), Some(Some(Protocols::Handshake)), "Connection is in a wrong state");
771                let handshake_processor_greeted_ref = Arc::clone(&handshake_processor_greeted_ref);
772                server_messages_stream.then(move |_payload| {
773                    let peer = Arc::clone(&peer);
774                    handshake_processor_greeted_ref.store(true, Relaxed);
775                    async move {
776                        peer.set_state(Protocols::WelcomeAuthenticatedFriend).await;
777                        peer.flush_and_close(Duration::from_secs(1)).await;
778                    }
779                })
780            }
781        )?;
782
783        // deeper processors should inform the server that they are now subjected to a new processor / protocol, so they may adjust accordingly
784        let welcome_authenticated_friend_processor_greeted = Arc::new(AtomicBool::new(false));
785        let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted);
786        let welcome_authenticated_friend_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
787            |connection_event| async {
788                match connection_event {
789                    ProtocolEvent::PeerArrived { peer  } => peer.send_async(String::from("Client is at `WelcomeAuthenticatedFriend`")).await
790                                                                    .expect("Sending failed"),
791                    _ => {},
792                }
793            },
794            move |_, _, peer, server_messages_stream| {
795                assert_eq!(peer.try_take_state(), Some(Some(Protocols::WelcomeAuthenticatedFriend)), "Connection is in a wrong state");
796                let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted_ref);
797                server_messages_stream.then(move |_payload| {
798                    let peer = Arc::clone(&peer);
799                    welcome_authenticated_friend_processor_greeted_ref.store(true, Relaxed);
800                    async move {
801                        peer.set_state(Protocols::AccountSettings).await;
802                        peer.flush_and_close(Duration::from_secs(1)).await;
803                    }
804                })
805            }
806        )?;
807
808        let account_settings_processor_greeted = Arc::new(AtomicBool::new(false));
809        let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted);
810        let account_settings_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
811            |connection_event| async {
812                match connection_event {
813                    ProtocolEvent::PeerArrived { peer  } => peer.send_async(String::from("Client is at `AccountSettings`")).await
814                                                                    .expect("Sending failed"),
815                    _ => {},
816                }
817            },
818            move |_, _, peer, server_messages_stream| {
819                assert_eq!(peer.try_take_state(), Some(Some(Protocols::AccountSettings)), "Connection is in a wrong state");
820                let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted_ref);
821                server_messages_stream.then(move |_payload| {
822                    let peer = Arc::clone(&peer);
823                    account_settings_processor_greeted_ref.store(true, Relaxed);
824                    async move {
825                        peer.set_state(Protocols::GoodbyeOptions).await;
826                        peer.flush_and_close(Duration::from_secs(1)).await;
827                    }
828                })
829            }
830        )?;
831
832        let goodbye_options_processor_greeted = Arc::new(AtomicBool::new(false));
833        let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted);
834        let goodbye_options_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
835            |connection_event| async {
836                match connection_event {
837                    ProtocolEvent::PeerArrived { peer  } => peer.send_async(String::from("Client is at `GoodbyeOptions`")).await
838                                                                    .expect("Sending failed"),
839                    _ => {},
840                }
841            },
842            move |_, _, peer, server_messages_stream| {
843                assert_eq!(peer.try_take_state(), Some(Some(Protocols::GoodbyeOptions)), "Connection is in a wrong state");
844                let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted_ref);
845                server_messages_stream.then(move |_payload| {
846                    let peer = Arc::clone(&peer);
847                    goodbye_options_processor_greeted_ref.store(true, Relaxed);
848                    async move {
849                        peer.set_state(Protocols::Disconnect).await;
850                        peer.flush_and_close(Duration::from_secs(1)).await;
851                    }
852                })
853            }
854        )?;
855
856        // this closure will route the connections based on the states the processors above had set
857        // (it will be called whenever a protocol processor ends -- "returning" the connection)
858        let connection_routing_closure = move |socket_connection: &SocketConnection<Protocols>, _|
859            match socket_connection.state() {
860                Protocols::Handshake                  => Some(handshake_processor.clone_sender()),
861                Protocols::WelcomeAuthenticatedFriend => Some(welcome_authenticated_friend_processor.clone_sender()),
862                Protocols::AccountSettings            => Some(account_settings_processor.clone_sender()),
863                Protocols::GoodbyeOptions             => Some(goodbye_options_processor.clone_sender()),
864                Protocols::Disconnect                 => None,
865            };
866        client.start_multi_protocol(Protocols::Handshake, connection_routing_closure, |_| future::ready(())).await?;
867
868        let client_waiter = client.termination_waiter();
869        // wait for the client to do its stuff
870        _ = tokio::time::timeout(Duration::from_secs(5), client_waiter()).await
871            .expect("TIMED OUT (>5s) Waiting for the server & client to do their stuff & disconnect the client");
872
873        // terminate the server & wait until the shutdown process is complete
874        server.terminate().await?;
875        server_termination_waiter().await?;
876
877        assert!(handshake_processor_greeted.load(Relaxed),                    "`Handshake` processor wasn't requested");
878        assert!(welcome_authenticated_friend_processor_greeted.load(Relaxed), "`WelcomeAuthenticatedFriend` processor wasn't requested");
879        assert!(account_settings_processor_greeted.load(Relaxed),             "`AccountSettings` processor wasn't requested");
880        assert!(goodbye_options_processor_greeted.load(Relaxed),              "`GoodbyeOptions` processor wasn't requested");
881
882        Ok(())
883    }
884
885
886    #[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
887    enum DummyClientAndServerMessages {
888        #[default]
889        FloodPing,
890    }
891
892    impl Deref for DummyClientAndServerMessages {
893        type Target = DummyClientAndServerMessages;
894        fn deref(&self) -> &Self::Target {
895            self
896        }
897    }
898
899    impl ReactiveMessagingSerializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
900        #[inline(always)]
901        fn serialize(remote_message: &DummyClientAndServerMessages, buffer: &mut Vec<u8>) {
902            ron_serializer(remote_message, buffer)
903                .expect("socket_client.rs unit tests: No errors should have happened here!")
904        }
905        #[inline(always)]
906        fn processor_error_message(err: String) -> DummyClientAndServerMessages {
907            panic!("socket_client.rs unit tests: protocol error when none should have happened: {err}");
908        }
909    }
910    impl ReactiveMessagingDeserializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
911        #[inline(always)]
912        fn deserialize(local_message: &[u8]) -> Result<DummyClientAndServerMessages, Box<dyn std::error::Error + Sync + Send>> {
913            ron_deserializer(local_message)
914        }
915    }
916}