use std::error::Error;
use std::fmt::Debug;
use std::future;
use std::future::Future;
use std::iter::Peekable;
use std::net::{SocketAddr, ToSocketAddrs};
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::vec::IntoIter;
use keen_retry::{ExponentialJitter, ResolvedResult, RetryProducerResult, RetryResult};
use tokio::net::{ TcpStream, TcpListener };
use log::{trace, error, warn};
use tokio::sync::Mutex;
use crate::config::{ConstConfig, RetryingStrategies};
use crate::socket_connection::connection::SocketConnection;
type ConnectionFuture = Pin < Box < dyn Future < Output=RetryProducerResult<TcpStream, Box<dyn Error + Sync + Send>> > > >;
pub struct ClientConnectionManager<const CONFIG_U64: u64> {
host: String,
port: u16,
connect_continuation_closure: Arc < Mutex < Box<dyn FnMut() -> ConnectionFuture> > >,
}
impl<const CONFIG_U64: u64> ClientConnectionManager<CONFIG_U64> {
pub fn new<IntoString: Into<String>>(host: IntoString, port: u16) -> Self {
let host = host.into();
let connect_closure = Box::new(Self::build_connect_continuation_closure(&host, port));
Self { host, port, connect_continuation_closure: Arc::new(Mutex::new(connect_closure)) }
}
pub async fn connect_retryable(&mut self) -> Result<TcpStream, Box<dyn Error + Send + Sync>> {
let config = ConstConfig::from(CONFIG_U64);
let retry_result_supplier = |retrying_start_time: SystemTime| {
let mut connect_continuation_closure = self.connect_continuation_closure.try_lock().expect("BUG (locking)");
async move {
connect_continuation_closure().await
.map_ok(|_, connection| (retrying_start_time.elapsed(), connection) )
.map_input(|_| retrying_start_time)
}
};
let retryable = retry_result_supplier(SystemTime::now()).await
.retry_with_async(retry_result_supplier);
let resolved_result = match config.retrying_strategy {
RetryingStrategies::DoNotRetry |
RetryingStrategies::EndCommunications =>
ResolvedResult::from_retry_result(retryable.into()),
RetryingStrategies::RetryWithBackoffUpTo(attempts) =>
retryable
.with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
re_attempts: attempts,
jitter_ratio: 0.2,
})
.await,
RetryingStrategies::RetryYieldingForUpToMillis(millis) =>
retryable
.yielding_until_timeout(Duration::from_millis(millis as u64), || Box::from(format!("Timed out (>{millis}ms) while attempting to connect to {}:{}", self.host, self.port)))
.await,
RetryingStrategies::RetrySpinningForUpToMillis(_millis) =>
todo!("THIS OPTION SHOULD BE REMOVED, AS IT IS NOT SUPPORTED BY KEEN-RETRY")
};
resolved_result
.inspect_recovered(|retrying_duration, _, errors|
warn!("`reactive-messaging::SocketClient`: Connection to {}:{} SUCCEEDED Succeeded after retrying {} times in {:?}. Transient errors: {}",
self.host, self.port, errors.len(), retrying_duration, keen_retry::loggable_retry_errors(&errors)) )
.inspect_given_up(|retrying_duration, transient_errors, fatal_error|
error!("`reactive-messaging::SocketClient`: Connection to {}:{} was GIVEN UP after retrying {} times in {:?}, with transient errors {}. The last error was {}",
self.host, self.port, transient_errors.len()+1, retrying_duration, keen_retry::loggable_retry_errors(&transient_errors), fatal_error) )
.inspect_unrecoverable(|retrying_duration, transient_errors, fatal_error|
error!("`reactive-messaging::SocketClient`: Connection to {}:{} FAILED FATABLY after retrying {} times in {:?}, with transient errors {}. The fatal error was {}",
self.host, self.port, transient_errors.len(), retrying_duration, keen_retry::loggable_retry_errors(&transient_errors), fatal_error) )
.into_result()
}
fn into_connect_continuation_closure(self) -> Arc < Mutex < Box<dyn FnMut() -> ConnectionFuture> > > {
self.connect_continuation_closure
}
fn build_connect_continuation_closure(host: &str, port: u16) -> impl FnMut() -> ConnectionFuture {
let address = format!("{}:{}", host, port);
let mut opt_addrs: Option<Peekable<IntoIter<SocketAddr>>> = None;
move || {
macro_rules! resolve {
() => {
match address.to_socket_addrs() {
Ok(resolved) => {
opt_addrs = Some(resolved.peekable());
opt_addrs.as_mut().unwrap()
},
Err(err) => return Box::pin(future::ready(RetryResult::Fatal { input: (), error: Box::from(format!("Unable to resolve address '{}': {}", address, err)) })),
}
};
}
let resolved_addrs = if let Some(addrs) = opt_addrs.as_mut() {
if addrs.peek().is_none() {
resolve!()
} else {
addrs
}
} else {
resolve!()
};
let socket_addr = resolved_addrs.next().unwrap();
let address = address.clone();
Box::pin(async move {
match TcpStream::connect(socket_addr).await {
Ok(socket) => RetryResult::Ok { reported_input: (), output: socket },
Err(err) => RetryResult::Transient { input: (), error: Box::from(format!("Couldn't connect to socket address '{socket_addr}' resolved from '{address}': {err}")) },
}
})
}
}
}
pub struct ServerConnectionHandler<StateType: Debug + Clone + Send + 'static> {
connection_channel: ConnectionChannel<StateType>,
network_event_loop_signaler: tokio::sync::oneshot::Sender<()>,
}
impl<StateType: Debug + Clone + Send + 'static> ServerConnectionHandler<StateType> {
pub async fn new(listening_interface: &str, listening_port: u16, connection_initial_state: StateType) -> Result<Self, Box<dyn Error + Sync + Send>> {
let connection_channel = ConnectionChannel::new();
let connection_sender = connection_channel.sender.clone();
let (network_event_loop_sender, network_event_loop_receiver) = tokio::sync::oneshot::channel::<()>();
Self::spawn_connection_listener(&listening_interface, listening_port, connection_initial_state, connection_sender, network_event_loop_receiver).await?;
Ok(Self {
connection_channel,
network_event_loop_signaler: network_event_loop_sender,
})
}
async fn spawn_connection_listener(listening_interface: &str,
listening_port: u16,
connection_initial_state: StateType,
sender: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
mut network_event_loop_signaler: tokio::sync::oneshot::Receiver<()>)
-> Result<(), Box<dyn Error + Sync + Send>> {
let listening_interface_and_port = format!("{}:{}", listening_interface, listening_port);
let listener = TcpListener::bind(&listening_interface_and_port).await?;
tokio::spawn( async move {
loop {
let (connection, client_address) = if let Some(accepted_connection_and_addr) = tokio::select! {
acceptance_result = listener.accept() => {
if let Err(err) = acceptance_result {
error!("`reactive-messaging::IncomingConnectionHandler`: ERROR while accepting a connection for the server @ {listening_interface_and_port}: {:?}", err);
None
} else {
Some(acceptance_result.unwrap())
}
}
result = &mut network_event_loop_signaler => {
match result {
Ok(()) => trace!("`reactive-messaging::IncomingConnectionHandler`: SHUTDOWN requested for the server @ {listening_interface_and_port} -- releasing the interface bind and bailing out of the network event loop"),
Err(err) => error!("`reactive-messaging::IncomingConnectionHandler`: ERROR in the `shutdown signaler` for the server @ {listening_interface_and_port} (a server shutdown will be commanded now due to this occurrence): {:?}", err),
};
break
}
} {
accepted_connection_and_addr
} else {
continue
};
let dispatching_result = sender.send(SocketConnection::new(connection, connection_initial_state.clone())).await;
if let Err(_unconsumed_connection) = dispatching_result {
error!("`reactive-messaging::IncomingConnectionHandler` BUG! -- The server @ {listening_interface_and_port} faced an ERROR when feeding an incoming connection (from '{client_address}') to the 'connections consumer': it had dropped the consumption receiver prematurely. The server's network event loop will be ABORTED and you should expect undefined behavior, as the application thinks the server is still running.");
break;
}
}
});
Ok(())
}
pub fn connection_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>> {
self.connection_channel.receiver()
}
pub async fn feed_connection(&self, socket_connection: SocketConnection<StateType>) -> Result<(), ReceiverDroppedErr<SocketConnection<StateType>>> {
self.connection_channel.feed(socket_connection).await
}
pub async fn shutdown(self) {
_ = self.network_event_loop_signaler.send(());
self.connection_channel.close().await;
}
}
pub struct ConnectionChannel<StateType: Debug> {
pub(crate) sender: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
receiver: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
}
impl<StateType: Debug> ConnectionChannel<StateType> {
pub fn new() -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(2);
Self {
sender,
receiver: Some(receiver),
}
}
pub fn receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>> {
self.receiver.take()
}
pub async fn feed(&self, socket_connection: SocketConnection<StateType>) -> Result<(), ReceiverDroppedErr<SocketConnection<StateType>>> {
self.sender.send(socket_connection).await
.map_err(|unconsumed_connection| ReceiverDroppedErr(unconsumed_connection.0))
}
pub fn clone_sender(&self) -> tokio::sync::mpsc::Sender<SocketConnection<StateType>> {
self.sender.clone()
}
pub async fn close(self) {
drop(self);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
pub struct ReceiverDroppedErr<T>(T);
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::atomic::Ordering::Relaxed;
#[tokio::test]
async fn connection_channel() -> Result<(), Box<dyn Error>> {
let expected_count = 10;
let received_count = Arc::new(AtomicU32::new(0));
let received_count_ref = received_count.clone();
let stream_ended = Arc::new(AtomicBool::new(false));
let stream_ended_ref = stream_ended.clone();
let mut connection_channel = ConnectionChannel::new();
let mut receiver = connection_channel.receiver().expect("The `receiver` should be available at this point");
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await; while let Some(_connection) = receiver.recv().await {
received_count_ref.fetch_add(1, Relaxed);
}
stream_ended_ref.store(true, Relaxed);
});
for _i in 0..10 {
let tokio_connection = TcpStream::connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str("66.45.249.218")?), 80)).await?;
let socket_connection = SocketConnection::new(tokio_connection, ());
connection_channel.feed(socket_connection).await.unwrap_or_else(|_| panic!("Failed to send value"));
}
assert_eq!(stream_ended.load(Relaxed), false, "The connections stream was prematurely closed");
connection_channel.close().await;
assert!(stream_ended.load(Relaxed), "The connections stream (on the receiver end) wasn't notified that closing had happened");
assert_eq!(received_count.load(Relaxed), expected_count, "The wrong number of connections were received");
Ok(())
}
#[tokio::test]
async fn server_connection_handler() -> Result<(), Box<dyn Error + Sync + Send>> {
let expected_count = 10 + 1; let interface = "127.0.0.1";
let port = 8356;
let received_count = Arc::new(AtomicU32::new(0));
let received_count_ref = received_count.clone();
let stream_ended = Arc::new(AtomicBool::new(false));
let stream_ended_ref = stream_ended.clone();
let mut server_connection_handler = ServerConnectionHandler::new(interface, port, ()).await?;
let mut connection_receiver = server_connection_handler.connection_receiver().expect("The `receiver` should be available at this point");
tokio::spawn(async move {
while let Some(_connection) = connection_receiver.recv().await {
received_count_ref.fetch_add(1, Relaxed);
}
stream_ended_ref.store(true, Relaxed);
});
for i in 0..10 {
let tokio_connection = TcpStream::connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::from_str(interface)?), port)).await?;
if i == 0 {
let extra_socket_connection = SocketConnection::new(tokio_connection, ());
server_connection_handler.feed_connection(extra_socket_connection).await.unwrap_or_else(|_| panic!("Failed to send value"));
}
}
assert_eq!(stream_ended.load(Relaxed), false, "The connections stream was prematurely closed");
server_connection_handler.shutdown().await;
assert!(stream_ended.load(Relaxed), "The connections stream (on the receiver end) wasn't notified that closing had happened");
assert_eq!(received_count.load(Relaxed), expected_count, "The wrong number of connections were received");
Ok(())
}
#[tokio::test]
async fn client_connection() -> Result<(), Box<dyn Error + Sync + Send>> {
let expected_count = 10;
let interface = "127.0.0.1";
let port = 8357;
let received_count = Arc::new(AtomicU32::new(0));
let received_count_ref = received_count.clone();
let stream_ended = Arc::new(AtomicBool::new(false));
let stream_ended_ref = stream_ended.clone();
let connect_shareable = ClientConnectionManager::<{ConstConfig::default().into()}>::new("non-existing-host.com.br", port)
.into_connect_continuation_closure();
let mut connect = connect_shareable
.lock().await;
let error_message = connect().await
.expect_fatal(&format!("Tried to connect to a non-existing host, but the result of a connection attempt was not a `Fatal` error"))
.into_result()
.expect_err("A `Result::Err` should have been issued")
.to_string();
assert_eq!(error_message, "Unable to resolve address 'non-existing-host.com.br:8357': failed to lookup address information: Name or service not known", "Wrong error message");
let connect_shareable = ClientConnectionManager::<{ConstConfig::default().into()}>::new(interface, port)
.into_connect_continuation_closure();
let mut connect = connect_shareable
.lock().await;
let error_message = connect().await
.expect_transient(&format!("There is no server currently listening at {interface}:{port}, but the result of a connection attempt was not a `Transient` error"))
.into_result()
.expect_err("A `Result::Err` should have been issued")
.to_string();
assert_eq!(error_message, "Couldn't connect to socket address '127.0.0.1:8357' resolved from '127.0.0.1:8357': Connection refused (os error 111)", "Wrong error message");
let mut server_connection_handler = ServerConnectionHandler::new(interface, port, ()).await?;
let mut connection_receiver = server_connection_handler.connection_receiver().expect("The `receiver` should be available at this point");
tokio::spawn(async move {
while let Some(_connection) = connection_receiver.recv().await {
received_count_ref.fetch_add(1, Relaxed);
}
stream_ended_ref.store(true, Relaxed);
});
for _i in 0..10 {
connect().await
.expect_ok(&format!("There is a server listening at {interface}:{port}, so the `connect()` closure should have worked"));
}
server_connection_handler.shutdown().await;
assert_eq!(received_count.load(Relaxed), expected_count, "The wrong number of connections were received");
Ok(())
}
}