use crate::ReactiveMessagingSerializer;
use super::{
config::*,
types::*,
prelude::ProcessorRemoteStreamType,
serde::ReactiveMessagingDeserializer,
};
use std::{
fmt::Debug,
future,
future::Future,
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicU32, Ordering::Relaxed},
},
time::Duration,
fmt::Formatter,
net::{IpAddr, Ipv4Addr},
str::FromStr,
};
use reactive_mutiny::prelude::advanced::{ChannelCommon, ChannelUni, ChannelProducer};
use futures::{StreamExt, Stream};
use tokio::{
io::{self,AsyncReadExt,AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use log::{trace, debug, warn, error};
#[inline(always)]
pub async fn server_loop_for_unresponsive_text_protocol<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
ClientMessages: ReactiveMessagingDeserializer<ClientMessages> + Send + Sync + PartialEq + Debug + 'static,
ServerMessages: ReactiveMessagingSerializer<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
PipelineOutputType: Send + Sync + Debug + 'static,
OutputStreamType: Stream<Item=PipelineOutputType> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()> + Send,
ConnectionEventsCallback: Fn(ConnectionEvent<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(String, u16, Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>>, ProcessorRemoteStreamType<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>) -> OutputStreamType + Send + Sync + 'static>
(listening_interface: String,
listening_port: u16,
mut shutdown_signaler: tokio::sync::oneshot::Receiver<u32>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let connection_events_callback = Arc::new(connection_events_callback);
let listener = TcpListener::bind(&format!("{}:{}", listening_interface, listening_port)).await?;
tokio::spawn( async move {
loop {
let (socket, addr) = if let Some(accepted_connection) = tokio::select! {
acceptance_result = listener.accept() => {
if let Err(err) = acceptance_result {
error!("PROBLEM while accepting a connection: {:?}", err);
None
} else {
Some(acceptance_result.unwrap())
}
}
result = &mut shutdown_signaler => {
let timeout_ms = match result {
Ok(timeout_millis) => {
trace!("SocketServer: SHUTDOWN requested for server @ {listening_interface}:{listening_port} -- with timeout {}ms -- bailing out from the network loop", timeout_millis);
timeout_millis
},
Err(err) => {
error!("SocketServer: PROBLEM in the `shutdown signaler` for server @ {listening_interface}:{listening_port} (a server shutdown will be commanded now due to this occurrence): {:?}", err);
5000 },
};
connection_events_callback(ConnectionEvent::ApplicationShutdown {timeout_ms}).await;
break
}
} {
accepted_connection
} else {
continue
};
let sender = SenderChannel::<ServerMessages, BUFFERED_MESSAGES_PER_PEER_COUNT>::new(format!("Sender for client {addr}"));
let peer = Arc::new(Peer::new(sender, addr));
let peer_ref1 = Arc::clone(&peer);
let peer_ref2 = Arc::clone(&peer);
connection_events_callback(ConnectionEvent::PeerConnected {peer: peer.clone()}).await;
let (client_ip, client_port) = match addr {
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
};
let connection_events_callback_ref = Arc::clone(&connection_events_callback);
let processor_sender = SocketProcessorUniType::<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>
::new(format!("Server processor for remote client {addr} @ {listening_interface}:{listening_port}"))
.spawn_non_futures_non_fallibles_executors(1,
|in_stream| dialog_processor_builder_fn(client_ip.clone(), client_port, peer_ref1.clone(), in_stream),
move |executor| {
futures::executor::block_on(connection_events_callback_ref(ConnectionEvent::PeerDisconnected { peer: peer_ref2, stream_stats: executor }));
future::ready(())
});
tokio::spawn(tokio::task::unconstrained(dialog_loop_for_textual_protocol(socket, peer, processor_sender)));
}
debug!("SocketServer: bailing out of network loop -- we should be undergoing a shutdown...");
});
Ok(())
}
#[inline(always)]
pub async fn client_for_unresponsive_text_protocol<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
ClientMessages: ReactiveMessagingSerializer<ClientMessages> + Send + Sync + PartialEq + Debug + 'static,
ServerMessages: ReactiveMessagingDeserializer<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
PipelineOutputType: Send + Sync + Debug + 'static,
OutputStreamType: Stream<Item=PipelineOutputType> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()>,
ConnectionEventsCallback: Fn(ConnectionEvent<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(String, u16, Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>>, ProcessorRemoteStreamType<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>) -> OutputStreamType>
(server_ipv4_addr: String,
port: u16,
shutdown_signaler: tokio::sync::oneshot::Receiver<u32>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(server_ipv4_addr.as_str())?), port);
let socket = TcpStream::connect(addr).await?;
let sender = SenderChannel::<ClientMessages, BUFFERED_MESSAGES_PER_PEER_COUNT>::new(format!("Sender for client {addr}"));
let peer = Arc::new(Peer::new(sender, addr));
let peer_ref1 = Arc::clone(&peer);
let peer_ref2 = Arc::clone(&peer);
let peer_ref3 = Arc::clone(&peer);
connection_events_callback(ConnectionEvent::PeerConnected {peer: peer.clone()}).await;
let processor_sender = SocketProcessorUniType::<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>
::new(format!("Client Processor for remote server @ {addr}"))
.spawn_non_futures_non_fallibles_executors(1,
|in_stream| dialog_processor_builder_fn(server_ipv4_addr.clone(), port, peer_ref1.clone(), in_stream),
move |executor| {
futures::executor::block_on(connection_events_callback(ConnectionEvent::PeerDisconnected { peer: peer_ref2, stream_stats: executor }));
future::ready(())
});
tokio::spawn(tokio::task::unconstrained(dialog_loop_for_textual_protocol(socket, peer, processor_sender)));
tokio::spawn(async move {
let timeout_ms = match shutdown_signaler.await {
Ok(timeout_millis) => {
trace!("reactive-messaging: SHUTDOWN requested for client connected to server @ {server_ipv4_addr}:{port} -- with timeout {}ms: notifying & dropping the connection", timeout_millis);
timeout_millis
},
Err(err) => {
error!("reactive-messaging: PROBLEM in the `shutdown signaler` client connected to server @ {server_ipv4_addr}:{port} (a client shutdown will be commanded now due to this occurrence): {:?}", err);
5000 },
};
peer_ref3.sender.gracefully_end_all_streams(Duration::from_millis(timeout_ms as u64)).await;
});
Ok(())
}
#[inline(always)]
async fn dialog_loop_for_textual_protocol<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static>
(mut textual_socket: TcpStream,
peer: Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, LocalMessages>>,
processor_uni: Arc<SocketProcessorUniType<BUFFERED_MESSAGES_PER_PEER_COUNT, RemoteMessages>>)
-> Result<(), Box<dyn std::error::Error + Sync + Send>> {
textual_socket.set_nodelay(true).map_err(|err| format!("error setting nodelay() for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
textual_socket.set_ttl(30).map_err(|err| format!("error setting ttl(30) for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
textual_socket.set_linger(Some(Duration::from_secs(3))).map_err(|err| format!("error setting linger(Some(Duration::from_secs(3))) for the socket connected at {}:{}: {err}", peer.peer_address, peer.peer_id))?;
let mut read_buffer = Vec::with_capacity(CHAT_MSG_SIZE_HINT);
let mut serialization_buffer = Vec::with_capacity(CHAT_MSG_SIZE_HINT);
let (mut sender_stream, _) = peer.sender.create_stream();
'connection: loop {
tokio::select!(
biased; to_send = sender_stream.next() => {
match to_send {
Some(to_send) => {
LocalMessages::serialize(&to_send, &mut serialization_buffer);
serialization_buffer.push(b'\n');
if let Err(err) = textual_socket.write_all(&serialization_buffer).await {
warn!("reactive-messaging: PROBLEM in the connection with {:#?} while WRITING: '{:?}' -- dropping it", peer, err);
peer.sender.cancel_all_streams();
break 'connection
}
},
None => {
debug!("reactive-messaging: Sender for {:#?} ended (most likely, .cancel_all_streams() was called on the `peer` by the processor.", peer);
break 'connection
}
}
},
read = textual_socket.read_buf(&mut read_buffer) => {
match read {
Ok(n) if n > 0 => {
let mut next_line_index = 0;
let mut this_line_search_start = read_buffer.len() - n;
loop {
if let Some(mut eol_pos) = read_buffer[next_line_index+this_line_search_start..].iter().position(|&b| b == b'\n') {
eol_pos += next_line_index+this_line_search_start;
let line_bytes = &read_buffer[next_line_index..eol_pos];
match RemoteMessages::deserialize(line_bytes) {
Ok(client_message) => {
if !processor_uni.try_send(|slot| unsafe { std::ptr::write(slot, client_message) }) {
let msg = format!("Input message ignored: local processor is too busy -- `dialog_processor` is full of unprocessed messages ({}/{}) while attempting to enqueue another message from {:?} (peer id {})",
processor_uni.channel.pending_items_count(), processor_uni.channel.buffer_size(), peer.peer_address, peer.peer_id);
peer.sender.try_send(|slot| unsafe { std::ptr::write(slot, LocalMessages::processor_error_message(msg.clone())) });
error!("reactive-messaging: {}", msg);
}
},
Err(err) => {
let stripped_line = String::from_utf8_lossy(line_bytes);
let error_message = format!("Unknown command received from {:?} (peer id {}): '{}': {}",
peer.peer_address, peer.peer_id, stripped_line, err);
warn!("reactive-messaging: {error_message}");
let outgoing_error = LocalMessages::processor_error_message(error_message);
let sent = peer.sender.try_send(|slot| unsafe { std::ptr::write(slot, outgoing_error) });
if !sent {
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
next_line_index = eol_pos + 1;
this_line_search_start = 0;
if next_line_index >= read_buffer.len() {
read_buffer.clear();
break
}
} else {
if next_line_index > 0 {
read_buffer.drain(0..next_line_index);
}
break
}
}
},
Ok(_) => {
warn!("reactive-messaging: PROBLEM with reading from {:?} (peer id {}) -- it is out of bytes! Dropping the connection", peer.peer_address, peer.peer_id);
peer.sender.cancel_all_streams();
break 'connection
},
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {},
Err(err) => {
error!("reactive-messaging: ERROR in the connection with {:?} (peer id {}) while READING: '{:?}' -- dropping it", peer.peer_address, peer.peer_id, err);
break 'connection
},
}
},
);
}
let _ = processor_uni.close(GRACEFUL_STREAM_ENDING_TIMEOUT_DURATION).await;
textual_socket.flush().await.map_err(|err| format!("error flushing the textual socket connected to {}:{}: {}", peer.peer_address, peer.peer_id, err))?;
textual_socket.shutdown().await.map_err(|err| format!("error flushing the textual socket connected to {}:{}: {}", peer.peer_address, peer.peer_id, err))?;
Ok(())
}
#[inline(always)]
pub async fn server_loop_for_responsive_text_protocol<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
ClientMessages: ReactiveMessagingDeserializer<ClientMessages> + Send + Sync + PartialEq + Debug + 'static,
ServerMessages: ReactiveMessagingSerializer<ServerMessages> +
ResponsiveMessages<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
OutputStreamType: Stream<Item=ServerMessages> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()> + Send,
ConnectionEventsCallback: Fn(ConnectionEvent<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(String, u16, Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>>, ProcessorRemoteStreamType<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>) -> OutputStreamType + Send + Sync + 'static>
(listening_interface: String,
listening_port: u16,
shutdown_signaler: tokio::sync::oneshot::Receiver<u32>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let dialog_processor_builder_fn = move |client_addr, connected_port, peer, client_messages_stream| {
let dialog_processor_stream = dialog_processor_builder_fn(client_addr, connected_port, Arc::clone(&peer), client_messages_stream);
to_responsive_stream(peer, dialog_processor_stream)
};
server_loop_for_unresponsive_text_protocol(listening_interface.clone(), listening_port, shutdown_signaler, connection_events_callback, dialog_processor_builder_fn).await
.map_err(|err| Box::from(format!("error when starting server @ {listening_interface}:{listening_port} with `server_loop_for_unresponsive_text_protocol()`: {err}")))
}
#[inline(always)]
pub async fn client_for_responsive_text_protocol<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
ClientMessages: ReactiveMessagingSerializer<ClientMessages> +
ResponsiveMessages<ClientMessages> + Send + Sync + PartialEq + Debug + 'static,
ServerMessages: ReactiveMessagingDeserializer<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
OutputStreamType: Stream<Item=ClientMessages> + Send + 'static,
ConnectionEventsCallbackFuture: Future<Output=()>,
ConnectionEventsCallback: Fn(ConnectionEvent<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
ProcessorBuilderFn: Fn(String, u16, Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, ClientMessages>>, ProcessorRemoteStreamType<BUFFERED_MESSAGES_PER_PEER_COUNT, ServerMessages>) -> OutputStreamType>
(server_ipv4_addr: String,
port: u16,
shutdown_signaler: tokio::sync::oneshot::Receiver<u32>,
connection_events_callback: ConnectionEventsCallback,
dialog_processor_builder_fn: ProcessorBuilderFn)
-> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let dialog_processor_builder_fn = |server_addr, port, peer, server_messages_stream| {
let dialog_processor_stream = dialog_processor_builder_fn(server_addr, port, Arc::clone(&peer), server_messages_stream);
to_responsive_stream(peer, dialog_processor_stream)
};
client_for_unresponsive_text_protocol(server_ipv4_addr.clone(), port, shutdown_signaler, connection_events_callback, dialog_processor_builder_fn).await
.map_err(|err| Box::from(format!("error when starting client for server @ {server_ipv4_addr}:{port} with `client_for_unresponsive_text_protocol()`: {err}")))
}
#[inline(always)]
fn to_responsive_stream<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> +
ResponsiveMessages<LocalMessages> + Send + Sync + PartialEq + Debug + 'static>
(peer: Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, LocalMessages>>,
request_processor_stream: impl Stream<Item = LocalMessages>)
-> impl Stream<Item = ()> {
request_processor_stream
.map(move |outgoing| {
let is_disconnect = LocalMessages::is_disconnect_message(&outgoing);
let is_no_answer = LocalMessages::is_no_answer_message(&outgoing);
if !is_disconnect && !is_no_answer {
trace!("Sending Answer `{:?}` to {:?} (peer id {})", outgoing, peer.peer_address, peer.peer_id);
let sent = peer.sender.try_send(|slot| unsafe { std::ptr::write(slot, outgoing) } );
if sent {
} else {
warn!("Slow reader detected -- {:?} (peer id {}). Closing the connection...", peer.peer_address, peer.peer_id);
peer.sender.cancel_all_streams();
}
} else if is_disconnect {
trace!("SocketServer: processor choose to drop connection with {} (peer id {}): '{:?}'", peer.peer_address, peer.peer_id, outgoing);
if !is_no_answer {
let _sent = peer.sender.try_send(|slot| unsafe { std::ptr::write(slot, outgoing) } );
}
peer.sender.cancel_all_streams();
}
})
}
#[inline(always)]
fn _to_responsive_stream_of_fallibles<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> +
ResponsiveMessages<LocalMessages> + Send + Sync + PartialEq + Debug + 'static>
(peer: Arc<Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, LocalMessages>>,
request_processor_stream: impl Stream<Item = Result<LocalMessages, Box<dyn std::error::Error + Sync + Send>>>)
-> impl Stream<Item = ()> {
let peer_ref1 = peer;
let peer_ref2 = Arc::clone(&peer_ref1);
let request_processor_stream = request_processor_stream
.map(move |processor_response| {
match processor_response {
Ok(outgoing) => {
outgoing
},
Err(err) => {
let err_string = format!("{:?}", err);
error!("SocketServer: processor connected with {} (peer id {}) yielded an error: {}", peer_ref1.peer_address, peer_ref1.peer_id, err_string);
LocalMessages::processor_error_message(err_string)
},
}
});
to_responsive_stream(peer_ref2, request_processor_stream)
}
static PEER_COUNTER: AtomicU32 = AtomicU32::new(0);
pub type PeerId = u32;
pub struct Peer<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
MessagesType: 'static + Send + Sync + PartialEq + Debug + ReactiveMessagingSerializer<MessagesType>> {
pub peer_id: PeerId,
pub sender: Arc<SenderChannel<MessagesType, BUFFERED_MESSAGES_PER_PEER_COUNT>>,
pub peer_address: SocketAddr,
}
impl<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
MessagesType: 'static + Send + Sync + PartialEq + Debug + ReactiveMessagingSerializer<MessagesType>>
Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, MessagesType> {
pub fn new(sender: Arc<SenderChannel<MessagesType, BUFFERED_MESSAGES_PER_PEER_COUNT>>, peer_address: SocketAddr) -> Self {
Self {
peer_id: PEER_COUNTER.fetch_add(1, Relaxed),
sender,
peer_address,
}
}
}
impl<const BUFFERED_MESSAGES_PER_PEER_COUNT: usize,
MessagesType: 'static + Send + Sync + PartialEq + Debug + ReactiveMessagingSerializer<MessagesType>>
Debug for
Peer<BUFFERED_MESSAGES_PER_PEER_COUNT, MessagesType> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Peer {{peer_id: {}, peer_address: '{}', sender: {}/{} pending messages}}",
self.peer_id, self.peer_address, self.sender.pending_items_count(), BUFFERED_MESSAGES_PER_PEER_COUNT)
}
}
pub trait ResponsiveMessages<LocalPeerMessages: ResponsiveMessages<LocalPeerMessages> + Send + PartialEq + Debug> {
fn is_disconnect_message(processor_answer: &LocalPeerMessages) -> bool;
fn is_no_answer_message(processor_answer: &LocalPeerMessages) -> bool;
}
#[cfg(any(test,doc))]
mod tests {
use futures::stream;
use tokio::sync::Mutex;
use super::*;
use std::{
time::SystemTime, sync::atomic::AtomicBool,
};
#[cfg(debug_assertions)]
const DEBUG: bool = true;
#[cfg(not(debug_assertions))]
const DEBUG: bool = false;
#[ctor::ctor]
fn suite_setup() {
simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
}
#[cfg_attr(not(doc),tokio::test)]
async fn unresponsive_dialogs() {
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT : u16 = 8570;
let client_secret = String::from("open, sesame");
let server_secret = String::from("now the 40 of you may enter");
let observed_secret = Arc::new(Mutex::new(None));
let (server_shutdown_sender, server_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let (_client_shutdown_sender, client_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let client_secret_ref = client_secret.clone();
let server_secret_ref = server_secret.clone();
server_loop_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT, server_shutdown_receiver,
|connection_event| {
match connection_event {
ConnectionEvent::PeerConnected { peer } => {
assert!(peer.sender.try_send_movable(String::from("Welcome! State your business!")), "couldn't send");
},
ConnectionEvent::PeerDisconnected { peer: _, stream_stats: _ } => {},
ConnectionEvent::ApplicationShutdown { timeout_ms } => {
println!("Test Server: shutdown was requested ({timeout_ms}ms timeout)... No connection will receive the drop message (nor will be even closed) because I, the lib caller, intentionally didn't keep track of the connected peers for this test!");
}
}
future::ready(())
},
move |_client_addr, _client_port, peer, client_messages_stream| {
let client_secret_ref = client_secret_ref.clone();
let server_secret_ref = server_secret_ref.clone();
client_messages_stream.inspect(move |client_message| {
assert!(peer.sender.try_send_movable(format!("Client just sent '{}'", client_message)), "couldn't send");
if *client_message == client_secret_ref {
assert!(peer.sender.try_send_movable(server_secret_ref.clone()), "couldn't send");
} else {
panic!("Client sent the wrong secret: '{}' -- I was expecting '{}'", client_message, client_secret_ref);
}
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let client_secret = client_secret.clone();
let observed_secret_ref = Arc::clone(&observed_secret);
client_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT, client_shutdown_receiver,
move |connection_event| {
match connection_event {
ConnectionEvent::PeerConnected { peer } => {
assert!(peer.sender.try_send_movable(client_secret.clone()), "couldn't send");
},
ConnectionEvent::PeerDisconnected { peer, stream_stats: _ } => {
println!("Test Client: connection with {} (peer_id #{}) was dropped -- should not happen in this test", peer.peer_address, peer.peer_id);
},
ConnectionEvent::ApplicationShutdown { timeout_ms: _ } => {}
}
future::ready(())
},
move |_client_addr, _client_port, _peer, server_messages_stream: ProcessorRemoteStreamType<1024, String>| {
let observed_secret_ref = Arc::clone(&observed_secret_ref);
server_messages_stream.then(move |server_message| {
let observed_secret_ref = Arc::clone(&observed_secret_ref);
async move {
println!("Server said: '{}'", server_message);
let _ = observed_secret_ref.lock().await.insert(server_message);
}
})
}
).await.expect("Starting the client");
println!("### Started a client -- which is running concurrently, in the background... it has 100ms to do its thing!");
tokio::time::sleep(Duration::from_millis(100)).await;
server_shutdown_sender.send(500).expect("sending server shutdown signal");
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let locked_observed_secret = observed_secret.lock().await;
let observed_secret = locked_observed_secret.as_ref().expect("Server secret has not been computed");
assert_eq!(*observed_secret, server_secret, "Communications didn't go according the plan");
}
#[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
#[ignore] async fn latency_measurements() {
const TEST_DURATION_MS: u64 = 2000;
const TEST_DURATION_NS: u64 = TEST_DURATION_MS * 1e6 as u64;
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT : u16 = 8571;
let (server_shutdown_sender, server_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
server_loop_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT, server_shutdown_receiver,
|_connection_event| async {},
|_listening_interface, _listening_port, peer, client_messages: ProcessorRemoteStreamType<1024, String>| {
client_messages.inspect(move |client_message| {
let n_str = &client_message[5..(client_message.len() - 1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|err| panic!("could not convert '{}' to number. Original client message: '{}'. Parsing error: {:?}", n_str, client_message, err));
assert!(peer.sender.try_send_movable(format!("Pong({})", n)), "couldn't send");
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = Arc::new(AtomicU32::new(0));
let counter_ref = Arc::clone(&counter);
client_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT, client_shutdown_receiver,
|connection_event| {
match connection_event {
ConnectionEvent::PeerConnected { peer } => {
assert!(peer.sender.try_send_movable(String::from("Ping(0)")), "couldn't send");
},
ConnectionEvent::PeerDisconnected { .. } => {},
ConnectionEvent::ApplicationShutdown { .. } => {},
}
future::ready(())
},
move |_listening_interface, _listening_port, peer, server_messages: ProcessorRemoteStreamType<1024, String>| {
let counter_ref = Arc::clone(&counter_ref);
server_messages.inspect(move |server_message| {
let n_str = &server_message[5..(server_message.len()-1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|err| panic!("could not convert '{}' to number. Original server message: '{}'. Parsing error: {:?}", n_str, server_message, err));
let current_count = counter_ref.fetch_add(1, Relaxed);
if n != current_count {
panic!("Received '{}', where Client was expecting 'Pong({})'", server_message, current_count);
}
assert!(peer.sender.try_send_movable(format!("Ping({})", current_count+1)), "couldn't send");
})
}
).await.expect("Starting the client");
println!("### Measuring latency for {TEST_DURATION_MS} milliseconds...");
tokio::time::sleep(Duration::from_millis(TEST_DURATION_MS)).await;
server_shutdown_sender.send(500).expect("sending server shutdown signal");
client_shutdown_sender.send(500).expect("sending client shutdown signal");
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let counter = counter.load(Relaxed);
let latency = Duration::from_nanos((TEST_DURATION_NS as f64 / (2.0 * counter as f64)) as u64);
println!("Round trips counter: {}", counter);
println!("Measured latency: {:?} ({})", latency, if DEBUG {"Debug mode"} else {"Release mode"});
if DEBUG {
assert!(counter > 30000, "Latency regression detected: we used to make 36690 round trips in 2 seconds (Debug mode) -- now only {} were made", counter);
} else {
assert!(counter > 200000, "Latency regression detected: we used to make 276385 round trips in 2 seconds (Release mode) -- now only {} were made", counter);
}
}
#[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
#[ignore] async fn message_flooding_throughput() {
const TEST_DURATION_MS: u64 = 2000;
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT : u16 = 8572;
let (server_shutdown_sender, server_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let received_messages_count = Arc::new(AtomicU32::new(0));
let unordered = Arc::new(AtomicU32::new(0)); let received_messages_count_ref = Arc::clone(&received_messages_count);
let unordered_ref = Arc::clone(&unordered);
server_loop_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT, server_shutdown_receiver,
|_connection_event: ConnectionEvent<1024, String>| async {},
move |_listening_interface, _listening_port, _peer, client_messages: ProcessorRemoteStreamType<1024, String>| {
let received_messages_count = Arc::clone(&received_messages_count_ref);
let unordered = Arc::clone(&unordered_ref);
client_messages.inspect(move |client_message| {
let n_str = &client_message[12..(client_message.len()-1)];
let n = str::parse::<u32>(n_str).unwrap_or_else(|_| panic!("could not convert '{}' to number. Original message: '{}'", n_str, client_message));
let count = received_messages_count.fetch_add(1, Relaxed);
if count != n && unordered.compare_exchange(0, count, Relaxed, Relaxed).is_ok() {
println!("Server: ERROR: received order of messages broke at message #{}", count);
}
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let sent_messages_count = Arc::new(AtomicU32::new(0));
let sent_messages_count_ref = Arc::clone(&sent_messages_count);
client_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>
(LISTENING_INTERFACE.to_string(), PORT,
client_shutdown_receiver,
move |connection_event| {
let sent_messages_count = Arc::clone(&sent_messages_count_ref);
match connection_event {
ConnectionEvent::PeerConnected { peer } => {
tokio::spawn(async move {
let start = SystemTime::now();
let mut n = 0;
loop {
assert!(peer.sender.try_send_movable(format!("DoNotAnswer({})", n)), "couldn't send");
n += 1;
if n % (1<<10) == 0 {
peer.sender.flush(Duration::from_millis(50)).await;
if start.elapsed().unwrap().as_millis() as u64 >= TEST_DURATION_MS {
println!("Client sent {} messages before bailing out", n);
sent_messages_count.store(n, Relaxed);
break;
}
}
}
});
},
ConnectionEvent::PeerDisconnected { .. } => {},
ConnectionEvent::ApplicationShutdown { .. } => {},
}
future::ready(())
},
move |_listening_interface, _listening_port, _peer, server_messages: ProcessorRemoteStreamType<1024, String>| {
server_messages
}
).await.expect("Starting the client");
println!("### Measuring latency for 2 seconds...");
tokio::time::sleep(Duration::from_millis(TEST_DURATION_MS)).await;
server_shutdown_sender.send(500).expect("sending server shutdown signal");
client_shutdown_sender.send(500).expect("sending client shutdown signal");
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(1000)).await;
println!("### Server saw:");
let received_messages_count = received_messages_count.load(Relaxed);
let unordered = unordered.load(Relaxed);
let sent_messages_count = sent_messages_count.load(Relaxed);
let received_messages_percent = 100.0 * (received_messages_count as f64 / sent_messages_count as f64);
println!(" {} received messages {}", received_messages_count, if unordered == 0 {"in order".into()} else {format!("unordered -- ordering broke at message #{}", unordered)});
println!(" {:.2}% of sent ones", received_messages_percent);
assert_eq!(unordered, 0, "Server should have received messages in order, but it was broken at message #{} -- total received was {}", unordered, received_messages_count);
assert!(received_messages_percent >= 99.99, "Client flooding regression detected: the server used to receive 100% of the sent messages -- now only {:.2}% made it through", received_messages_percent);
if DEBUG {
assert!(received_messages_count > 400000, "Client flooding throughput regression detected: we used to send/receive 451584 flood messages in this test (Debug mode) -- now only {} were made", received_messages_count);
} else {
assert!(received_messages_count > 400000, "Client flooding throughput regression detected: we used to send/receive 500736 flood messages in this test (Release mode) -- now only {} were made", received_messages_count);
}
}
#[cfg_attr(not(doc),tokio::test)]
async fn responsive_dialogs() {
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT : u16 = 8573;
let client_secret = String::from("open, sesame");
let server_secret = String::from("now the 40 of you may enter");
let observed_secret = Arc::new(Mutex::new(None));
let (server_shutdown_sender, server_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let (_client_shutdown_sender, client_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let client_secret_ref = client_secret.clone();
let server_secret_ref = server_secret.clone();
server_loop_for_responsive_text_protocol::<1024, _, _, _, _, _, _>(LISTENING_INTERFACE.to_string(), PORT, server_shutdown_receiver,
|_connection_event| future::ready(()),
move |_client_addr, _client_port, peer, client_messages_stream| {
let client_secret_ref = client_secret_ref.clone();
let server_secret_ref = server_secret_ref.clone();
assert!(peer.sender.try_send_movable(String::from("Welcome! State your business!")), "couldn't send");
client_messages_stream.flat_map(move |client_message: SocketProcessorDerivedType<1024, String>| {
stream::iter([
format!("Client just sent '{}'", client_message),
if *client_message == client_secret_ref {
server_secret_ref.clone()
} else {
panic!("Client sent the wrong secret: '{}' -- I was expecting '{}'", client_message, client_secret_ref);
}])
})
}
).await.expect("Starting the server");
println!("### Waiting a little for the server to start...");
tokio::time::sleep(Duration::from_millis(10)).await;
let client_secret = client_secret.clone();
let observed_secret_ref = Arc::clone(&observed_secret);
client_for_responsive_text_protocol::<1024, _, _, _, _, _, _>(LISTENING_INTERFACE.to_string(), PORT, client_shutdown_receiver,
move |_connection_event| future::ready(()),
move |_client_addr, _client_port, peer, server_messages_stream: ProcessorRemoteStreamType<1024, String>| {
let observed_secret_ref = Arc::clone(&observed_secret_ref);
assert!(peer.sender.try_send_movable(client_secret.clone()), "couldn't send");
server_messages_stream
.then(move |server_message| {
let observed_secret_ref = Arc::clone(&observed_secret_ref);
async move {
println!("Server said: '{}'", server_message);
let _ = observed_secret_ref.lock().await.insert(server_message);
}
})
.map(|_server_message| ".".to_string())
}
).await.expect("Starting the client");
println!("### Started a client -- which is running concurrently, in the background... it has 100ms to do its thing!");
tokio::time::sleep(Duration::from_millis(100)).await;
server_shutdown_sender.send(500).expect("sending server shutdown signal");
println!("### Waiting a little for the shutdown signal to reach the server...");
tokio::time::sleep(Duration::from_millis(10)).await;
let locked_observed_secret = observed_secret.lock().await;
let observed_secret = locked_observed_secret.as_ref().expect("Server secret has not been computed");
assert_eq!(*observed_secret, server_secret, "Communications didn't go according the plan");
}
#[cfg_attr(not(doc),tokio::test)]
async fn client_shutdown() {
const LISTENING_INTERFACE: &str = "127.0.0.1";
const PORT : u16 = 8574;
let server_disconnected = Arc::new(AtomicBool::new(false));
let server_disconnected_ref = Arc::clone(&server_disconnected);
let client_disconnected = Arc::new(AtomicBool::new(false));
let client_disconnected_ref = Arc::clone(&client_disconnected);
let (server_shutdown_sender, server_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
let (client_shutdown_sender, client_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
server_loop_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>(LISTENING_INTERFACE.to_string(), PORT, server_shutdown_receiver,
move |connection_event| {
let server_disconnected = Arc::clone(&server_disconnected_ref);
async move {
if let ConnectionEvent::PeerDisconnected { .. } = connection_event {
server_disconnected.store(true, Relaxed);
}
}
},
move |_client_addr, _client_port, _peer: Arc<Peer<1024, String>>, client_messages_stream: ProcessorRemoteStreamType<1024, String>| client_messages_stream
).await.expect("Starting the server");
tokio::time::sleep(Duration::from_millis(10)).await;
client_for_unresponsive_text_protocol::<1024, _, _, _, _, _, _, _>(LISTENING_INTERFACE.to_string(), PORT, client_shutdown_receiver,
move |connection_event| {
let client_disconnected = Arc::clone(&client_disconnected_ref);
async move {
if let ConnectionEvent::PeerDisconnected { .. } = connection_event {
client_disconnected.store(true, Relaxed);
}
}
},
move |_client_addr, _client_port, _peer: Arc<Peer<1024, String>>, server_messages_stream: ProcessorRemoteStreamType<1024, String>| server_messages_stream
).await.expect("Starting the client");
tokio::time::sleep(Duration::from_millis(100)).await;
client_shutdown_sender.send(50).expect("sending client shutdown signal");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(client_disconnected.load(Relaxed), "Client didn't drop the connection with the server");
assert!(server_disconnected.load(Relaxed), "Server didn't notice the drop in the connection made by the client");
server_shutdown_sender.send(50).expect("sending server shutdown signal");
}
impl ReactiveMessagingSerializer<String> for String {
#[inline(always)]
fn serialize(message: &String, buffer: &mut Vec<u8>) {
buffer.clear();
buffer.extend_from_slice(message.as_bytes());
}
#[inline(always)]
fn processor_error_message(err: String) -> String {
let msg = format!("ServerBug! Please, fix! Error: {}", err);
panic!("SocketServerSerializer<String>::processor_error_message(): {}", msg);
}
}
impl ResponsiveMessages<String> for String {
#[inline(always)]
fn is_disconnect_message(processor_answer: &String) -> bool {
processor_answer.is_empty()
}
#[inline(always)]
fn is_no_answer_message(processor_answer: &String) -> bool {
processor_answer == "."
}
}
impl ReactiveMessagingDeserializer<String> for String {
#[inline(always)]
fn deserialize(message: &[u8]) -> Result<String, Box<dyn std::error::Error + Sync + Send + 'static>> {
Ok(String::from_utf8_lossy(message).to_string())
}
}
}