Trait MessagingService

Source
pub trait MessagingService<const CONFIG: u64> {
    type StateType: Send + Sync + Clone + Debug + 'static;

    // Required methods
    async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static, ProcessorUniType: GenericUni<ItemType = RemoteMessages> + Send + Sync + 'static, SenderChannel: FullDuplexUniChannel<ItemType = LocalMessages, DerivedItemType = LocalMessages> + Send + Sync + 'static, OutputStreamItemsType: Send + Sync + Debug + 'static, RemoteStreamType: Stream<Item = OutputStreamItemsType> + Send + 'static, ProtocolEventsCallbackFuture: Future<Output = ()> + Send + 'static, ProtocolEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static, ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>(
        &mut self,
        connection_events_callback: ProtocolEventsCallback,
        dialog_processor_builder_fn: ProcessorBuilderFn,
    ) -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>;
    async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output = ()> + Send>(
        &mut self,
        initial_connection_state: Self::StateType,
        connection_routing_closure: impl FnMut(&SocketConnection<Self::StateType>, bool) -> Option<Sender<SocketConnection<Self::StateType>>> + Send + 'static,
        connection_events_callback: impl for<'r> Fn(ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static,
    ) -> Result<(), Box<dyn Error + Sync + Send>>;
    fn termination_waiter(
        &mut self,
    ) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>>;
    async fn terminate(self) -> Result<(), Box<dyn Error + Send + Sync>>;

    // Provided method
    async fn start_single_protocol(
        &mut self,
        connection_channel: ConnectionChannel<Self::StateType>,
    ) -> Result<(), Box<dyn Error + Sync + Send>>
       where Self::StateType: Default { ... }
}
Expand description

Base trait for server and client services functionalities

Required Associated Types§

Source

type StateType: Send + Sync + Clone + Debug + 'static

Required Methods§

Source

async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static, LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static, ProcessorUniType: GenericUni<ItemType = RemoteMessages> + Send + Sync + 'static, SenderChannel: FullDuplexUniChannel<ItemType = LocalMessages, DerivedItemType = LocalMessages> + Send + Sync + 'static, OutputStreamItemsType: Send + Sync + Debug + 'static, RemoteStreamType: Stream<Item = OutputStreamItemsType> + Send + 'static, ProtocolEventsCallbackFuture: Future<Output = ()> + Send + 'static, ProtocolEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static, ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static>( &mut self, connection_events_callback: ProtocolEventsCallback, dialog_processor_builder_fn: ProcessorBuilderFn, ) -> Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>

Spawns a task dedicated to the given “protocol processor”, returning immediately.
The given dialog_processor_builder_fn will be called for each new connection and should return a Stream that will produce non-futures & non-fallible items that may be, optionally, sent to the remote party (see crate::prelude::ResponsiveStream):

  • protocol_events_callback: – a generic function (or closure) to handle “new peer”, “peer left” and “service termination” events (possibly to manage sessions). Sign it as:
    async fn protocol_events_handler<const CONFIG:  u64,
                                     LocalMessages: ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
                                     SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
                                    (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
  • dialog_processor_builder_fn – the generic function (or closure) that receives the Stream of remote messages and returns another Stream, possibly yielding messages of the “local” type to be sent to the remote party – see crate::prelude::ResponsiveStream. Sign the processor as:
    fn processor<const CONFIG:   u64,
                 LocalMessages:  ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
                 SenderChannel:  FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
                 StreamItemType: Deref<Target=[your type for messages produced by the REMOTE party]>>
                (remote_addr:            String,
                 connected_port:         u16,
                 peer:                   Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
                 remote_messages_stream: impl Stream<Item=StreamItemType>)
                -> impl Stream<Item=ANY_TYPE> {...}

– if you want the processor to produce answer messages of type LocalMessages to be sent to clients, see [Self::spawn_responsive_processor()]:

Source

async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output = ()> + Send>( &mut self, initial_connection_state: Self::StateType, connection_routing_closure: impl FnMut(&SocketConnection<Self::StateType>, bool) -> Option<Sender<SocketConnection<Self::StateType>>> + Send + 'static, connection_events_callback: impl for<'r> Fn(ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static, ) -> Result<(), Box<dyn Error + Sync + Send>>

Starts the service using the provided connection_routing_closure to distribute the connections among the configured processors – previously fed in by [Self::spawn_responsive_processor()] & [Self::spawn_unresponsive_processor()].

protocol_stacking_closure := FnMut(socket_connection: &SocketConnection<StateType>, is_reused: bool) -> connection_receiver: Option<tokio::sync::mpsc::Sender<TcpStream>>

– this closure “decides what to do” with available connections, routing them to the appropriate processors:

  • Newly received connections will have last_state set to None – otherwise, this will either be set by the processor before the Peer is closed – see Peer::set_state() – or will have the Default value.
  • The returned value must be one of the “handles” returned by [Self::spawn_responsive_processor()] or [Self::spawn_unresponsive_processor()].
  • If None is returned, the connection will be closed.

This method returns an error in the following cases:

  1. if the connecting/binding process fails;
  2. if no processors were configured.
Source

fn termination_waiter( &mut self, ) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>>

Returns an async closure that blocks until Self::terminate() is called. Example:

    self.start_the_service();
    self.termination_waiter()().await;
Source

async fn terminate(self) -> Result<(), Box<dyn Error + Send + Sync>>

Notifies the service it is time to stop / shutdown / terminate.
It is a recommended practice that the connection_events_handler() you provided (when starting each dialog processor) inform all clients that a remote-initiated disconnection (due to the call to this function) is happening – the protocol must support that, though.

Provided Methods§

Source

async fn start_single_protocol( &mut self, connection_channel: ConnectionChannel<Self::StateType>, ) -> Result<(), Box<dyn Error + Sync + Send>>
where Self::StateType: Default,

Start the service with a single processor (after calling either [Self::spawn_unresponsive_processor()] or [Self::spawn_responsive_processor()] once) – A.K.A. “The Single Protocol Mode”.
See Self::start_multi_protocol() if you want a service that shares connections among different protocol processors.

Starts the service using the provided connection_channel to distribute the connections.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<const CONFIG: u64, StateType: Send + Sync + Clone + Debug + 'static> MessagingService<CONFIG> for CompositeSocketClient<CONFIG, StateType>

Source§

type StateType = StateType

Source§

impl<const CONFIG: u64, StateType: Send + Sync + Clone + Debug + 'static> MessagingService<CONFIG> for CompositeSocketServer<CONFIG, StateType>

Source§

type StateType = StateType