use std::io::ErrorKind;
use std::time::Duration;
#[cfg(feature = "async-tokio")]
use std::net::ToSocketAddrs;
#[cfg(feature = "async-tokio")]
use tokio::net::TcpStream;
#[cfg(feature = "async-tokio")]
use tokio::task::JoinHandle;
#[cfg(feature = "async-tokio")]
use tokio::time::sleep;
use crate::channel::{self, Receiver, Sender};
use crate::network::remote::remote_send;
use crate::network::{DemuxCoord, NetworkMessage, ReceiverEndpoint};
use crate::operator::ExchangeData;
use crate::network::NetworkSender;
const CONNECT_ATTEMPTS: usize = 32;
#[cfg(not(feature = "async-tokio"))]
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const RETRY_INITIAL_TIMEOUT: Duration = Duration::from_millis(8);
const RETRY_MAX_TIMEOUT: Duration = Duration::from_secs(1);
const MUX_CHANNEL_CAPACITY: usize = 10;
#[derive(Debug)]
pub struct MultiplexingSender<Out: Send + 'static> {
tx: Option<Sender<(ReceiverEndpoint, NetworkMessage<Out>)>>,
}
#[cfg(feature = "async-tokio")]
impl<Out: ExchangeData> MultiplexingSender<Out> {
pub fn new(coord: DemuxCoord, address: (String, u16)) -> (Self, JoinHandle<()>) {
let (tx, rx) = channel::bounded(MUX_CHANNEL_CAPACITY);
let join_handle = tokio::spawn(async move {
log::debug!(
"mux connecting to {}",
address.to_socket_addrs().unwrap().next().unwrap()
);
let stream = connect_remote(coord, address).await;
mux_thread::<Out>(coord, rx, stream).await;
});
(Self { tx: Some(tx) }, join_handle)
}
pub(crate) fn get_sender(&mut self, receiver_endpoint: ReceiverEndpoint) -> NetworkSender<Out> {
use crate::network::mux_sender;
mux_sender(receiver_endpoint, self.tx.as_ref().unwrap().clone())
}
}
#[cfg(feature = "async-tokio")]
async fn connect_remote(coord: DemuxCoord, address: (String, u16)) -> TcpStream {
let socket_addrs: Vec<_> = address
.to_socket_addrs()
.map_err(|e| format!("Failed to get the address for {}: {:?}", coord, e))
.unwrap()
.collect();
let mut retry_delay = RETRY_INITIAL_TIMEOUT;
for attempt in 1..=CONNECT_ATTEMPTS {
log::debug!(
"{} connecting to {:?} ({} attempt)",
coord,
socket_addrs,
attempt,
);
for address in socket_addrs.iter() {
match TcpStream::connect(address).await {
Ok(stream) => {
return stream;
}
Err(err) => match err.kind() {
ErrorKind::TimedOut => {
log::debug!("{coord} timeout connecting to {address:?}");
}
ErrorKind::ConnectionRefused => {
log::log!(
if attempt > 4 {
log::Level::Warn
} else {
log::Level::Debug
},
"{coord} connection refused connecting to {address:?} ({attempt})"
);
}
_ => {
log::warn!("{coord} failed to connect to {address:?}: {err:?}");
}
},
}
}
log::debug!(
"{coord} retrying connection to {socket_addrs:?} in {}s",
retry_delay.as_secs_f32(),
);
sleep(retry_delay).await;
retry_delay = (2 * retry_delay).min(RETRY_MAX_TIMEOUT);
}
panic!(
"Failed to connect to remote {} at {:?} after {} attempts",
coord, address, CONNECT_ATTEMPTS
);
}
#[cfg(feature = "async-tokio")]
async fn mux_thread<Out: ExchangeData>(
coord: DemuxCoord,
rx: Receiver<(ReceiverEndpoint, NetworkMessage<Out>)>,
mut stream: TcpStream,
) {
use tokio::io::AsyncWriteExt;
let address = stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".to_string());
log::debug!("{} connected to {:?}", coord, address);
while let Ok((dest, message)) = rx.recv_async().await {
remote_send(message, dest, &mut stream, &address).await;
}
stream.shutdown().await.unwrap();
log::debug!("{} finished", coord);
}