use crate::dedup::ConcurrentDedup;
use crate::endpoint_core::{run_stream_loop, EndpointCore, ExponentialBackoff};
use crate::error::{Result, RouterError};
use crate::filter::EndpointFilters;
use crate::router::{EndpointId, RoutedMessage};
use crate::routing::RoutingTable;
use parking_lot::RwLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
static NEXT_TCP_CLIENT_ID: AtomicUsize = AtomicUsize::new(10_000);
const MAX_TCP_CLIENTS: usize = 100;
use tokio::net::{TcpListener, TcpStream};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
#[allow(clippy::too_many_arguments)]
pub async fn run(
id: usize,
address: String,
mode: crate::config::EndpointMode,
bus_tx: broadcast::Sender<RoutedMessage>,
bus_rx: broadcast::Receiver<RoutedMessage>,
routing_table: Arc<RwLock<RoutingTable>>,
dedup: ConcurrentDedup,
filters: EndpointFilters,
token: CancellationToken,
) -> Result<()> {
drop(bus_rx);
let core = EndpointCore {
id: EndpointId(id),
bus_tx: bus_tx.clone(),
routing_table: routing_table.clone(),
dedup: dedup.clone(),
filters: filters.clone(),
update_routing: true, };
match mode {
crate::config::EndpointMode::Server => {
let listener = TcpListener::bind(&address)
.await
.map_err(|e| RouterError::network(&address, e))?;
info!("TCP Server listening on {}", address);
let mut join_set = JoinSet::new();
loop {
tokio::select! {
accept_res = listener.accept() => {
match accept_res {
Ok((stream, addr)) => {
if join_set.len() >= MAX_TCP_CLIENTS {
warn!("TCP Server {}: Max client limit ({}) reached, rejecting {}", address, MAX_TCP_CLIENTS, addr);
drop(stream);
continue;
}
let _ = stream.set_nodelay(true);
let client_id = NEXT_TCP_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
info!("Accepted TCP connection from {} (EndpointId: {})", addr, client_id);
let core_client = EndpointCore {
id: EndpointId(client_id),
bus_tx: core.bus_tx.clone(),
routing_table: core.routing_table.clone(),
dedup: core.dedup.clone(),
filters: core.filters.clone(),
update_routing: true, };
let rx_client = core.bus_tx.subscribe();
let token_client = token.clone();
join_set.spawn(async move {
let (read, write) = stream.into_split();
let name = format!("TCP Client {}", addr);
let _ = run_stream_loop(read, write, rx_client, core_client, token_client, name).await;
});
}
Err(e) => error!("TCP Accept error: {}", e),
}
}
_ = join_set.join_next(), if !join_set.is_empty() => {}
_ = token.cancelled() => break,
}
}
join_set.shutdown().await;
Ok(())
}
crate::config::EndpointMode::Client => {
info!("Connecting to TCP server at {}", address);
let mut backoff =
ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
loop {
if token.is_cancelled() {
break;
}
match TcpStream::connect(&address).await {
Ok(stream) => {
let _ = stream.set_nodelay(true);
info!("Connected to {}", address);
backoff.reset();
let (read, write) = stream.into_split();
let name = format!("TCP Client {}", address);
let _ = run_stream_loop(
read,
write,
bus_tx.subscribe(),
core.clone(),
token.clone(),
name,
)
.await;
warn!("Connection to {} lost, retrying...", address);
}
Err(e) => {
warn!("Failed to connect to {}: {}. Retrying...", address, e);
}
}
let wait = backoff.next_backoff();
tokio::select! {
_ = tokio::time::sleep(wait) => {},
_ = token.cancelled() => break,
}
}
Ok(())
}
}
}