use std::{
collections::{BTreeMap, HashMap},
future::Future,
pin::Pin,
sync::{Arc, Mutex},
time::Duration,
};
use facet_core::Shape;
use futures_util::FutureExt as _;
use tracing::{trace, warn};
use vox_rt::sync::{Notify, mpsc, oneshot, watch};
use vox_types::time::Instant;
use vox_types::{
BoxFut, ChannelMessage, ConduitRx, ConduitTx, ConnectionRole, ConnectionSettings, Decline,
EstablishmentContext, EstablishmentDetails, EstablishmentEvent, EstablishmentOutcome,
EstablishmentPhase, Handler, HandshakeResult, IdAllocator, IdentityResolutionContext,
LaneAccept, LaneClose, LaneGrant, LaneId, LaneOpen, LaneReject, MaybeSend, MaybeSync, Message,
MessageFamily, MessagePayload, Metadata, Parity, PeerEvidence, PeerIdentity, RequestBody,
RequestId, RequestMessage, RequestResponse, SchemaMessage, SelfRef, TrySendError,
VoxDebugSnapshot, VoxObserverHandle,
};
use vox_types::{
ConnectionCloseReason, DecodeErrorKind, DriverTaskStatus, LaneDebugSnapshot, LaneDebugState,
};
mod builders;
pub use builders::*;
#[derive(Debug, Clone, Copy)]
pub struct ConnectionKeepaliveConfig {
pub ping_interval: Duration,
pub pong_timeout: Duration,
}
pub const VOX_LANE_REJECT_REASON_METADATA_KEY: &str = "vox-lane-reject-reason";
pub const VOX_LANE_REJECT_MESSAGE_METADATA_KEY: &str = "vox-lane-reject-message";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LaneRejectReason {
UnknownService,
Forbidden,
NotReady,
Draining,
SchemaIncompatible,
PolicyRejected,
}
impl LaneRejectReason {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::UnknownService => "unknown-service",
Self::Forbidden => "forbidden",
Self::NotReady => "not-ready",
Self::Draining => "draining",
Self::SchemaIncompatible => "schema-incompatible",
Self::PolicyRejected => "policy-rejected",
}
}
#[must_use]
pub fn from_metadata_value(value: &str) -> Option<Self> {
match value {
"unknown-service" => Some(Self::UnknownService),
"forbidden" => Some(Self::Forbidden),
"not-ready" => Some(Self::NotReady),
"draining" => Some(Self::Draining),
"schema-incompatible" => Some(Self::SchemaIncompatible),
"policy-rejected" => Some(Self::PolicyRejected),
_ => None,
}
}
}
impl std::fmt::Display for LaneRejectReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct LaneRejection {
reason: LaneRejectReason,
metadata: Metadata,
}
impl LaneRejection {
#[must_use]
pub fn new(reason: LaneRejectReason) -> Self {
Self::with_metadata(reason, Metadata::default())
}
#[must_use]
pub fn with_message(reason: LaneRejectReason, message: impl Into<String>) -> Self {
let metadata = vox_types::metadata()
.str(VOX_LANE_REJECT_MESSAGE_METADATA_KEY, message.into())
.build();
Self::with_metadata(reason, metadata)
}
#[must_use]
pub fn with_metadata(reason: LaneRejectReason, mut metadata: Metadata) -> Self {
vox_types::meta_set(
&mut metadata,
VOX_LANE_REJECT_REASON_METADATA_KEY,
reason.as_str(),
);
Self { reason, metadata }
}
#[must_use]
pub fn from_metadata(metadata: Metadata) -> Self {
let reason = vox_types::metadata_get_str(&metadata, VOX_LANE_REJECT_REASON_METADATA_KEY)
.and_then(LaneRejectReason::from_metadata_value)
.unwrap_or(LaneRejectReason::PolicyRejected);
Self::with_metadata(reason, metadata)
}
#[must_use]
pub fn reason(&self) -> LaneRejectReason {
self.reason
}
#[must_use]
pub fn message(&self) -> Option<&str> {
vox_types::metadata_get_str(&self.metadata, VOX_LANE_REJECT_MESSAGE_METADATA_KEY)
.or_else(|| vox_types::metadata_get_str(&self.metadata, "error"))
}
#[must_use]
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
#[must_use]
pub fn into_metadata(self) -> Metadata {
self.metadata
}
}
impl Default for LaneRejection {
fn default() -> Self {
Self::new(LaneRejectReason::PolicyRejected)
}
}
fn lane_rejection_details(rejection: &LaneRejection) -> EstablishmentDetails {
EstablishmentDetails::rejection_reason(rejection.reason().as_str())
}
pub(crate) fn observe_establishment_started(
observer: Option<&VoxObserverHandle>,
role: ConnectionRole,
phase: EstablishmentPhase,
lane_id: Option<LaneId>,
) -> Instant {
let started_at = Instant::now();
if let Some(observer) = observer {
observer.establishment_event(EstablishmentEvent::Started {
context: EstablishmentContext {
role,
phase,
lane_id,
},
});
}
started_at
}
pub(crate) fn observe_establishment_finished(
observer: Option<&VoxObserverHandle>,
role: ConnectionRole,
phase: EstablishmentPhase,
lane_id: Option<LaneId>,
outcome: EstablishmentOutcome,
started_at: Instant,
) {
observe_establishment_finished_with_details(
observer,
role,
phase,
lane_id,
outcome,
started_at,
EstablishmentDetails::EMPTY,
);
}
pub(crate) fn observe_establishment_finished_with_details(
observer: Option<&VoxObserverHandle>,
role: ConnectionRole,
phase: EstablishmentPhase,
lane_id: Option<LaneId>,
outcome: EstablishmentOutcome,
started_at: Instant,
details: EstablishmentDetails,
) {
if let Some(observer) = observer {
observer.establishment_event(EstablishmentEvent::Finished {
context: EstablishmentContext {
role,
phase,
lane_id,
},
outcome,
elapsed: started_at.elapsed(),
details,
});
}
}
pub trait IdentityResolver: MaybeSend + MaybeSync + 'static {
fn resolve(&self, context: IdentityResolutionContext<'_>) -> Result<PeerIdentity, Decline>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct AnonymousIdentityResolver;
impl IdentityResolver for AnonymousIdentityResolver {
fn resolve(&self, _context: IdentityResolutionContext<'_>) -> Result<PeerIdentity, Decline> {
Ok(PeerIdentity::anonymous())
}
}
pub struct IdentityResolverFn<F>(pub F);
impl<F> IdentityResolver for IdentityResolverFn<F>
where
F: for<'a> Fn(IdentityResolutionContext<'a>) -> Result<PeerIdentity, Decline>
+ MaybeSend
+ MaybeSync
+ 'static,
{
fn resolve(&self, context: IdentityResolutionContext<'_>) -> Result<PeerIdentity, Decline> {
(self.0)(context)
}
}
pub fn identity_resolver_fn<F>(f: F) -> IdentityResolverFn<F>
where
F: for<'a> Fn(IdentityResolutionContext<'a>) -> Result<PeerIdentity, Decline>
+ MaybeSend
+ MaybeSync
+ 'static,
{
IdentityResolverFn(f)
}
pub struct LaneRequest<'a> {
metadata: &'a vox_types::Metadata,
service: &'a str,
peer_identity: &'a PeerIdentity,
peer_evidence: &'a PeerEvidence,
}
impl<'a> LaneRequest<'a> {
pub fn new(
metadata: &'a vox_types::Metadata,
peer_identity: &'a PeerIdentity,
peer_evidence: &'a PeerEvidence,
) -> Result<Self, ConnectionError> {
let service = vox_types::metadata_get_str(metadata, "vox-service").ok_or_else(|| {
ConnectionError::Protocol("missing required vox-service metadata".into())
})?;
Ok(Self {
metadata,
service,
peer_identity,
peer_evidence,
})
}
pub fn service(&self) -> &str {
self.service
}
pub fn transport(&self) -> Option<&str> {
vox_types::metadata_get_str(self.metadata, "vox-transport")
}
pub fn peer_addr(&self) -> Option<&str> {
vox_types::metadata_get_str(self.metadata, "vox-peer-addr")
}
pub fn get_str(&self, key: &str) -> Option<&str> {
vox_types::metadata_get_str(self.metadata, key)
}
pub fn get_u64(&self, key: &str) -> Option<u64> {
vox_types::metadata_get_u64(self.metadata, key)
}
pub fn metadata(&self) -> &'a vox_types::Metadata {
self.metadata
}
pub fn peer_identity(&self) -> &'a PeerIdentity {
self.peer_identity
}
pub fn peer_evidence(&self) -> &'a PeerEvidence {
self.peer_evidence
}
}
pub struct PendingLane {
handle: Option<LaneHandle>,
lane_grant: LaneGrant,
}
impl PendingLane {
fn new(handle: LaneHandle) -> Self {
Self {
handle: Some(handle),
lane_grant: LaneGrant::empty(),
}
}
#[must_use]
pub fn with_grant(mut self, grant: LaneGrant) -> Self {
self.lane_grant = grant;
self
}
fn take_handle(&mut self) -> LaneHandle {
let mut handle = self.handle.take().expect("PendingLane already consumed");
handle.set_lane_grant(self.lane_grant.clone());
handle
}
pub fn handle_with(mut self, handler: impl Handler<crate::DriverReplySink> + 'static) {
let handle = self.take_handle();
let conn_id = handle.lane_id();
trace!(%conn_id, "PendingLane::handle_with: creating driver");
let mut driver = crate::Driver::new(handle, handler);
#[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move {
trace!(%conn_id, "PendingLane driver starting");
driver.run().await;
trace!(%conn_id, "PendingLane driver exited");
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move { driver.run().await });
}
pub fn handle_with_client<C: crate::FromVoxLane>(
mut self,
handler: impl Handler<crate::DriverReplySink> + 'static,
) -> C {
let handle = self.take_handle();
let conn_id = handle.lane_id();
trace!(%conn_id, "PendingLane::handle_with_client: creating driver");
let mut driver = crate::Driver::new(handle, handler);
let caller = crate::Caller::new(driver.caller());
#[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move {
trace!(%conn_id, "PendingLane driver starting");
driver.run().await;
trace!(%conn_id, "PendingLane driver exited");
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move { driver.run().await });
C::from_vox_lane(caller, None)
}
pub fn proxy_to(mut self, other: LaneHandle) {
let handle = self.take_handle();
#[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move {
let _ = proxy_lanes(handle, other).await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
let _ = proxy_lanes(handle, other).await;
});
}
pub fn into_handle(mut self) -> LaneHandle {
self.take_handle()
}
}
impl Drop for PendingLane {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
let conn_id = handle.lane_id();
warn!(%conn_id, "PendingLane dropped without being consumed — closing service lane");
if let Some(tx) = handle.control_tx.as_ref() {
let _ = send_drop_control(tx, DropControlRequest::Close(conn_id));
}
}
}
}
pub trait LaneAcceptor: MaybeSend + MaybeSync + 'static {
fn accept(&self, request: &LaneRequest, connection: PendingLane) -> Result<(), LaneRejection>;
}
impl<H> LaneAcceptor for H
where
H: Handler<crate::DriverReplySink> + Clone + MaybeSend + MaybeSync + 'static,
{
fn accept(&self, _request: &LaneRequest, connection: PendingLane) -> Result<(), LaneRejection> {
connection.handle_with(self.clone());
Ok(())
}
}
pub struct LaneAcceptorFn<F>(pub F);
impl<F> LaneAcceptor for LaneAcceptorFn<F>
where
F: Fn(&LaneRequest, PendingLane) -> Result<(), LaneRejection> + MaybeSend + MaybeSync + 'static,
{
fn accept(&self, request: &LaneRequest, connection: PendingLane) -> Result<(), LaneRejection> {
(self.0)(request, connection)
}
}
pub fn lane_acceptor_fn<F>(f: F) -> LaneAcceptorFn<F>
where
F: Fn(&LaneRequest, PendingLane) -> Result<(), LaneRejection> + MaybeSend + MaybeSync + 'static,
{
LaneAcceptorFn(f)
}
struct OpenRequest {
settings: ConnectionSettings,
metadata: Metadata,
result_tx: vox_rt::sync::oneshot::Sender<Result<LaneHandle, ConnectionError>>,
}
struct CloseRequest {
conn_id: LaneId,
metadata: Metadata,
result_tx: vox_rt::sync::oneshot::Sender<Result<(), ConnectionError>>,
}
#[derive(Debug, Clone)]
pub(crate) enum DropControlRequest {
Shutdown,
Close(LaneId),
ProtocolClose {
conn_id: LaneId,
description: String,
},
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum FailureDisposition {
Cancelled,
Indeterminate,
}
#[cfg(not(target_arch = "wasm32"))]
fn send_drop_control(
tx: &mpsc::UnboundedSender<DropControlRequest>,
req: DropControlRequest,
) -> Result<(), ()> {
tx.send(req).map_err(|_| ())
}
#[cfg(target_arch = "wasm32")]
fn send_drop_control(
tx: &mpsc::UnboundedSender<DropControlRequest>,
req: DropControlRequest,
) -> Result<(), ()> {
tx.try_send(req).map_err(|_| ())
}
#[derive(Clone)]
pub struct ConnectionHandle {
open_tx: mpsc::Sender<OpenRequest>,
close_tx: mpsc::Sender<CloseRequest>,
control_tx: mpsc::UnboundedSender<DropControlRequest>,
peer_identity: PeerIdentity,
peer_evidence: PeerEvidence,
_control_caller: Option<crate::Caller>,
}
impl ConnectionHandle {
pub fn peer_identity(&self) -> &PeerIdentity {
&self.peer_identity
}
pub fn peer_evidence(&self) -> &PeerEvidence {
&self.peer_evidence
}
pub async fn closed(&self) {
if let Some(caller) = &self._control_caller {
caller.closed().await;
}
}
pub async fn open_lane<Client: crate::FromVoxLane>(&self) -> Result<Client, ConnectionError> {
self.open_lane_with_settings(ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: vox_types::DEFAULT_INITIAL_CHANNEL_CREDIT,
})
.await
}
pub async fn open_lane_with_settings<Client: crate::FromVoxLane>(
&self,
settings: ConnectionSettings,
) -> Result<Client, ConnectionError> {
use crate::{Caller, Driver};
let metadata = vox_types::metadata()
.str(
crate::connection::builders::VOX_SERVICE_METADATA_KEY,
Client::SERVICE_NAME,
)
.build();
let handle = self.open_lane_handle(settings, metadata).await?;
let mut driver = Driver::new(handle, ());
let caller = Caller::new(driver.caller());
#[cfg(not(target_arch = "wasm32"))]
tokio::spawn(async move { driver.run().await });
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move { driver.run().await });
Ok(Client::from_vox_lane(caller, Some(self.clone())))
}
pub async fn open_lane_handle(
&self,
settings: ConnectionSettings,
metadata: Metadata,
) -> Result<LaneHandle, ConnectionError> {
let (result_tx, result_rx) = vox_rt::sync::oneshot::channel("connection.open_result");
self.open_tx
.send(OpenRequest {
settings,
metadata,
result_tx,
})
.await
.map_err(|_| ConnectionError::Protocol("connection closed".into()))?;
result_rx
.await
.map_err(|_| ConnectionError::Protocol("connection closed".into()))?
}
pub async fn close_lane(
&self,
lane_id: LaneId,
metadata: Metadata,
) -> Result<(), ConnectionError> {
let (result_tx, result_rx) = vox_rt::sync::oneshot::channel("connection.close_result");
self.close_tx
.send(CloseRequest {
conn_id: lane_id,
metadata,
result_tx,
})
.await
.map_err(|_| ConnectionError::Protocol("connection closed".into()))?;
result_rx
.await
.map_err(|_| ConnectionError::Protocol("connection closed".into()))?
}
pub fn shutdown(&self) -> Result<(), ConnectionError> {
send_drop_control(&self.control_tx, DropControlRequest::Shutdown)
.map_err(|_| ConnectionError::Protocol("connection closed".into()))
}
}
pub struct Connection {
rx: Box<dyn DynConduitRx>,
role: ConnectionRole,
parity: Parity,
connection_core: Arc<ConnectionCore>,
local_connection_settings: ConnectionSettings,
peer_connection_settings: Option<ConnectionSettings>,
peer_identity: PeerIdentity,
peer_evidence: PeerEvidence,
conns: BTreeMap<LaneId, ConnectionSlot>,
conn_ids: IdAllocator<LaneId>,
lane_acceptor: Option<Arc<dyn LaneAcceptor>>,
open_rx: mpsc::Receiver<OpenRequest>,
close_rx: mpsc::Receiver<CloseRequest>,
control_tx: mpsc::UnboundedSender<DropControlRequest>,
control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
keepalive: Option<ConnectionKeepaliveConfig>,
observer: Option<VoxObserverHandle>,
}
#[derive(Debug)]
struct KeepaliveRuntime {
ping_interval: Duration,
pong_timeout: Duration,
next_ping_at: vox_types::time::tokio::Instant,
waiting_pong_nonce: Option<u64>,
pong_deadline: vox_types::time::tokio::Instant,
next_ping_nonce: u64,
}
#[derive(Debug)]
pub struct LaneState {
pub id: LaneId,
pub local_settings: ConnectionSettings,
pub peer_settings: ConnectionSettings,
conn_tx: mpsc::Sender<RecvMessage>,
closed_tx: watch::Sender<Option<ConnectionCloseReason>>,
schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
lane_grant: Arc<Mutex<LaneGrant>>,
}
#[derive(Debug)]
enum ConnectionSlot {
Active(LaneState),
PendingOutbound(PendingOutboundData),
}
struct PendingOutboundData {
local_settings: ConnectionSettings,
establishment_started_at: Instant,
result_tx: Option<vox_rt::sync::oneshot::Sender<Result<LaneHandle, ConnectionError>>>,
}
impl std::fmt::Debug for PendingOutboundData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingOutbound")
.field("local_settings", &self.local_settings)
.finish()
}
}
#[derive(Clone)]
pub(crate) struct ConnectionSender {
lane_id: LaneId,
pub(crate) connection_core: Arc<ConnectionCore>,
failures: Arc<mpsc::UnboundedSender<(RequestId, FailureDisposition)>>,
}
fn forwarded_payload<'a>(payload: &'a vox_types::Payload<'a>) -> vox_types::Payload<'a> {
let vox_types::Payload::Encoded(bytes) = payload else {
unreachable!("proxy forwarding expects decoded incoming payload bytes")
};
vox_types::Payload::Encoded(bytes)
}
fn forwarded_request_body<'a>(body: &'a RequestBody<'a>) -> RequestBody<'a> {
match body {
RequestBody::Call(call) => RequestBody::Call(vox_types::RequestCall {
method_id: call.method_id,
channels: call.channels.clone(),
metadata: call.metadata.clone(),
args: forwarded_payload(&call.args),
schemas: call.schemas.clone(),
}),
RequestBody::Response(response) => RequestBody::Response(RequestResponse {
metadata: response.metadata.clone(),
ret: forwarded_payload(&response.ret),
schemas: response.schemas.clone(),
}),
RequestBody::Cancel(cancel) => RequestBody::Cancel(vox_types::RequestCancel {
metadata: cancel.metadata.clone(),
}),
}
}
fn swap_call_args_to_bytes<'s>(mut msg: Message<'s>, bytes: &'s [u8]) -> Message<'s> {
if let MessagePayload::RequestMessage(req) = &mut msg.payload
&& let RequestBody::Call(call) = &mut req.body
{
call.args = vox_types::Payload::Encoded(bytes);
}
msg
}
fn forwarded_channel_body<'a>(body: &'a vox_types::ChannelBody<'a>) -> vox_types::ChannelBody<'a> {
match body {
vox_types::ChannelBody::Item(item) => {
vox_types::ChannelBody::Item(vox_types::ChannelItem {
item: forwarded_payload(&item.item),
})
}
vox_types::ChannelBody::Close(close) => {
vox_types::ChannelBody::Close(vox_types::ChannelClose {
metadata: close.metadata.clone(),
})
}
vox_types::ChannelBody::Reset(reset) => {
vox_types::ChannelBody::Reset(vox_types::ChannelReset {
metadata: reset.metadata.clone(),
})
}
vox_types::ChannelBody::GrantCredit(credit) => {
vox_types::ChannelBody::GrantCredit(vox_types::ChannelGrantCredit {
additional: credit.additional,
})
}
}
}
impl ConnectionSender {
pub(crate) fn lane_id(&self) -> LaneId {
self.lane_id
}
pub(crate) async fn send_with_binder<'a>(
&self,
msg: ConnectionMessage<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
) -> Result<(), ()> {
self.send_with_binder_and_method(msg, binder, None).await
}
pub(crate) async fn send_with_binder_and_method<'a>(
&self,
msg: ConnectionMessage<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
channel_method: Option<&'static vox_types::MethodDescriptor>,
) -> Result<(), ()> {
self.send_with_binder_and_method_observing_channels(msg, binder, channel_method, |_| {})
.await
}
pub(crate) async fn send_with_binder_and_method_observing_channels<'a>(
&self,
msg: ConnectionMessage<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
channel_method: Option<&'static vox_types::MethodDescriptor>,
declared_channels: impl FnOnce(&[vox_types::ChannelId]),
) -> Result<(), ()> {
let payload = match msg {
ConnectionMessage::Request(r) => MessagePayload::RequestMessage(r),
ConnectionMessage::Channel(c) => MessagePayload::ChannelMessage(c),
ConnectionMessage::Schema(s) => MessagePayload::SchemaMessage(s),
};
let message = Message {
lane_id: self.lane_id,
payload,
};
self.connection_core
.send_with_options(
message,
binder,
None,
channel_method,
Vec::new(),
declared_channels,
)
.await
.map_err(|_| ())
}
pub(crate) async fn send_channel_with_writer_schema<'a>(
&self,
channel: ChannelMessage<'a>,
writer_schema: Option<vox_types::ChannelWriterSchemaPlan>,
) -> Result<(), ()> {
let extra_schema_sends = writer_schema
.map(PendingSchemaSend::from)
.into_iter()
.collect();
self.connection_core
.send_with_options(
Message {
lane_id: self.lane_id,
payload: MessagePayload::ChannelMessage(channel),
},
None,
None,
None,
extra_schema_sends,
|_| {},
)
.await
.map_err(|_| ())
}
pub async fn send<'a>(&self, msg: ConnectionMessage<'a>) -> Result<(), ()> {
self.send_with_binder(msg, None).await
}
pub(crate) fn try_send_channel_with_writer_schema<'a>(
&self,
channel: ChannelMessage<'a>,
writer_schema: Option<vox_types::ChannelWriterSchemaPlan>,
) -> Result<(), TrySendError<()>> {
let extra_schema_sends = writer_schema
.map(PendingSchemaSend::from)
.into_iter()
.collect();
self.connection_core.try_send_with_options(
Message {
lane_id: self.lane_id,
payload: MessagePayload::ChannelMessage(channel),
},
None,
None,
None,
extra_schema_sends,
)
}
pub(crate) async fn send_owned(
&self,
schemas: Arc<vox_types::SchemaRecvTracker>,
msg: SelfRef<ConnectionMessage<'static>>,
) -> Result<(), ()> {
let msg_ref = msg.get();
let payload = match msg_ref {
ConnectionMessage::Request(request) => MessagePayload::RequestMessage(RequestMessage {
id: request.id,
body: forwarded_request_body(&request.body),
}),
ConnectionMessage::Channel(channel) => MessagePayload::ChannelMessage(ChannelMessage {
id: channel.id,
body: forwarded_channel_body(&channel.body),
}),
ConnectionMessage::Schema(schema) => MessagePayload::SchemaMessage(SchemaMessage {
method_id: schema.method_id,
direction: schema.direction,
schemas: schema.schemas.clone(),
}),
};
self.connection_core
.send(
Message {
lane_id: self.lane_id,
payload,
},
None,
Some(&*schemas),
)
.await
.map_err(|_| ())
}
pub async fn send_response<'a>(
&self,
request_id: RequestId,
response: RequestResponse<'a>,
) -> Result<(), ()> {
self.send(ConnectionMessage::Request(RequestMessage {
id: request_id,
body: RequestBody::Response(response),
}))
.await
}
pub async fn send_response_for_method<'a>(
&self,
request_id: RequestId,
method_id: vox_types::MethodId,
mut response: RequestResponse<'a>,
) -> Result<(), ()> {
self.prepare_response_for_method(request_id, method_id, &mut response);
self.send(ConnectionMessage::Request(RequestMessage {
id: request_id,
body: RequestBody::Response(response),
}))
.await
}
pub(crate) fn prepare_response_for_method(
&self,
request_id: RequestId,
method_id: vox_types::MethodId,
response: &mut RequestResponse<'_>,
) {
self.connection_core.prepare_response_for_method(
self.lane_id,
request_id,
method_id,
response,
);
}
pub(crate) fn prepare_response_for_shape(
&self,
request_id: RequestId,
method_id: vox_types::MethodId,
shape: &'static Shape,
response: &mut RequestResponse<'_>,
) {
self.connection_core.prepare_response_for_shape(
self.lane_id,
request_id,
method_id,
shape,
response,
);
}
pub fn mark_failure(&self, request_id: RequestId, disposition: FailureDisposition) {
let _ = self.failures.send((request_id, disposition));
}
}
pub struct LaneHandle {
pub(crate) sender: ConnectionSender,
pub(crate) rx: mpsc::Receiver<RecvMessage>,
pub(crate) failures_rx: mpsc::UnboundedReceiver<(RequestId, FailureDisposition)>,
pub(crate) control_tx: Option<mpsc::UnboundedSender<DropControlRequest>>,
pub(crate) closed_rx: watch::Receiver<Option<ConnectionCloseReason>>,
pub(crate) local_settings: ConnectionSettings,
pub(crate) peer_settings: ConnectionSettings,
pub parity: Parity,
pub(crate) observer: Option<VoxObserverHandle>,
pub(crate) peer_identity: PeerIdentity,
pub(crate) peer_evidence: PeerEvidence,
pub(crate) lane_grant: LaneGrant,
pub(crate) lane_grant_state: Arc<Mutex<LaneGrant>>,
}
impl std::fmt::Debug for LaneHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LaneHandle")
.field("lane_id", &self.sender.lane_id)
.finish()
}
}
pub(crate) enum ConnectionMessage<'payload> {
Request(RequestMessage<'payload>),
Channel(ChannelMessage<'payload>),
Schema(SchemaMessage),
}
vox_types::impl_reborrow!(ConnectionMessage);
pub(crate) struct RecvMessage {
pub schemas: Arc<vox_types::SchemaRecvTracker>,
pub msg: SelfRef<ConnectionMessage<'static>>,
pub fds: vox_types::FrameFds,
}
impl LaneHandle {
fn set_lane_grant(&mut self, grant: LaneGrant) {
*self
.lane_grant_state
.lock()
.expect("lane grant state mutex poisoned") = grant.clone();
self.lane_grant = grant;
}
pub fn lane_id(&self) -> LaneId {
self.sender.lane_id
}
pub fn peer_identity(&self) -> &PeerIdentity {
&self.peer_identity
}
pub fn peer_evidence(&self) -> &PeerEvidence {
&self.peer_evidence
}
pub fn lane_grant(&self) -> &LaneGrant {
&self.lane_grant
}
pub async fn closed(&self) {
if self.closed_rx.borrow().is_some() {
return;
}
let mut rx = self.closed_rx.clone();
while rx.changed().await.is_ok() {
if rx.borrow().is_some() {
return;
}
}
}
pub fn is_connected(&self) -> bool {
self.closed_rx.borrow().is_none()
}
pub fn close_reason(&self) -> Option<ConnectionCloseReason> {
*self.closed_rx.borrow()
}
pub fn debug_snapshot(&self) -> VoxDebugSnapshot {
let (outbound_queue_depth, outbound_queue_capacity) =
self.sender.connection_core.outbound_queue_stats();
VoxDebugSnapshot {
lanes: vec![LaneDebugSnapshot {
lane_id: self.lane_id(),
endpoint: None,
surface: None,
component: None,
state: if self.closed_rx.borrow().is_some() {
LaneDebugState::Closed
} else {
LaneDebugState::Open
},
outstanding_requests: 0,
requests: Vec::new(),
open_channels: Vec::new(),
outbound_queue_depth: Some(outbound_queue_depth),
outbound_queue_capacity: Some(outbound_queue_capacity),
local_control_queue_depth: None,
local_control_queue_capacity: None,
last_inbound_message_at: None,
last_outbound_message_at: None,
last_progress_at: None,
close_reason: *self.closed_rx.borrow(),
driver_task_status: DriverTaskStatus::Unknown,
}],
}
}
pub fn dump_debug_snapshot(&self) -> VoxDebugSnapshot {
let snapshot = self.debug_snapshot();
tracing::info!(?snapshot, "vox debug snapshot");
snapshot
}
}
pub async fn proxy_lanes(left: LaneHandle, right: LaneHandle) -> Result<(), ConnectionError> {
if left.parity == right.parity {
return Err(ConnectionError::Protocol(
"proxy_lanes requires opposite parities".into(),
));
}
let left_conn_id = left.lane_id();
let right_conn_id = right.lane_id();
let LaneHandle {
sender: left_sender,
rx: mut left_rx,
failures_rx: _left_failures_rx,
control_tx: left_control_tx,
closed_rx: _left_closed_rx,
local_settings: _left_local_settings,
peer_settings: _left_peer_settings,
parity: _left_parity,
observer: _left_observer,
peer_identity: _left_peer_identity,
peer_evidence: _left_peer_evidence,
lane_grant: _left_lane_grant,
lane_grant_state: _left_lane_grant_state,
} = left;
let LaneHandle {
sender: right_sender,
rx: mut right_rx,
failures_rx: _right_failures_rx,
control_tx: right_control_tx,
closed_rx: _right_closed_rx,
local_settings: _right_local_settings,
peer_settings: _right_peer_settings,
parity: _right_parity,
observer: _right_observer,
peer_identity: _right_peer_identity,
peer_evidence: _right_peer_evidence,
lane_grant: _right_lane_grant,
lane_grant_state: _right_lane_grant_state,
} = right;
loop {
enum ProxyEvent {
Left(Option<RecvMessage>),
Right(Option<RecvMessage>),
}
let event = {
let left = left_rx.recv().fuse();
let right = right_rx.recv().fuse();
futures_util::pin_mut!(left, right);
futures_util::select_biased! {
recv = left => ProxyEvent::Left(recv),
recv = right => ProxyEvent::Right(recv),
}
};
match event {
ProxyEvent::Left(Some(recv)) => {
if right_sender
.send_owned(recv.schemas, recv.msg)
.await
.is_err()
{
break;
}
}
ProxyEvent::Right(Some(recv)) => {
if left_sender
.send_owned(recv.schemas, recv.msg)
.await
.is_err()
{
break;
}
}
ProxyEvent::Left(None) | ProxyEvent::Right(None) => break,
}
}
if let Some(tx) = left_control_tx.as_ref() {
let _ = send_drop_control(tx, DropControlRequest::Close(left_conn_id));
}
if let Some(tx) = right_control_tx.as_ref() {
let _ = send_drop_control(tx, DropControlRequest::Close(right_conn_id));
}
Ok(())
}
#[derive(Debug)]
pub enum ConnectionError {
Io(std::io::Error),
Protocol(String),
EstablishmentRejected(vox_types::Decline),
Rejected(LaneRejection),
ConnectTimeout,
}
impl ConnectionError {
pub fn is_transient_connect_failure(&self) -> bool {
matches!(self, Self::Io(_) | Self::ConnectTimeout)
}
}
impl std::fmt::Display for ConnectionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "io error: {e}"),
Self::Protocol(msg) => write!(f, "protocol error: {msg}"),
Self::EstablishmentRejected(decline) => {
write!(f, "connection establishment rejected: {}", decline.reason)
}
Self::Rejected(rejection) => {
if let Some(message) = rejection.message() {
write!(f, "lane open rejected: {}: {message}", rejection.reason())
} else {
write!(f, "lane open rejected: {}", rejection.reason())
}
}
Self::ConnectTimeout => write!(f, "connect timeout"),
}
}
}
impl std::error::Error for ConnectionError {}
fn classify_connection_recv_error(error: &std::io::Error) -> ConnectionCloseReason {
let message = error.to_string();
if message.contains("decode error") || message.contains("protocol") {
ConnectionCloseReason::Protocol
} else {
ConnectionCloseReason::Transport
}
}
fn classify_decode_error(error: &std::io::Error) -> Option<DecodeErrorKind> {
let message = error.to_string();
if message.contains("decode error") {
Some(DecodeErrorKind::Payload)
} else {
None
}
}
impl Connection {
fn observe_connection_recv_error(&self, error: &std::io::Error) {
let Some(observer) = &self.observer else {
return;
};
if let Some(kind) = classify_decode_error(error) {
for conn_id in self.conns.iter().filter_map(|(conn_id, slot)| {
matches!(slot, ConnectionSlot::Active(_)).then_some(*conn_id)
}) {
observer.driver_event(vox_types::DriverEvent::DecodeError {
lane_id: conn_id,
kind,
});
}
return;
}
observer.transport_event(vox_types::TransportEvent::Closed {
lane_id: None,
reason: classify_connection_recv_error(error),
});
}
fn close_connection_for_protocol_error(
&mut self,
conn_id: LaneId,
detail: impl std::fmt::Display,
) {
warn!(%conn_id, "closing connection after protocol error: {detail}");
self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Protocol);
}
fn record_received_schema_bytes(
&mut self,
_conn_id: LaneId,
schema_recv_tracker: Arc<vox_types::SchemaRecvTracker>,
method_id: vox_types::MethodId,
direction: vox_types::BindingDirection,
schema_bytes: &vox_types::SchemaBytes,
_context: &str,
) -> bool {
schema_recv_tracker.record_received(method_id, direction, schema_bytes.0.clone());
true
}
#[allow(clippy::too_many_arguments)]
fn pre_handshake<Tx, Rx>(
tx: Tx,
rx: Rx,
lane_acceptor: Option<Arc<dyn LaneAcceptor>>,
open_rx: mpsc::Receiver<OpenRequest>,
close_rx: mpsc::Receiver<CloseRequest>,
control_tx: mpsc::UnboundedSender<DropControlRequest>,
control_rx: mpsc::UnboundedReceiver<DropControlRequest>,
keepalive: Option<ConnectionKeepaliveConfig>,
observer: Option<VoxObserverHandle>,
) -> Self
where
Tx: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
Rx: ConduitRx<Msg = MessageFamily> + MaybeSend + 'static,
{
let (outbound_tx, outbound_rx) = mpsc::channel("connection.outbound", 256);
let connection_core = Arc::new(ConnectionCore {
inner: std::sync::Mutex::new(ConnectionCoreInner {
tx: Arc::new(tx) as Arc<dyn DynConduitTx>,
conns: HashMap::new(),
}),
outbound_tx,
observer: observer.clone(),
channel_gates: std::sync::Mutex::new(HashMap::new()),
});
spawn_outbound_worker(outbound_rx);
Connection {
rx: Box::new(rx),
role: ConnectionRole::Initiator, parity: Parity::Odd, connection_core,
local_connection_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
peer_connection_settings: None,
peer_identity: PeerIdentity::anonymous(),
peer_evidence: PeerEvidence::none(),
conns: BTreeMap::new(),
conn_ids: IdAllocator::new(Parity::Odd), lane_acceptor,
open_rx,
close_rx,
control_tx,
control_rx,
keepalive,
observer,
}
}
fn establish_from_handshake(
&mut self,
result: HandshakeResult,
) -> Result<LaneHandle, ConnectionError> {
self.role = result.role;
self.parity = result.our_settings.parity;
self.conn_ids = IdAllocator::new(result.our_settings.parity);
self.local_connection_settings = result.our_settings.clone();
self.peer_connection_settings = Some(result.peer_settings.clone());
self.peer_identity = result.peer_identity.clone();
self.peer_evidence = result.peer_evidence.clone();
Ok(self.make_control_lane_handle(result.our_settings, result.peer_settings))
}
fn make_control_lane_handle(
&mut self,
local_settings: ConnectionSettings,
peer_settings: ConnectionSettings,
) -> LaneHandle {
self.make_connection_handle(LaneId::CONTROL, local_settings, peer_settings)
}
fn make_connection_handle(
&mut self,
conn_id: LaneId,
local_settings: ConnectionSettings,
peer_settings: ConnectionSettings,
) -> LaneHandle {
let label = format!("connection.lane{}", conn_id.0);
let (conn_tx, conn_rx) = mpsc::channel::<RecvMessage>(&label, 64);
let (failures_tx, failures_rx) = mpsc::unbounded_channel(format!("{label}.failures"));
let (closed_tx, closed_rx) = watch::channel(None);
let sender = ConnectionSender {
lane_id: conn_id,
connection_core: Arc::clone(&self.connection_core),
failures: Arc::new(failures_tx),
};
let parity = local_settings.parity;
let handle_local_settings = local_settings.clone();
let handle_peer_settings = peer_settings.clone();
let lane_grant_state = Arc::new(Mutex::new(LaneGrant::empty()));
trace!(%conn_id, "make_connection_handle: inserting slot into conns");
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::LaneOpened { lane_id: conn_id });
}
self.conns.insert(
conn_id,
ConnectionSlot::Active(LaneState {
id: conn_id,
local_settings,
peer_settings,
conn_tx,
closed_tx,
schema_recv_tracker: Arc::new(vox_types::SchemaRecvTracker::new()),
lane_grant: Arc::clone(&lane_grant_state),
}),
);
LaneHandle {
sender,
rx: conn_rx,
failures_rx,
control_tx: Some(self.control_tx.clone()),
closed_rx,
local_settings: handle_local_settings,
peer_settings: handle_peer_settings,
parity,
observer: self.observer.clone(),
peer_identity: self.peer_identity.clone(),
peer_evidence: self.peer_evidence.clone(),
lane_grant: LaneGrant::empty(),
lane_grant_state,
}
}
pub async fn run(&mut self) {
let mut keepalive_runtime = self.make_keepalive_runtime();
let mut keepalive_tick = keepalive_runtime.as_ref().map(|_| {
let mut interval = vox_types::time::tokio::interval(Duration::from_millis(10));
interval.set_missed_tick_behavior(vox_types::time::tokio::MissedTickBehavior::Delay);
interval
});
let mut open_rx_closed = false;
let mut close_rx_closed = false;
let mut control_rx_closed = false;
loop {
enum RunEvent {
Message(std::io::Result<Option<SelfRef<Message<'static>>>>),
Open(Option<OpenRequest>),
Close(Option<CloseRequest>),
Control(Option<DropControlRequest>),
Keepalive,
}
let event = {
let msg = self.rx.recv_msg().fuse();
let open = async {
if open_rx_closed {
futures_util::future::pending().await
} else {
self.open_rx.recv().await
}
}
.fuse();
let close = async {
if close_rx_closed {
futures_util::future::pending().await
} else {
self.close_rx.recv().await
}
}
.fuse();
let control = async {
if control_rx_closed {
futures_util::future::pending().await
} else {
self.control_rx.recv().await
}
}
.fuse();
let keepalive = async {
if let Some(interval) = keepalive_tick.as_mut() {
interval.tick().await;
} else {
futures_util::future::pending::<()>().await;
}
}
.fuse();
futures_util::pin_mut!(msg, open, close, control, keepalive);
futures_util::select_biased! {
msg = msg => RunEvent::Message(msg),
req = open => RunEvent::Open(req),
req = close => RunEvent::Close(req),
req = control => RunEvent::Control(req),
() = keepalive => RunEvent::Keepalive,
}
};
match event {
RunEvent::Message(msg) => {
vox_types::dlog!("[connection {:?}] recv_msg returned", self.role);
match msg {
Ok(Some(msg)) => {
let fds = self.rx.take_frame_fds();
self.handle_message(msg, fds, &mut keepalive_runtime).await;
}
Ok(None) => {
vox_types::dlog!(
"[connection {:?}] recv loop: conduit returned EOF",
self.role
);
self.close_all_connections(ConnectionCloseReason::Remote);
break;
}
Err(error) => {
let close_reason = classify_connection_recv_error(&error);
self.observe_connection_recv_error(&error);
warn!(
role = ?self.role,
%error,
?close_reason,
"connection receive failed; closing connections if recovery is unavailable"
);
vox_types::dlog!(
"[connection {:?}] recv loop: conduit recv error: {}",
self.role,
error
);
self.close_all_connections(close_reason);
break;
}
}
}
RunEvent::Open(Some(req)) => {
self.handle_open_request(req).await;
}
RunEvent::Close(Some(req)) => {
self.handle_close_request(req).await;
}
RunEvent::Control(Some(req)) => {
if !self.handle_drop_control_request(req).await {
self.close_all_connections(ConnectionCloseReason::Local);
break;
}
}
RunEvent::Keepalive => {
if !self.handle_keepalive_tick(&mut keepalive_runtime).await {
self.close_all_connections(ConnectionCloseReason::Protocol);
break;
}
}
RunEvent::Open(None) => {
open_rx_closed = true;
}
RunEvent::Close(None) => {
close_rx_closed = true;
}
RunEvent::Control(None) => {
control_rx_closed = true;
}
}
}
self.close_all_connections(ConnectionCloseReason::ConnectionShutdown);
trace!("connection recv loop exited");
}
async fn handle_message(
&mut self,
msg: SelfRef<Message<'static>>,
fds: vox_types::FrameFds,
keepalive_runtime: &mut Option<KeepaliveRuntime>,
) {
let msg_ref = msg.get();
let conn_id = msg_ref.lane_id;
match &msg_ref.payload {
MessagePayload::Ping(ping) => {
let _ = self
.connection_core
.send(
Message {
lane_id: conn_id,
payload: MessagePayload::Pong(vox_types::Pong { nonce: ping.nonce }),
},
None,
None,
)
.await;
return;
}
MessagePayload::Pong(pong) => {
if conn_id.is_control() {
self.handle_keepalive_pong(pong.nonce, keepalive_runtime);
}
return;
}
MessagePayload::SchemaMessage(schema_msg) => {
let (schema_recv_tracker, conn_tx) = match self.conns.get(&conn_id) {
Some(ConnectionSlot::Active(state)) => (
Arc::clone(&state.schema_recv_tracker),
state.conn_tx.clone(),
),
_ => return,
};
let _ = self.record_received_schema_bytes(
conn_id,
Arc::clone(&schema_recv_tracker),
schema_msg.method_id,
schema_msg.direction,
&schema_msg.schemas,
"standalone schema message",
);
let recv_msg = RecvMessage {
schemas: schema_recv_tracker,
msg: msg.map(|m| match m.payload {
MessagePayload::SchemaMessage(schema) => ConnectionMessage::Schema(schema),
_ => unreachable!(),
}),
fds,
};
if conn_tx.send(recv_msg).await.is_err() {
self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
}
return;
}
_ => {}
}
vox_types::selfref_match!(msg, payload {
MessagePayload::LaneClose(_) => {
if conn_id.is_control() {
warn!("received LaneClose for control lane");
} else {
trace!(conn_id = conn_id.0, "received LaneClose for service lane");
}
self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Remote);
}
MessagePayload::LaneOpen(open) => {
self.handle_inbound_open(conn_id, open).await;
}
MessagePayload::LaneAccept(accept) => {
self.handle_inbound_accept(conn_id, accept);
}
MessagePayload::LaneReject(reject) => {
self.handle_inbound_reject(conn_id, reject);
}
MessagePayload::RequestMessage(r) => {
let r_ref = r.get();
vox_types::dlog!(
"[connection {:?}] recv request: conn={:?} req={:?} body={} method={:?}",
self.role,
conn_id,
r_ref.id,
match &r_ref.body {
RequestBody::Call(_) => "Call",
RequestBody::Response(_) => "Response",
RequestBody::Cancel(_) => "Cancel",
},
match &r_ref.body {
RequestBody::Call(call) => Some(call.method_id),
RequestBody::Response(_) | RequestBody::Cancel(_) => None,
}
);
let response_had_schema_payload = matches!(&r_ref.body, RequestBody::Response(resp) if !resp.schemas.is_empty());
{
let schema_bytes = match &r_ref.body {
RequestBody::Call(call) => Some(&call.schemas),
RequestBody::Response(resp) => Some(&resp.schemas),
_ => None,
};
vox_types::dlog!(
"[schema] recv ({:?}): req={:?} body={} schemas_len={:?}",
self.role,
r_ref.id,
match &r_ref.body {
RequestBody::Call(_) => "Call",
RequestBody::Response(_) => "Response",
RequestBody::Cancel(_) => "Cancel",
},
schema_bytes.map(|s| s.0.len())
);
let schema_recv_tracker = match self.conns.get(&conn_id) {
Some(ConnectionSlot::Active(state)) => {
Arc::clone(&state.schema_recv_tracker)
}
_ => return,
};
if let Some(schema_bytes) = schema_bytes
&& !schema_bytes.is_empty()
{
let (method_id, direction) = match &r_ref.body {
RequestBody::Call(call) => {
(call.method_id, vox_types::BindingDirection::Args)
}
RequestBody::Response(_) => {
let Some(method_id) =
self.connection_core.take_outgoing_call_method(conn_id, r_ref.id)
else {
self.close_connection_for_protocol_error(
conn_id,
format!(
"response schemas for unknown inflight request {:?}",
r_ref.id
),
);
return;
};
(method_id, vox_types::BindingDirection::Response)
}
RequestBody::Cancel(_) => unreachable!(),
};
if !self.record_received_schema_bytes(
conn_id,
schema_recv_tracker,
method_id,
direction,
schema_bytes,
"inlined request schemas",
) {
return;
}
}
}
if matches!(&r_ref.body, RequestBody::Response(_)) && !response_had_schema_payload {
let _ = self.connection_core.take_outgoing_call_method(conn_id, r_ref.id);
}
if let RequestBody::Call(call) = &r_ref.body {
self.connection_core.record_incoming_call(conn_id, r_ref.id, call.method_id);
}
let state = match self.conns.get(&conn_id) {
Some(ConnectionSlot::Active(state)) => state,
_ => return,
};
let conn_tx = state.conn_tx.clone();
let request_id = r_ref.id;
let body_kind = match &r_ref.body {
RequestBody::Call(_) => "Call",
RequestBody::Response(_) => "Response",
RequestBody::Cancel(_) => "Cancel",
};
let recv_msg = RecvMessage {
schemas: Arc::clone(&state.schema_recv_tracker),
msg: r.map(ConnectionMessage::Request),
fds,
};
vox_types::dlog!(
"[connection {:?}] dispatch request: conn={:?} req={:?} body={}",
self.role,
conn_id,
request_id,
body_kind
);
if conn_tx.send(recv_msg).await.is_err() {
self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
}
}
MessagePayload::ChannelMessage(c) => {
let state = match self.conns.get(&conn_id) {
Some(ConnectionSlot::Active(state)) => state,
_ => return,
};
let conn_tx = state.conn_tx.clone();
let recv_msg = RecvMessage {
schemas: Arc::clone(&state.schema_recv_tracker),
msg: c.map(ConnectionMessage::Channel),
fds,
};
if conn_tx.send(recv_msg).await.is_err() {
self.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Unknown);
}
}
MessagePayload::ProtocolError(_) => {
warn!(%conn_id, "received protocol error from peer");
self.close_all_connections(ConnectionCloseReason::Protocol);
let _ = send_drop_control(&self.control_tx, DropControlRequest::Shutdown);
}
})
}
fn make_keepalive_runtime(&self) -> Option<KeepaliveRuntime> {
let config = self.keepalive?;
if config.ping_interval.is_zero() || config.pong_timeout.is_zero() {
warn!("keepalive disabled due to non-positive interval/timeout");
return None;
}
let now = vox_types::time::tokio::Instant::now();
Some(KeepaliveRuntime {
ping_interval: config.ping_interval,
pong_timeout: config.pong_timeout,
next_ping_at: now + config.ping_interval,
waiting_pong_nonce: None,
pong_deadline: now,
next_ping_nonce: 1,
})
}
fn handle_keepalive_pong(&self, nonce: u64, keepalive_runtime: &mut Option<KeepaliveRuntime>) {
let Some(runtime) = keepalive_runtime.as_mut() else {
return;
};
if runtime.waiting_pong_nonce != Some(nonce) {
return;
}
runtime.waiting_pong_nonce = None;
runtime.next_ping_at = vox_types::time::tokio::Instant::now() + runtime.ping_interval;
}
async fn handle_keepalive_tick(
&mut self,
keepalive_runtime: &mut Option<KeepaliveRuntime>,
) -> bool {
let Some(runtime) = keepalive_runtime.as_mut() else {
return true;
};
let now = vox_types::time::tokio::Instant::now();
if let Some(waiting_nonce) = runtime.waiting_pong_nonce {
if now >= runtime.pong_deadline {
warn!(
nonce = waiting_nonce,
timeout_ms = runtime.pong_timeout.as_millis(),
"keepalive timeout waiting for pong"
);
return false;
}
return true;
}
if now < runtime.next_ping_at {
return true;
}
let nonce = runtime.next_ping_nonce;
if self
.connection_core
.send(
Message {
lane_id: LaneId::CONTROL,
payload: MessagePayload::Ping(vox_types::Ping { nonce }),
},
None,
None,
)
.await
.is_err()
{
warn!("failed to send keepalive ping");
return false;
}
runtime.waiting_pong_nonce = Some(nonce);
runtime.pong_deadline = now + runtime.pong_timeout;
runtime.next_ping_at = now + runtime.ping_interval;
runtime.next_ping_nonce = runtime.next_ping_nonce.wrapping_add(1);
true
}
async fn send_lane_reject(
connection_core: Arc<ConnectionCore>,
conn_id: LaneId,
rejection: LaneRejection,
) {
let _ = connection_core
.send(
Message {
lane_id: conn_id,
payload: MessagePayload::LaneReject(vox_types::LaneReject {
metadata: rejection.into_metadata(),
}),
},
None,
None,
)
.await;
}
async fn handle_inbound_open(&mut self, conn_id: LaneId, open: SelfRef<LaneOpen>) {
let establishment_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
);
let peer_parity = self.parity.other();
if !conn_id.has_parity(peer_parity) {
let rejection = LaneRejection::with_message(
LaneRejectReason::PolicyRejected,
"lane id parity does not match peer",
);
let details = lane_rejection_details(&rejection);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Error,
establishment_started_at,
details,
);
return;
}
if self.conns.contains_key(&conn_id) {
let rejection = LaneRejection::with_message(
LaneRejectReason::PolicyRejected,
"lane id is already in use",
);
let details = lane_rejection_details(&rejection);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Error,
establishment_started_at,
details,
);
return;
}
if self.lane_acceptor.is_none() {
let authorization_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
);
let rejection = LaneRejection::with_message(
LaneRejectReason::NotReady,
"no lane acceptor configured",
);
let details = lane_rejection_details(&rejection);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
EstablishmentOutcome::Rejected,
authorization_started_at,
details,
);
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Rejected,
establishment_started_at,
details,
);
return;
}
let open = open.get();
if open.connection_settings.initial_channel_credit == 0 {
let rejection = LaneRejection::with_message(
LaneRejectReason::PolicyRejected,
"initial_channel_credit must be greater than zero",
);
let details = lane_rejection_details(&rejection);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Error,
establishment_started_at,
details,
);
return;
}
let our_settings = ConnectionSettings {
parity: open.connection_settings.parity.other(),
max_concurrent_requests: open.connection_settings.max_concurrent_requests,
initial_channel_credit: open.connection_settings.initial_channel_credit,
};
let handle = self.make_connection_handle(
conn_id,
our_settings.clone(),
open.connection_settings.clone(),
);
let metadata = open.metadata.clone();
let request = match LaneRequest::new(&metadata, &self.peer_identity, &self.peer_evidence) {
Ok(r) => r,
Err(e) => {
trace!(%conn_id, %e, "rejecting service lane");
let authorization_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
);
self.conns.remove(&conn_id);
let rejection =
LaneRejection::with_message(LaneRejectReason::UnknownService, e.to_string());
let details = lane_rejection_details(&rejection);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
EstablishmentOutcome::Rejected,
authorization_started_at,
details,
);
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Rejected,
establishment_started_at,
details,
);
return;
}
};
let pending = PendingLane::new(handle);
let acceptor = self.lane_acceptor.as_ref().unwrap();
trace!(%conn_id, "calling acceptor for service lane");
let authorization_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
);
match acceptor.accept(&request, pending) {
Ok(()) => {
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
EstablishmentOutcome::Ok,
authorization_started_at,
);
trace!(%conn_id, "acceptor accepted service lane, sending LaneAccept");
let grant_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrant,
Some(conn_id),
);
let _ = self
.connection_core
.send(
Message {
lane_id: conn_id,
payload: MessagePayload::LaneAccept(vox_types::LaneAccept {
connection_settings: our_settings,
metadata: self.lane_grant_metadata(&conn_id),
}),
},
None,
None,
)
.await;
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrant,
Some(conn_id),
EstablishmentOutcome::Ok,
grant_started_at,
);
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Ok,
establishment_started_at,
);
}
Err(rejection) => {
let details = lane_rejection_details(&rejection);
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneAuthorization,
Some(conn_id),
EstablishmentOutcome::Rejected,
authorization_started_at,
details,
);
trace!(%conn_id, "acceptor rejected, removing conn slot");
self.conns.remove(&conn_id);
Self::send_lane_reject(Arc::clone(&self.connection_core), conn_id, rejection).await;
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Rejected,
establishment_started_at,
details,
);
}
}
}
fn handle_inbound_accept(&mut self, conn_id: LaneId, accept: SelfRef<LaneAccept>) {
let accept = accept.get();
let slot = self.remove_connection(&conn_id);
match slot {
Some(ConnectionSlot::PendingOutbound(mut pending))
if accept.connection_settings.initial_channel_credit == 0 =>
{
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Error,
pending.establishment_started_at,
);
if let Some(tx) = pending.result_tx.take() {
let _ = tx.send(Err(ConnectionError::Protocol(
"initial_channel_credit must be greater than zero".into(),
)));
}
}
Some(ConnectionSlot::PendingOutbound(mut pending)) => {
let mut handle = self.make_connection_handle(
conn_id,
pending.local_settings.clone(),
accept.connection_settings.clone(),
);
let grant = LaneGrant::from_metadata(accept.metadata.clone());
self.observe_lane_grant_creation(conn_id, &grant);
handle.set_lane_grant(grant);
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Ok,
pending.establishment_started_at,
);
if let Some(tx) = pending.result_tx.take() {
let _ = tx.send(Ok(handle));
}
}
Some(other) => {
self.conns.insert(conn_id, other);
}
None => {
}
}
}
fn handle_inbound_reject(&mut self, conn_id: LaneId, reject: SelfRef<LaneReject>) {
let reject = reject.get();
let slot = self.remove_connection(&conn_id);
match slot {
Some(ConnectionSlot::PendingOutbound(mut pending)) => {
let rejection = LaneRejection::from_metadata(reject.metadata.clone());
let details = lane_rejection_details(&rejection);
observe_establishment_finished_with_details(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Rejected,
pending.establishment_started_at,
details,
);
if let Some(tx) = pending.result_tx.take() {
let _ = tx.send(Err(ConnectionError::Rejected(rejection)));
}
}
Some(other) => {
self.conns.insert(conn_id, other);
}
None => {}
}
}
async fn handle_open_request(&mut self, req: OpenRequest) {
if req.settings.initial_channel_credit == 0 {
let _ = req.result_tx.send(Err(ConnectionError::Protocol(
"initial_channel_credit must be greater than zero".into(),
)));
return;
}
let conn_id = self.conn_ids.alloc();
let establishment_started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
);
let send_result = self
.connection_core
.send(
Message {
lane_id: conn_id,
payload: MessagePayload::LaneOpen(LaneOpen {
connection_settings: req.settings.clone(),
metadata: req.metadata,
}),
},
None,
None,
)
.await;
if send_result.is_err() {
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::ServiceLaneOpen,
Some(conn_id),
EstablishmentOutcome::Error,
establishment_started_at,
);
let _ = req.result_tx.send(Err(ConnectionError::Protocol(
"failed to send LaneOpen".into(),
)));
return;
}
self.conns.insert(
conn_id,
ConnectionSlot::PendingOutbound(PendingOutboundData {
local_settings: req.settings,
establishment_started_at,
result_tx: Some(req.result_tx),
}),
);
}
async fn handle_close_request(&mut self, req: CloseRequest) {
if req.conn_id.is_control() {
let _ = req.result_tx.send(Err(ConnectionError::Protocol(
"cannot close control lane".into(),
)));
return;
}
if self
.remove_connection_with_reason(&req.conn_id, ConnectionCloseReason::Local)
.is_none()
{
let _ = req.result_tx.send(Err(ConnectionError::Protocol(
"connection not found".into(),
)));
return;
}
let send_result = self
.connection_core
.send(
Message {
lane_id: req.conn_id,
payload: MessagePayload::LaneClose(LaneClose {
metadata: req.metadata,
}),
},
None,
None,
)
.await;
if send_result.is_err() {
let _ = req.result_tx.send(Err(ConnectionError::Protocol(
"failed to send LaneClose".into(),
)));
return;
}
let _ = req.result_tx.send(Ok(()));
}
async fn handle_drop_control_request(&mut self, req: DropControlRequest) -> bool {
match req {
DropControlRequest::Shutdown => {
trace!("connection shutdown requested");
false
}
DropControlRequest::Close(conn_id) => {
if conn_id.is_control() {
trace!("ignoring root close control request");
return true;
}
if self
.remove_connection_with_reason(&conn_id, ConnectionCloseReason::Local)
.is_some()
{
let _ = self
.connection_core
.send(
Message {
lane_id: conn_id,
payload: MessagePayload::LaneClose(LaneClose {
metadata: vox_types::Metadata::default(),
}),
},
None,
None,
)
.await;
}
true
}
DropControlRequest::ProtocolClose {
conn_id,
description,
} => {
trace!(%conn_id, %description, "protocol close requested");
let _ = self
.connection_core
.send(
Message {
lane_id: LaneId::CONTROL,
payload: MessagePayload::ProtocolError(vox_types::ProtocolError {
description: &description,
}),
},
None,
None,
)
.await;
self.close_all_connections(ConnectionCloseReason::Protocol);
false
}
}
}
fn remove_connection(&mut self, conn_id: &LaneId) -> Option<ConnectionSlot> {
self.remove_connection_with_reason(conn_id, ConnectionCloseReason::Unknown)
}
fn lane_grant_metadata(&self, conn_id: &LaneId) -> Metadata {
let Some(ConnectionSlot::Active(state)) = self.conns.get(conn_id) else {
return Metadata::default();
};
state
.lane_grant
.lock()
.expect("lane grant state mutex poisoned")
.metadata()
.clone()
}
fn observe_lane_grant_creation(&self, conn_id: LaneId, grant: &LaneGrant) {
if grant.is_empty() {
return;
}
let started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrant,
Some(conn_id),
);
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrant,
Some(conn_id),
EstablishmentOutcome::Ok,
started_at,
);
}
fn observe_lane_grant_revocation(&self, conn_id: LaneId, state: &LaneState) {
let has_grant = !state
.lane_grant
.lock()
.expect("lane grant state mutex poisoned")
.is_empty();
if !has_grant {
return;
}
let started_at = observe_establishment_started(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrantRevocation,
Some(conn_id),
);
observe_establishment_finished(
self.observer.as_ref(),
self.role,
EstablishmentPhase::LaneGrantRevocation,
Some(conn_id),
EstablishmentOutcome::Ok,
started_at,
);
}
fn remove_connection_with_reason(
&mut self,
conn_id: &LaneId,
reason: ConnectionCloseReason,
) -> Option<ConnectionSlot> {
trace!(%conn_id, "remove_connection called");
let slot = self.conns.remove(conn_id);
if let Some(ConnectionSlot::Active(state)) = &slot {
let _ = state.closed_tx.send(Some(reason));
self.observe_lane_grant_revocation(*conn_id, state);
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::LaneClosed {
lane_id: *conn_id,
reason,
});
}
}
slot
}
fn close_all_connections(&mut self, reason: ConnectionCloseReason) {
trace!(role = ?self.role, count = self.conns.len(), "close_all_connections");
vox_types::dlog!(
"[connection {:?}] close_all_lanes: {} slots",
self.role,
self.conns.len()
);
for (conn_id, slot) in self.conns.iter() {
if let ConnectionSlot::Active(state) = slot {
vox_types::dlog!(
"[connection {:?}] closing connection {:?}",
self.role,
conn_id
);
let _ = state.closed_tx.send(Some(reason));
self.observe_lane_grant_revocation(*conn_id, state);
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::LaneClosed {
lane_id: *conn_id,
reason,
});
}
}
}
self.conns.clear();
}
}
struct ChannelGate {
opened: std::sync::atomic::AtomicBool,
notify: Notify,
}
pub(crate) struct ConnectionCore {
inner: std::sync::Mutex<ConnectionCoreInner>,
outbound_tx: mpsc::Sender<OutboundBatch>,
observer: Option<VoxObserverHandle>,
channel_gates: std::sync::Mutex<HashMap<vox_types::ChannelId, Arc<ChannelGate>>>,
}
pub trait OutboundSendFuture: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
impl<T> OutboundSendFuture for T where T: Future<Output = std::io::Result<()>> + MaybeSend + 'static {}
type OutboundSend = Pin<Box<dyn OutboundSendFuture>>;
#[derive(Clone)]
struct PendingSchemaSend {
method_id: vox_types::MethodId,
direction: vox_types::BindingDirection,
prepared: vox_types::PreparedSchemaPlan,
}
impl From<vox_types::ChannelWriterSchemaPlan> for PendingSchemaSend {
fn from(plan: vox_types::ChannelWriterSchemaPlan) -> Self {
let _ = plan.role;
Self {
method_id: plan.method_id,
direction: plan.direction,
prepared: plan.prepared,
}
}
}
struct OutboundBatch {
conn_id: LaneId,
request_id: Option<RequestId>,
payload_kind: &'static str,
conn_state: Arc<std::sync::Mutex<SendConnState>>,
tx: Arc<dyn DynConduitTx>,
schema_sends: Vec<PendingSchemaSend>,
payload_send: OutboundSend,
result_tx: oneshot::Sender<std::io::Result<()>>,
}
type PreparedOutboundBatch = (
OutboundBatch,
oneshot::Receiver<std::io::Result<()>>,
Vec<vox_types::ChannelId>,
);
struct PrepareOutboundError {
gated_channels: Vec<vox_types::ChannelId>,
}
async fn run_outbound_worker(mut rx: mpsc::Receiver<OutboundBatch>) {
while let Some(batch) = rx.recv().await {
trace!(
conn_id = %batch.conn_id,
request_id = ?batch.request_id,
payload_kind = batch.payload_kind,
schema_count = batch.schema_sends.len(),
"connection outbound worker received batch"
);
let mut result = Ok(());
for schema_send in batch.schema_sends {
trace!(
conn_id = %batch.conn_id,
request_id = ?batch.request_id,
method_id = ?schema_send.method_id,
direction = ?schema_send.direction,
"connection outbound worker sending schema batch"
);
let schemas = {
let mut conn_state = batch
.conn_state
.lock()
.expect("send conn state mutex poisoned");
conn_state.send_tracker.preview_prepared_plan(
schema_send.method_id,
schema_send.direction,
&schema_send.prepared,
)
};
if schemas.is_empty() {
continue;
}
let schema_msg = Message {
lane_id: batch.conn_id,
payload: MessagePayload::SchemaMessage(SchemaMessage {
method_id: schema_send.method_id,
direction: schema_send.direction,
schemas,
}),
};
let send = match batch.tx.clone().prepare_msg(schema_msg, None) {
Ok(send) => send,
Err(error) => {
result = Err(error);
break;
}
};
if let Err(error) = send.await {
result = Err(error);
break;
}
let mut conn_state = batch
.conn_state
.lock()
.expect("send conn state mutex poisoned");
conn_state.send_tracker.mark_prepared_plan_sent(
schema_send.method_id,
schema_send.direction,
&schema_send.prepared,
);
conn_state
.planned_bindings
.remove(&(schema_send.direction, schema_send.method_id));
}
if result.is_ok()
&& let Err(error) = batch.payload_send.await
{
trace!(
conn_id = %batch.conn_id,
request_id = ?batch.request_id,
payload_kind = batch.payload_kind,
?error,
"connection outbound worker payload send failed"
);
result = Err(error);
}
trace!(
conn_id = %batch.conn_id,
request_id = ?batch.request_id,
payload_kind = batch.payload_kind,
ok = result.is_ok(),
"connection outbound worker finished batch"
);
let _ = batch.result_tx.send(result);
}
}
#[cfg(not(target_arch = "wasm32"))]
fn spawn_outbound_worker(rx: mpsc::Receiver<OutboundBatch>) {
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(run_outbound_worker(rx));
return;
}
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build outbound worker runtime");
runtime.block_on(run_outbound_worker(rx));
});
}
#[cfg(target_arch = "wasm32")]
fn spawn_outbound_worker(rx: mpsc::Receiver<OutboundBatch>) {
wasm_bindgen_futures::spawn_local(run_outbound_worker(rx));
}
struct SendConnState {
send_tracker: vox_types::SchemaSendTracker,
inflight_incoming: HashMap<RequestId, vox_types::MethodId>,
inflight_outgoing: HashMap<RequestId, vox_types::MethodId>,
planned_bindings:
HashMap<(vox_types::BindingDirection, vox_types::MethodId), vox_types::PreparedSchemaPlan>,
}
impl SendConnState {
fn new() -> Self {
SendConnState {
send_tracker: vox_types::SchemaSendTracker::new(),
inflight_incoming: HashMap::new(),
inflight_outgoing: HashMap::new(),
planned_bindings: HashMap::new(),
}
}
}
struct ConnectionCoreInner {
tx: Arc<dyn DynConduitTx>,
conns: HashMap<LaneId, Arc<std::sync::Mutex<SendConnState>>>,
}
fn get_or_create_send_conn_state(
inner: &mut ConnectionCoreInner,
conn_id: LaneId,
) -> Arc<std::sync::Mutex<SendConnState>> {
inner
.conns
.entry(conn_id)
.or_insert_with(|| Arc::new(std::sync::Mutex::new(SendConnState::new())))
.clone()
}
fn gated_channel_id(msg: &Message<'_>) -> Option<vox_types::ChannelId> {
match &msg.payload {
MessagePayload::ChannelMessage(ch) => match &ch.body {
vox_types::ChannelBody::Item(_) | vox_types::ChannelBody::Close(_) => Some(ch.id),
_ => None,
},
_ => None,
}
}
impl ConnectionCore {
pub(crate) fn outbound_queue_stats(&self) -> (usize, usize) {
let capacity = self.outbound_tx.max_capacity();
let available = self.outbound_tx.capacity();
(capacity.saturating_sub(available), capacity)
}
pub(crate) fn register_channel_gate(&self, channel_id: vox_types::ChannelId) {
self.channel_gates
.lock()
.expect("channel gates mutex poisoned")
.entry(channel_id)
.or_insert_with(|| {
Arc::new(ChannelGate {
opened: std::sync::atomic::AtomicBool::new(false),
notify: Notify::new("connection.channel_gate"),
})
});
}
fn open_channel_gates(&self, channels: &[vox_types::ChannelId]) {
if channels.is_empty() {
return;
}
let mut gates = self
.channel_gates
.lock()
.expect("channel gates mutex poisoned");
for id in channels {
if let Some(gate) = gates.remove(id) {
gate.opened
.store(true, std::sync::atomic::Ordering::Release);
gate.notify.notify_waiters();
}
}
}
async fn await_channel_gate(&self, channel_id: vox_types::ChannelId) {
let gate = {
let gates = self
.channel_gates
.lock()
.expect("channel gates mutex poisoned");
match gates.get(&channel_id) {
Some(gate) => Arc::clone(gate),
None => return,
}
};
loop {
if gate.opened.load(std::sync::atomic::Ordering::Acquire) {
return;
}
let notified = gate.notify.notified();
if gate.opened.load(std::sync::atomic::Ordering::Acquire) {
return;
}
notified.await;
}
}
fn channel_gate_open(&self, channel_id: vox_types::ChannelId) -> bool {
self.channel_gates
.lock()
.expect("channel gates mutex poisoned")
.get(&channel_id)
.is_none_or(|gate| gate.opened.load(std::sync::atomic::Ordering::Acquire))
}
fn prepare_outbound_batch<'a>(
&self,
mut msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
channel_method: Option<&'static vox_types::MethodDescriptor>,
extra_schema_sends: Vec<PendingSchemaSend>,
) -> Result<PreparedOutboundBatch, PrepareOutboundError> {
let conn_id = msg.lane_id;
let (request_id, payload_kind) = match &msg.payload {
MessagePayload::RequestMessage(req) => {
let kind = match &req.body {
RequestBody::Call(_) => "request.call",
RequestBody::Response(_) => "request.response",
RequestBody::Cancel(_) => "request.cancel",
};
(Some(req.id), kind)
}
MessagePayload::SchemaMessage(_) => (None, "schema"),
MessagePayload::ChannelMessage(_) => (None, "channel"),
MessagePayload::LaneOpen(_) => (None, "connection.open"),
MessagePayload::LaneAccept(_) => (None, "connection.accept"),
MessagePayload::LaneReject(_) => (None, "connection.reject"),
MessagePayload::LaneClose(_) => (None, "connection.close"),
MessagePayload::ProtocolError(_) => (None, "protocol.error"),
MessagePayload::Ping(_) => (None, "ping"),
MessagePayload::Pong(_) => (None, "pong"),
};
trace!(
conn_id = %conn_id,
?request_id,
payload_kind,
"connection preparing outbound message"
);
let (tx, conn_state, schema_sends) = {
let mut inner = self.inner.lock().expect("connection core mutex poisoned");
let tx = inner.tx.clone();
let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
drop(inner);
if let MessagePayload::RequestMessage(req) = &mut msg.payload {
vox_types::dlog!(
"[connection-core] send request: conn={:?} req={:?} body={} forwarded={}",
conn_id,
req.id,
match &req.body {
RequestBody::Call(_) => "Call",
RequestBody::Response(_) => "Response",
RequestBody::Cancel(_) => "Cancel",
},
forwarded_schemas.is_some()
);
let schema_sends = {
let mut conn_state_guard =
conn_state.lock().expect("send conn state mutex poisoned");
let mut schema_sends = extra_schema_sends;
match &mut req.body {
RequestBody::Call(call) => {
if let Some(schema_send) = Self::plan_call_schema_send(
&mut conn_state_guard,
req.id,
call.method_id,
call,
forwarded_schemas,
) {
schema_sends.push(schema_send);
}
call.schemas = Default::default();
}
RequestBody::Response(resp) => {
if let Some(method_id) =
conn_state_guard.inflight_incoming.remove(&req.id)
&& let Some(schema_send) = Self::plan_response_schema_send(
&mut conn_state_guard,
req.id,
method_id,
resp,
forwarded_schemas,
)
{
schema_sends.push(schema_send);
}
resp.schemas = Default::default();
}
RequestBody::Cancel(_) => {}
}
schema_sends
};
(tx, conn_state, schema_sends)
} else {
(tx, conn_state, extra_schema_sends)
}
};
trace!(
conn_id = %conn_id,
?request_id,
payload_kind,
schema_count = schema_sends.len(),
"connection preparing outbound payload"
);
let mut gated_channels: Vec<vox_types::ChannelId> = Vec::new();
let channel_storage: Option<Vec<u8>> = if let MessagePayload::RequestMessage(req) =
&msg.payload
&& let RequestBody::Call(call) = &req.body
&& let vox_types::Payload::Value { ptr, shape, .. } = &call.args
&& vox_types::shape_contains_channel(shape)
{
let (ptr, shape) = (*ptr, *shape);
let encode_args = || match binder {
Some(b) => {
vox_types::with_channel_binder(b, || vox_phon::to_vec_for_shape(ptr, shape))
}
None => vox_phon::to_vec_for_shape(ptr, shape),
};
let (encoded, channels) = match channel_method {
Some(method) => vox_types::collect_channels_for_method(method, encode_args),
None => vox_types::collect_channels(encode_args),
};
gated_channels = channels.clone();
let encoded = match encoded {
Ok(encoded) => encoded,
Err(_) => {
return Err(PrepareOutboundError { gated_channels });
}
};
if let MessagePayload::RequestMessage(req) = &mut msg.payload
&& let RequestBody::Call(call) = &mut req.body
{
call.channels = channels;
}
Some(encoded)
} else {
None
};
let prepared = if let Some(bytes) = &channel_storage {
let msg = swap_call_args_to_bytes(msg, bytes);
tx.clone().prepare_msg(msg, binder)
} else {
tx.clone().prepare_msg(msg, binder)
};
let payload_send = match prepared {
Ok(send) => send,
Err(_) => {
return Err(PrepareOutboundError { gated_channels });
}
};
trace!(
conn_id = %conn_id,
?request_id,
payload_kind,
"connection prepared outbound payload"
);
let (result_tx, result_rx) = oneshot::channel("connection.outbound.result");
Ok((
OutboundBatch {
conn_id,
request_id,
payload_kind,
conn_state,
tx,
schema_sends,
payload_send,
result_tx,
},
result_rx,
gated_channels,
))
}
pub(crate) async fn send<'a>(
&self,
msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
) -> Result<(), ()> {
self.send_with_options(msg, binder, forwarded_schemas, None, Vec::new(), |_| {})
.await
}
async fn send_with_options<'a>(
&self,
msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
channel_method: Option<&'static vox_types::MethodDescriptor>,
extra_schema_sends: Vec<PendingSchemaSend>,
declared_channels: impl FnOnce(&[vox_types::ChannelId]),
) -> Result<(), ()> {
let lane_id = msg.lane_id;
if let Some(channel_id) = gated_channel_id(&msg) {
self.await_channel_gate(channel_id).await;
}
let (batch, result_rx, gated_channels) = match self.prepare_outbound_batch(
msg,
binder,
forwarded_schemas,
channel_method,
extra_schema_sends,
) {
Ok(prepared) => prepared,
Err(err) => {
self.open_channel_gates(&err.gated_channels);
declared_channels(&err.gated_channels);
return Err(());
}
};
declared_channels(&gated_channels);
let queued = self.outbound_tx.send(batch).await;
self.open_channel_gates(&gated_channels);
if queued.is_err() {
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::OutboundQueueClosed { lane_id });
}
return Err(());
}
trace!(conn_id = %lane_id, "connection queued outbound batch");
let result = result_rx.await.map_err(|_| ());
trace!(
conn_id = %lane_id,
ok = result.as_ref().map(|inner| inner.is_ok()).unwrap_or(false),
"connection outbound batch completed"
);
match result? {
Ok(()) => Ok(()),
Err(_) => {
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::EncodeError {
lane_id,
kind: vox_types::EncodeErrorKind::Transport,
});
}
Err(())
}
}
}
fn try_send_with_options<'a>(
&self,
msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
channel_method: Option<&'static vox_types::MethodDescriptor>,
extra_schema_sends: Vec<PendingSchemaSend>,
) -> Result<(), TrySendError<()>> {
let lane_id = msg.lane_id;
if let Some(channel_id) = gated_channel_id(&msg)
&& !self.channel_gate_open(channel_id)
{
return Err(TrySendError::Full(()));
}
let (batch, _result_rx, gated_channels) = match self.prepare_outbound_batch(
msg,
binder,
forwarded_schemas,
channel_method,
extra_schema_sends,
) {
Ok(prepared) => prepared,
Err(err) => {
self.open_channel_gates(&err.gated_channels);
return Err(TrySendError::Closed(()));
}
};
let result = self.outbound_tx.try_send(batch).map_err(|err| match err {
mpsc::error::TrySendError::Full(_) => {
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::OutboundQueueFull { lane_id });
}
TrySendError::Full(())
}
mpsc::error::TrySendError::Closed(_) => {
if let Some(observer) = &self.observer {
observer.driver_event(vox_types::DriverEvent::OutboundQueueClosed { lane_id });
}
TrySendError::Closed(())
}
});
self.open_channel_gates(&gated_channels);
result
}
pub(crate) fn record_incoming_call(
&self,
conn_id: LaneId,
request_id: RequestId,
method_id: vox_types::MethodId,
) {
let mut inner = self.inner.lock().expect("connection core mutex poisoned");
let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
vox_types::dlog!(
"[schema] record_incoming_call: conn={:?} req={:?} method={:?}",
conn_id,
request_id,
method_id
);
conn_state
.lock()
.expect("send conn state mutex poisoned")
.inflight_incoming
.insert(request_id, method_id);
}
pub(crate) fn take_outgoing_call_method(
&self,
conn_id: LaneId,
request_id: RequestId,
) -> Option<vox_types::MethodId> {
let inner = self.inner.lock().expect("connection core mutex poisoned");
inner.conns.get(&conn_id).and_then(|conn_state| {
conn_state
.lock()
.expect("send conn state mutex poisoned")
.inflight_outgoing
.remove(&request_id)
})
}
pub(crate) fn prepare_response_for_method(
&self,
conn_id: LaneId,
request_id: RequestId,
method_id: vox_types::MethodId,
response: &mut RequestResponse<'_>,
) {
let mut inner = self.inner.lock().expect("connection core mutex poisoned");
let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
let key = (vox_types::BindingDirection::Response, method_id);
if conn_state
.send_tracker
.has_sent_binding(method_id, vox_types::BindingDirection::Response)
{
response.schemas = Default::default();
return;
}
let prepared = match &response.ret {
vox_types::Payload::Value { shape, .. } => {
match Self::get_or_plan_binding_for_shape(
&mut conn_state,
key,
request_id,
"response",
shape,
) {
Some(prepared) => prepared,
None => return,
}
}
vox_types::Payload::Encoded(_) => {
tracing::error!(
"schema attachment failed: missing forwarded response schemas for method {:?}",
method_id
);
return;
}
};
response.schemas = prepared.to_payload();
}
pub(crate) fn prepare_response_for_shape(
&self,
conn_id: LaneId,
_request_id: RequestId,
method_id: vox_types::MethodId,
shape: &'static Shape,
response: &mut RequestResponse<'_>,
) {
let mut inner = self.inner.lock().expect("connection core mutex poisoned");
let conn_state = get_or_create_send_conn_state(&mut inner, conn_id);
let mut conn_state = conn_state.lock().expect("send conn state mutex poisoned");
if conn_state
.send_tracker
.has_sent_binding(method_id, vox_types::BindingDirection::Response)
{
response.schemas = Default::default();
return;
}
match vox_types::SchemaSendTracker::plan_for_shape(shape) {
Ok(prepared) => {
response.schemas = conn_state.send_tracker.commit_prepared_plan(
method_id,
vox_types::BindingDirection::Response,
prepared,
);
}
Err(e) => tracing::error!("error-response schema extraction failed: {e}"),
}
}
fn get_or_plan_binding_for_shape(
conn_state: &mut SendConnState,
key: (vox_types::BindingDirection, vox_types::MethodId),
request_id: RequestId,
kind: &str,
shape: &'static Shape,
) -> Option<vox_types::PreparedSchemaPlan> {
if let Some(prepared) = conn_state.planned_bindings.get(&key) {
return Some(prepared.clone());
}
match vox_types::SchemaSendTracker::plan_for_shape(shape) {
Ok(prepared) => {
vox_types::dlog!(
"[schema] planned {} {} schemas for method {:?} (req {:?})",
prepared.bytes.len(),
kind,
key.1,
request_id
);
conn_state.planned_bindings.insert(key, prepared.clone());
Some(prepared)
}
Err(e) => {
tracing::error!("schema extraction failed: {e}");
None
}
}
}
fn get_or_plan_binding_from_tracker(
conn_state: &mut SendConnState,
key: (vox_types::BindingDirection, vox_types::MethodId),
tracker: &vox_types::SchemaRecvTracker,
) -> Option<vox_types::PreparedSchemaPlan> {
if let Some(prepared) = conn_state.planned_bindings.get(&key) {
return Some(prepared.clone());
}
let bytes = tracker.writer_schema_bytes(key.1, key.0)?;
let prepared = vox_types::PreparedSchemaPlan { bytes };
conn_state.planned_bindings.insert(key, prepared.clone());
Some(prepared)
}
fn plan_response_schema_send(
conn_state: &mut SendConnState,
request_id: RequestId,
method_id: vox_types::MethodId,
response: &mut RequestResponse<'_>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
) -> Option<PendingSchemaSend> {
if conn_state
.send_tracker
.has_sent_binding(method_id, vox_types::BindingDirection::Response)
{
response.schemas = Default::default();
return None;
}
let key = (vox_types::BindingDirection::Response, method_id);
let prepared = if !response.schemas.is_empty() {
conn_state
.planned_bindings
.get(&key)
.cloned()
.unwrap_or_else(|| vox_types::PreparedSchemaPlan {
bytes: response.schemas.0.clone(),
})
} else {
match &response.ret {
vox_types::Payload::Value { shape, .. } => Self::get_or_plan_binding_for_shape(
conn_state, key, request_id, "response", shape,
)?,
vox_types::Payload::Encoded(_) => {
let Some(source) = forwarded_schemas else {
tracing::error!(
"schema attachment failed: missing forwarded response schemas for method {:?}",
method_id
);
return None;
};
Self::get_or_plan_binding_from_tracker(conn_state, key, source)?
}
}
};
Some(PendingSchemaSend {
method_id,
direction: vox_types::BindingDirection::Response,
prepared,
})
}
fn plan_call_schema_send(
conn_state: &mut SendConnState,
request_id: RequestId,
method_id: vox_types::MethodId,
call: &mut vox_types::RequestCall<'_>,
forwarded_schemas: Option<&vox_types::SchemaRecvTracker>,
) -> Option<PendingSchemaSend> {
conn_state.inflight_outgoing.insert(request_id, method_id);
if conn_state
.send_tracker
.has_sent_binding(method_id, vox_types::BindingDirection::Args)
{
call.schemas = Default::default();
return None;
}
let key = (vox_types::BindingDirection::Args, method_id);
let prepared = if !call.schemas.is_empty() {
conn_state
.planned_bindings
.get(&key)
.cloned()
.unwrap_or_else(|| vox_types::PreparedSchemaPlan {
bytes: call.schemas.0.clone(),
})
} else {
match &call.args {
vox_types::Payload::Value { shape, .. } => {
Self::get_or_plan_binding_for_shape(conn_state, key, request_id, "args", shape)?
}
vox_types::Payload::Encoded(_) => {
let Some(source) = forwarded_schemas else {
tracing::error!(
"schema attachment failed: missing forwarded args schemas for method {:?}",
method_id
);
return None;
};
Self::get_or_plan_binding_from_tracker(conn_state, key, source)?
}
}
};
Some(PendingSchemaSend {
method_id,
direction: vox_types::BindingDirection::Args,
prepared,
})
}
}
pub trait DynConduitTx: MaybeSend + MaybeSync {
fn prepare_msg<'a>(
self: Arc<Self>,
msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
) -> std::io::Result<OutboundSend>;
}
pub trait DynConduitRx: MaybeSend {
fn recv_msg<'a>(&'a mut self)
-> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>>;
fn take_frame_fds(&mut self) -> vox_types::FrameFds;
}
impl<T> DynConduitTx for T
where
T: ConduitTx<Msg = MessageFamily> + MaybeSend + MaybeSync + 'static,
{
fn prepare_msg<'a>(
self: Arc<Self>,
msg: Message<'a>,
binder: Option<&'a dyn vox_types::ChannelBinder>,
) -> std::io::Result<OutboundSend> {
let prepared = if let Some(binder) = binder {
vox_types::with_channel_binder(binder, || self.prepare_send(msg))
} else {
self.prepare_send(msg)
};
let prepared = prepared.map_err(|e| std::io::Error::other(e.to_string()))?;
Ok(Box::pin(async move {
self.send_prepared(prepared)
.await
.map_err(|e| std::io::Error::other(e.to_string()))
}))
}
}
impl<T> DynConduitRx for T
where
T: ConduitRx<Msg = MessageFamily> + MaybeSend,
{
fn recv_msg<'a>(
&'a mut self,
) -> BoxFut<'a, std::io::Result<Option<SelfRef<Message<'static>>>>> {
Box::pin(async move {
self.recv()
.await
.map_err(|error| std::io::Error::other(error.to_string()))
})
}
fn take_frame_fds(&mut self) -> vox_types::FrameFds {
ConduitRx::take_frame_fds(self)
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use vox_rt::sync::mpsc;
use vox_types::{
Backing, BindingDirection, Conduit, DriverEvent, HandshakeResult, LaneAccept, LaneReject,
Payload, RequestCall, SelfRef, TransportEvent, VoxObserverHandle,
};
use super::*;
#[derive(Clone)]
struct CapturingTx {
sent: Arc<Mutex<Vec<CapturedMessage>>>,
}
#[derive(Debug)]
struct CapturedMessage {
lane_id: LaneId,
payload: CapturedPayload,
}
#[derive(Debug)]
enum CapturedPayload {
Schema {
method_id: vox_types::MethodId,
direction: BindingDirection,
schemas: Vec<u8>,
},
Call {
request_id: RequestId,
method_id: vox_types::MethodId,
schemas_len: usize,
},
Response {
request_id: RequestId,
schemas_len: usize,
},
Other,
}
impl ConduitTx for CapturingTx {
type Error = std::io::Error;
type Msg = MessageFamily;
type Prepared = CapturedMessage;
fn prepare_send(&self, item: Message<'_>) -> Result<Self::Prepared, Self::Error> {
let payload = match &item.payload {
MessagePayload::SchemaMessage(schema) => CapturedPayload::Schema {
method_id: schema.method_id,
direction: schema.direction,
schemas: schema.schemas.0.clone(),
},
MessagePayload::RequestMessage(request) => match &request.body {
RequestBody::Call(call) => CapturedPayload::Call {
request_id: request.id,
method_id: call.method_id,
schemas_len: call.schemas.0.len(),
},
RequestBody::Response(response) => CapturedPayload::Response {
request_id: request.id,
schemas_len: response.schemas.0.len(),
},
_ => CapturedPayload::Other,
},
_ => CapturedPayload::Other,
};
Ok(CapturedMessage {
lane_id: item.lane_id,
payload,
})
}
async fn send_prepared(&self, prepared: Self::Prepared) -> Result<(), Self::Error> {
self.sent
.lock()
.expect("captured message mutex poisoned")
.push(prepared);
Ok(())
}
async fn close(self) -> std::io::Result<()> {
Ok(())
}
}
struct PendingRx;
impl ConduitRx for PendingRx {
type Error = std::io::Error;
type Msg = MessageFamily;
async fn recv(&mut self) -> Result<Option<SelfRef<Message<'static>>>, Self::Error> {
std::future::pending().await
}
}
struct RecordingObserver {
driver_events: Arc<Mutex<Vec<DriverEvent>>>,
transport_events: Arc<Mutex<Vec<TransportEvent>>>,
}
impl vox_types::VoxObserver for RecordingObserver {
fn driver_event(&self, event: DriverEvent) {
self.driver_events
.lock()
.expect("driver events mutex poisoned")
.push(event);
}
fn transport_event(&self, event: TransportEvent) {
self.transport_events
.lock()
.expect("transport events mutex poisoned")
.push(event);
}
}
fn make_connection() -> Connection {
let (a, b) = crate::memory_link_pair(32);
std::mem::forget(b);
let conduit = crate::BareConduit::new(a);
let (tx, rx) = conduit.split();
let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("connection.open.test", 4);
let (_close_tx, close_rx) = mpsc::channel::<CloseRequest>("connection.close.test", 4);
let (control_tx, control_rx) = mpsc::unbounded_channel("connection.control.test");
Connection::pre_handshake(
tx, rx, None, open_rx, close_rx, control_tx, control_rx, None, None,
)
}
fn make_connection_with_observer(observer: VoxObserverHandle) -> Connection {
let (a, b) = crate::memory_link_pair(32);
std::mem::forget(b);
let conduit = crate::BareConduit::new(a);
let (tx, rx) = conduit.split();
let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("connection.open.observed.test", 4);
let (_close_tx, close_rx) =
mpsc::channel::<CloseRequest>("connection.close.observed.test", 4);
let (control_tx, control_rx) = mpsc::unbounded_channel("connection.control.observed.test");
Connection::pre_handshake(
tx,
rx,
None,
open_rx,
close_rx,
control_tx,
control_rx,
None,
Some(observer),
)
}
fn make_capturing_connection(
sent: Arc<Mutex<Vec<CapturedMessage>>>,
) -> (Connection, LaneHandle) {
let (_open_tx, open_rx) = mpsc::channel::<OpenRequest>("connection.open.capture.test", 4);
let (_close_tx, close_rx) =
mpsc::channel::<CloseRequest>("connection.close.capture.test", 4);
let (control_tx, control_rx) = mpsc::unbounded_channel("connection.control.capture.test");
let mut connection = Connection::pre_handshake(
CapturingTx { sent },
PendingRx,
None,
open_rx,
close_rx,
control_tx,
control_rx,
None,
None,
);
let handle = connection
.establish_from_handshake(test_handshake(
ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
))
.expect("establish captured connection");
(connection, handle)
}
fn test_handshake(
our_settings: ConnectionSettings,
peer_settings: ConnectionSettings,
) -> HandshakeResult {
HandshakeResult {
role: ConnectionRole::Initiator,
our_settings,
peer_settings,
our_schema: vec![],
peer_schema: vec![],
peer_metadata: vox_types::Metadata::default(),
peer_evidence: vox_types::PeerEvidence::none(),
peer_identity: vox_types::PeerIdentity::anonymous(),
}
}
fn accept_ref() -> SelfRef<LaneAccept> {
SelfRef::owning(
Backing::Boxed(Box::<[u8]>::default()),
LaneAccept {
connection_settings: ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
metadata: vox_types::Metadata::default(),
},
)
}
fn zero_credit_accept_ref() -> SelfRef<LaneAccept> {
SelfRef::owning(
Backing::Boxed(Box::<[u8]>::default()),
LaneAccept {
connection_settings: ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 0,
},
metadata: vox_types::Metadata::default(),
},
)
}
fn reject_ref() -> SelfRef<LaneReject> {
SelfRef::owning(
Backing::Boxed(Box::<[u8]>::default()),
LaneReject {
metadata: vox_types::Metadata::default(),
},
)
}
#[test]
fn connection_receive_errors_emit_diagnostics_and_non_graceful_close_reasons() {
fn run_case(
error: std::io::Error,
expected_reason: ConnectionCloseReason,
expect_decode_error: bool,
) {
let driver_events = Arc::new(Mutex::new(Vec::new()));
let transport_events = Arc::new(Mutex::new(Vec::new()));
let observer: VoxObserverHandle = Arc::new(RecordingObserver {
driver_events: driver_events.clone(),
transport_events: transport_events.clone(),
});
let mut connection = make_connection_with_observer(observer);
let handle = connection
.establish_from_handshake(test_handshake(
ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
))
.expect("establish observed connection");
driver_events
.lock()
.expect("driver events mutex poisoned")
.clear();
transport_events
.lock()
.expect("transport events mutex poisoned")
.clear();
connection.observe_connection_recv_error(&error);
connection.close_all_connections(classify_connection_recv_error(&error));
assert_eq!(handle.close_reason(), Some(expected_reason));
let driver_events = driver_events.lock().expect("driver events mutex poisoned");
assert!(driver_events.iter().any(|event| matches!(
event,
DriverEvent::LaneClosed {
lane_id: LaneId::CONTROL,
reason
} if *reason == expected_reason
)));
assert_eq!(
driver_events.iter().any(|event| matches!(
event,
DriverEvent::DecodeError {
lane_id: LaneId::CONTROL,
kind: DecodeErrorKind::Payload,
}
)),
expect_decode_error
);
let transport_events = transport_events
.lock()
.expect("transport events mutex poisoned");
assert_eq!(
transport_events.iter().any(|event| matches!(
event,
TransportEvent::Closed {
lane_id: None,
reason
} if *reason == expected_reason
)),
!expect_decode_error
);
}
run_case(
std::io::Error::other("decode error: invalid Message payload"),
ConnectionCloseReason::Protocol,
true,
);
run_case(
std::io::Error::other("connection reset by peer"),
ConnectionCloseReason::Transport,
false,
);
}
#[tokio::test]
async fn caller_schema_exchange_sends_binding_once_before_request() {
use facet::Facet;
let sent = Arc::new(Mutex::new(Vec::new()));
let (_connection, handle) = make_capturing_connection(Arc::clone(&sent));
let method_id = vox_types::MethodId(700);
let first_arg = 42_u32;
handle
.sender
.send(ConnectionMessage::Request(RequestMessage {
id: RequestId(1),
body: RequestBody::Call(RequestCall {
method_id,
channels: Vec::new(),
metadata: Metadata::default(),
args: Payload::outgoing(&first_arg),
schemas: Default::default(),
}),
}))
.await
.expect("first call send");
{
let captured = sent.lock().expect("captured message mutex poisoned");
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].lane_id, LaneId::CONTROL);
match &captured[0].payload {
CapturedPayload::Schema {
method_id: actual_method_id,
direction,
schemas,
} => {
assert_eq!(*actual_method_id, method_id);
assert_eq!(*direction, BindingDirection::Args);
let parsed =
vox_phon::parse_schema_bytes(schemas).expect("parse args schema binding");
let expected_root =
vox_phon::schema_id_for_shape(<u32 as Facet>::SHAPE).expect("u32 root");
assert_eq!(parsed.root, expected_root);
}
other => panic!("expected schema message before request, got {other:?}"),
}
assert_eq!(captured[1].lane_id, LaneId::CONTROL);
match &captured[1].payload {
CapturedPayload::Call {
request_id,
method_id: actual_method_id,
schemas_len,
} => {
assert_eq!(*request_id, RequestId(1));
assert_eq!(*actual_method_id, method_id);
assert_eq!(*schemas_len, 0);
}
other => panic!("expected request after schema message, got {other:?}"),
}
}
let second_arg = 43_u32;
handle
.sender
.send(ConnectionMessage::Request(RequestMessage {
id: RequestId(3),
body: RequestBody::Call(RequestCall {
method_id,
channels: Vec::new(),
metadata: Metadata::default(),
args: Payload::outgoing(&second_arg),
schemas: Default::default(),
}),
}))
.await
.expect("second call send");
let captured = sent.lock().expect("captured message mutex poisoned");
assert_eq!(captured.len(), 3);
match &captured[2].payload {
CapturedPayload::Call {
request_id,
method_id: actual_method_id,
schemas_len,
} => {
assert_eq!(*request_id, RequestId(3));
assert_eq!(*actual_method_id, method_id);
assert_eq!(*schemas_len, 0);
}
other => panic!("expected second request without schema resend, got {other:?}"),
}
}
#[tokio::test]
async fn callee_schema_exchange_sends_binding_once_before_response() {
use facet::Facet;
let sent = Arc::new(Mutex::new(Vec::new()));
let (_connection, handle) = make_capturing_connection(Arc::clone(&sent));
let method_id = vox_types::MethodId(701);
let request_id = RequestId(11);
handle
.sender
.connection_core
.record_incoming_call(LaneId::CONTROL, request_id, method_id);
let first_response: Result<u32, vox_types::VoxError<core::convert::Infallible>> = Ok(99);
handle
.sender
.send_response(
request_id,
RequestResponse {
metadata: Metadata::default(),
ret: Payload::outgoing(&first_response),
schemas: Default::default(),
},
)
.await
.expect("first response send");
{
let captured = sent.lock().expect("captured message mutex poisoned");
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].lane_id, LaneId::CONTROL);
match &captured[0].payload {
CapturedPayload::Schema {
method_id: actual_method_id,
direction,
schemas,
} => {
assert_eq!(*actual_method_id, method_id);
assert_eq!(*direction, BindingDirection::Response);
let parsed = vox_phon::parse_schema_bytes(schemas)
.expect("parse response schema binding");
let expected_root = vox_phon::schema_id_for_shape(
<Result<
u32,
vox_types::VoxError<core::convert::Infallible>,
> as Facet>::SHAPE,
)
.expect("response root");
assert_eq!(parsed.root, expected_root);
}
other => panic!("expected schema message before response, got {other:?}"),
}
assert_eq!(captured[1].lane_id, LaneId::CONTROL);
match &captured[1].payload {
CapturedPayload::Response {
request_id: actual_request_id,
schemas_len,
} => {
assert_eq!(*actual_request_id, request_id);
assert_eq!(*schemas_len, 0);
}
other => panic!("expected response after schema message, got {other:?}"),
}
}
let second_request_id = RequestId(13);
handle.sender.connection_core.record_incoming_call(
LaneId::CONTROL,
second_request_id,
method_id,
);
let second_response: Result<u32, vox_types::VoxError<core::convert::Infallible>> = Ok(100);
handle
.sender
.send_response(
second_request_id,
RequestResponse {
metadata: Metadata::default(),
ret: Payload::outgoing(&second_response),
schemas: Default::default(),
},
)
.await
.expect("second response send");
let captured = sent.lock().expect("captured message mutex poisoned");
assert_eq!(captured.len(), 3);
match &captured[2].payload {
CapturedPayload::Response {
request_id: actual_request_id,
schemas_len,
} => {
assert_eq!(*actual_request_id, second_request_id);
assert_eq!(*schemas_len, 0);
}
other => panic!("expected second response without schema resend, got {other:?}"),
}
}
#[tokio::test]
async fn duplicate_connection_accept_is_ignored_after_first() {
let mut connection = make_connection();
let conn_id = LaneId(1);
let (result_tx, result_rx) = vox_rt::sync::oneshot::channel("connection.test.open_result");
connection.conns.insert(
conn_id,
ConnectionSlot::PendingOutbound(PendingOutboundData {
local_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
establishment_started_at: Instant::now(),
result_tx: Some(result_tx),
}),
);
connection.handle_inbound_accept(conn_id, accept_ref());
let handle = result_rx
.await
.expect("pending outbound result should resolve")
.expect("accept should resolve as Ok");
assert_eq!(handle.lane_id(), conn_id);
connection.handle_inbound_accept(conn_id, accept_ref());
assert!(
matches!(
connection.conns.get(&conn_id),
Some(ConnectionSlot::Active(LaneState { id, .. })) if *id == conn_id
),
"duplicate accept should keep existing active connection state"
);
}
#[tokio::test]
async fn duplicate_connection_reject_is_ignored_after_first() {
let mut connection = make_connection();
let conn_id = LaneId(1);
let (result_tx, result_rx) = vox_rt::sync::oneshot::channel("connection.test.open_result");
connection.conns.insert(
conn_id,
ConnectionSlot::PendingOutbound(PendingOutboundData {
local_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
establishment_started_at: Instant::now(),
result_tx: Some(result_tx),
}),
);
connection.handle_inbound_reject(conn_id, reject_ref());
let result = result_rx
.await
.expect("pending outbound result should resolve");
assert!(
matches!(result, Err(ConnectionError::Rejected(_))),
"expected rejection, got: {result:?}"
);
connection.handle_inbound_reject(conn_id, reject_ref());
assert!(
!connection.conns.contains_key(&conn_id),
"duplicate reject should not recreate connection state"
);
}
#[tokio::test]
async fn inbound_accept_with_zero_initial_credit_rejects_pending_open() {
let mut connection = make_connection();
let conn_id = LaneId(1);
let (result_tx, result_rx) = vox_rt::sync::oneshot::channel("connection.test.open_result");
connection.conns.insert(
conn_id,
ConnectionSlot::PendingOutbound(PendingOutboundData {
local_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
establishment_started_at: Instant::now(),
result_tx: Some(result_tx),
}),
);
connection.handle_inbound_accept(conn_id, zero_credit_accept_ref());
let result = result_rx
.await
.expect("pending outbound result should resolve");
assert!(
matches!(
result,
Err(ConnectionError::Protocol(ref message))
if message == "initial_channel_credit must be greater than zero"
),
"expected zero-credit protocol error, got: {result:?}"
);
assert!(
!connection.conns.contains_key(&conn_id),
"zero-credit accept should not create an active connection"
);
}
#[test]
fn out_of_order_accept_or_reject_without_pending_is_ignored() {
let mut connection = make_connection();
let conn_id = LaneId(99);
connection.handle_inbound_accept(conn_id, accept_ref());
connection.handle_inbound_reject(conn_id, reject_ref());
assert!(
connection.conns.is_empty(),
"out-of-order accept/reject should not mutate empty connection table"
);
}
#[tokio::test]
async fn close_request_clears_pending_outbound_open() {
let mut connection = make_connection();
let (open_result_tx, open_result_rx) =
vox_rt::sync::oneshot::channel("connection.open.result");
let (close_result_tx, close_result_rx) =
vox_rt::sync::oneshot::channel("connection.close.result");
connection.conns.insert(
LaneId(1),
ConnectionSlot::PendingOutbound(PendingOutboundData {
local_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
establishment_started_at: Instant::now(),
result_tx: Some(open_result_tx),
}),
);
connection
.handle_close_request(CloseRequest {
conn_id: LaneId(1),
metadata: vox_types::Metadata::default(),
result_tx: close_result_tx,
})
.await;
let close_result = close_result_rx
.await
.expect("close result should be delivered");
assert!(
close_result.is_ok(),
"close should succeed for pending outbound connection"
);
assert!(
open_result_rx.await.is_err(),
"pending open result channel should be closed once the pending slot is removed"
);
}
}