pub trait MessagingService<const CONFIG: u64> {
type StateType: Send + Sync + Clone + Debug + 'static;
// Required methods
async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static, ProcessorUniType: GenericUni<ItemType = RemoteMessages> + Send + Sync + 'static, SenderChannel: FullDuplexUniChannel<ItemType = LocalMessages, DerivedItemType = LocalMessages> + Send + Sync + 'static, OutputStreamItemsType: Send + Sync + Debug + 'static, RemoteStreamType: Stream<Item = OutputStreamItemsType> + Send + 'static, ProtocolEventsCallbackFuture: Future<Output = ()> + Send + 'static, ProtocolEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static, ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>(
&mut self,
connection_events_callback: ProtocolEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn,
) -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>;
async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output = ()> + Send>(
&mut self,
initial_connection_state: Self::StateType,
connection_routing_closure: impl FnMut(&SocketConnection<Self::StateType>, bool) -> Option<Sender<SocketConnection<Self::StateType>>> + Send + 'static,
connection_events_callback: impl for<'r> Fn(ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static,
) -> Result<(), Box<dyn Error + Sync + Send>>;
fn termination_waiter(
&mut self,
) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>>;
async fn terminate(self) -> Result<(), Box<dyn Error + Send + Sync>>;
// Provided method
async fn start_single_protocol(
&mut self,
connection_channel: ConnectionChannel<Self::StateType>,
) -> Result<(), Box<dyn Error + Sync + Send>>
where Self::StateType: Default { ... }
}
Expand description
Base trait for server and client services functionalities
Required Associated Types§
Required Methods§
Sourceasync fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static, ProcessorUniType: GenericUni<ItemType = RemoteMessages> + Send + Sync + 'static, SenderChannel: FullDuplexUniChannel<ItemType = LocalMessages, DerivedItemType = LocalMessages> + Send + Sync + 'static, OutputStreamItemsType: Send + Sync + Debug + 'static, RemoteStreamType: Stream<Item = OutputStreamItemsType> + Send + 'static, ProtocolEventsCallbackFuture: Future<Output = ()> + Send + 'static, ProtocolEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static, ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>(
&mut self,
connection_events_callback: ProtocolEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn,
) -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>
async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static, ProcessorUniType: GenericUni<ItemType = RemoteMessages> + Send + Sync + 'static, SenderChannel: FullDuplexUniChannel<ItemType = LocalMessages, DerivedItemType = LocalMessages> + Send + Sync + 'static, OutputStreamItemsType: Send + Sync + Debug + 'static, RemoteStreamType: Stream<Item = OutputStreamItemsType> + Send + 'static, ProtocolEventsCallbackFuture: Future<Output = ()> + Send + 'static, ProtocolEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static, ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>( &mut self, connection_events_callback: ProtocolEventsCallback, dialog_processor_builder_fn: ProcessorBuilderFn, ) -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>
Spawns a task dedicated to the given “protocol processor”, returning immediately.
The given dialog_processor_builder_fn
will be called for each new connection and should return a Stream
that will produce non-futures & non-fallible items that may be, optionally, sent to the remote party (see crate::prelude::ResponsiveStream):
protocol_events_callback
: – a generic function (or closure) to handle “new peer”, “peer left” and “service termination” events (possibly to manage sessions). Sign it as:async fn protocol_events_handler<const CONFIG: u64, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug, SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync> (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
dialog_processor_builder_fn
– the generic function (or closure) that receives theStream
of remote messages and returns anotherStream
, possibly yielding messages of the “local” type to be sent to the remote party – see crate::prelude::ResponsiveStream. Sign the processor as:fn processor<const CONFIG: u64, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug, SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync, StreamItemType: Deref<Target=[your type for messages produced by the REMOTE party]>> (remote_addr: String, connected_port: u16, peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, remote_messages_stream: impl Stream<Item=StreamItemType>) -> impl Stream<Item=ANY_TYPE> {...}
– if you want the processor to produce answer messages of type LocalMessages
to be sent to clients, see [Self::spawn_responsive_processor()]:
Sourceasync fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output = ()> + Send>(
&mut self,
initial_connection_state: Self::StateType,
connection_routing_closure: impl FnMut(&SocketConnection<Self::StateType>, bool) -> Option<Sender<SocketConnection<Self::StateType>>> + Send + 'static,
connection_events_callback: impl for<'r> Fn(ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static,
) -> Result<(), Box<dyn Error + Sync + Send>>
async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output = ()> + Send>( &mut self, initial_connection_state: Self::StateType, connection_routing_closure: impl FnMut(&SocketConnection<Self::StateType>, bool) -> Option<Sender<SocketConnection<Self::StateType>>> + Send + 'static, connection_events_callback: impl for<'r> Fn(ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static, ) -> Result<(), Box<dyn Error + Sync + Send>>
Starts the service using the provided connection_routing_closure
to distribute the connections among the configured processors
– previously fed in by [Self::spawn_responsive_processor()] & [Self::spawn_unresponsive_processor()].
protocol_stacking_closure := FnMut(socket_connection: &SocketConnection<StateType>, is_reused: bool) -> connection_receiver: Option<tokio::sync::mpsc::Sender<TcpStream>>
– this closure “decides what to do” with available connections, routing them to the appropriate processors:
- Newly received connections will have
last_state
set toNone
– otherwise, this will either be set by the processor before the Peer is closed – see Peer::set_state() – or will have theDefault
value. - The returned value must be one of the “handles” returned by [Self::spawn_responsive_processor()] or [Self::spawn_unresponsive_processor()].
- If
None
is returned, the connection will be closed.
This method returns an error in the following cases:
- if the connecting/binding process fails;
- if no processors were configured.
Sourcefn termination_waiter(
&mut self,
) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>>
fn termination_waiter( &mut self, ) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>>
Returns an async closure that blocks until Self::terminate() is called. Example:
self.start_the_service();
self.termination_waiter()().await;
Sourceasync fn terminate(self) -> Result<(), Box<dyn Error + Send + Sync>>
async fn terminate(self) -> Result<(), Box<dyn Error + Send + Sync>>
Notifies the service it is time to stop / shutdown / terminate.
It is a recommended practice that the connection_events_handler()
you provided (when starting each dialog processor)
inform all clients that a remote-initiated disconnection (due to the call to this function) is happening – the protocol must support that, though.
Provided Methods§
Sourceasync fn start_single_protocol(
&mut self,
connection_channel: ConnectionChannel<Self::StateType>,
) -> Result<(), Box<dyn Error + Sync + Send>>
async fn start_single_protocol( &mut self, connection_channel: ConnectionChannel<Self::StateType>, ) -> Result<(), Box<dyn Error + Sync + Send>>
Start the service with a single processor (after calling either [Self::spawn_unresponsive_processor()]
or [Self::spawn_responsive_processor()] once) – A.K.A. “The Single Protocol Mode”.
See Self::start_multi_protocol() if you want a service that shares connections among
different protocol processors.
Starts the service using the provided connection_channel
to distribute the connections.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.