pub mod builder;
pub mod quic;
pub mod scmp_handler;
pub mod socket;
pub(crate) mod udp_polling;
use std::{
borrow::Cow,
fmt, net,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use anyhow::Context as _;
use bytes::Bytes;
use endhost_api_client::client::EndhostApiClient;
use futures::future::BoxFuture;
use quic::{AddressTranslator, Endpoint, ScionAsyncUdpSocket};
use scion_proto::{
address::{Isd, IsdAsn, SocketAddr},
packet::ScionPacketRaw,
path::Path,
};
use scion_sdk_reqwest_connect_rpc::client::CrpcClientError;
use snap_tun::client::ConnectSnapTunSocketError;
pub use socket::{PathUnawareUdpScionSocket, RawScionSocket, ScmpScionSocket, UdpScionSocket};
use url::Url;
pub use self::builder::ScionStackBuilder;
use crate::{
path::{
PathStrategy,
fetcher::{EndhostApiSegmentFetcher, PathFetcherImpl, traits::SegmentFetcher},
manager::{
MultiPathManager, MultiPathManagerConfig,
traits::{PathWaitError, PathWaitTimeoutError},
},
policy::PathPolicy,
scoring::PathScoring,
},
scionstack::{
scmp_handler::{ScmpErrorHandler, ScmpErrorReceiver, ScmpHandler},
socket::SendErrorReceiver,
},
types::Subscribers,
};
pub struct ScionStack {
endhost_api: Option<Url>,
client: Arc<dyn EndhostApiClient>,
underlay: Arc<dyn DynUnderlayStack>,
scmp_error_receivers: Subscribers<dyn ScmpErrorReceiver>,
send_error_receivers: Subscribers<dyn SendErrorReceiver>,
}
impl fmt::Debug for ScionStack {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ScionStack")
.field("client", &"Arc<ConnectRpcClient>")
.field("underlay", &"Arc<dyn DynUnderlayStack>")
.finish()
}
}
impl ScionStack {
pub(crate) fn new(
endhost_api: Option<Url>,
client: Arc<dyn EndhostApiClient>,
underlay: Arc<dyn DynUnderlayStack>,
) -> Self {
Self {
endhost_api,
client,
underlay,
scmp_error_receivers: Subscribers::new(),
send_error_receivers: Subscribers::new(),
}
}
pub async fn bind(
&self,
bind_addr: Option<SocketAddr>,
) -> Result<UdpScionSocket, ScionSocketBindError> {
self.bind_with_config(bind_addr, SocketConfig::default())
.await
}
pub async fn bind_with_config(
&self,
bind_addr: Option<SocketAddr>,
mut socket_config: SocketConfig,
) -> Result<UdpScionSocket, ScionSocketBindError> {
let socket = PathUnawareUdpScionSocket::new(
self.underlay
.bind_socket(SocketKind::Udp, bind_addr)
.await?,
vec![Box::new(ScmpErrorHandler::new(
self.scmp_error_receivers.clone(),
))],
);
if !socket_config.disable_endhost_api_segment_fetcher {
let connect_rpc_fetcher: Box<dyn SegmentFetcher> =
Box::new(EndhostApiSegmentFetcher::new(self.client.clone()));
socket_config
.segment_fetchers
.push(("Endhost API".into(), connect_rpc_fetcher));
}
let fetcher = PathFetcherImpl::new(
socket_config.segment_fetchers,
socket_config.segment_fetcher_timeout,
);
if socket_config.path_strategy.scoring.is_empty() {
socket_config.path_strategy.scoring.use_default_scorers();
}
let pather = Arc::new(
MultiPathManager::new(
MultiPathManagerConfig::default(),
fetcher,
socket_config.path_strategy,
)
.expect("should not fail with default configuration"),
);
self.scmp_error_receivers.register(pather.clone());
self.send_error_receivers.register(pather.clone());
Ok(UdpScionSocket::new(
socket,
pather,
socket_config.connect_timeout,
self.send_error_receivers.clone(),
))
}
pub async fn connect(
&self,
remote_addr: SocketAddr,
bind_addr: Option<SocketAddr>,
) -> Result<UdpScionSocket, ScionSocketConnectError> {
let socket = self.bind(bind_addr).await?;
socket.connect(remote_addr).await
}
pub async fn connect_with_config(
&self,
remote_addr: SocketAddr,
bind_addr: Option<SocketAddr>,
socket_config: SocketConfig,
) -> Result<UdpScionSocket, ScionSocketConnectError> {
let socket = self.bind_with_config(bind_addr, socket_config).await?;
socket.connect(remote_addr).await
}
pub async fn bind_scmp(
&self,
bind_addr: Option<SocketAddr>,
) -> Result<ScmpScionSocket, ScionSocketBindError> {
let socket = self
.underlay
.bind_socket(SocketKind::Scmp, bind_addr)
.await?;
Ok(ScmpScionSocket::new(socket))
}
pub async fn bind_raw(
&self,
bind_addr: Option<SocketAddr>,
) -> Result<RawScionSocket, ScionSocketBindError> {
let socket = self
.underlay
.bind_socket(SocketKind::Raw, bind_addr)
.await?;
Ok(RawScionSocket::new(socket))
}
pub async fn bind_path_unaware(
&self,
bind_addr: Option<SocketAddr>,
) -> Result<PathUnawareUdpScionSocket, ScionSocketBindError> {
let socket = self
.underlay
.bind_socket(SocketKind::Udp, bind_addr)
.await?;
Ok(PathUnawareUdpScionSocket::new(socket, vec![]))
}
#[deprecated(
since = "0.4.0",
note = "will soon be removed; use the quic-scion crate instead"
)]
pub async fn quic_endpoint(
&self,
bind_addr: Option<SocketAddr>,
config: anapaya_quinn::EndpointConfig,
server_config: Option<anapaya_quinn::ServerConfig>,
runtime: Option<Arc<dyn anapaya_quinn::Runtime>>,
) -> anyhow::Result<Endpoint> {
#[allow(deprecated)]
self.quic_endpoint_with_config(
bind_addr,
config,
server_config,
runtime,
SocketConfig::default(),
)
.await
}
#[deprecated(
since = "0.4.0",
note = "will soon be removed; use the quic-scion crate instead"
)]
pub async fn quic_endpoint_with_config(
&self,
bind_addr: Option<SocketAddr>,
config: anapaya_quinn::EndpointConfig,
server_config: Option<anapaya_quinn::ServerConfig>,
runtime: Option<Arc<dyn anapaya_quinn::Runtime>>,
socket_config: SocketConfig,
) -> anyhow::Result<Endpoint> {
let scmp_handlers: Vec<Box<dyn ScmpHandler>> = vec![Box::new(ScmpErrorHandler::new(
self.scmp_error_receivers.clone(),
))];
let socket = self
.underlay
.bind_async_udp_socket(bind_addr, scmp_handlers)
.await?;
let address_translator = Arc::new(AddressTranslator::default());
let pather = {
let mut segment_fetchers = socket_config.segment_fetchers;
if !socket_config.disable_endhost_api_segment_fetcher {
let connect_rpc_fetcher: Box<dyn SegmentFetcher> =
Box::new(EndhostApiSegmentFetcher::new(self.client.clone()));
segment_fetchers.push(("Endhost API".into(), connect_rpc_fetcher));
}
let fetcher =
PathFetcherImpl::new(segment_fetchers, socket_config.segment_fetcher_timeout);
let mut strategy = socket_config.path_strategy;
if strategy.scoring.is_empty() {
strategy.scoring.use_default_scorers();
}
Arc::new(
MultiPathManager::new(MultiPathManagerConfig::default(), fetcher, strategy)
.map_err(|e| anyhow::anyhow!("failed to create path manager: {}", e))?,
)
};
self.scmp_error_receivers.register(pather.clone());
let local_scion_addr = socket.local_addr();
let socket = Arc::new(ScionAsyncUdpSocket::new(
socket,
pather.clone(),
address_translator.clone(),
));
let runtime = match runtime {
Some(runtime) => runtime,
None => anapaya_quinn::default_runtime().context("No runtime found")?,
};
Ok(Endpoint::new_with_abstract_socket(
config,
server_config,
socket,
local_scion_addr,
runtime,
pather,
address_translator,
)?)
}
pub fn local_ases(&self) -> Vec<IsdAsn> {
self.underlay.local_ases()
}
pub fn endhost_api(&self) -> Option<Url> {
self.endhost_api.clone()
}
pub fn create_path_manager(&self) -> MultiPathManager<PathFetcherImpl> {
let fetcher = PathFetcherImpl::new(
vec![(
"Endhost API".into(),
Box::new(EndhostApiSegmentFetcher::new(self.client.clone())),
)],
DEFAULT_SEGMENT_FETCHER_TIMEOUT,
);
let mut strategy = PathStrategy::default();
strategy.scoring.use_default_scorers();
MultiPathManager::new(MultiPathManagerConfig::default(), fetcher, strategy)
.expect("should not fail with default configuration")
}
}
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_SEGMENT_FETCHER_TIMEOUT: Duration = Duration::from_secs(60);
pub struct SocketConfig {
pub(crate) segment_fetchers: Vec<(String, Box<dyn SegmentFetcher>)>,
pub(crate) segment_fetcher_timeout: Duration,
pub(crate) disable_endhost_api_segment_fetcher: bool,
pub(crate) path_strategy: PathStrategy,
pub(crate) connect_timeout: Duration,
}
impl Default for SocketConfig {
fn default() -> Self {
Self::new()
}
}
impl SocketConfig {
pub fn new() -> Self {
Self {
segment_fetchers: Vec::new(),
segment_fetcher_timeout: DEFAULT_SEGMENT_FETCHER_TIMEOUT,
disable_endhost_api_segment_fetcher: false,
path_strategy: Default::default(),
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
}
}
pub fn with_path_policy(mut self, policy: impl PathPolicy) -> Self {
self.path_strategy.add_policy(policy);
self
}
pub fn with_path_scoring(mut self, scoring: impl PathScoring, impact: f32) -> Self {
self.path_strategy.scoring = self.path_strategy.scoring.with_scorer(scoring, impact);
self
}
pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
pub fn with_segment_fetcher(mut self, name: String, fetcher: Box<dyn SegmentFetcher>) -> Self {
self.segment_fetchers.push((name, fetcher));
self
}
pub fn disable_endhost_api_segment_fetcher(mut self) -> Self {
self.disable_endhost_api_segment_fetcher = true;
self
}
pub fn with_segment_fetcher_timeout(mut self, timeout: Duration) -> Self {
self.segment_fetcher_timeout = timeout;
self
}
}
#[derive(Debug, thiserror::Error)]
pub enum ScionSocketBindError {
#[error(transparent)]
InvalidBindAddress(InvalidBindAddressError),
#[error("port {0} is already in use")]
PortAlreadyInUse(u16),
#[error(transparent)]
SnapConnectionError(SnapConnectionError),
#[error("underlay unavailable for the requested ISD: {0}")]
NoUnderlayAvailable(Isd),
#[error("other error: {0}")]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum InvalidBindAddressError {
#[error("cannot bind to service addresses: {0}")]
ServiceAddress(SocketAddr),
#[error("cannot bind to requested address: {0}")]
CannotBindToRequestedAddress(SocketAddr, Cow<'static, str>),
#[error(
"assigned address ({assigned_addr}) does not match requested address ({bind_addr}), likely due to NAT"
)]
AddressMismatch {
assigned_addr: SocketAddr,
bind_addr: SocketAddr,
},
#[error("could not find any local IP address to bind to")]
NoLocalIpAddressFound,
}
#[derive(Debug, thiserror::Error)]
pub enum SnapConnectionError {
#[error("SNAP token source is missing")]
SnapTokenSourceMissing,
#[error("error establishing SNAP tunnel: {0}")]
TunnelEstablishmentError(#[from] ConnectSnapTunSocketError),
#[error("failed to create SNAP control plane client: {0}")]
ControlPlaneClientCreationError(anyhow::Error),
#[error("failed to discover SNAP data plane: {0}")]
DataPlaneDiscoveryError(CrpcClientError),
}
#[derive(Hash, Eq, PartialEq, Clone, Debug, Ord, PartialOrd)]
pub enum SocketKind {
Udp,
Scmp,
Raw,
}
pub(crate) trait UnderlayStack: Send + Sync {
type Socket: UnderlaySocket + 'static;
type AsyncUdpSocket: AsyncUdpUnderlaySocket + 'static;
fn bind_socket(
&self,
kind: SocketKind,
bind_addr: Option<SocketAddr>,
) -> BoxFuture<'_, Result<Self::Socket, ScionSocketBindError>>;
fn bind_async_udp_socket(
&self,
bind_addr: Option<SocketAddr>,
scmp_handlers: Vec<Box<dyn ScmpHandler>>,
) -> BoxFuture<'_, Result<Self::AsyncUdpSocket, ScionSocketBindError>>;
fn local_ases(&self) -> Vec<IsdAsn>;
}
pub(crate) trait DynUnderlayStack: Send + Sync {
fn bind_socket(
&self,
kind: SocketKind,
bind_addr: Option<SocketAddr>,
) -> BoxFuture<'_, Result<Box<dyn UnderlaySocket>, ScionSocketBindError>>;
fn bind_async_udp_socket(
&self,
bind_addr: Option<SocketAddr>,
scmp_handlers: Vec<Box<dyn ScmpHandler>>,
) -> BoxFuture<'_, Result<Arc<dyn AsyncUdpUnderlaySocket>, ScionSocketBindError>>;
fn local_ases(&self) -> Vec<IsdAsn>;
}
impl<U: UnderlayStack> DynUnderlayStack for U {
fn bind_socket(
&self,
kind: SocketKind,
bind_addr: Option<SocketAddr>,
) -> BoxFuture<'_, Result<Box<dyn UnderlaySocket>, ScionSocketBindError>> {
Box::pin(async move {
let socket = self.bind_socket(kind, bind_addr).await?;
Ok(Box::new(socket) as Box<dyn UnderlaySocket>)
})
}
fn bind_async_udp_socket(
&self,
bind_addr: Option<SocketAddr>,
scmp_handlers: Vec<Box<dyn ScmpHandler>>,
) -> BoxFuture<'_, Result<Arc<dyn AsyncUdpUnderlaySocket>, ScionSocketBindError>> {
Box::pin(async move {
let socket = self.bind_async_udp_socket(bind_addr, scmp_handlers).await?;
Ok(Arc::new(socket) as Arc<dyn AsyncUdpUnderlaySocket>)
})
}
fn local_ases(&self) -> Vec<IsdAsn> {
<Self as UnderlayStack>::local_ases(self)
}
}
#[derive(Debug, thiserror::Error)]
pub enum ScionSocketConnectError {
#[error("failed to get path to destination: {0}")]
PathLookupError(#[from] PathWaitTimeoutError),
#[error(transparent)]
BindError(#[from] ScionSocketBindError),
}
#[derive(Debug, thiserror::Error)]
pub enum ScionSocketSendError {
#[error("path lookup error: {0}")]
PathLookupError(#[from] PathWaitError),
#[error("udp next hop {address:?} unreachable: {isd_as}#{interface_id}: {msg}")]
UnderlayNextHopUnreachable {
isd_as: IsdAsn,
interface_id: u16,
address: Option<net::SocketAddr>,
msg: String,
},
#[error("invalid packet: {0}")]
InvalidPacket(Cow<'static, str>),
#[error("underlying socket is closed")]
Closed,
#[error("underlying connection returned an I/O error: {0:?}")]
IoError(#[from] std::io::Error),
#[error("socket is not connected")]
NotConnected,
}
pub const MIN_PATH_BUFFER_SIZE: usize = 1024;
#[derive(Debug, thiserror::Error)]
pub enum ScionSocketReceiveError {
#[error("provided path buffer is too small (at least {MIN_PATH_BUFFER_SIZE} bytes required)")]
PathBufTooSmall,
#[error("i/o error: {0:?}")]
IoError(#[from] std::io::Error),
#[error("socket is not connected")]
NotConnected,
}
pub(crate) trait UnderlaySocket: 'static + Send + Sync {
fn send<'a>(
&'a self,
packet: ScionPacketRaw,
) -> BoxFuture<'a, Result<(), ScionSocketSendError>>;
fn try_send(&self, packet: ScionPacketRaw) -> Result<(), ScionSocketSendError>;
fn recv<'a>(&'a self) -> BoxFuture<'a, Result<ScionPacketRaw, ScionSocketReceiveError>>;
fn local_addr(&self) -> SocketAddr;
fn snap_data_plane(&self) -> Option<net::SocketAddr>;
}
pub(crate) trait AsyncUdpUnderlaySocket: Send + Sync {
fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn udp_polling::UdpPoller>>;
fn try_send(&self, raw_packet: ScionPacketRaw) -> Result<(), std::io::Error>;
fn poll_recv_from_with_path(
&self,
cx: &mut Context,
) -> Poll<std::io::Result<(SocketAddr, Bytes, Path)>>;
fn local_addr(&self) -> SocketAddr;
fn snap_data_plane(&self) -> Option<net::SocketAddr>;
}
impl Drop for ScionStack {
fn drop(&mut self) {
tracing::warn!("ScionStack was dropped");
}
}