1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Common types used across this submodule
use std::error::Error;
use crate::serde::ReactiveMessagingConfig;
use crate::prelude::Peer;
use crate::socket_connection::connection_provider::ConnectionChannel;
use crate::types::{ProtocolEvent, MessagingMutinyStream, ConnectionEvent};
use crate::socket_connection::{
socket_dialog::dialog_types::SocketDialog,
connection::SocketConnection,
};
use std::fmt::Debug;
use std::future;
use std::future::Future;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::Stream;
use reactive_mutiny::prelude::{FullDuplexUniChannel, GenericUni};
/// Base trait for server and client services functionalities
pub trait MessagingService<const CONFIG: u64> {
type StateType: Send + Sync + Clone + Debug + 'static;
/// Spawns a task dedicated to the given "protocol processor", returning immediately.\
/// The given `dialog_processor_builder_fn` will be called for each new connection and should return a `Stream`
/// that will produce non-futures & non-fallible items that **may be, optionally, sent to the remote party** (see [crate::prelude::ResponsiveStream]):
/// - `protocol_events_callback`: -- a generic function (or closure) to handle "new peer", "peer left" and "service termination" events (possibly to manage sessions). Sign it as:
/// ```nocompile
/// async fn protocol_events_handler<const CONFIG: u64,
/// LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
/// SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
/// (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) {...}
/// ```
/// - `dialog_processor_builder_fn` -- the generic function (or closure) that receives the `Stream` of remote messages and returns another `Stream`, possibly yielding
/// messages of the "local" type to be sent to the remote party -- see [crate::prelude::ResponsiveStream]. Sign the processor as:
/// ```nocompile
/// fn processor<const CONFIG: u64,
/// LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
/// SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
/// StreamItemType: Deref<Target=[your type for messages produced by the REMOTE party]>>
/// (remote_addr: String,
/// connected_port: u16,
/// peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>,
/// remote_messages_stream: impl Stream<Item=StreamItemType>)
/// -> impl Stream<Item=ANY_TYPE> {...}
/// ```
/// -- if you want the processor to produce answer messages of type `LocalMessages` to be sent to clients, see [Self::spawn_responsive_processor()]:
fn spawn_processor<RemoteMessages: Send + Sync + PartialEq + Debug + 'static,
LocalMessages: ReactiveMessagingConfig<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
ProcessorUniType: GenericUni<ItemType=RemoteMessages> + Send + Sync + 'static,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync + 'static,
OutputStreamItemsType: Send + Sync + Debug + 'static,
RemoteStreamType: Stream<Item=OutputStreamItemsType> + Send + 'static,
ProtocolEventsCallbackFuture: Future<Output=()> + Send + 'static,
ProtocolEventsCallback: Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, Self::StateType>) -> ProtocolEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(/*remote_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, Self::StateType>>, /*remote_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> RemoteStreamType + Send + Sync + 'static,
OriginalRemoteMessages: Send + Sync + PartialEq + Debug + 'static>
(&mut self,
socket_dialog: impl SocketDialog<CONFIG, RemoteMessages=OriginalRemoteMessages, LocalMessages=LocalMessages, ProcessorUni=ProcessorUniType, SenderChannel=SenderChannel, State=Self::StateType> + 'static,
connection_events_callback: ProtocolEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> impl Future<Output=Result<ConnectionChannel<Self::StateType>, Box<dyn Error + Sync + Send>>> + Send;
/// Start the service with a single processor (after calling either [Self::spawn_unresponsive_processor()]
/// or [Self::spawn_responsive_processor()] once) -- A.K.A. "The Single Protocol Mode".\
/// See [Self::start_multi_protocol()] if you want a service that shares connections among
/// different protocol processors.
///
/// Starts the service using the provided `connection_channel` to distribute the connections.
fn start_single_protocol(&mut self, connection_channel: ConnectionChannel<Self::StateType>)
-> impl Future<Output=Result<(), Box<dyn Error + Sync + Send>>> + Send
where Self: Send, Self::StateType: Default {
async {
// this closure will cause incoming or just-opened connections to be sent to `connection_channel` and returned connections to be dropped
let connection_routing_closure = move |_socket_connection: &SocketConnection<Self::StateType>, is_reused: bool|
if is_reused {
None
} else {
Some(connection_channel.clone_sender())
};
// tracking the connection events is not really necessary for the "single protocol" case here, as, for this specific case, the "protocol events" contain that information already
let connection_events_callback = |_: ConnectionEvent<'_, Self::StateType>| future::ready(());
self.start_multi_protocol(Self::StateType::default(), connection_routing_closure, connection_events_callback).await
}
}
/// Starts the service using the provided `connection_routing_closure` to distribute the connections among the configured processors
/// -- previously fed in by [Self::spawn_responsive_processor()] & [Self::spawn_unresponsive_processor()].
///
/// `protocol_stacking_closure := FnMut(socket_connection: &SocketConnection<StateType>, is_reused: bool) -> connection_receiver: Option<tokio::sync::mpsc::Sender<TcpStream>>`
///
/// -- this closure "decides what to do" with available connections, routing them to the appropriate processors:
/// - Newly received connections will have `last_state` set to `None` -- otherwise, this will either be set by the processor
/// before the [Peer] is closed -- see [Peer::set_state()] -- or will have the `Default` value.
/// - The returned value must be one of the "handles" returned by [Self::spawn_responsive_processor()] or
/// [Self::spawn_unresponsive_processor()].
/// - If `None` is returned, the connection will be closed.
///
/// This method returns an error in the following cases:
/// 1) if the connecting/binding process fails;
/// 2) if no processors were configured.
fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output=()> + Send>
(&mut self,
initial_connection_state: Self::StateType,
connection_routing_closure: impl FnMut(/*socket_connection: */&SocketConnection<Self::StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<Self::StateType>>> + Send + 'static,
connection_events_callback: impl for <'r> Fn(/*event: */ConnectionEvent<'r, Self::StateType>) -> ConnectionEventsCallbackFuture + Send + 'static)
-> impl Future<Output=Result<(), Box<dyn Error + Sync + Send>>> + Send;
/// Returns an async closure that blocks until [Self::terminate()] is called.
/// Example:
/// ```no_compile
/// self.start_the_service();
/// self.termination_waiter()().await;
fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn std::error::Error + Send + Sync>>> >;
/// Notifies the service it is time to stop / shutdown / terminate.\
/// It is a recommended practice that the `connection_events_handler()` you provided (when starting each dialog processor)
/// inform all clients that a remote-initiated disconnection (due to the call to this function) is happening -- the protocol must support that, though.
fn terminate(self) -> impl Future<Output=Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send;
}
/// For internal use: defines `ProcessorUniType` & `SenderChannel` based on the given [Channels] parameter
/// (for use when spawning processors with [MessagingService::spawn_unresponsive_processor()] &
/// [MessagingService::spawn_responsive_processor()].)\
/// The second parameter -- `channel_type` can be one of `Atomic`, `FullSync`, `Crossbeam`.
#[macro_export]
macro_rules! _define_processor_uni_and_sender_channel_types {
($const_config: expr, Atomic, $remote_messages: ty, $local_messages: ty) => {
const _CONST_CONFIG: ConstConfig = $const_config;
const _CONFIG: u64 = _CONST_CONFIG.into();
const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_channel_size as usize;
const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_channel_size as usize;
type ProcessorUniType = $crate::prelude::UniZeroCopyAtomic<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
type SenderChannel = $crate::prelude::ChannelUniMoveAtomic<$local_messages, _SENDER_BUFFER, 1>;
};
($const_config: expr, FullSync, $remote_messages: ty, $local_messages: ty) => {
const _CONST_CONFIG: ConstConfig = $const_config;
const _CONFIG: u64 = _CONST_CONFIG.into();
const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_channel_size as usize;
const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_channel_size as usize;
type ProcessorUniType = $crate::prelude::UniZeroCopyFullSync<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
type SenderChannel = $crate::prelude::ChannelUniMoveFullSync<$local_messages, _SENDER_BUFFER, 1>;
};
($const_config: expr, Crossbeam, $remote_messages: ty, $local_messages: ty) => {
const _CONST_CONFIG: ConstConfig = $const_config;
const _CONFIG: u64 = _CONST_CONFIG.into();
const _PROCESSOR_BUFFER: usize = _CONST_CONFIG.receiver_channel_size as usize;
const _PROCESSOR_UNI_INSTRUMENTS: usize = _CONST_CONFIG.executor_instruments.into();
const _SENDER_BUFFER: usize = _CONST_CONFIG.sender_channel_size as usize;
type ProcessorUniType = $crate::prelude::UniMoveCrossbeam<$remote_messages, _PROCESSOR_BUFFER, 1, _PROCESSOR_UNI_INSTRUMENTS>;
type SenderChannel = $crate::prelude::ChannelUniMoveCrossbeam<$local_messages, _SENDER_BUFFER, 1>;
};
}
pub use _define_processor_uni_and_sender_channel_types;