use crate::{
socket_services::socket_server::common::upgrade_to_termination_tracking,
types::{
ProtocolEvent,
MessagingMutinyStream,
}, socket_connection::{
peer::Peer,
socket_connection_handler::SocketConnectionHandler,
connection_provider::{ServerConnectionHandler, ConnectionChannel},
},
serde::ReactiveMessagingConfig,
};
use crate::socket_connection::connection::SocketConnection;
use crate::socket_services::types::MessagingService;
use crate::types::ConnectionEvent;
use crate::socket_connection::socket_dialog::dialog_types::SocketDialog;
use std::{
error::Error,
fmt::Debug,
future::Future,
sync::Arc,
};
use reactive_mutiny::prelude::advanced::{
FullDuplexUniChannel,
GenericUni,
};
use std::net::SocketAddr;
use futures::{future::BoxFuture, Stream};
use tokio::io::AsyncWriteExt;
use log::{error, trace, warn};
#[macro_export]
macro_rules! new_socket_server {
($const_config: expr,
$interface_ip: expr,
$port: expr) => {
$crate::new_composite_socket_server!($const_config, $interface_ip, $port, ())
}
}
pub use new_socket_server;
#[macro_export]
macro_rules! new_composite_socket_server {
($const_config: expr,
$interface_ip: expr,
$port: expr,
$state_type: ty) => {{
const _CONST_CONFIG: ConstConfig = $const_config;
const _CONFIG: u64 = _CONST_CONFIG.into();
$crate::prelude::CompositeSocketServer::<_CONFIG, $state_type>::new($interface_ip, $port)
}}
}
pub use new_composite_socket_server;
#[macro_export]
macro_rules! spawn_server_processor {
($const_config: expr,
Textual,
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
$crate::_define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
let socket_dialog = $crate::socket_connection::socket_dialog::textual_dialog::TextualDialog::<_CONFIG, $remote_messages, $local_messages, $crate::prelude::ReactiveMessagingRonSerializer, $crate::prelude::ReactiveMessagingRonDeserializer, ProcessorUniType, SenderChannel, _>::default();
use $crate::prelude::MessagingService;
$socket_server.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _, _>(socket_dialog, $connection_events_handler_fn, $dialog_processor_builder_fn).await
}};
($const_config: expr,
Textual,
$serializer: tt, // a type implementing `ReactiveMessagingSerializer`
$deserializer: tt, // a type implementing `ReactiveMessagingDeserializer`
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
$crate::_define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
let socket_dialog = $crate::socket_connection::socket_dialog::textual_dialog::TextualDialog::<_CONFIG, $remote_messages, $local_messages, $serializer, $deserializer, ProcessorUniType, SenderChannel, _>::default();
use $crate::prelude::MessagingService;
$socket_server.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _, _>(socket_dialog, $connection_events_handler_fn, $dialog_processor_builder_fn).await
}};
($const_config: expr,
VariableBinary,
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
type _DERIVED_REMOTE_MESSAGES = $crate::socket_connection::socket_dialog::serialized_binary_dialog::SerializedWrapperType::<$remote_messages, $crate::prelude::ReactiveMessagingRkyvFastDeserializer>;
$crate::_define_processor_uni_and_sender_channel_types!($const_config, $channel_type, _DERIVED_REMOTE_MESSAGES, $local_messages);
let socket_dialog = $crate::socket_connection::socket_dialog::serialized_binary_dialog::SerializedBinaryDialog::<_CONFIG, $remote_messages, $local_messages, $crate::prelude::ReactiveMessagingRkyvSerializer, $crate::prelude::ReactiveMessagingRkyvFastDeserializer, ProcessorUniType, SenderChannel, _>::default();
use $crate::prelude::MessagingService;
$socket_server.spawn_processor::<_DERIVED_REMOTE_MESSAGES, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _, _>(socket_dialog, $connection_events_handler_fn, $dialog_processor_builder_fn).await
}};
($const_config: expr,
VariableBinary,
$serializer: tt, // a type implementing `ReactiveMessagingSerializer`
$deserializer: tt, // a type implementing `ReactiveMessagingDeserializer`
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
type _DERIVED_REMOTE_MESSAGES = $crate::socket_connection::socket_dialog::serialized_binary_dialog::SerializedWrapperType::<$remote_messages, $crate::prelude::ReactiveMessagingRkyvFastDeserializer>;
$crate::_define_processor_uni_and_sender_channel_types!($const_config, $channel_type, _DERIVED_REMOTE_MESSAGES, $local_messages);
let socket_dialog = $crate::socket_connection::socket_dialog::serialized_binary_dialog::SerializedBinaryDialog::<_CONFIG, $remote_messages, $local_messages, $serializer, $deserializer, ProcessorUniType, SenderChannel, _>::default();
use $crate::prelude::MessagingService;
$socket_server.spawn_processor::<_DERIVED_REMOTE_MESSAGES, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _, _>(socket_dialog, $connection_events_handler_fn, $dialog_processor_builder_fn).await
}};
($const_config: expr,
MmapBinary,
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
$crate::_define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
let socket_dialog = $crate::socket_connection::socket_dialog::mmap_binary_dialog::MmapBinaryDialog::<_CONFIG, $remote_messages, $local_messages, ProcessorUniType, SenderChannel, _>::default();
use $crate::prelude::MessagingService;
$socket_server.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _, _>(socket_dialog, $connection_events_handler_fn, $dialog_processor_builder_fn).await
}};
}
pub use spawn_server_processor;
#[macro_export]
macro_rules! start_server_processor {
($const_config: expr,
$message_form: tt, // one of `Textual`, `VariableBinary` or `MmapBinary`
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
use $crate::prelude::MessagingService;
match $crate::spawn_server_processor!($const_config, $message_form, $channel_type, $socket_server, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
Ok(connection_channel) => $socket_server.start_single_protocol(connection_channel).await,
Err(err) => Err(err),
}
}};
($const_config: expr,
$message_form: tt, // one of `Textual`, `VariableBinary` or `MmapBinary`
$serializer: tt, // a type implementing `ReactiveMessagingSerializer`
$deserializer: tt, // a type implementing `ReactiveMessagingDeserializer`
$channel_type: tt, // one of `Atomic`, `FullSync`, `Crossbeam`
$socket_server: expr,
$remote_messages: ty,
$local_messages: ty,
$connection_events_handler_fn: expr,
$dialog_processor_builder_fn: expr) => {{
use $crate::prelude::MessagingService;
match $crate::spawn_server_processor!($const_config, $message_form, $serializer, $deserializer, $channel_type, $socket_server, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
Ok(connection_channel) => $socket_server.start_single_protocol(connection_channel).await,
Err(err) => Err(err),
}
}};
}
pub use start_server_processor;
pub struct CompositeSocketServer<const CONFIG: u64,
StateType: Send + Sync + Clone + Debug + 'static> {
interface_ip: String,
port: u16,
connection_provider: Option<ServerConnectionHandler<StateType>>,
processor_termination_complete_receivers: Option<Vec<tokio::sync::oneshot::Receiver<()>>>,
returned_connections_source: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
returned_connections_sink: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
}
impl<const CONFIG: u64,
StateType: Send + Sync + Clone + Debug + 'static>
CompositeSocketServer<CONFIG, StateType> {
pub fn new<IntoString: Into<String>>
(interface_ip: IntoString,
port: u16)
-> Self {
let (returned_connections_sink, returned_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(2);
Self {
interface_ip: interface_ip.into(),
port,
connection_provider: None,
processor_termination_complete_receivers: Some(vec![]),
returned_connections_source: Some(returned_connections_source),
returned_connections_sink,
}
}
}
impl<const CONFIG: u64,
StateType: Send + Sync + Clone + Debug + 'static>
MessagingService<CONFIG>
for CompositeSocketServer<CONFIG, StateType> {
type StateType = StateType;
async fn spawn_processor<RemoteMessages: Send + Sync + PartialEq + Debug + 'static,
LocalMessages: ReactiveMessagingConfig<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
ProcessorUniType: GenericUni<ItemType=RemoteMessages> + Send + Sync + 'static,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync + 'static,
OutputStreamItemsType: Send + Sync + Debug + 'static,
ServerStreamType: Stream<Item=OutputStreamItemsType> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()> + Send + 'static,
ConnectionEventsCallback: Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(/*client_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, /*client_messages_stream: */MessagingMutinyStream<ProcessorUniType>) -> ServerStreamType + Send + Sync + 'static,
OriginalRemoteMessages: Send + Sync + PartialEq + Debug + 'static>
(&mut self,
socket_dialog: impl SocketDialog<CONFIG, RemoteMessages=OriginalRemoteMessages, LocalMessages=LocalMessages, ProcessorUni=ProcessorUniType, SenderChannel=SenderChannel, State=StateType> + 'static,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<ConnectionChannel<StateType>, Box<dyn Error + Sync + Send>> {
let (local_termination_sender, local_termination_receiver) = tokio::sync::oneshot::channel::<()>();
self.processor_termination_complete_receivers.as_mut().expect("BUG!").push(local_termination_receiver);
let connection_events_callback = upgrade_to_termination_tracking(local_termination_sender, connection_events_callback);
let mut connection_provider = ConnectionChannel::new();
let new_connections_source = connection_provider.receiver()
.ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
let socket_communications_handler = SocketConnectionHandler::<CONFIG, _>::new(socket_dialog);
socket_communications_handler.server_loop(&self.interface_ip,
self.port,
new_connections_source,
self.returned_connections_sink.clone(),
connection_events_callback,
dialog_processor_builder_fn).await
.map_err(|err| format!("Error starting an unresponsive GenericCompositeSocketServer @ {}:{}: {:?}", self.interface_ip, self.port, err))?;
Ok(connection_provider)
}
async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output=()> + Send>
(&mut self,
initial_connection_state: StateType,
mut connection_routing_closure: impl FnMut(/*socket_connection: */&SocketConnection<StateType>, /*is_reused: */bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<StateType>>> + Send + 'static,
connection_events_callback: impl for <'r> Fn(/*event: */ConnectionEvent<'r, StateType>) -> ConnectionEventsCallbackFuture + Send + 'static)
-> Result<(), Box<dyn Error + Sync + Send>> {
let mut connection_provider = ServerConnectionHandler::new(&self.interface_ip, self.port, initial_connection_state).await
.map_err(|err| format!("couldn't start the Connection Provider server event loop: {err}"))?;
let mut new_connections_source = connection_provider.connection_receiver()
.ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
_ = self.connection_provider.insert(connection_provider);
let mut returned_connections_source = self.returned_connections_source.take()
.ok_or_else(|| String::from("couldn't `take()` from the `returned_connections_source`. Has the server been `.start()`ed more than once?"))?;
let interface_ip = self.interface_ip.clone();
let port = self.port;
tokio::spawn(async move {
loop {
let (mut connection, sender) = tokio::select! {
new_connection = new_connections_source.recv() => {
let Some(new_socket_connection) = new_connection else { break };
connection_events_callback(ConnectionEvent::Connected(&new_socket_connection)).await;
let sender = connection_routing_closure(&new_socket_connection, false);
(new_socket_connection, sender)
},
returned_connection_and_state = returned_connections_source.recv() => {
let Some(returned_socket_connection) = returned_connection_and_state else { break };
let sender = (!returned_socket_connection.closed())
.then_some(())
.and_then(|_| connection_routing_closure(&returned_socket_connection, true));
(returned_socket_connection, sender)
},
};
match sender {
Some(sender) => {
let (client_ip, client_port) = connection.connection().peer_addr()
.map(|peer_addr| match peer_addr {
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
})
.unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
trace!("`reactive-messaging::CompositeSocketServer`: ROUTING the client {client_ip}:{client_port} of the server @ {interface_ip}:{port} to another processor");
if let Err(err) = sender.send(connection).await {
error!("`reactive-messaging::CompositeSocketServer`: BUG(?) in server @ {interface_ip}:{port} while re-routing the client {client_ip}:{client_port}'s socket: THE NEW (ROUTED) PROCESSOR CAN NO LONGER RECEIVE CONNECTIONS -- THE CONNECTION WILL BE DROPPED: {err}");
break
}
},
None => {
connection_events_callback(ConnectionEvent::Disconnected(&connection)).await;
if let Err(err) = connection.connection_mut().shutdown().await {
let (client_ip, client_port) = connection.connection().peer_addr()
.map(|peer_addr| match peer_addr {
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
})
.unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
error!("`reactive-messaging::CompositeSocketServer`: ERROR in server @ {interface_ip}:{port} while shutting down the socket with client {client_ip}:{client_port}: {err}");
}
}
}
}
trace!("`reactive-messaging::CompositeSocketServer`: The 'Connection Routing Task' for server @ {interface_ip}:{port} ended -- hopefully, due to a graceful server termination.");
});
Ok(())
}
fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>> > {
let mut local_termination_receiver = self.processor_termination_complete_receivers.take();
let interface_ip = self.interface_ip.clone();
let port = self.port;
Box::new(move || Box::pin(async move {
let Some(local_termination_receiver) = local_termination_receiver.take() else {
return Err(Box::from(format!("GenericCompositeSocketServer::termination_waiter(): termination requested for server @ {interface_ip}:{port}, but the server was not started (or a previous termination was commanded) at the moment the `termination_waiter()`'s returned closure was called")))
};
for (i, processor_termination_complete_receiver) in local_termination_receiver.into_iter().enumerate() {
if let Err(err) = processor_termination_complete_receiver.await {
error!("GenericCompositeSocketServer::termination_waiter(): It is no longer possible to tell when the processor {i} will be termination for server @ {interface_ip}:{port}: `one_shot` signal error: {err}")
}
}
Ok(())
}))
}
async fn terminate(mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
match self.connection_provider.take() {
Some(connection_provider) => {
warn!("GenericCompositeSocketServer: Termination asked & initiated for server @ {}:{}", self.interface_ip, self.port);
connection_provider.shutdown().await;
Ok(())
}
None => {
Err(Box::from("GenericCompositeSocketServer: Termination requested, but the service was not started -- no `self.start_with_*()` was called. Ignoring..."))
}
}
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
use crate::prelude::*;
use crate::socket_connection::socket_dialog::textual_dialog::TextualDialog;
use std::{
future,
ops::Deref,
sync::atomic::{AtomicU32, Ordering::Relaxed},
time::Duration,
};
use std::sync::atomic::AtomicBool;
use serde::{Deserialize, Serialize};
use futures::StreamExt;
use tokio::sync::Mutex;
use crate::unit_test_utils::TestString;
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT_START: u16 = 8040;
#[cfg_attr(not(doc),test)]
fn single_protocol_instantiation() {
let _atomic_server = new_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START);
let _fullsync_server = new_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START+1);
let _crossbeam_server = new_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START+2);
}
#[cfg_attr(not(doc),test)]
fn composite_protocol_instantiation() {
let _atomic_server = new_composite_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START+3, () );
let _fullsync_server = new_composite_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START+4, () );
let _crossbeam_server = new_composite_socket_server!(
ConstConfig {
..ConstConfig::default()
},
LISTENING_INTERFACE, PORT_START+5, () );
}
#[cfg_attr(not(doc),tokio::test)]
async fn doc_usage() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
const PORT: u16 = PORT_START+6; const TEST_CONFIG: ConstConfig = ConstConfig::default();
let mut server = new_socket_server!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT);
start_server_processor!(TEST_CONFIG, Textual, Atomic, server,
DummyClientAndServerMessages,
DummyClientAndServerMessages,
connection_events_handler,
unresponsive_processor
)?;
async fn connection_events_handler<const CONFIG: u64,
LocalMessages: ReactiveMessagingConfig<LocalMessages> + Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
(_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel>) {
}
fn unresponsive_processor<const CONFIG: u64,
LocalMessages: ReactiveMessagingConfig<LocalMessages> + Send + Sync + PartialEq + Debug,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StreamItemType: Deref<Target=DummyClientAndServerMessages>>
(_client_addr: String,
_connected_port: u16,
_peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel>>,
client_messages_stream: impl Stream<Item=StreamItemType>)
-> impl Stream<Item=()> {
client_messages_stream.map(|_payload| ())
}
let termination_waiter = server.termination_waiter();
server.terminate().await?;
termination_waiter().await?;
let mut server = new_socket_server!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT);
start_server_processor!(TEST_CONFIG, Textual, Atomic, server,
DummyClientAndServerMessages,
DummyClientAndServerMessages,
connection_events_handler,
responsive_processor
)?;
fn responsive_processor<const CONFIG: u64,
SenderChannel: FullDuplexUniChannel<ItemType=DummyClientAndServerMessages, DerivedItemType=DummyClientAndServerMessages> + Send + Sync,
StreamItemType: Deref<Target=DummyClientAndServerMessages>>
(_client_addr: String,
_connected_port: u16,
peer: Arc<Peer<CONFIG, DummyClientAndServerMessages, SenderChannel>>,
client_messages_stream: impl Stream<Item=StreamItemType>)
-> impl Stream<Item=()> {
client_messages_stream
.map(|_payload| DummyClientAndServerMessages::FloodPing)
.to_responsive_stream(peer, |_, _| ())
}
let termination_waiter = server.termination_waiter();
server.terminate().await?;
termination_waiter().await?;
let mut server = new_socket_server!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT);
start_server_processor!(TEST_CONFIG, Textual, Atomic, server,
DummyClientAndServerMessages,
DummyClientAndServerMessages,
|_| future::ready(()),
|_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
)?;
let termination_waiter = server.termination_waiter();
server.terminate().await?;
termination_waiter().await?;
const CUSTOM_CONFIG: ConstConfig = ConstConfig {
receiver_channel_size: 2048,
sender_channel_size: 1024,
executor_instruments: reactive_mutiny::prelude::Instruments::LogsWithExpensiveMetrics,
..ConstConfig::default()
};
let mut server = CompositeSocketServer :: <{CUSTOM_CONFIG.into()},
()>
:: new(LISTENING_INTERFACE, PORT);
type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.receiver_channel_size as usize}, 1, {CUSTOM_CONFIG.executor_instruments.into()}>;
type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.sender_channel_size as usize}, 1>;
let socket_dialog_handler = TextualDialog::<{CUSTOM_CONFIG.into()}, DummyClientAndServerMessages, DummyClientAndServerMessages, ReactiveMessagingRonSerializer, ReactiveMessagingRonDeserializer, ProcessorUniType, SenderChannelType, ()>::default();
let connection_channel = server.spawn_processor::<DummyClientAndServerMessages,
DummyClientAndServerMessages,
ProcessorUniType,
SenderChannelType,
_, _, _, _, _, _ > (
socket_dialog_handler,
|_| future::ready(()),
|_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
).await?;
server.start_single_protocol(connection_channel).await?;
let termination_waiter = server.termination_waiter();
server.terminate().await?;
termination_waiter().await?;
Ok(())
}
#[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
async fn termination_process() {
const PORT: u16 = PORT_START+7;
let max_time_ms = 20;
let client_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
let client_received_messages_count_ref2 = Arc::clone(&client_received_messages_count_ref1);
let server_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
let server_received_messages_count_ref2 = Arc::clone(&server_received_messages_count_ref1);
let client_peer_ref1 = Arc::new(Mutex::new(None));
let client_peer_ref2 = Arc::clone(&client_peer_ref1);
const TEST_CONFIG: ConstConfig = ConstConfig {
..ConstConfig::default()
};
let mut server = CompositeSocketServer :: <{TEST_CONFIG.into()},
() >
:: new(LISTENING_INTERFACE, PORT);
type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {TEST_CONFIG.receiver_channel_size as usize}, 1, {TEST_CONFIG.executor_instruments.into()}>;
type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {TEST_CONFIG.sender_channel_size as usize}, 1>;
let socket_dialog_handler = TextualDialog::<{TEST_CONFIG.into()}, DummyClientAndServerMessages, DummyClientAndServerMessages, ReactiveMessagingRonSerializer, ReactiveMessagingRonDeserializer, ProcessorUniType, SenderChannelType, ()>::default();
let connection_channel = server.spawn_processor :: <DummyClientAndServerMessages,
DummyClientAndServerMessages,
ProcessorUniType,
SenderChannelType,
_, _, _, _, _, _> (
socket_dialog_handler,
move |connection_event: ProtocolEvent<{TEST_CONFIG.into()}, DummyClientAndServerMessages, SenderChannelType>| {
let client_peer = Arc::clone(&client_peer_ref1);
async move {
match connection_event {
ProtocolEvent::PeerArrived { peer } => {
client_peer.lock().await.replace(peer);
},
ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
ProtocolEvent::LocalServiceTermination => {
let client_peer = client_peer.lock().await;
let client_peer = client_peer.as_ref().expect("No client is connected");
let _ = client_peer.send_async(DummyClientAndServerMessages::FloodPing).await;
client_peer.flush_and_close(Duration::ZERO).await;
}
}
}
},
move |_, _, peer, client_messages: MessagingMutinyStream<ProcessorUniType>| {
let server_received_messages_count = Arc::clone(&server_received_messages_count_ref1);
client_messages
.inspect(move |_| { server_received_messages_count.fetch_add(1, Relaxed); })
.map(move |_client_message| DummyClientAndServerMessages::FloodPing )
.to_responsive_stream(peer, |_, _| ())
}
).await.expect("Spawning a server processor");
server.start_single_protocol(connection_channel).await.expect("Starting the server");
let mut client = new_socket_client!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT);
start_client_processor!(TEST_CONFIG, Textual, FullSync, client,
DummyClientAndServerMessages,
DummyClientAndServerMessages,
|_| async {},
move |_, _, peer, server_messages| {
let client_received_messages_count = Arc::clone(&client_received_messages_count_ref1);
server_messages
.inspect(move |_| { client_received_messages_count.fetch_add(1, Relaxed); } )
.map(move |_server_message| DummyClientAndServerMessages::FloodPing)
.to_responsive_stream(peer, |_, _| ())
}
).expect("Starting the client");
while client_peer_ref2.lock().await.is_none() {
tokio::time::sleep(Duration::from_millis(2)).await;
}
let wait_for_server_termination = server.termination_waiter();
server.terminate().await
.expect("ERROR Signaling the server of the termination intention");
let start = std::time::SystemTime::now();
_ = tokio::time::timeout(Duration::from_secs(5), wait_for_server_termination()).await
.expect("TIMED OUT (>5s) Waiting for the server to live it's life and to complete the termination process");
let elapsed_ms = start.elapsed().unwrap().as_millis();
assert!(client_received_messages_count_ref2.load(Relaxed) > 1, "The client didn't receive any messages (not even the 'server is shutting down' notification)");
assert!(server_received_messages_count_ref2.load(Relaxed) > 1, "The server didn't receive any messages (not even 'gracefully disconnecting' after being notified that the server is shutting down)");
assert!(elapsed_ms <= max_time_ms as u128,
"The server termination (of a never complying client) didn't complete in a reasonable time, meaning the termination code is wrong. Maximum acceptable time: {}ms; Measured Time: {}ms",
max_time_ms, elapsed_ms);
}
#[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
async fn composite_protocol_stacking_pattern() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
const PORT: u16 = PORT_START+8;
const TEST_CONFIG: ConstConfig = ConstConfig::default();
let mut server = new_composite_socket_server!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT,
Protocols);
#[derive(Debug,PartialEq,Clone)]
enum Protocols {
IncomingClient,
WelcomeAuthenticatedFriend,
AccountSettings,
GoodbyeOptions,
Disconnect,
}
let incoming_client_processor_greeted = Arc::new(AtomicBool::new(false));
let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted);
let incoming_client_processor = spawn_server_processor!(TEST_CONFIG, Textual, Atomic, server, TestString, TestString,
|_| future::ready(()),
move |_, _, peer, client_messages_stream| {
assert_eq!(peer.try_take_state(), Some(Some(Protocols::IncomingClient)), "Connection is in a wrong state");
let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted_ref);
client_messages_stream.then(move |_payload| {
let peer = Arc::clone(&peer);
incoming_client_processor_greeted_ref.store(true, Relaxed);
async move {
peer.send_async(TestString::from(format!("`IncomingClient`: New peer {peer:?} ended up initial authentication proceedings. SAY SOMETHING and you will be routed to 'WelcomeAuthenticatedFriend'"))).await
.expect("Sending failed");
peer.set_state(Protocols::WelcomeAuthenticatedFriend).await;
peer.flush_and_close(Duration::from_secs(1)).await;
}
})
}
)?;
let welcome_authenticated_friend_processor_greeted = Arc::new(AtomicBool::new(false));
let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted);
let welcome_authenticated_friend_processor = spawn_server_processor!(TEST_CONFIG, Textual, Atomic, server, TestString, TestString,
|connection_event| async {
if let ProtocolEvent::PeerArrived { peer } = connection_event {
peer.send_async(TestString::from(format!("`WelcomeAuthenticatedFriend`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'AccountSettings'"))).await
.expect("Sending failed");
}
},
move |_, _, peer, client_messages_stream| {
assert_eq!(peer.try_take_state(), Some(Some(Protocols::WelcomeAuthenticatedFriend)), "Connection is in a wrong state");
let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted_ref);
client_messages_stream.then(move |_payload| {
let peer = Arc::clone(&peer);
welcome_authenticated_friend_processor_greeted_ref.store(true, Relaxed);
async move {
peer.set_state(Protocols::AccountSettings).await;
peer.flush_and_close(Duration::from_secs(1)).await;
}
})
}
)?;
let account_settings_processor_greeted = Arc::new(AtomicBool::new(false));
let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted);
let account_settings_processor = spawn_server_processor!(TEST_CONFIG, Textual, Atomic, server, TestString, TestString,
|connection_event| async {
if let ProtocolEvent::PeerArrived { peer } = connection_event {
peer.send_async(TestString::from(format!("`AccountSettings`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'GoodbyeOptions'"))).await
.expect("Sending failed");
}
},
move |_, _, peer, client_messages_stream| {
assert_eq!(peer.try_take_state(), Some(Some(Protocols::AccountSettings)), "Connection is in a wrong state");
let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted_ref);
client_messages_stream.then(move |_payload| {
let peer = Arc::clone(&peer);
account_settings_processor_greeted_ref.store(true, Relaxed);
async move {
peer.set_state(Protocols::GoodbyeOptions).await;
peer.flush_and_close(Duration::from_secs(1)).await;
}
})
}
)?;
let goodbye_options_processor_greeted = Arc::new(AtomicBool::new(false));
let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted);
let goodbye_options_processor = spawn_server_processor!(TEST_CONFIG, Textual, Atomic, server, TestString, TestString,
|connection_event| async {
if let ProtocolEvent::PeerArrived { peer } = connection_event {
peer.send_async(TestString::from(format!("`GoodbyeOptions`: Now dealing with client {peer:?}. SAY SOMETHING and you will be DISCONNECTED, as our talking is over. Thank you."))).await
.expect("Sending failed");
}
},
move |_, _, peer, client_messages_stream| {
assert_eq!(peer.try_take_state(), Some(Some(Protocols::GoodbyeOptions)), "Connection is in a wrong state");
let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted_ref);
client_messages_stream.then(move |_payload| {
let peer = Arc::clone(&peer);
goodbye_options_processor_greeted_ref.store(true, Relaxed);
async move {
peer.set_state(Protocols::Disconnect).await;
peer.flush_and_close(Duration::from_secs(1)).await;
}
})
}
)?;
let connection_routing_closure = move |socket_connection: &SocketConnection<Protocols>, _|
match socket_connection.state() {
Protocols::IncomingClient => Some(incoming_client_processor.clone_sender()),
Protocols::WelcomeAuthenticatedFriend => Some(welcome_authenticated_friend_processor.clone_sender()),
Protocols::AccountSettings => Some(account_settings_processor.clone_sender()),
Protocols::GoodbyeOptions => Some(goodbye_options_processor.clone_sender()),
Protocols::Disconnect => None,
};
server.start_multi_protocol(Protocols::IncomingClient, connection_routing_closure, |_| future::ready(())).await?;
let server_termination_waiter = server.termination_waiter();
let mut client = new_socket_client!(
TEST_CONFIG,
LISTENING_INTERFACE,
PORT);
start_client_processor!(TEST_CONFIG, Textual, Atomic, client, TestString, TestString,
|connection_event| async {
match connection_event {
ProtocolEvent::PeerArrived { peer } => peer.send_async(TestString::from("Hello! Am I in?")).await.expect("Sending failed"),
ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
ProtocolEvent::LocalServiceTermination => (),
}
},
move |_, _, peer, server_messages| server_messages
.map(|msg| {
println!("RECEIVED: {msg} -- answering with 'OK'");
TestString::from("OK")
})
.to_responsive_stream(peer, |_, _| ())
)?;
let client_waiter = client.termination_waiter();
_ = tokio::time::timeout(Duration::from_secs(5), client_waiter()).await
.expect("TIMED OUT (>5s) Waiting for the client & server to do their stuff & disconnect the client");
server.terminate().await?;
server_termination_waiter().await?;
assert!(incoming_client_processor_greeted.load(Relaxed), "`IncomingClient` processor wasn't requested");
assert!(welcome_authenticated_friend_processor_greeted.load(Relaxed), "`WelcomeAuthenticatedFriend` processor wasn't requested");
assert!(account_settings_processor_greeted.load(Relaxed), "`AccountSettings` processor wasn't requested");
assert!(goodbye_options_processor_greeted.load(Relaxed), "`GoodbyeOptions` processor wasn't requested");
Ok(())
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
enum DummyClientAndServerMessages {
#[default]
FloodPing,
AnythingElse, }
impl ReactiveMessagingConfig<DummyClientAndServerMessages> for DummyClientAndServerMessages {}
}