use std::future::IntoFuture;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[cfg(any(
all(feature = "transport-tcp", not(target_arch = "wasm32")),
all(feature = "transport-local", not(target_arch = "wasm32")),
all(feature = "transport-websocket", not(target_arch = "wasm32"))
))]
use vox_core::initiator;
use vox_core::{
ConnectionError, ConnectionHandle, FromVoxLane, IdentityResolver, LaneAcceptor, LaneRequest,
PendingLane,
};
use vox_types::{
DEFAULT_INITIAL_CHANNEL_CREDIT, Decline, IdentityResolutionContext, Link, MaybeSend, MaybeSync,
Metadata, PeerIdentity, VoxObserver, VoxObserverHandle, metadata_into_owned,
};
mod error;
pub use error::ServeError;
#[cfg(all(feature = "transport-tcp", not(target_arch = "wasm32")))]
mod tcp;
#[cfg(all(feature = "transport-local", not(target_arch = "wasm32")))]
mod local;
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
mod ws;
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
pub use ws::WsListener;
#[cfg(all(feature = "transport-websocket-tls", not(target_arch = "wasm32")))]
mod wss;
#[cfg(all(feature = "transport-websocket-tls", not(target_arch = "wasm32")))]
pub use wss::WssListener;
mod channel;
pub use channel::{ChannelListener, ChannelListenerSender};
pub trait VoxListener: MaybeSend + 'static {
type Link: Link + MaybeSend + 'static;
fn accept(
&mut self,
) -> impl std::future::Future<Output = std::io::Result<Self::Link>> + MaybeSend + '_;
}
#[cfg(not(target_arch = "wasm32"))]
type BoxHighLevelFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
#[cfg(target_arch = "wasm32")]
type BoxHighLevelFuture<T> = Pin<Box<dyn Future<Output = T>>>;
pub fn connect(addr: impl std::fmt::Display) -> ConnectBuilder {
ConnectBuilder::new(addr.to_string())
}
pub fn connect_lane<Client: FromVoxLane>(
addr: impl std::fmt::Display,
) -> ConnectLaneBuilder<Client> {
ConnectLaneBuilder {
inner: connect(addr),
_client: std::marker::PhantomData,
}
}
enum ConnectAddress {
#[cfg(all(feature = "transport-tcp", not(target_arch = "wasm32")))]
Tcp(String),
#[cfg(all(feature = "transport-local", not(target_arch = "wasm32")))]
Local(String),
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
Ws(String),
}
fn parse_connect_address(addr: String) -> Result<ConnectAddress, ConnectionError> {
let (scheme, host) = match addr.split_once("://") {
Some((scheme, host)) => (scheme.to_string(), host.to_string()),
None => ("tcp".to_string(), addr),
};
#[cfg(not(any(
all(feature = "transport-tcp", not(target_arch = "wasm32")),
all(feature = "transport-local", not(target_arch = "wasm32")),
all(feature = "transport-websocket", not(target_arch = "wasm32"))
)))]
let _ = &host;
match scheme.as_str() {
#[cfg(all(feature = "transport-tcp", not(target_arch = "wasm32")))]
"tcp" => Ok(ConnectAddress::Tcp(host)),
#[cfg(all(feature = "transport-local", not(target_arch = "wasm32")))]
"local" => Ok(ConnectAddress::Local(host)),
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
"ws" | "wss" => Ok(ConnectAddress::Ws(format!("{scheme}://{host}"))),
_ => Err(ConnectionError::Protocol(format!(
"unknown transport scheme: {scheme:?}"
))),
}
}
pub struct ConnectBuilder {
addr: String,
metadata: Metadata,
lane_acceptor: Option<Arc<dyn LaneAcceptor>>,
connect_timeout: Option<Duration>,
channel_capacity: u32,
observer: Option<VoxObserverHandle>,
identity_resolver: Option<Arc<dyn IdentityResolver>>,
wait_for_service: Option<Duration>,
}
impl ConnectBuilder {
fn new(addr: String) -> Self {
Self {
addr,
metadata: vox_types::Metadata::default(),
lane_acceptor: None,
connect_timeout: Some(Duration::from_secs(5)),
channel_capacity: DEFAULT_INITIAL_CHANNEL_CREDIT,
observer: None,
identity_resolver: None,
wait_for_service: None,
}
}
pub fn on_lane(mut self, acceptor: impl LaneAcceptor) -> Self {
self.lane_acceptor = Some(Arc::new(acceptor));
self
}
pub fn metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
self.channel_capacity = channel_capacity;
self
}
pub fn observer(mut self, observer: impl VoxObserver) -> Self {
self.observer = Some(Arc::new(observer));
self
}
pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
self.observer = Some(observer);
self
}
pub fn identity_resolver(mut self, resolver: impl IdentityResolver) -> Self {
self.identity_resolver = Some(Arc::new(resolver));
self
}
pub fn wait_for_service(mut self, timeout: Duration) -> Self {
self.wait_for_service = Some(timeout);
self
}
}
const INITIAL_CONNECT_BACKOFF_MIN: Duration = Duration::from_millis(100);
const INITIAL_CONNECT_BACKOFF_MAX: Duration = Duration::from_secs(5);
const CHANNEL_CAPACITY_ZERO_ERROR: &str = "channel_capacity must be greater than zero";
fn validate_channel_capacity(channel_capacity: u32) -> Result<(), ConnectionError> {
if channel_capacity == 0 {
return Err(ConnectionError::Protocol(
CHANNEL_CAPACITY_ZERO_ERROR.into(),
));
}
Ok(())
}
impl ConnectBuilder {
pub async fn establish(self) -> Result<ConnectionHandle, ConnectionError> {
let ConnectBuilder {
addr,
metadata,
lane_acceptor,
connect_timeout,
channel_capacity,
observer,
identity_resolver,
wait_for_service,
} = self;
validate_channel_capacity(channel_capacity)?;
tracing::debug!(
%addr,
channel_capacity,
wait_for_service = wait_for_service.is_some(),
"vox high-level connect starting"
);
let parsed = parse_connect_address(addr)?;
let metadata = metadata_into_owned(metadata);
match wait_for_service {
Some(service_timeout) => {
let deadline = Instant::now() + service_timeout;
let mut backoff = INITIAL_CONNECT_BACKOFF_MIN;
loop {
let now = Instant::now();
if now >= deadline {
return Err(ConnectionError::ConnectTimeout);
}
let remaining = deadline - now;
let attempt = Self::establish_once(
&parsed,
metadata.clone(),
lane_acceptor.clone(),
connect_timeout,
channel_capacity,
observer.clone(),
identity_resolver.clone(),
);
let result = match vox_rt::time::timeout(remaining, attempt).await {
Ok(r) => r,
Err(_) => Err(ConnectionError::ConnectTimeout),
};
match result {
Ok(connection) => {
tracing::debug!("vox high-level connect established");
return Ok(connection);
}
Err(e)
if !matches!(
e,
ConnectionError::Io(_) | ConnectionError::ConnectTimeout
) =>
{
return Err(e);
}
Err(e) => {
let now = Instant::now();
if now >= deadline {
return Err(e);
}
let remaining = deadline - now;
let sleep = backoff.min(remaining);
tracing::debug!(
error = ?e,
?sleep,
"vox high-level connect attempt failed; backing off"
);
vox_rt::time::sleep(sleep).await;
backoff = backoff.saturating_mul(2).min(INITIAL_CONNECT_BACKOFF_MAX);
}
}
}
}
None => {
let result = Self::establish_once(
&parsed,
metadata,
lane_acceptor,
connect_timeout,
channel_capacity,
observer,
identity_resolver,
)
.await;
match &result {
Ok(_) => tracing::debug!("vox high-level connect established"),
Err(error) => tracing::debug!(?error, "vox high-level connect failed"),
}
result
}
}
}
async fn establish_once(
parsed: &ConnectAddress,
metadata: vox_types::Metadata,
lane_acceptor: Option<Arc<dyn LaneAcceptor>>,
connect_timeout: Option<Duration>,
channel_capacity: u32,
observer: Option<VoxObserverHandle>,
identity_resolver: Option<Arc<dyn IdentityResolver>>,
) -> Result<ConnectionHandle, ConnectionError> {
#[cfg(not(any(
all(feature = "transport-tcp", not(target_arch = "wasm32")),
all(feature = "transport-local", not(target_arch = "wasm32")),
all(feature = "transport-websocket", not(target_arch = "wasm32"))
)))]
let _ = (
&metadata,
&lane_acceptor,
&connect_timeout,
channel_capacity,
&observer,
&identity_resolver,
);
match parsed {
#[cfg(all(feature = "transport-tcp", not(target_arch = "wasm32")))]
ConnectAddress::Tcp(host) => {
tracing::trace!(
transport = "tcp",
%host,
"vox high-level connect attempt"
);
let mut builder = initiator(vox_stream::tcp_link_source(host.clone()));
if let Some(acceptor) = lane_acceptor.clone() {
builder = builder.on_lane(AcceptorRef(acceptor));
}
if let Some(timeout) = connect_timeout {
builder = builder.connect_timeout(timeout);
}
builder = builder.channel_capacity(channel_capacity);
if let Some(observer) = observer.clone() {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver.clone() {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
builder.metadata(metadata).establish_connection().await
}
#[cfg(all(feature = "transport-local", not(target_arch = "wasm32")))]
ConnectAddress::Local(host) => {
tracing::trace!(
transport = "local",
%host,
"vox high-level connect attempt"
);
let mut builder = initiator(vox_stream::local_link_source(host.clone()));
if let Some(acceptor) = lane_acceptor.clone() {
builder = builder.on_lane(AcceptorRef(acceptor));
}
if let Some(timeout) = connect_timeout {
builder = builder.connect_timeout(timeout);
}
builder = builder.channel_capacity(channel_capacity);
if let Some(observer) = observer.clone() {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver.clone() {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
builder.metadata(metadata).establish_connection().await
}
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
ConnectAddress::Ws(url) => {
tracing::trace!(
transport = "ws",
%url,
"vox high-level connect attempt"
);
let mut builder = initiator(vox_websocket::ws_link_source(url.clone()));
if let Some(acceptor) = lane_acceptor {
builder = builder.on_lane(AcceptorRef(acceptor));
}
if let Some(timeout) = connect_timeout {
builder = builder.connect_timeout(timeout);
}
builder = builder.channel_capacity(channel_capacity);
if let Some(observer) = observer {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
builder.metadata(metadata).establish_connection().await
}
#[allow(unreachable_patterns)]
_ => Err(ConnectionError::Protocol(
"transport not enabled in this vox build".to_string(),
)),
}
}
}
impl IntoFuture for ConnectBuilder {
type Output = Result<ConnectionHandle, ConnectionError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'static>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.establish())
}
}
pub struct ConnectLaneBuilder<Client> {
inner: ConnectBuilder,
_client: std::marker::PhantomData<Client>,
}
impl<Client> ConnectLaneBuilder<Client> {
pub fn on_lane(mut self, acceptor: impl LaneAcceptor) -> Self {
self.inner = self.inner.on_lane(acceptor);
self
}
pub fn metadata(mut self, metadata: Metadata) -> Self {
self.inner = self.inner.metadata(metadata);
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.inner = self.inner.connect_timeout(timeout);
self
}
pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
self.inner = self.inner.channel_capacity(channel_capacity);
self
}
pub fn observer(mut self, observer: impl VoxObserver) -> Self {
self.inner = self.inner.observer(observer);
self
}
pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
self.inner = self.inner.observer_handle(observer);
self
}
pub fn identity_resolver(mut self, resolver: impl IdentityResolver) -> Self {
self.inner = self.inner.identity_resolver(resolver);
self
}
pub fn wait_for_service(mut self, timeout: Duration) -> Self {
self.inner = self.inner.wait_for_service(timeout);
self
}
}
impl<Client> ConnectLaneBuilder<Client>
where
Client: FromVoxLane,
{
pub async fn establish(self) -> Result<Client, ConnectionError> {
self.inner.establish().await?.open_lane::<Client>().await
}
}
impl<Client> IntoFuture for ConnectLaneBuilder<Client>
where
Client: FromVoxLane + 'static,
{
type Output = Result<Client, ConnectionError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + 'static>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.establish())
}
}
pub fn serve<A: LaneAcceptor>(addr: impl std::fmt::Display, acceptor: A) -> ServeBuilder<A> {
ServeBuilder::new(addr.to_string(), acceptor)
}
pub struct ServeBuilder<A> {
addr: String,
acceptor: A,
metadata: Metadata,
channel_capacity: u32,
observer: Option<VoxObserverHandle>,
identity_resolver: Option<Arc<dyn IdentityResolver>>,
}
impl<A> ServeBuilder<A> {
fn new(addr: String, acceptor: A) -> Self {
Self {
addr,
acceptor,
metadata: vox_types::Metadata::default(),
channel_capacity: DEFAULT_INITIAL_CHANNEL_CREDIT,
observer: None,
identity_resolver: None,
}
}
pub fn metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;
self
}
pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
self.channel_capacity = channel_capacity;
self
}
pub fn observer(mut self, observer: impl VoxObserver) -> Self {
self.observer = Some(Arc::new(observer));
self
}
pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
self.observer = Some(observer);
self
}
pub fn identity_resolver(mut self, resolver: impl IdentityResolver) -> Self {
self.identity_resolver = Some(Arc::new(resolver));
self
}
}
impl<A> ServeBuilder<A>
where
A: LaneAcceptor,
{
pub async fn run(self) -> Result<(), ServeError> {
let Self {
addr,
acceptor,
metadata,
channel_capacity,
observer,
identity_resolver,
} = self;
validate_channel_capacity(channel_capacity)?;
let metadata = metadata_into_owned(metadata);
let (scheme, host) = match addr.split_once("://") {
Some((scheme, host)) => (scheme.to_string(), host.to_string()),
None => ("tcp".to_string(), addr),
};
#[cfg(not(any(
all(feature = "transport-tcp", not(target_arch = "wasm32")),
all(feature = "transport-local", not(target_arch = "wasm32")),
all(feature = "transport-websocket", not(target_arch = "wasm32")),
feature = "transport-websocket-tls"
)))]
let _ = (&host, &acceptor, &metadata, &observer, &identity_resolver);
match scheme.as_str() {
#[cfg(all(feature = "transport-tcp", not(target_arch = "wasm32")))]
"tcp" => {
let listener = tokio::net::TcpListener::bind(&host).await?;
let mut builder =
serve_listener(listener, acceptor).channel_capacity(channel_capacity);
builder = builder.metadata(metadata);
if let Some(observer) = observer {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
Ok(builder.await?)
}
#[cfg(all(feature = "transport-local", not(target_arch = "wasm32")))]
"local" => {
local::serve_local(
&host,
acceptor,
metadata,
channel_capacity,
observer,
identity_resolver,
)
.await
}
#[cfg(all(feature = "transport-websocket", not(target_arch = "wasm32")))]
"ws" => {
let listener = WsListener::bind(&host).await?;
let mut builder =
serve_listener(listener, acceptor).channel_capacity(channel_capacity);
builder = builder.metadata(metadata);
if let Some(observer) = observer {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
Ok(builder.await?)
}
#[cfg(all(feature = "transport-websocket-tls", not(target_arch = "wasm32")))]
"wss" => {
wss::serve_wss(
&host,
acceptor,
metadata,
channel_capacity,
observer,
identity_resolver,
)
.await
}
_ => Err(ServeError::UnsupportedScheme { scheme }),
}
}
}
impl<A> IntoFuture for ServeBuilder<A>
where
A: LaneAcceptor,
{
type Output = Result<(), ServeError>;
type IntoFuture = BoxHighLevelFuture<Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.run())
}
}
pub fn serve_listener<L, A>(listener: L, acceptor: A) -> ServeListenerBuilder<L, A>
where
L: VoxListener,
<L::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
<L::Link as Link>::Rx: MaybeSend + 'static,
A: LaneAcceptor,
{
ServeListenerBuilder::new(listener, acceptor)
}
pub struct ServeListenerBuilder<L, A> {
listener: L,
acceptor: A,
metadata: Metadata,
channel_capacity: u32,
observer: Option<VoxObserverHandle>,
identity_resolver: Option<Arc<dyn IdentityResolver>>,
}
impl<L, A> ServeListenerBuilder<L, A> {
fn new(listener: L, acceptor: A) -> Self {
Self {
listener,
acceptor,
metadata: vox_types::Metadata::default(),
channel_capacity: DEFAULT_INITIAL_CHANNEL_CREDIT,
observer: None,
identity_resolver: None,
}
}
pub fn metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;
self
}
pub fn channel_capacity(mut self, channel_capacity: u32) -> Self {
self.channel_capacity = channel_capacity;
self
}
pub fn observer(mut self, observer: impl VoxObserver) -> Self {
self.observer = Some(Arc::new(observer));
self
}
pub fn observer_handle(mut self, observer: VoxObserverHandle) -> Self {
self.observer = Some(observer);
self
}
pub fn identity_resolver(mut self, resolver: impl IdentityResolver) -> Self {
self.identity_resolver = Some(Arc::new(resolver));
self
}
}
impl<L, A> ServeListenerBuilder<L, A>
where
L: VoxListener,
<L::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
<L::Link as Link>::Rx: MaybeSend + 'static,
A: LaneAcceptor,
{
pub async fn run(mut self) -> Result<(), ConnectionError> {
validate_channel_capacity(self.channel_capacity)?;
let acceptor: Arc<dyn LaneAcceptor> = Arc::new(self.acceptor);
loop {
tracing::trace!("vox high-level listener waiting for connection");
let link = self.listener.accept().await.map_err(ConnectionError::Io)?;
tracing::debug!("vox high-level listener accepted raw connection");
let acceptor = acceptor.clone();
let metadata = self.metadata.clone();
let observer = self.observer.clone();
let channel_capacity = self.channel_capacity;
let identity_resolver = self.identity_resolver.clone();
vox_rt::spawn(async move {
tracing::trace!("vox high-level listener establishing connection");
let mut builder = vox_core::acceptor_on(link)
.on_lane(AcceptorRef(acceptor))
.metadata(metadata)
.channel_capacity(channel_capacity);
if let Some(observer) = observer {
builder = builder.observer_handle(observer);
}
if let Some(resolver) = identity_resolver {
builder = builder.identity_resolver(IdentityResolverRef(resolver));
}
let result = builder.establish_connection().await;
match result {
Ok(connection) => {
tracing::debug!("vox high-level listener established connection");
connection.closed().await;
tracing::debug!("vox high-level listener connection closed");
}
Err(error) => {
tracing::debug!(?error, "vox high-level listener connection failed");
}
}
});
}
}
}
impl<L, A> IntoFuture for ServeListenerBuilder<L, A>
where
L: VoxListener,
<L::Link as Link>::Tx: MaybeSend + MaybeSync + 'static,
<L::Link as Link>::Rx: MaybeSend + 'static,
A: LaneAcceptor,
{
type Output = Result<(), ConnectionError>;
type IntoFuture = BoxHighLevelFuture<Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.run())
}
}
struct AcceptorRef(Arc<dyn LaneAcceptor>);
impl LaneAcceptor for AcceptorRef {
fn accept(
&self,
request: &LaneRequest,
connection: PendingLane,
) -> Result<(), vox_core::LaneRejection> {
self.0.accept(request, connection)
}
}
struct IdentityResolverRef(Arc<dyn IdentityResolver>);
impl IdentityResolver for IdentityResolverRef {
fn resolve(&self, context: IdentityResolutionContext<'_>) -> Result<PeerIdentity, Decline> {
self.0.resolve(context)
}
}