reactive_messaging/socket_services/
types.rs

1//! Common types used across this submodule
2
3use std::error::Error;
4use crate::serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
5use crate::prelude::Peer;
6use crate::socket_connection::connection_provider::ConnectionChannel;
7use crate::types::{ProtocolEvent, MessagingMutinyStream, ConnectionEvent};
8use crate::socket_connection::connection::SocketConnection;
9use std::fmt::Debug;
10use std::future;
11use std::future::Future;
12use std::sync::Arc;
13use futures::future::BoxFuture;
14use futures::Stream;
15use reactive_mutiny::prelude::{FullDuplexUniChannel, GenericUni};
16
17
18/// Base trait for server and client services functionalities
19pub trait MessagingService<const CONFIG: u64> {
20    type StateType: Send + Sync + Clone + Debug + 'static;
21
22
23    /// Spawns a task dedicated to the given "protocol processor", returning immediately.\
24    /// The given `dialog_processor_builder_fn` will be called for each new connection and should return a `Stream`
25    /// that will produce non-futures & non-fallible items that **may be, optionally, sent to the remote party** (see [crate::prelude::ResponsiveStream]):
26    ///   - `protocol_events_callback`: -- a generic function (or closure) to handle "new peer", "peer left" and "service termination" events (possibly to manage sessions). Sign it as:
27    ///     ```nocompile
28    ///     async fn protocol_events_handler<const CONFIG:  u64,
29    ///                                      LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
30    ///                                      SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
31    ///                                     (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
32    ///     ```
33    ///   - `dialog_processor_builder_fn` -- the generic function (or closure) that receives the `Stream` of remote messages and returns another `Stream`, possibly yielding
34    ///                                      messages of the "local" type to be sent to the remote party -- see [crate::prelude::ResponsiveStream]. Sign the processor as:
35    ///     ```nocompile
36    ///     fn processor<const CONFIG:   u64,
37    ///                  LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
38    ///                  SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
39    ///                  StreamItemType: Deref<Target=[your type for messages produced by the REMOTE party]>>
40    ///                 (remote_addr:            String,
41    ///                  connected_port:         u16,
42    ///                  peer:                   Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
43    ///                  remote_messages_stream: impl Stream<Item=StreamItemType>)
44    ///                 -> impl Stream<Item=ANY_TYPE> {...}
45    ///     ```
46    /// -- if you want the processor to produce answer messages of type `LocalMessages` to be sent to clients, see [Self::spawn_responsive_processor()]:
47    async fn spawn_processor<RemoteMessages:                ReactiveMessagingDeserializer<RemoteMessages>                                                                                                                                                                                         + Send + Sync + PartialEq + Debug + 'static,
48                             LocalMessages:                 ReactiveMessagingSerializer<LocalMessages>                                                                                                                                                                                            + Send + Sync + PartialEq + Debug + 'static,
49                             ProcessorUniType:              GenericUni<ItemType=RemoteMessages>                                                                                                                                                                                                   + Send + Sync                     + 'static,
50                             SenderChannel:                 FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages>                                                                                                                                                           + Send + Sync                     + 'static,
51                             OutputStreamItemsType:                                                                                                                                                                                                                                                 Send + Sync             + Debug + 'static,
52                             RemoteStreamType:              Stream<Item=OutputStreamItemsType>                                                                                                                                                                                                    + Send                            + 'static,
53                             ProtocolEventsCallbackFuture:  Future<Output=()>                                                                                                                                                                                                                     + Send                            + 'static,
54                             ProtocolEventsCallback:        Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>)                                                                                                                   -> ProtocolEventsCallbackFuture + Send + Sync                     + 'static,
55                             ProcessorBuilderFn:            Fn(/*remote_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, /*remote_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType             + Send + Sync                     + 'static>
56
57                            (&mut self,
58                             connection_events_callback:  ProtocolEventsCallback,
59                             dialog_processor_builder_fn: ProcessorBuilderFn)
60
61                            -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>;
62
63    /// Start the service with a single processor (after calling either [Self::spawn_unresponsive_processor()]
64    /// or [Self::spawn_responsive_processor()] once) -- A.K.A. "The Single Protocol Mode".\
65    /// See [Self::start_multi_protocol()] if you want a service that shares connections among
66    /// different protocol processors.
67    ///
68    /// Starts the service using the provided `connection_channel` to distribute the connections.
69    async fn start_single_protocol(&mut self, connection_channel: ConnectionChannel<Self::StateType>)
70                                  -> Result<(), Box<dyn Error + Sync + Send>>
71                                  where Self::StateType: Default {
72        // this closure will cause incoming or just-opened connections to be sent to `connection_channel` and returned connections to be dropped
73        let connection_routing_closure = move |_socket_connection: &SocketConnection<Self::StateType>, is_reused: bool|
74            if is_reused {
75                None
76            } else {
77                Some(connection_channel.clone_sender())
78            };
79        // tracking the connection events is not really necessary for the "single protocol" case here, as, for this specific case, the "protocol events" contain that information already
80        let connection_events_callback = |_: ConnectionEvent<'_, Self::StateType>| future::ready(());
81        self.start_multi_protocol(Self::StateType::default(), connection_routing_closure, connection_events_callback).await
82    }
83
84    /// Starts the service using the provided `connection_routing_closure` to distribute the connections among the configured processors
85    /// -- previously fed in by [Self::spawn_responsive_processor()] & [Self::spawn_unresponsive_processor()].
86    ///
87    /// `protocol_stacking_closure := FnMut(socket_connection: &SocketConnection<StateType>, is_reused: bool) -> connection_receiver: Option<tokio::sync::mpsc::Sender<TcpStream>>`
88    ///
89    /// -- this closure "decides what to do" with available connections, routing them to the appropriate processors:
90    ///   - Newly received connections will have `last_state` set to `None` -- otherwise, this will either be set by the processor
91    ///     before the [Peer] is closed -- see [Peer::set_state()] -- or will have the `Default` value.
92    ///   - The returned value must be one of the "handles" returned by [Self::spawn_responsive_processor()] or
93    ///     [Self::spawn_unresponsive_processor()].
94    ///   - If `None` is returned, the connection will be closed.
95    ///
96    /// This method returns an error in the following cases:
97    ///   1) if the connecting/binding process fails;
98    ///   2) if no processors were configured.
99    async fn start_multi_protocol<ConnectionEventsCallbackFuture:  Future<Output=()> + Send>
100                                 (&mut self,
101                                  initial_connection_state:    Self::StateType,
102                                  connection_routing_closure:  impl FnMut(/*socket_connection: */&SocketConnection<Self::StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<Self::StateType>>> + Send + 'static,
103                                  connection_events_callback:  impl for <'r> Fn(/*event: */ConnectionEvent<'r, Self::StateType>)                          -> ConnectionEventsCallbackFuture                                       + Send + 'static)
104                                 -> Result<(), Box<dyn Error + Sync + Send>>;
105
106    /// Returns an async closure that blocks until [Self::terminate()] is called.
107    /// Example:
108    /// ```no_compile
109    ///     self.start_the_service();
110    ///     self.termination_waiter()().await;
111    fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn std::error::Error + Send + Sync>>> >;
112
113    /// Notifies the service it is time to stop / shutdown / terminate.\
114    /// It is a recommended practice that the `connection_events_handler()` you provided (when starting each dialog processor)
115    /// inform all clients that a remote-initiated disconnection (due to the call to this function) is happening -- the protocol must support that, though.
116    async fn terminate(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
117
118}
119
120
121/// For internal use: defines `ProcessorUniType` & `SenderChannel` based on the given [Channels] parameter
122/// (for use when spawning processors with [MessagingService::spawn_unresponsive_processor()] &
123///  [MessagingService::spawn_responsive_processor()].)
124#[macro_export]
125macro_rules! _define_processor_uni_and_sender_channel_types {
126    ($const_config: expr, Atomic, $remote_messages: ty, $local_messages: ty) => {
127        const _CONST_CONFIG:              ConstConfig  = $const_config;
128        const _PROCESSOR_BUFFER:          usize        = _CONST_CONFIG.receiver_buffer as usize;
129        const _PROCESSOR_UNI_INSTRUMENTS: usize        = _CONST_CONFIG.executor_instruments.into();
130        const _SENDER_BUFFER:             usize        = _CONST_CONFIG.sender_buffer   as usize;
131        type ProcessorUniType = UniZeroCopyAtomic<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
132        type SenderChannel = ChannelUniMoveAtomic<$local_messages, _SENDER_BUFFER, 1>;
133    };
134    ($const_config: expr, FullSync, $remote_messages: ty, $local_messages: ty) => {
135        const _CONST_CONFIG:              ConstConfig  = $const_config;
136        const _PROCESSOR_BUFFER:          usize        = _CONST_CONFIG.receiver_buffer as usize;
137        const _PROCESSOR_UNI_INSTRUMENTS: usize        = _CONST_CONFIG.executor_instruments.into();
138        const _SENDER_BUFFER:             usize        = _CONST_CONFIG.sender_buffer   as usize;
139        type ProcessorUniType = UniZeroCopyFullSync<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
140        type SenderChannel = ChannelUniMoveFullSync<$local_messages, _SENDER_BUFFER, 1>;
141    };
142    ($const_config: expr, Crossbeam, $remote_messages: ty, $local_messages: ty) => {
143        const _CONST_CONFIG:              ConstConfig  = $const_config;
144        const _PROCESSOR_BUFFER:          usize        = _CONST_CONFIG.receiver_buffer as usize;
145        const _PROCESSOR_UNI_INSTRUMENTS: usize        = _CONST_CONFIG.executor_instruments.into();
146        const _SENDER_BUFFER:             usize        = _CONST_CONFIG.sender_buffer   as usize;
147        type ProcessorUniType = UniMoveCrossbeam<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
148        type SenderChannel = ChannelUniMoveCrossbeam<$local_messages, _SENDER_BUFFER, 1>;
149    };
150}
151pub use _define_processor_uni_and_sender_channel_types;