mod protocol;
use super::*;
use crate::routing_table::*;
pub use protocol::*;
use std::io;
impl_veilid_log_facility!("net");
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ProtocolConfig {
pub outbound: ProtocolTypeSet,
pub inbound: ProtocolTypeSet,
pub family_global: AddressTypeSet,
pub public_internet_capabilities: Vec<VeilidCapability>,
}
struct NetworkInner {
network_needs_restart: bool,
protocol_config: ProtocolConfig,
}
pub(super) struct NetworkUnlockedInner {
startup_lock: StartupLock,
}
#[derive(Clone)]
pub(super) struct Network {
registry: VeilidComponentRegistry,
inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>,
}
impl_veilid_component_accessors!(Network);
impl core::ops::Deref for Network {
type Target = NetworkUnlockedInner;
fn deref(&self) -> &Self::Target {
&self.unlocked_inner
}
}
impl Network {
fn new_inner() -> NetworkInner {
NetworkInner {
network_needs_restart: false,
protocol_config: Default::default(),
}
}
fn new_unlocked_inner() -> NetworkUnlockedInner {
NetworkUnlockedInner {
startup_lock: StartupLock::new(),
}
}
pub fn new(registry: VeilidComponentRegistry) -> Self {
Self {
registry,
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner()),
}
}
async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
) -> EyreResult<NetworkResult<T>> {
let network_result = fut.await?;
if matches!(network_result, NetworkResult::NoConnection(_)) {
self.network_manager()
.address_filter()
.set_dial_info_failed(dial_info);
}
Ok(network_result)
}
#[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Bytes,
) -> EyreResult<NetworkResult<()>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let timeout_ms = self.config().network.connection_initial_timeout_ms;
if self
.network_manager()
.address_filter()
.is_ip_addr_punished(dial_info.address().ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
}
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
ProtocolType::WS => {
let pnc = network_result_try!(ws::WebsocketProtocolHandler::connect(
self.registry(),
&dial_info,
timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
#[cfg(feature = "enable-protocol-wss")]
ProtocolType::WSS => {
let pnc = network_result_try!(ws::WebsocketProtocolHandler::connect(
self.registry(),
&dial_info,
timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
};
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(()))
})
.await
}
#[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_recv_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Bytes,
timeout_ms: u32,
) -> EyreResult<NetworkResult<Bytes>> {
let _guard = self.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let connect_timeout_ms = self.config().network.connection_initial_timeout_ms;
if self
.network_manager()
.address_filter()
.is_ip_addr_punished(dial_info.address().ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
}
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
_ => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => unreachable!(),
ProtocolType::WS => {
ws::WebsocketProtocolHandler::connect(
self.registry(),
&dial_info,
connect_timeout_ms,
)
.await
.wrap_err("connect failure")?
}
#[cfg(feature = "enable-protocol-wss")]
ProtocolType::WSS => {
ws::WebsocketProtocolHandler::connect(
self.registry(),
&dial_info,
connect_timeout_ms,
)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
let out =
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
.await
.into_network_result())
.wrap_err("recv failure")?);
self.network_manager()
.stats_packet_rcvd(dial_info.ip_addr(), ByteCount::new(out.len() as u64));
Ok(NetworkResult::Value(out))
}
}
})
.await
}
#[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_existing_flow(
&self,
flow: Flow,
data: Bytes,
) -> EyreResult<SendDataToExistingFlowResult> {
let _guard = self.startup_lock.enter()?;
let data_len = data.len();
match flow.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
}
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
_ => {}
}
if let Some(conn) = self
.network_manager()
.connection_manager()
.get_connection(flow)
{
match conn.send_async(data).await {
ConnectionHandleSendResult::Sent => {
self.network_manager().stats_packet_sent(
flow.remote().socket_addr().ip(),
ByteCount::new(data_len as u64),
);
return Ok(SendDataToExistingFlowResult::Sent(conn.unique_flow()));
}
ConnectionHandleSendResult::NotSent(data) => {
return Ok(SendDataToExistingFlowResult::NotSent(data));
}
}
}
Ok(SendDataToExistingFlowResult::NotSent(data))
}
#[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_dial_info(
&self,
dial_info: DialInfo,
data: Bytes,
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP {
bail!("no support for UDP protocol");
}
if dial_info.protocol_type() == ProtocolType::TCP {
bail!("no support for TCP protocol");
}
let conn = network_result_try!(
self.network_manager()
.connection_manager()
.get_or_create_connection(dial_info.clone())
.await?
);
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset,
"failed to send",
)));
}
let unique_flow = conn.unique_flow();
self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(unique_flow))
})
.await
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "net", err, skip(self), fields(__VEILID_LOG_KEY = self.log_key()))
)]
#[allow(clippy::unused_async)]
pub async fn send_hole_punch(
&self,
_dial_info: DialInfo,
) -> EyreResult<NetworkResult<UniqueFlow>> {
Ok(NetworkResult::ServiceUnavailable(
"unimplemented for this platform".to_owned(),
))
}
pub async fn startup_internal(&self) -> EyreResult<StartupDisposition> {
veilid_log!(self debug "starting network");
let protocol_config = {
let config = self.config();
let inbound = ProtocolTypeSet::new();
let mut outbound = ProtocolTypeSet::new();
if config.network.protocol.ws.connect {
outbound.insert(ProtocolType::WS);
}
#[cfg(feature = "enable-protocol-wss")]
if config.network.protocol.wss.connect {
outbound.insert(ProtocolType::WSS);
}
let supported_address_types: AddressTypeSet = if is_ipv6_supported() {
AddressType::IPV4 | AddressType::IPV6
} else {
AddressType::IPV4.into()
};
let family_global = supported_address_types;
let public_internet_capabilities = {
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !config.capabilities.disable.contains(cap))
.collect::<Vec<VeilidCapability>>()
};
ProtocolConfig {
outbound,
inbound,
family_global,
public_internet_capabilities,
}
};
self.inner.lock().protocol_config = protocol_config.clone();
let routing_table = self.routing_table();
let mut editor_public_internet = routing_table.edit_public_internet_routing_domain();
editor_public_internet.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_global,
protocol_config.public_internet_capabilities.clone(),
true,
);
if editor_public_internet.commit(true).await {
editor_public_internet.publish();
}
Ok(StartupDisposition::Success)
}
#[cfg_attr(feature = "instrument", instrument(level = "debug", err, skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
let guard = self.startup_lock.startup()?;
match self.startup_internal().await {
Ok(StartupDisposition::Success) => {
veilid_log!(self debug "Network started");
guard.success();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry");
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
debug!("network failed to start");
Err(e)
}
}
}
pub fn needs_restart(&self) -> bool {
self.inner.lock().network_needs_restart
}
pub fn is_started(&self) -> bool {
self.startup_lock.is_started()
}
#[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
pub fn restart_network(&self) {
self.inner.lock().network_needs_restart = true;
}
#[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn shutdown(&self) {
veilid_log!(self debug "starting low level network shutdown");
let Ok(guard) = self.startup_lock.shutdown().await else {
veilid_log!(self error "low level network is already shut down");
return;
};
let routing_table = self.routing_table();
routing_table
.edit_public_internet_routing_domain()
.reset()
.await;
*self.inner.lock() = Self::new_inner();
guard.success();
veilid_log!(self debug "finished low level network shutdown");
}
pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
None
}
pub fn get_preferred_local_address_by_key(
&self,
_pt: ProtocolType,
_at: AddressType,
) -> Option<SocketAddr> {
None
}
#[expect(dead_code)]
pub fn needs_update_dial_info(&self) -> bool {
let Ok(_guard) = self.startup_lock.enter() else {
veilid_log!(self debug "ignoring 'needs_update_dial_info' due to not started up");
return false;
};
false
}
pub fn resolved_detect_address_changes(&self) -> bool {
let Ok(_guard) = self.startup_lock.enter() else {
veilid_log!(self debug "ignoring 'resolved_detect_address_changes' due to not started up");
return false;
};
false
}
pub fn trigger_update_dial_info(&self, _routing_domain: RoutingDomain) {
let Ok(_guard) = self.startup_lock.enter() else {
veilid_log!(self debug "ignoring 'trigger_update_dial_info' due to not started up");
return;
};
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err, fields(__VEILID_LOG_KEY = self.log_key()))
)]
#[allow(clippy::unused_async)]
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
veilid_log!(self debug "ignoring 'Network::tick' due to not started up");
return Ok(());
};
Ok(())
}
#[expect(clippy::unused_async)]
pub async fn cancel_tasks(&self) {}
}