use crate::{
config::*,
types::*,
socket_connection::{
common::{ReactiveMessagingSender, ReactiveMessagingUniSender, upgrade_processor_uni_retrying_logic},
peer::Peer,
},
};
use std::{error::Error, fmt::Debug, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use reactive_mutiny::prelude::advanced::GenericUni;
use futures::Stream;
use tokio::io::AsyncWriteExt;
use log::{trace, debug, warn, error};
use crate::socket_connection::connection::SocketConnection;
use crate::socket_connection::socket_dialog::dialog_types::SocketDialog;
pub struct SocketConnectionHandler<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG> + Send + Sync + 'static> {
_socket_dialog: SocketDialogType,
}
impl<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG> + Send + Sync + 'static>
SocketConnectionHandler<CONFIG, SocketDialogType> {
const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
pub fn new(_socket_dialog: SocketDialogType) -> Self {
Self {
_socket_dialog,
}
}
#[inline(always)]
pub async fn server_loop<PipelineOutputType: Send + Sync + Debug + 'static,
OutputStreamType: Stream<Item=PipelineOutputType> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()> + Send,
ConnectionEventsCallback: Fn(/*server_event: */ProtocolEvent<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel, SocketDialogType::State>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(/*client_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel, SocketDialogType::State>>, /*client_messages_stream: */MessagingMutinyStream<SocketDialogType::ProcessorUni>) -> OutputStreamType + Send + Sync + 'static>
(self,
listening_interface: &str,
listening_port: u16,
mut connection_source: tokio::sync::mpsc::Receiver<SocketConnection<SocketDialogType::State>>,
connection_sink: tokio::sync::mpsc::Sender<SocketConnection<SocketDialogType::State>>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<(), Box<dyn Error + Sync + Send>> {
let arc_self = Arc::new(self);
let connection_events_callback = Arc::new(connection_events_callback);
let connection_sink = Arc::new(connection_sink);
let listening_interface_and_port = format!("{}:{}", listening_interface, listening_port);
tokio::spawn( async move {
while let Some(socket_connection) = connection_source.recv().await {
let addr = match socket_connection.connection().peer_addr() {
Ok(addr) => addr,
Err(err) => {
error!("`reactive-messaging::SocketServer`: ERROR in server @ {listening_interface_and_port} while determining an incoming client's address: {err}");
continue;
},
};
let (client_ip, client_port) = match addr {
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
};
let sender = ReactiveMessagingSender::<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel>::new(format!("Sender for client {addr}"));
let peer = Arc::new(Peer::new(sender, addr, &socket_connection)); let peer_ref1 = Arc::clone(&peer);
let peer_ref2 = Arc::clone(&peer);
let peer_ref3 = Arc::clone(&peer);
connection_events_callback(ProtocolEvent::PeerArrived {peer: peer.clone()}).await;
let connection_events_callback_ref = Arc::clone(&connection_events_callback);
let processor_sender = SocketDialogType::ProcessorUni::new(format!("Server processor for remote client {addr} @ {listening_interface_and_port}"))
.spawn_non_futures_non_fallibles_executors(1,
|in_stream| dialog_processor_builder_fn(client_ip.clone(), client_port, peer_ref1.clone(), in_stream),
move |executor| async move {
let execution_duration = Duration::from_nanos(executor.execution_finish_delta_nanos() - executor.execution_start_delta_nanos());
let executor_name = executor.executor_name().clone();
connection_events_callback_ref(ProtocolEvent::PeerLeft { peer: peer_ref2, stream_stats: executor }).await;
let flush_timeout_millis = Self::CONST_CONFIG.flush_timeout_millis as u64;
let unsent_messages = peer_ref3.flush_and_close(Duration::from_millis(flush_timeout_millis)).await;
if unsent_messages > 0 {
warn!("{executor_name} (in execution for {execution_duration:?}) left with {unsent_messages} unsent messages to {peer_ref3:?}, even after flushing with a timeout of {flush_timeout_millis}ms after, probably, its `Stream` being dropped. Consider increasing `CONST_CONFIG.flush_timeout_millis` or revisiting the processor logic.");
}
});
let processor_sender = upgrade_processor_uni_retrying_logic::<CONFIG, SocketDialogType::DeserializedRemoteMessages, <<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType, SocketDialogType::ProcessorUni>
(processor_sender);
let cloned_self = Arc::clone(&arc_self);
let listening_interface_and_port = listening_interface_and_port.clone();
let cloned_connection_sink = Arc::clone(&connection_sink);
tokio::spawn(tokio::task::unconstrained(async move {
let socket_connection = match cloned_self.dialog_loop(socket_connection, peer.clone(), processor_sender).await {
Ok(socket_connection) => socket_connection,
Err(err) => {
error!("`reactive-messaging::SocketServer`: ERROR in server @ {listening_interface_and_port} while starting the dialog with client {client_ip}:{client_port}: {err}");
return
}
};
if let Err(err) = cloned_connection_sink.send(socket_connection).await {
error!("`reactive-messaging::SocketServer`: ERROR in server @ {listening_interface_and_port} while returning the connection with {client_ip}:{client_port} to the caller. It will now be forcibly closed: {err}");
};
}));
}
debug!("`reactive-messaging::SocketServer`: bailing out of network loop -- we should be undergoing a shutdown...");
connection_events_callback(ProtocolEvent::LocalServiceTermination).await;
});
Ok(())
}
#[inline(always)]
pub async fn client<PipelineOutputType: Send + Sync + Debug + 'static,
OutputStreamType: Stream<Item=PipelineOutputType> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()> + Send,
ConnectionEventsCallback: Fn(/*server_event: */ProtocolEvent<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel, SocketDialogType::State>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(/*client_addr: */String, /*connected_port: */u16, /*peer: */Arc<Peer<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel, SocketDialogType::State>>, /*server_messages_stream: */MessagingMutinyStream<SocketDialogType::ProcessorUni>) -> OutputStreamType>
(self,
socket_connection: SocketConnection<SocketDialogType::State>,
mut shutdown_signaler: tokio::sync::broadcast::Receiver<()>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<SocketConnection<SocketDialogType::State>, Box<dyn Error + Sync + Send>> {
let addr = socket_connection.connection().peer_addr()?;
let sender = ReactiveMessagingSender::<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel>::new(format!("Sender for client {addr}"));
let peer = Arc::new(Peer::new(sender, addr, &socket_connection));
let peer_ref1 = Arc::clone(&peer);
let peer_ref2 = Arc::clone(&peer);
let peer_ref3 = Arc::clone(&peer);
let peer_ref4 = Arc::clone(&peer);
connection_events_callback(ProtocolEvent::PeerArrived {peer: peer.clone()}).await;
let connection_events_callback_ref1 = Arc::new(connection_events_callback);
let connection_events_callback_ref2 = Arc::clone(&connection_events_callback_ref1);
let processor_sender = SocketDialogType::ProcessorUni::new(format!("Client Processor for remote server @ {addr}"))
.spawn_non_futures_non_fallibles_executors(1,
|in_stream| dialog_processor_builder_fn(addr.ip().to_string(), addr.port(), peer_ref1.clone(), in_stream),
move |executor| async move {
let execution_duration = Duration::from_nanos(executor.execution_finish_delta_nanos() - executor.execution_start_delta_nanos());
let executor_name = executor.executor_name().clone();
connection_events_callback_ref1(ProtocolEvent::PeerLeft { peer: peer_ref2, stream_stats: executor }).await;
let flush_timeout_millis = Self::CONST_CONFIG.flush_timeout_millis as u64;
let unsent_messages = peer_ref3.flush_and_close(Duration::from_millis(flush_timeout_millis)).await;
if unsent_messages > 0 {
warn!("{executor_name} (in execution for {execution_duration:?}) left with {unsent_messages} unsent messages to {peer_ref3:?}, even after flushing with a timeout of {flush_timeout_millis}ms after, probably, its `Stream` being dropped. Consider increasing `CONST_CONFIG.flush_timeout_millis` or revisiting the processor logic.");
}
});
let processor_sender = upgrade_processor_uni_retrying_logic::<CONFIG, SocketDialogType::DeserializedRemoteMessages, <<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType, SocketDialogType::ProcessorUni>
(processor_sender);
let addr = addr.to_string();
tokio::spawn(async move {
match shutdown_signaler.recv().await {
Ok(()) => trace!("reactive-messaging: SHUTDOWN requested for client connected to server @ {addr} -- notifying & dropping the connection"),
Err(err) => error!("reactive-messaging: PROBLEM in the `shutdown signaler` client connected to server @ {addr} (a client shutdown will be commanded now due to this occurrence): {err:?}"),
};
connection_events_callback_ref2(ProtocolEvent::LocalServiceTermination).await;
peer_ref4.flush_and_close(Duration::from_millis(Self::CONST_CONFIG.flush_timeout_millis as u64)).await;
});
let arc_self = Arc::new(self);
arc_self.dialog_loop(socket_connection, peer.clone(), processor_sender).await
}
#[inline(always)]
async fn dialog_loop(self: Arc<Self>,
mut socket_connection: SocketConnection<SocketDialogType::State>,
peer: Arc<Peer<CONFIG, SocketDialogType::LocalMessages, SocketDialogType::SenderChannel, SocketDialogType::State>>,
processor_sender: ReactiveMessagingUniSender<CONFIG, SocketDialogType::DeserializedRemoteMessages, <<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType, SocketDialogType::ProcessorUni>)
-> Result<SocketConnection<SocketDialogType::State>, Box<dyn Error + Sync + Send>> {
if let Some(no_delay) = Self::CONST_CONFIG.socket_options.no_delay {
socket_connection.connection_mut().set_nodelay(no_delay).map_err(|err| format!("error setting nodelay({no_delay}) for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
}
if let Some(hops_to_live) = Self::CONST_CONFIG.socket_options.hops_to_live {
let hops_to_live = hops_to_live.get() as u32;
socket_connection.connection_mut().set_ttl(hops_to_live).map_err(|err| format!("error setting ttl({hops_to_live}) for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
}
if let Some(millis) = Self::CONST_CONFIG.socket_options.linger_millis {
socket_connection.connection_mut().set_linger(Some(Duration::from_millis(millis as u64))).map_err(|err| format!("error setting linger({millis}ms) for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
}
let result = SocketDialogType::default().dialog_loop(&mut socket_connection, &peer, &processor_sender).await;
if let Ok(()) = result {
_ = processor_sender.close(Duration::from_millis(Self::CONST_CONFIG.flush_timeout_millis as u64)).await;
peer.cancel_and_close();
socket_connection.connection_mut()
.flush()
.await
.map_err(|err| format!("error flushing the socket connected to {}:{}: {}", peer.peer_address, peer.peer_id, err))?;
if let Some(state) = peer.take_state().await {
socket_connection.set_state(state);
}
}
result.map(|_| socket_connection)
}
}
#[cfg(any(test,doc))]
pub mod tests {
use super::*;
use crate::unit_test_utils::{next_server_port, TestString, TestStringDeserializer, TestStringSerializer};
use crate::socket_connection::{
socket_dialog::textual_dialog::TextualDialog,
connection_provider::ServerConnectionHandler,
};
use std::{
future,
sync::atomic::{
AtomicBool,
AtomicU32,
Ordering::Relaxed,
},
net::ToSocketAddrs,
};
use std::fmt::Display;
use std::ops::Deref;
use std::time::Instant;
use reactive_mutiny::{prelude::advanced::{UniZeroCopyAtomic, ChannelUniMoveAtomic, ChannelUniZeroCopyAtomic}, types::{ChannelCommon, ChannelUni, ChannelProducer}};
use futures::stream::{self, StreamExt};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
#[cfg(debug_assertions)]
const DEBUG: bool = true;
#[cfg(not(debug_assertions))]
const DEBUG: bool = false;
#[cfg_attr(not(doc),tokio::test)]
async fn sanity_check() {
const DEFAULT_TEST_CONFIG: ConstConfig = ConstConfig {
retrying_strategy: RetryingStrategies::RetryYieldingForUpToMillis(30),
..ConstConfig::default()
};
const DEFAULT_TEST_CONFIG_U64: u64 = DEFAULT_TEST_CONFIG.into();
const DEFAULT_TEST_UNI_INSTRUMENTS: usize = DEFAULT_TEST_CONFIG.executor_instruments.into();
type DefaultTestUni<PayloadType = TestString> = UniZeroCopyAtomic<PayloadType, {DEFAULT_TEST_CONFIG.receiver_channel_size as usize}, 1, DEFAULT_TEST_UNI_INSTRUMENTS>;
type SenderChannel<PayloadType = TestString> = ChannelUniMoveAtomic<PayloadType, {DEFAULT_TEST_CONFIG.sender_channel_size as usize}, 1>;
println!("Original object: {:#?}", DEFAULT_TEST_CONFIG);
println!("Reconverted: {:#?}", ConstConfig::from(DEFAULT_TEST_CONFIG_U64));
assert_eq!(ConstConfig::from(DEFAULT_TEST_CONFIG_U64), DEFAULT_TEST_CONFIG, "Configs don't match");
let uni = DefaultTestUni::new("Can it be instantiated?");
let sender = SenderChannel::new("Can it be instantiated?");
let channel_payload = String::from("Sender Payload");
let (mut stream, _stream_id) = sender.create_stream();
assert!(sender.send(channel_payload.clone()).is_ok(), "`reactive-mutiny`: channel: couldn't send");
assert_eq!(stream.next().await, Some(channel_payload), "`reactive-mutiny`: channel: couldn't receive");
assert_eq!(sender.gracefully_end_all_streams(Duration::from_millis(100)).await, 1, "`reactive-mutiny` Streams should only be reported as 'gracefully ended' after they are consumed to exhaustion");
assert_eq!(stream.next().await, None, "`reactive-mutiny`: channel: couldn't end the Stream");
let sender = ChannelUniZeroCopyAtomic::<String, {DEFAULT_TEST_CONFIG.sender_channel_size as usize}, 1>::new("Please work also...");
let channel_payload = String::from("Sender Payload");
let (mut stream, _stream_id) = sender.create_stream();
assert!(sender.send(channel_payload.clone()).is_ok(), "`reactive-mutiny`: channel: couldn't send");
assert_eq!(stream.next().await.expect("empty"), channel_payload, "`reactive-mutiny`: channel: couldn't receive");
assert_eq!(sender.gracefully_end_all_streams(Duration::from_millis(100)).await, 1, "`reactive-mutiny` Streams should only be reported as 'gracefully ended' after they are consumed to exhaustion");
assert!(stream.next().await.is_none(), "`reactive-mutiny`: channel: couldn't end the Stream");
let uni = uni.spawn_non_futures_non_fallibles_executors(1,
|stream_in| stream_in.inspect(|e| println!("Received {}", e)),
|_| { println!("ENDED"); future::ready(()) });
assert!(uni.send(String::from("delivered?")).is_ok(), "`reactive-mutiny`: Uni: couldn't send");
assert!(uni.close(Duration::from_millis(100)).await, "`reactive-mutiny` Streams being executed by an `Uni` should be reported as 'gracefully ended' as they should be promptly consumed to exhaustion");
let mut connection_provider = ServerConnectionHandler::new("127.0.0.1", next_server_port(), ()).await
.expect("couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("couldn't move the Connection Receiver out of the Connection Provider");
let (returned_connections_sink, _returned_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
let socket_communications_handler = SocketConnectionHandler::<DEFAULT_TEST_CONFIG_U64, TextualDialog<DEFAULT_TEST_CONFIG_U64, TestString, TestString, TestStringSerializer, TestStringDeserializer, DefaultTestUni, SenderChannel, ()>>::new(TextualDialog::default());
socket_communications_handler.server_loop(
"127.0.0.1", 8579, new_connections_source, returned_connections_sink,
|connection_event| async move {
match connection_event {
ProtocolEvent::PeerArrived { .. } => tokio::time::sleep(Duration::from_millis(100)).await,
ProtocolEvent::PeerLeft { .. } => tokio::time::sleep(Duration::from_millis(100)).await,
ProtocolEvent::LocalServiceTermination => tokio::time::sleep(Duration::from_millis(100)).await,
}
},
move |_client_addr, _client_port, _peer, client_messages_stream|
client_messages_stream
).await.expect("Starting the server");
}
pub async fn unresponsive_dialogs<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG, State = ()> + 'static,
DerivedType: Debug + PartialEq<String>>
()
where SocketDialogType::LocalMessages: From<&'static str> + From<String>,
<<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType: std::ops::Deref<Target=DerivedType> {
const LISTENING_INTERFACE: &str = "127.0.0.1";
let port = next_server_port();
let client_secret = String::from("open, sesame");
let server_secret = String::from("now the 40 of you may enter");
let observed_secret = Arc::new(Mutex::new(None));
let (_client_shutdown_sender, client_shutdown_receiver) = tokio::sync::broadcast::channel(1);
let client_secret_clone = client_secret.clone();
let server_secret_clone = server_secret.clone();
let mut connection_provider = ServerConnectionHandler::new(LISTENING_INTERFACE, port, ()).await
.expect("Sanity Check: couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("Sanity Check: couldn't move the Connection Receiver out of the Connection Provider");
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
let (returned_connections_sink, mut server_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
socket_communications_handler.server_loop(
LISTENING_INTERFACE, port, new_connections_source, returned_connections_sink,
|connection_event| {
match connection_event {
ProtocolEvent::PeerArrived { peer } => {
assert!(peer.send(SocketDialogType::LocalMessages::from("Welcome! State your business!")).is_ok(), "couldn't send");
},
ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => {},
ProtocolEvent::LocalServiceTermination => {
println!("Test Server: shutdown was requested... No connection will receive the drop message (nor will be even closed) because I, the lib caller, intentionally didn't keep track of the connected peers for this test!");
}
}
future::ready(())
},
move |_client_addr, _client_port, peer, client_messages_stream| {
let client_secret_clone = client_secret_clone.clone();
let server_secret_clone = server_secret_clone.clone();
client_messages_stream.inspect(move |client_message| {
assert!(peer.send(SocketDialogType::LocalMessages::from(format!("Client just sent {:?}", client_message))).is_ok(), "couldn't send");
if client_message.deref() == &client_secret_clone {
assert!(peer.send(SocketDialogType::LocalMessages::from(server_secret_clone.clone())).is_ok(), "couldn't send");
} else {
panic!("Client sent the wrong secret: {:?} -- I was expecting '{}'", client_message, client_secret_clone);
}
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let client_secret = client_secret.clone();
let observed_secret_clone = Arc::clone(&observed_secret);
let tokio_connection = TcpStream::connect(format!("{}:{}", LISTENING_INTERFACE, port).to_socket_addrs().expect("Error resolving address").next().unwrap()).await.expect("Error connecting");
let socket_connection = SocketConnection::new(tokio_connection, ());
let client_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
tokio::spawn(
client_communications_handler.client(
socket_connection, client_shutdown_receiver,
move |connection_event| {
match connection_event {
ProtocolEvent::PeerArrived { peer } => {
assert!(peer.send(SocketDialogType::LocalMessages::from(client_secret.clone())).is_ok(), "couldn't send");
},
ProtocolEvent::PeerLeft { peer, stream_stats: _ } => {
println!("Test Client: connection with {} (peer_id #{}) was dropped -- should not happen in this test", peer.peer_address, peer.peer_id);
},
ProtocolEvent::LocalServiceTermination => {}
}
future::ready(())
},
move |_client_addr, _client_port, _peer, server_messages_stream| {
let observed_secret_clone = Arc::clone(&observed_secret_clone);
server_messages_stream.then(move |server_message| {
let observed_secret_clone = Arc::clone(&observed_secret_clone);
async move {
println!("Server said: {:?}", server_message);
let _ = observed_secret_clone.lock().await.insert(server_message);
}
})
}
)
);
println!("### Started a client -- which is running concurrently, in the background... it has 100ms to do its thing!");
tokio::time::sleep(Duration::from_millis(100)).await;
server_connections_source.close();
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let locked_observed_secret = observed_secret.lock().await;
let observed_secret = locked_observed_secret.as_ref().expect("Server secret has not been computed");
assert_eq!(observed_secret.deref(), &server_secret, "Communications didn't go according the plan");
}
pub async fn responsive_dialogs<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG, State = ()> + 'static,
DerivedType: Debug + PartialEq<String>>
()
where SocketDialogType::LocalMessages: From<&'static str> + From<String>,
<<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType: std::ops::Deref<Target=DerivedType> {
const LISTENING_INTERFACE: &str = "127.0.0.1";
let port = next_server_port();
let client_secret = String::from("open, sesame");
let server_secret = String::from("now the 40 of you may enter");
let observed_secret = Arc::new(Mutex::new(None));
let (_client_shutdown_sender, client_shutdown_receiver) = tokio::sync::broadcast::channel(1);
let client_secret_clone = client_secret.clone();
let server_secret_clone = server_secret.clone();
let mut connection_provider = ServerConnectionHandler::new(LISTENING_INTERFACE, port, ()).await
.expect("couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("couldn't move the Connection Receiver out of the Connection Provider");
let (returned_connections_sink, mut server_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
socket_communications_handler.server_loop(LISTENING_INTERFACE, port, new_connections_source, returned_connections_sink,
|_connection_event| future::ready(()),
move |_client_addr, _client_port, peer, client_messages_stream| {
let client_secret_clone = client_secret_clone.clone();
let server_secret_clone = server_secret_clone.clone();
assert!(peer.send(SocketDialogType::LocalMessages::from("Welcome! State your business!")).is_ok(), "couldn't send");
client_messages_stream
.flat_map(move |client_message| {
stream::iter([
SocketDialogType::LocalMessages::from(format!("Client just sent {:?}", client_message)),
if client_message.deref() == &client_secret_clone {
SocketDialogType::LocalMessages::from(server_secret_clone.clone())
} else {
panic!("Client sent the wrong secret: {:?} -- I was expecting '{}'", client_message, client_secret_clone);
}])
})
.to_responsive_stream(peer, |_, _| ())
}
).await.expect("ERROR starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let observed_secret_clone = observed_secret.clone();
let tokio_connection = TcpStream::connect(format!("{}:{}", LISTENING_INTERFACE, port).to_socket_addrs().expect("Error resolving address").next().unwrap()).await.expect("Error connecting");
let socket_connection = SocketConnection::new(tokio_connection, ());
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
tokio::spawn(
socket_communications_handler.client(socket_connection, client_shutdown_receiver,
move |_connection_event| future::ready(()),
move |_client_addr, _client_port, peer, server_messages_stream| {
let observed_secret_clone = Arc::clone(&observed_secret_clone);
assert!(peer.send(SocketDialogType::LocalMessages::from(client_secret.clone())).is_ok(), "couldn't send");
server_messages_stream
.then(move |server_message| {
let observed_secret_clone = observed_secret_clone.clone();
println!("Server said: (type {}) {:?}", get_type(&server_message), server_message);
async move {
let _ = observed_secret_clone.lock().await.insert(server_message);
}
})
}
)
);
println!("### Started a client -- which is running concurrently, in the background... it has 100ms to do its thing!");
tokio::time::sleep(Duration::from_millis(100)).await;
server_connections_source.close();
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let locked_observed_secret = observed_secret.lock().await;
let observed_secret = locked_observed_secret.as_ref().expect("Server secret has not been computed");
assert_eq!(observed_secret.deref(), &server_secret, "Communications didn't go according the plan");
}
pub async fn client_termination<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG, State = ()> + 'static,
DerefType: Display>
()
where <<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType: std::ops::Deref<Target=DerefType> {
const LISTENING_INTERFACE: &str = "127.0.0.1";
let port = next_server_port();
let server_disconnected = Arc::new(AtomicBool::new(false));
let server_disconnected_ref = Arc::clone(&server_disconnected);
let client_disconnected = Arc::new(AtomicBool::new(false));
let client_disconnected_ref = Arc::clone(&client_disconnected);
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::broadcast::channel(2);
let mut connection_provider = ServerConnectionHandler::new(LISTENING_INTERFACE, port, ()).await
.expect("couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("couldn't move the Connection Receiver out of the Connection Provider");
let (returned_connections_sink, mut server_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
socket_communications_handler.server_loop(
LISTENING_INTERFACE, port, new_connections_source, returned_connections_sink,
move |connection_event| {
let server_disconnected = Arc::clone(&server_disconnected_ref);
async move {
if let ProtocolEvent::PeerLeft { .. } = connection_event {
server_disconnected.store(true, Relaxed);
}
}
},
move |_client_addr, _client_port, _peer, client_messages_stream| client_messages_stream
).await.expect("ERROR starting the server");
tokio::time::sleep(Duration::from_millis(10)).await;
let tokio_connection = TcpStream::connect(format!("{}:{}", LISTENING_INTERFACE, port).to_socket_addrs().expect("Error resolving address").next().unwrap()).await.expect("Error connecting");
let socket_connection = SocketConnection::new(tokio_connection, ());
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
tokio::spawn(
socket_communications_handler.client(
socket_connection, client_shutdown_receiver,
move |connection_event| {
let client_disconnected = Arc::clone(&client_disconnected_ref);
async move {
if let ProtocolEvent::PeerLeft { .. } = connection_event {
client_disconnected.store(true, Relaxed);
}
}
},
move |_client_addr, _client_port, _peer, server_messages_stream| server_messages_stream
)
);
tokio::time::sleep(Duration::from_millis(1000)).await;
client_shutdown_sender.send(()).expect("sending client shutdown signal");
tokio::time::sleep(Duration::from_millis(1000)).await;
assert!(client_disconnected.load(Relaxed), "Client didn't drop the connection with the server");
assert!(server_disconnected.load(Relaxed), "Server didn't notice the drop in the connection made by the client");
server_connections_source.close(); }
pub async fn latency_measurements<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG, State = ()> + 'static,
DerefType: AsRef<str>>
(tolerance: f64, debug_expected_count: u32, release_expected_count: u32)
where SocketDialogType::LocalMessages: From<&'static str> + From<String>,
<<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType: std::ops::Deref<Target=DerefType> {
const TEST_DURATION_MS: u64 = 2000;
const TEST_DURATION_NS: u64 = TEST_DURATION_MS * 1e6 as u64;
const LISTENING_INTERFACE: &str = "127.0.0.1";
let port = next_server_port();
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::broadcast::channel(1);
let mut connection_provider = ServerConnectionHandler::new(LISTENING_INTERFACE, port, ()).await
.expect("couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("couldn't move the Connection Receiver out of the Connection Provider");
let (returned_connections_sink, mut server_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
socket_communications_handler.server_loop(
LISTENING_INTERFACE, port, new_connections_source, returned_connections_sink,
|_connection_event| async {},
|_listening_interface, _listening_port, peer, client_messages| {
client_messages.inspect(move |client_message| {
let client_message = client_message.as_ref();
let n_str = &client_message[5..(client_message.len() - 1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|err| panic!("could not convert '{}' to number. Original client message: {:?}. Parsing error: {:?}", n_str, client_message, err));
assert!(peer.send(SocketDialogType::LocalMessages::from(format!("Pong({})", n))).is_ok(), "couldn't send");
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = Arc::new(AtomicU32::new(0));
let counter_ref = Arc::clone(&counter);
let tokio_connection = TcpStream::connect(format!("{}:{}", LISTENING_INTERFACE, port).to_socket_addrs().expect("Error resolving address").next().unwrap()).await.expect("Error connecting");
let socket_connection = SocketConnection::new(tokio_connection, ());
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
tokio::spawn(
socket_communications_handler.client(
socket_connection, client_shutdown_receiver,
|connection_event| {
match connection_event {
ProtocolEvent::PeerArrived { peer } => {
assert!(peer.send(SocketDialogType::LocalMessages::from("Ping(0)")).is_ok(), "couldn't send");
},
ProtocolEvent::PeerLeft { .. } => {},
ProtocolEvent::LocalServiceTermination => {},
}
future::ready(())
},
move |_listening_interface, _listening_port, peer, server_messages| {
let counter_ref = Arc::clone(&counter_ref);
server_messages.inspect(move |server_message| {
let server_message = server_message.as_ref();
let n_str = &server_message[5..(server_message.len()-1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|err| panic!("could not convert '{}' to number. Original server message: {:?}. Parsing error: {:?}", n_str, server_message, err));
let current_count = counter_ref.fetch_add(1, Relaxed);
if n != current_count {
panic!("Received {:?}, where Client was expecting 'Pong({})'", server_message, current_count);
}
assert!(peer.send(SocketDialogType::LocalMessages::from(format!("Ping({})", current_count+1))).is_ok(), "couldn't send");
})
}
)
);
println!("### Measuring latency for {TEST_DURATION_MS} milliseconds...");
tokio::time::sleep(Duration::from_millis(TEST_DURATION_MS)).await;
server_connections_source.close(); client_shutdown_sender.send(()).expect("sending client termination signal");
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = counter.load(Relaxed);
let latency = Duration::from_nanos((TEST_DURATION_NS as f64 / (2.0 * counter as f64)) as u64);
println!("Round trips counter: {}", counter);
println!("Measured latency: {:?} ({})", latency, if DEBUG {"Debug mode"} else {"Release mode"});
if DEBUG {
assert!(counter > ((1.0 - tolerance) * debug_expected_count as f64) as u32, "Latency regression detected: we used to make {debug_expected_count} round trips in 2 seconds (Debug mode) -- now only {counter} were made");
} else {
assert!(counter > ((1.0 - tolerance) * release_expected_count as f64) as u32, "Latency regression detected: we used to make {release_expected_count} round trips in 2 seconds (Release mode) -- now only {counter} were made");
}
}
pub async fn message_flooding_throughput<const CONFIG: u64,
SocketDialogType: SocketDialog<CONFIG, State = ()> + 'static,
DerefType: AsRef<str>>
(tolerance: f64, debug_expected_count: u32, release_expected_count: u32)
where SocketDialogType::LocalMessages: From<&'static str> + From<String>,
<<SocketDialogType as SocketDialog<CONFIG>>::ProcessorUni as GenericUni>::DerivedItemType: std::ops::Deref<Target=DerefType> {
const TEST_DURATION_MS: u64 = 2000;
const LISTENING_INTERFACE: &str = "127.0.0.1";
let port = next_server_port();
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::broadcast::channel(1);
let received_messages_count = Arc::new(AtomicU32::new(0));
let unordered = Arc::new(AtomicU32::new(0)); let received_messages_count_ref = Arc::clone(&received_messages_count);
let unordered_ref = Arc::clone(&unordered);
let mut connection_provider = ServerConnectionHandler::new(LISTENING_INTERFACE, port, ()).await
.expect("couldn't start the Connection Provider server event loop");
let new_connections_source = connection_provider.connection_receiver()
.expect("couldn't move the Connection Receiver out of the Connection Provider");
let (returned_connections_sink, mut server_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<()>>(2);
let socket_communications_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
socket_communications_handler.server_loop(
LISTENING_INTERFACE, port, new_connections_source, returned_connections_sink,
|_connection_event| async {},
move |_listening_interface, _listening_port, _peer, client_messages| {
let received_messages_count = Arc::clone(&received_messages_count_ref);
let unordered = Arc::clone(&unordered_ref);
client_messages.inspect(move |client_message| {
let client_message = client_message.as_ref();
let n_str = &client_message[12..(client_message.len()-1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|_| panic!("could not convert '{}' to number. Original message: {:?}", n_str, client_message));
let count = received_messages_count.fetch_add(1, Relaxed);
if count != n && unordered.compare_exchange(0, count, Relaxed, Relaxed).is_ok() {
println!("Server: ERROR: received order of messages broke at message #{}", count);
}
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let sent_messages_count = Arc::new(AtomicU32::new(0));
let sent_messages_count_ref = Arc::clone(&sent_messages_count);
let tokio_connection = TcpStream::connect(format!("{}:{}", LISTENING_INTERFACE, port).to_socket_addrs().expect("Error resolving address").next().unwrap()).await.expect("Error connecting");
let socket_connection = SocketConnection::new(tokio_connection, ());
let socket_connection_handler = SocketConnectionHandler::<CONFIG, SocketDialogType>::new(SocketDialogType::default());
tokio::spawn(
socket_connection_handler.client(
socket_connection,
client_shutdown_receiver,
move |connection_event| {
let sent_messages_count = Arc::clone(&sent_messages_count_ref);
match connection_event {
ProtocolEvent::PeerArrived { peer } => {
tokio::spawn(async move {
let start = Instant::now();
let mut n = 0;
loop {
let send_result = peer.send_async(SocketDialogType::LocalMessages::from(format!("DoNotAnswer({})", n))).await;
assert!(send_result.is_ok(), "couldn't send: {:?}", send_result.unwrap_err());
n += 1;
if n % (1<<15) == 0 && start.elapsed().as_millis() as u64 >= TEST_DURATION_MS {
println!("Client sent {} messages before bailing out", n);
sent_messages_count.store(n, Relaxed);
assert_eq!(peer.flush_and_close(Duration::from_secs(1)).await, 0, "couldn't flush!");
break;
}
}
});
},
ProtocolEvent::PeerLeft { .. } => {},
ProtocolEvent::LocalServiceTermination => {},
}
future::ready(())
},
move |_listening_interface, _listening_port, _peer, server_messages| {
server_messages
}
)
);
println!("### Measuring latency for 2 seconds...");
tokio::time::sleep(Duration::from_millis((TEST_DURATION_MS as f64 * 1.01) as u64)).await; client_shutdown_sender.send(()).expect("sending client termination signal");
server_connections_source.close();
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(100)).await;
println!("### Server saw:");
let received_messages_count = received_messages_count.load(Relaxed);
let unordered = unordered.load(Relaxed);
let sent_messages_count = sent_messages_count.load(Relaxed);
let received_messages_percent = 100.0 * (received_messages_count as f64 / sent_messages_count as f64);
println!(" {} received messages {}", received_messages_count, if unordered == 0 {"in order".into()} else {format!("unordered -- ordering broke at message #{}", unordered)});
println!(" {:.2}% of sent ones", received_messages_percent);
assert_eq!(unordered, 0, "Server should have received messages in order, but it was broken at message #{} -- total received was {}", unordered, received_messages_count);
assert!(received_messages_percent >= 99.99, "Client flooding regression detected: the server used to receive 100% of the sent messages -- now only {:.2}% made it through", received_messages_percent);
if DEBUG {
assert!(received_messages_count > ((1.0 - tolerance) * debug_expected_count as f64) as u32, "Client flooding throughput regression detected: we used to send/receive {debug_expected_count} flood messages in this test (Debug mode) -- now only {received_messages_count} were made");
} else {
assert!(received_messages_count > ((1.0 - tolerance) * release_expected_count as f64) as u32, "Client flooding throughput regression detected: we used to send/receive {release_expected_count} flood messages in this test (Release mode) -- now only {received_messages_count} were made");
}
}
fn get_type<Type>(_val: &Type) -> String {
format!("{}", std::any::type_name::<Type>())
}
}