reactive_messaging/socket_services/types.rs
1//! Common types used across this submodule
2
3use std::error::Error;
4use crate::serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
5use crate::prelude::Peer;
6use crate::socket_connection::connection_provider::ConnectionChannel;
7use crate::types::{ProtocolEvent, MessagingMutinyStream, ConnectionEvent};
8use crate::socket_connection::connection::SocketConnection;
9use std::fmt::Debug;
10use std::future;
11use std::future::Future;
12use std::sync::Arc;
13use futures::future::BoxFuture;
14use futures::Stream;
15use reactive_mutiny::prelude::{FullDuplexUniChannel, GenericUni};
16
17
18/// Base trait for server and client services functionalities
19pub trait MessagingService<const CONFIG: u64> {
20 type StateType: Send + Sync + Clone + Debug + 'static;
21
22
23 /// Spawns a task dedicated to the given "protocol processor", returning immediately.\
24 /// The given `dialog_processor_builder_fn` will be called for each new connection and should return a `Stream`
25 /// that will produce non-futures & non-fallible items that **may be, optionally, sent to the remote party** (see [crate::prelude::ResponsiveStream]):
26 /// - `protocol_events_callback`: -- a generic function (or closure) to handle "new peer", "peer left" and "service termination" events (possibly to manage sessions). Sign it as:
27 /// ```nocompile
28 /// async fn protocol_events_handler<const CONFIG: u64,
29 /// LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
30 /// SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
31 /// (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
32 /// ```
33 /// - `dialog_processor_builder_fn` -- the generic function (or closure) that receives the `Stream` of remote messages and returns another `Stream`, possibly yielding
34 /// messages of the "local" type to be sent to the remote party -- see [crate::prelude::ResponsiveStream]. Sign the processor as:
35 /// ```nocompile
36 /// fn processor<const CONFIG: u64,
37 /// LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
38 /// SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
39 /// StreamItemType: Deref<Target=[your type for messages produced by the REMOTE party]>>
40 /// (remote_addr: String,
41 /// connected_port: u16,
42 /// peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
43 /// remote_messages_stream: impl Stream<Item=StreamItemType>)
44 /// -> impl Stream<Item=ANY_TYPE> {...}
45 /// ```
46 /// -- if you want the processor to produce answer messages of type `LocalMessages` to be sent to clients, see [Self::spawn_responsive_processor()]:
47 async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static,
48 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
49 ProcessorUniType: GenericUni<ItemType=RemoteMessages> + Send + Sync + 'static,
50 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync + 'static,
51 OutputStreamItemsType: Send + Sync + Debug + 'static,
52 RemoteStreamType: Stream<Item=OutputStreamItemsType> + Send + 'static,
53 ProtocolEventsCallbackFuture: Future<Output=()> + Send + 'static,
54 ProtocolEventsCallback: Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static,
55 ProcessorBuilderFn: Fn(/*remote_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, /*remote_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>
56
57 (&mut self,
58 connection_events_callback: ProtocolEventsCallback,
59 dialog_processor_builder_fn: ProcessorBuilderFn)
60
61 -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>;
62
63 /// Start the service with a single processor (after calling either [Self::spawn_unresponsive_processor()]
64 /// or [Self::spawn_responsive_processor()] once) -- A.K.A. "The Single Protocol Mode".\
65 /// See [Self::start_multi_protocol()] if you want a service that shares connections among
66 /// different protocol processors.
67 ///
68 /// Starts the service using the provided `connection_channel` to distribute the connections.
69 async fn start_single_protocol(&mut self, connection_channel: ConnectionChannel<Self::StateType>)
70 -> Result<(), Box<dyn Error + Sync + Send>>
71 where Self::StateType: Default {
72 // this closure will cause incoming or just-opened connections to be sent to `connection_channel` and returned connections to be dropped
73 let connection_routing_closure = move |_socket_connection: &SocketConnection<Self::StateType>, is_reused: bool|
74 if is_reused {
75 None
76 } else {
77 Some(connection_channel.clone_sender())
78 };
79 // tracking the connection events is not really necessary for the "single protocol" case here, as, for this specific case, the "protocol events" contain that information already
80 let connection_events_callback = |_: ConnectionEvent<'_, Self::StateType>| future::ready(());
81 self.start_multi_protocol(Self::StateType::default(), connection_routing_closure, connection_events_callback).await
82 }
83
84 /// Starts the service using the provided `connection_routing_closure` to distribute the connections among the configured processors
85 /// -- previously fed in by [Self::spawn_responsive_processor()] & [Self::spawn_unresponsive_processor()].
86 ///
87 /// `protocol_stacking_closure := FnMut(socket_connection: &SocketConnection<StateType>, is_reused: bool) -> connection_receiver: Option<tokio::sync::mpsc::Sender<TcpStream>>`
88 ///
89 /// -- this closure "decides what to do" with available connections, routing them to the appropriate processors:
90 /// - Newly received connections will have `last_state` set to `None` -- otherwise, this will either be set by the processor
91 /// before the [Peer] is closed -- see [Peer::set_state()] -- or will have the `Default` value.
92 /// - The returned value must be one of the "handles" returned by [Self::spawn_responsive_processor()] or
93 /// [Self::spawn_unresponsive_processor()].
94 /// - If `None` is returned, the connection will be closed.
95 ///
96 /// This method returns an error in the following cases:
97 /// 1) if the connecting/binding process fails;
98 /// 2) if no processors were configured.
99 async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output=()> + Send>
100 (&mut self,
101 initial_connection_state: Self::StateType,
102 connection_routing_closure: impl FnMut(/*socket_connection: */&SocketConnection<Self::StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<Self::StateType>>> + Send + 'static,
103 connection_events_callback: impl for <'r> Fn(/*event: */ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static)
104 -> Result<(), Box<dyn Error + Sync + Send>>;
105
106 /// Returns an async closure that blocks until [Self::terminate()] is called.
107 /// Example:
108 /// ```no_compile
109 /// self.start_the_service();
110 /// self.termination_waiter()().await;
111 fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn std::error::Error + Send + Sync>>> >;
112
113 /// Notifies the service it is time to stop / shutdown / terminate.\
114 /// It is a recommended practice that the `connection_events_handler()` you provided (when starting each dialog processor)
115 /// inform all clients that a remote-initiated disconnection (due to the call to this function) is happening -- the protocol must support that, though.
116 async fn terminate(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
117
118}
119
120
121/// For internal use: defines `ProcessorUniType` & `SenderChannel` based on the given [Channels] parameter
122/// (for use when spawning processors with [MessagingService::spawn_unresponsive_processor()] &
123/// [MessagingService::spawn_responsive_processor()].)
124#[macro_export]
125macro_rules! _define_processor_uni_and_sender_channel_types {
126 ($const_config: expr, Atomic, $remote_messages: ty, $local_messages: ty) => {
127 const _CONST_CONFIG: ConstConfig = $const_config;
128 const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_buffer as usize;
129 const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
130 const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_buffer as usize;
131 type ProcessorUniType = UniZeroCopyAtomic<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
132 type SenderChannel = ChannelUniMoveAtomic<$local_messages, _SENDER_BUFFER, 1>;
133 };
134 ($const_config: expr, FullSync, $remote_messages: ty, $local_messages: ty) => {
135 const _CONST_CONFIG: ConstConfig = $const_config;
136 const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_buffer as usize;
137 const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
138 const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_buffer as usize;
139 type ProcessorUniType = UniZeroCopyFullSync<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
140 type SenderChannel = ChannelUniMoveFullSync<$local_messages, _SENDER_BUFFER, 1>;
141 };
142 ($const_config: expr, Crossbeam, $remote_messages: ty, $local_messages: ty) => {
143 const _CONST_CONFIG: ConstConfig = $const_config;
144 const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_buffer as usize;
145 const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
146 const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_buffer as usize;
147 type ProcessorUniType = UniMoveCrossbeam<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
148 type SenderChannel = ChannelUniMoveCrossbeam<$local_messages, _SENDER_BUFFER, 1>;
149 };
150}
151pub use _define_processor_uni_and_sender_channel_types;