use super::*;
use connection_table::ConnectionRefKind;
use connection_table::*;
use network_connection::*;
use stop_token::future::FutureExt;
impl_veilid_log_facility!("net");
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
const NEW_CONNECTION_RETRY_COUNT: usize = 0;
const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500;
#[derive(Debug)]
enum ConnectionManagerEvent {
Accepted(ProtocolNetworkConnection),
Dead(Box<NetworkConnection>),
}
#[derive(Debug)]
pub struct ConnectionRefScope {
connection_manager: ConnectionManager,
id: NetworkConnectionId,
}
impl ConnectionRefScope {
pub fn try_new(connection_manager: ConnectionManager, id: NetworkConnectionId) -> Option<Self> {
if !connection_manager.connection_ref(id, ConnectionRefKind::AddRef) {
return None;
}
Some(Self {
connection_manager,
id,
})
}
}
impl Drop for ConnectionRefScope {
fn drop(&mut self) {
self.connection_manager
.connection_ref(self.id, ConnectionRefKind::RemoveRef);
}
}
#[derive(Debug)]
struct ProtectedAddress {
node_ref: NodeRef,
span_start_ts: Timestamp,
drops_in_span: usize,
}
#[derive(Debug)]
struct ConnectionManagerInner {
next_id: NetworkConnectionId,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: Option<MustJoinHandle<()>>,
stop_source: Option<StopSource>,
protected_addresses: HashMap<SocketAddress, ProtectedAddress>,
}
struct ConnectionManagerArc {
connection_initial_timeout_ms: u32,
connection_inactivity_timeout_ms: u32,
connection_table: ConnectionTable,
address_lock_table: AsyncTagLockTable<SocketAddr>,
startup_lock: StartupLock,
inner: Mutex<Option<ConnectionManagerInner>>,
reconnection_processor: DeferredStreamProcessor,
}
impl core::fmt::Debug for ConnectionManagerArc {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("ConnectionManagerArc")
.field("inner", &self.inner)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct ConnectionManager {
registry: VeilidComponentRegistry,
arc: Arc<ConnectionManagerArc>,
}
impl_veilid_component_accessors!(ConnectionManager);
impl ConnectionManager {
fn new_inner(
stop_source: StopSource,
sender: flume::Sender<ConnectionManagerEvent>,
async_processor_jh: MustJoinHandle<()>,
) -> ConnectionManagerInner {
ConnectionManagerInner {
next_id: 0.into(),
stop_source: Some(stop_source),
sender,
async_processor_jh: Some(async_processor_jh),
protected_addresses: HashMap::new(),
}
}
fn new_arc(registry: VeilidComponentRegistry) -> ConnectionManagerArc {
let config = registry.config();
let connection_initial_timeout_ms = config.network.connection_initial_timeout_ms;
let connection_inactivity_timeout_ms = config.network.connection_inactivity_timeout_ms;
ConnectionManagerArc {
reconnection_processor: DeferredStreamProcessor::new(),
connection_initial_timeout_ms,
connection_inactivity_timeout_ms,
connection_table: ConnectionTable::new(registry),
address_lock_table: AsyncTagLockTable::new(),
startup_lock: StartupLock::new(),
inner: Mutex::new(None),
}
}
pub fn new(registry: VeilidComponentRegistry) -> Self {
Self {
arc: Arc::new(Self::new_arc(registry.clone())),
registry,
}
}
pub fn connection_inactivity_timeout_ms(&self) -> u32 {
self.arc.connection_inactivity_timeout_ms
}
pub fn startup(&self) -> EyreResult<()> {
let guard = self.arc.startup_lock.startup()?;
veilid_log!(self debug "startup connection manager");
let (sender, receiver) = flume::unbounded();
let stop_source = StopSource::new();
let async_processor = spawn(
"connection manager async processor",
self.clone().async_processor(stop_source.token(), receiver),
);
{
let mut inner = self.arc.inner.lock();
if inner.is_some() {
panic!("shouldn't start connection manager twice without shutting it down first");
}
*inner = Some(Self::new_inner(stop_source, sender, async_processor));
}
self.arc.reconnection_processor.init();
guard.success();
Ok(())
}
pub async fn shutdown(&self) {
veilid_log!(self debug "starting connection manager shutdown");
let Ok(guard) = self.arc.startup_lock.shutdown().await else {
veilid_log!(self error "connection manager is already shut down");
return;
};
veilid_log!(self debug "stopping reconnection processor task");
self.arc.reconnection_processor.terminate().await;
let mut inner = {
let mut inner_lock = self.arc.inner.lock();
match inner_lock.take() {
Some(v) => v,
None => {
panic!("not started");
}
}
};
veilid_log!(self debug "stopping async processor task");
drop(inner.stop_source.take());
let async_processor_jh = inner.async_processor_jh.take().unwrap_or_log();
veilid_log!(self debug "waiting for async processor to stop");
async_processor_jh.await;
veilid_log!(self debug "waiting for connection handlers to complete");
self.arc.connection_table.join().await;
guard.success();
veilid_log!(self debug "finished connection manager shutdown");
}
fn should_protect_connection(
&self,
inner: &mut ConnectionManagerInner,
conn: &NetworkConnection,
) -> Option<NodeRef> {
inner
.protected_addresses
.get(conn.flow().remote_address())
.map(|x| x.node_ref.clone())
}
pub fn update_protections(&self) {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
let mut lock = self.arc.inner.lock();
let Some(inner) = lock.as_mut() else {
return;
};
let mut dead_addresses = inner
.protected_addresses
.keys()
.cloned()
.collect::<HashSet<_>>();
for routing_domain in RoutingDomainSet::all() {
for relay in self
.network_manager()
.routing_table()
.relays(routing_domain)
{
for did in relay.relay_node.dial_info_details() {
let protected_address = did.dial_info.socket_address();
dead_addresses.remove(&protected_address);
inner
.protected_addresses
.entry(protected_address)
.and_modify(|pa| pa.node_ref = relay.relay_node.unfiltered())
.or_insert_with(|| ProtectedAddress {
node_ref: relay.relay_node.unfiltered(),
span_start_ts: Timestamp::now_non_decreasing(),
drops_in_span: 0usize,
});
}
}
}
for dead_address in dead_addresses {
inner.protected_addresses.remove(&dead_address);
}
self.arc
.connection_table
.with_all_connections_mut(|conn| {
if let Some(protect_nr) = conn.protected_node_ref() {
if self.should_protect_connection(inner, conn).is_none() {
veilid_log!(self debug "== Unprotecting connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
conn.unprotect();
}
} else if let Some(protect_nr) = self.should_protect_connection(inner, conn) {
veilid_log!(self debug "== Protecting existing connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
conn.protect(protect_nr);
}
Option::<()>::None
});
}
fn on_new_protocol_network_connection(
&self,
inner: &mut ConnectionManagerInner,
prot_conn: ProtocolNetworkConnection,
opt_dial_info: Option<DialInfo>,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
let id = inner.next_id;
inner.next_id += 1u64;
veilid_log!(self trace
"on_new_protocol_network_connection: id={} prot_conn={:?}",
id,
prot_conn
);
let stop_token = match &inner.stop_source {
Some(ss) => ss.token(),
None => bail!("not creating connection because we are stopping"),
};
let mut conn = NetworkConnection::from_protocol(
self.clone(),
stop_token,
prot_conn,
id,
opt_dial_info,
);
let handle = conn.get_handle();
if let Some(protect_nr) = self.should_protect_connection(inner, &conn) {
veilid_log!(self debug "== Protecting new connection: {} -> {} for node {}", id, conn.debug_print(Timestamp::now()), protect_nr);
conn.protect(protect_nr);
}
match self.arc.connection_table.add_connection(conn) {
Ok(None) => {
}
Ok(Some(conn)) => {
#[cfg(feature = "verbose-tracing")]
veilid_log!(self debug "== LRU kill connection due to limit: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner
.sender
.send(ConnectionManagerEvent::Dead(Box::new(conn)));
}
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
let desc = conn.flow();
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!(
"connection filtered: {:?} ({})",
desc, e
)));
}
Err(ConnectionTableAddError::AlreadyExists(conn)) => {
let desc = conn.flow();
veilid_log!(self debug "== Connection already exists: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!(
"connection already exists: {:?}",
desc
)));
}
Err(ConnectionTableAddError::TableFull(conn)) => {
let desc = conn.flow();
veilid_log!(self debug "== Connection table full: {:?}", conn.debug_print(Timestamp::now()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!(
"connection table is full: {:?}",
desc
)));
}
};
Ok(NetworkResult::Value(handle))
}
pub fn get_connection(&self, flow: Flow) -> Option<ConnectionHandle> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
self.arc.connection_table.peek_connection_by_flow(flow)
}
pub(super) fn touch_connection_by_id(&self, id: NetworkConnectionId) {
self.arc.connection_table.touch_connection_by_id(id)
}
fn connection_ref(&self, id: NetworkConnectionId, kind: ConnectionRefKind) -> bool {
self.arc.connection_table.ref_connection_by_id(id, kind)
}
pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
ConnectionRefScope::try_new(self.clone(), id)
}
pub async fn get_or_create_connection(
&self,
dial_info: DialInfo,
) -> EyreResult<NetworkResult<ConnectionHandle>> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return Ok(NetworkResult::service_unavailable(
"connection manager is not started",
));
};
let peer_address = dial_info.peer_address();
let remote_addr = peer_address.socket_addr();
let mut preferred_local_address = self
.network_manager()
.net()
.get_preferred_local_address(&dial_info);
let best_port = preferred_local_address.map(|pla| pla.port());
let Ok(_lock_guard) = timeout(
self.arc.connection_initial_timeout_ms,
self.arc.address_lock_table.lock_tag(remote_addr),
)
.await
else {
veilid_log!(self debug "== get_or_create_connection: connection timeout trying to connect to dial_info={:?}", dial_info);
return Ok(NetworkResult::no_connection_other("connection timeout"));
};
veilid_log!(self trace "== get_or_create_connection dial_info={:?}", dial_info);
if let Some(best_existing_conn) = self
.arc
.connection_table
.get_best_connection_by_remote(best_port, peer_address)
{
veilid_log!(self trace
"== Returning best existing connection {:?}",
best_existing_conn
);
return Ok(NetworkResult::Value(best_existing_conn));
}
if self
.arc
.connection_table
.check_for_colliding_connection(&dial_info)
{
preferred_local_address = None;
}
let mut retry_count = NEW_CONNECTION_RETRY_COUNT;
let network_manager = self.network_manager();
let nres = loop {
veilid_log!(self trace "== get_or_create_connection connect({}) {:?} -> {}", retry_count, preferred_local_address, dial_info);
let result_net_res = ProtocolNetworkConnection::connect(
self.registry(),
preferred_local_address,
&dial_info,
self.arc.connection_initial_timeout_ms,
network_manager.address_filter(),
)
.await;
match result_net_res {
Ok(net_res) => {
if net_res.is_value() || retry_count == 0 {
break net_res;
}
}
Err(e) => {
if retry_count == 0 {
bail!(
"failed to connect: {:?} -> {:?}: {:#?}",
preferred_local_address,
dial_info,
e
);
}
}
};
retry_count -= 1;
sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
};
let prot_conn = network_result_value_or_log!(self target:"network_result", nres => [ format!("== get_or_create_connection failed {:?} -> {}", preferred_local_address, dial_info) ] {
network_result_raise!(nres);
});
let mut inner = self.arc.inner.lock();
let inner = match &mut *inner {
Some(v) => v,
None => {
bail!("shutting down");
}
};
self.on_new_protocol_network_connection(inner, prot_conn, Some(dial_info))
}
pub fn add_relaying_flow(&self, flow: Flow) {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
self.arc.connection_table.add_priority_flow(flow);
}
async fn process_connection_manager_event(
&self,
event: ConnectionManagerEvent,
allow_accept: bool,
) {
match event {
ConnectionManagerEvent::Accepted(prot_conn) => {
if !allow_accept {
return;
}
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
let _lock_guard = self
.arc
.address_lock_table
.lock_tag(prot_conn.flow().remote_address().socket_addr())
.await;
let mut inner = self.arc.inner.lock();
match &mut *inner {
Some(inner) => {
let _ = self.on_new_protocol_network_connection(inner, prot_conn, None);
}
None => {
}
};
}
ConnectionManagerEvent::Dead(mut conn) => {
let _lock_guard = self
.arc
.address_lock_table
.lock_tag(conn.flow().remote_address().socket_addr())
.await;
conn.close();
conn.await;
}
}
}
async fn async_processor(
self,
stop_token: StopToken,
receiver: flume::Receiver<ConnectionManagerEvent>,
) {
while let Ok(Ok(event)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
self.process_connection_manager_event(event, true).await;
}
for event in receiver.drain() {
self.process_connection_manager_event(event, false).await;
}
}
#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), expect(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection(
&self,
protocol_connection: ProtocolNetworkConnection,
) -> EyreResult<()> {
let sender = {
let mut inner = self.arc.inner.lock();
let inner = match &mut *inner {
Some(v) => v,
None => {
return Ok(());
}
};
inner.sender.clone()
};
let _ = sender
.send_async(ConnectionManagerEvent::Accepted(protocol_connection))
.await;
Ok(())
}
pub(super) fn report_connection_finished(&self, connection_id: NetworkConnectionId) {
let sender = {
let mut inner = self.arc.inner.lock();
let inner = match &mut *inner {
Some(v) => v,
None => {
return;
}
};
inner.sender.clone()
};
let conn = self
.arc
.connection_table
.remove_connection_by_id(connection_id);
if let Some(conn) = conn {
if let Some(protect_nr) = conn.protected_node_ref() {
if let Some(inner) = self.arc.inner.lock().as_mut() {
for pa in inner.protected_addresses.values_mut() {
if pa.node_ref.same_entry(&protect_nr) {
let cur_ts = Timestamp::now_non_decreasing();
let duration = cur_ts.duration_since(pa.span_start_ts);
let mut reconnect = true;
if duration < PROTECTED_CONNECTION_DROP_SPAN {
pa.drops_in_span += 1;
veilid_log!(self debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
if pa.drops_in_span >= PROTECTED_CONNECTION_DROP_COUNT {
protect_nr.report_protected_connection_dropped();
reconnect = false;
pa.drops_in_span = 0;
pa.span_start_ts = cur_ts;
}
} else {
pa.drops_in_span = 1;
pa.span_start_ts = cur_ts;
veilid_log!(self debug "== Protected connection dropped (count={}): {} -> {} for node {}", pa.drops_in_span, conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
}
if reconnect {
if let Some(dial_info) = conn.dial_info() {
self.spawn_reconnector(dial_info);
} else {
veilid_log!(self debug "Can't reconnect to accepted protected connection: {} -> {} for node {}", conn.connection_id(), conn.debug_print(Timestamp::now()), protect_nr);
}
}
break;
}
}
}
}
let _ = sender.send(ConnectionManagerEvent::Dead(Box::new(conn)));
}
}
fn spawn_reconnector(&self, dial_info: DialInfo) {
let this = self.clone();
self.arc.reconnection_processor.add_future(
Box::pin(async move {
match this.get_or_create_connection(dial_info.clone()).await {
Ok(NetworkResult::Value(conn)) => {
veilid_log!(this debug "Reconnection successful to {}: {:?}", dial_info,conn);
}
Ok(res) => {
veilid_log!(this debug "Reconnection unsuccessful to {}: {:?}", dial_info, res);
}
Err(e) => {
veilid_log!(this debug "Reconnection error to {}: {}", dial_info, e);
}
};
}));
}
pub fn debug_print(&self) -> String {
format!(
"Connection Table:\n\n{}",
self.arc.connection_table.debug_print_table()
)
}
}